310 lines
11 KiB
Python
310 lines
11 KiB
Python
'''
|
|
Author: liu xiaoyan
|
|
Date: 2023-12-01 14:30:12
|
|
LastEditors: WZX 17839623189@168.com
|
|
LastEditTime: 2023-12-05 17:01:44
|
|
FilePath: /wzx/zkmetaapi/wukong/ZKMetaUnit/ZKMeta.py
|
|
'''
|
|
import os, sys
|
|
import time
|
|
import math
|
|
from typing import List
|
|
from .utils import MQTT, parse_robot_state, calcu_vecloty, calcu_distance
|
|
sys.path.append("/home/unitree/alg/prejects/ZK-AI/zkdog/ZKDogApi/src")
|
|
import robot_interface as sdk
|
|
|
|
HIGHLEVEL = 0xee
|
|
LOWLEVEL = 0xff
|
|
|
|
# cmake -DPYTHON_LIBRARY=/home/unitree/python3.8/lib \
|
|
# -DPYTHON_INCLUDE_DIR=/home/unitree/python3.8/include/python3.8 \
|
|
# -DPYTHON_EXECUTABLE=/home/unitree/python3.8/bin/python3.8 \
|
|
# ..
|
|
|
|
class ZKDOG:
|
|
def __init__(self) -> None:
|
|
self.udp = sdk.UDP(HIGHLEVEL, 8080, "192.168.123.161", 8082)
|
|
self.MQTT = MQTT()
|
|
self.cmd = sdk.HighCmd()
|
|
self.state = sdk.HighState()
|
|
self.udp.InitCmdData(self.cmd)
|
|
self.udp.Recv()
|
|
self.udp.GetRecv(self.state)
|
|
self.cmd.gaitType = 0
|
|
self.cmd.speedLevel = 0
|
|
self.cmd.footRaiseHeight = 0
|
|
self.cmd.bodyHeight = 0
|
|
self.cmd.euler = [0, 0, 0]
|
|
self.cmd.velocity = [0, 0]
|
|
self.cmd.yawSpeed = 0.0
|
|
self.cmd.reserve = 0
|
|
self.flag = 1
|
|
self.MQTT.run()
|
|
|
|
def check_temp(self):
|
|
while True:
|
|
time.sleep(1) # Check every 1 second
|
|
with self.MQTT.lock:
|
|
return parse_robot_state(self.MQTT.bytes)
|
|
|
|
def moving_unlocked(self, times: int = 1000):
|
|
while times > 0:
|
|
time.sleep(0.002)
|
|
self.udp.Recv()
|
|
self.udp.GetRecv(self.state)
|
|
self.recovery()
|
|
self.cmd.mode = 6
|
|
times -= 1
|
|
self.udp.SetSend(self.cmd)
|
|
self.udp.Send()
|
|
|
|
def recovery(self):
|
|
self.cmd.mode = 0
|
|
self.cmd.gaitType = 0
|
|
self.cmd.speedLevel = 0
|
|
self.cmd.footRaiseHeight = 0
|
|
self.cmd.bodyHeight = 0
|
|
self.cmd.euler = [0, 0, 0]
|
|
self.cmd.velocity = [0, 0]
|
|
self.cmd.yawSpeed = 0.0
|
|
self.cmd.reserve = 0
|
|
|
|
|
|
def forward(self, distance: float = 1.0, gaitType: int = 0,
|
|
velocaity: List=[1, 0], yawSpeed: int =0, footRaiseHeight: float=0.1):
|
|
'''
|
|
@ description: this is a function for moving forward and backward.
|
|
Parameters:
|
|
@distance: moving distance.
|
|
@gaitType: sport mode. Notice: Except not modification.
|
|
@velocaity: except max moving speed.
|
|
@yawSpeed: Except set the value=0 when moving.
|
|
@footRaiseHeight: Except not modification.
|
|
EXAMPLE 1: moving forward 0.1 m
|
|
zkdog = ZKDOG()
|
|
zkdog.forward(distance=0.1)
|
|
EXAMPLE 2: moving forward 10.1 m at 1m/s
|
|
zkdog.forward(distance=10.1, velocaity=[1, 0])
|
|
EXAMPLE 3: moving backward 5m at 1m/s
|
|
zkdog.forward(distance=5, velocaity=[-1, 0])
|
|
|
|
'''
|
|
if self.flag == 0:
|
|
return False
|
|
if self.flag == 1:
|
|
self.moving_unlocked()
|
|
with self.MQTT.lock:
|
|
past_dog_info = parse_robot_state(self.MQTT.bytes)
|
|
past_point = past_dog_info["position"]
|
|
current_point = past_point
|
|
if distance > 2:
|
|
while calcu_distance(current_point, past_point) < distance - 8*(math.pow(velocaity[0], 2)/12 + math.pow(velocaity[0], 2)/8):
|
|
time.sleep(0.1)
|
|
self.udp.Recv()
|
|
self.udp.GetRecv(self.state)
|
|
self.recovery()
|
|
self.cmd.mode = 2
|
|
self.cmd.gaitType = gaitType
|
|
current_point = parse_robot_state(self.MQTT.bytes)["position"]
|
|
self.cmd.velocity = velocaity
|
|
self.cmd.yawSpeed = yawSpeed
|
|
self.cmd.footRaiseHeight = footRaiseHeight
|
|
self.udp.SetSend(self.cmd)
|
|
self.udp.Send()
|
|
current_point = parse_robot_state(self.MQTT.bytes)["position"]
|
|
time.sleep(2)
|
|
current_point = parse_robot_state(self.MQTT.bytes)["position"]
|
|
else:
|
|
while distance - calcu_distance(current_point, past_point) > 0.1:
|
|
time.sleep(0.1)
|
|
self.udp.Recv()
|
|
self.udp.GetRecv(self.state)
|
|
self.recovery()
|
|
self.cmd.mode = 2
|
|
self.cmd.gaitType = gaitType
|
|
current_point = parse_robot_state(self.MQTT.bytes)["position"]
|
|
self.cmd.velocity = [0.15 if velocaity[0]>0 else -0.15, 0]
|
|
self.cmd.yawSpeed = yawSpeed
|
|
self.cmd.footRaiseHeight = footRaiseHeight
|
|
self.udp.SetSend(self.cmd)
|
|
self.udp.Send()
|
|
current_point = parse_robot_state(self.MQTT.bytes)["position"]
|
|
self.recovery()
|
|
self.udp.SetSend(self.cmd)
|
|
self.udp.Send()
|
|
self.is_sitdown = False
|
|
self.is_standup = True
|
|
return True
|
|
|
|
|
|
def rolling(self, tag: int = 90, gaitType: int = 0,
|
|
velocaity: List=[0, 0], yawSpeed: int = 1, footRaiseHeight: float=0.1):
|
|
'''
|
|
@ description: rolling
|
|
Parameters:
|
|
@tag: Rotation Angle
|
|
@gaitType: sport mode, Notice Expect not modification
|
|
@velocaity: moving velocaity, Notice Expect not modification.
|
|
@yawSpeed: Arc velocity. Expect range from [-3, 3].
|
|
@footRaiseHeight: Expect range from [0, 0.8]
|
|
|
|
EXAMPLE: 180 degrees left at 1 rad/s
|
|
zkdog = ZKDOG()
|
|
zkdog.rolling(tag=180, yawSpeed=1)
|
|
'''
|
|
|
|
if self.flag == 0:
|
|
return False
|
|
if self.flag == 1:
|
|
self.moving_unlocked()
|
|
with self.MQTT.lock:
|
|
past_dog_info = parse_robot_state(self.MQTT.bytes)
|
|
past_yaw = past_dog_info["rpy"][-1]
|
|
current_yaw = past_yaw
|
|
param_rads = 180/math.pi
|
|
send_time = 0.1
|
|
yaw_different = current_yaw - past_yaw
|
|
tmp_yawSpeed = yawSpeed
|
|
count = 0
|
|
start_time = time.time()
|
|
while yaw_different < tag:
|
|
self.udp.Recv()
|
|
self.udp.GetRecv(self.state)
|
|
self.recovery()
|
|
self.cmd.mode = 2
|
|
self.cmd.gaitType = gaitType
|
|
self.cmd.velocity = velocaity # -1 ~ +1
|
|
# 速度控制
|
|
if tag-yaw_different > abs(yawSpeed)*param_rads * send_time*5:
|
|
self.cmd.yawSpeed = yawSpeed
|
|
else:
|
|
tmp_yawSpeed = tmp_yawSpeed * 0.8
|
|
self.cmd.yawSpeed = tmp_yawSpeed if abs(tmp_yawSpeed) > 0.1 else 0.1 if yawSpeed > 0 else -0.1
|
|
self.cmd.footRaiseHeight = footRaiseHeight
|
|
self.udp.SetSend(self.cmd)
|
|
self.udp.Send()
|
|
time.sleep(send_time)
|
|
# 数值获取
|
|
current_yaw = parse_robot_state(self.MQTT.bytes)["rpy"][-1]
|
|
|
|
if yawSpeed < 0:
|
|
yaw_different = (count+1)*360+past_yaw-current_yaw if past_yaw-current_yaw < 0 else past_yaw-current_yaw+count*360
|
|
|
|
else:
|
|
yaw_different = (count+1)*360+current_yaw-past_yaw if current_yaw-past_yaw < 0 else current_yaw - past_yaw+count*360
|
|
end_time = time.time()
|
|
if end_time-start_time > 10 and past_yaw-2 <= current_yaw <= past_yaw+2:
|
|
count += 1
|
|
start_time = time.time()
|
|
|
|
self.recovery()
|
|
self.udp.SetSend(self.cmd)
|
|
self.udp.Send()
|
|
self.is_sitdown = False
|
|
self.is_standup = True
|
|
return True
|
|
|
|
def __sitdown(self):
|
|
'''
|
|
description: sitdown
|
|
return {*}
|
|
'''
|
|
if self.flag == 0:
|
|
return False
|
|
|
|
if self.flag == 1:
|
|
times = 0
|
|
sitdown_cmd = [1, 6, 5, 7]
|
|
# 1 -> 6 -> 5 -> 7
|
|
while times < 4000:
|
|
time.sleep(0.002)
|
|
# self.recovery()
|
|
self.cmd.mode = sitdown_cmd[times//1000]
|
|
times += 1
|
|
self.udp.SetSend(self.cmd)
|
|
self.udp.Send()
|
|
self.flag = 0
|
|
return True
|
|
|
|
def __standup(self):
|
|
'''
|
|
description: stand up
|
|
return {*}
|
|
'''
|
|
times = 0
|
|
standup_cmd = [7, 5, 6]
|
|
if self.flag == 1:
|
|
return False
|
|
if self.flag == 0:
|
|
while times < 3000:
|
|
time.sleep(0.002)
|
|
self.cmd.mode = standup_cmd[times//1000]
|
|
times += 1
|
|
self.udp.SetSend(self.cmd)
|
|
self.udp.Send()
|
|
self.flag = 1
|
|
return True
|
|
|
|
def getcurrenrstate(self):
|
|
return self.state.mode
|
|
|
|
def turn_left(self, tag=90):
|
|
try:
|
|
res = self.rolling(tag=tag)
|
|
return {"type": res}
|
|
except Exception as e:
|
|
return {"type": False, "msg": e}
|
|
|
|
def turn_right(self, tag=90):
|
|
try:
|
|
res = self.rolling(tag=tag, yawSpeed = -1)
|
|
return {"type": res}
|
|
except Exception as e:
|
|
return {"type": False, "msg": e}
|
|
|
|
def up(self, tag=1):
|
|
try:
|
|
res = self.forward(distance=tag)
|
|
return {"type": res}
|
|
except Exception as e:
|
|
return {"type": False, "msg": e}
|
|
|
|
def down(self, tag=1):
|
|
try:
|
|
res = self.forward(distance=tag, velocaity=[-1, 0])
|
|
return {"type": res}
|
|
except Exception as e:
|
|
return {"type": False, "msg": e}
|
|
|
|
def bark(self, tag="bark"):
|
|
try:
|
|
tag="bark"
|
|
if not os.system(f"ssh unitree@192.168.123.13 -t 'aplay -D plughw:2,0 /home/unitree/tmp/{tag}'.wav"):
|
|
return {"type": True}
|
|
except Exception as e:
|
|
return {"type": False, "msg": e}
|
|
|
|
def sitdown(self, tag=1):
|
|
try:
|
|
res = self.__sitdown()
|
|
return {"type": res}
|
|
except Exception as e:
|
|
return {"type": False, "msg": e}
|
|
|
|
def standup(self, tag=1):
|
|
try:
|
|
res = self.__standup()
|
|
return {"type": res}
|
|
except Exception as e:
|
|
return {"type": False, "msg": e}
|
|
|
|
if __name__=="__main__":
|
|
zd = ZKDOG()
|
|
|
|
print(zd.down(tag=1))
|
|
print(zd.turn_right(tag=180))
|
|
print(zd.up(tag=1))
|
|
print(zd.down(tag=1))
|
|
# zd.sitdown()
|
|
# zd.standup()
|
|
# zd.bark() |