196 lines
7.2 KiB
Python
196 lines
7.2 KiB
Python
import time
|
|
import sys
|
|
|
|
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize
|
|
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
|
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_
|
|
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_
|
|
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_
|
|
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_
|
|
from unitree_sdk2py.utils.crc import CRC
|
|
from unitree_sdk2py.utils.thread import RecurrentThread
|
|
import unitree_legged_const as b2w
|
|
from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient
|
|
from unitree_sdk2py.b2.sport.sport_client import SportClient
|
|
|
|
class Custom:
|
|
def __init__(self):
|
|
self.Kp = 1000.0
|
|
self.Kd = 10.0
|
|
self.time_consume = 0
|
|
self.rate_count = 0
|
|
self.sin_count = 0
|
|
self.motiontime = 0
|
|
self.dt = 0.002
|
|
|
|
self.low_cmd = unitree_go_msg_dds__LowCmd_()
|
|
self.low_state = None
|
|
|
|
self.targetPos_1 = [0.0, 1.36, -2.65, 0.0, 1.36, -2.65,
|
|
-0.2, 1.36, -2.65, 0.2, 1.36, -2.65]
|
|
|
|
self.targetPos_2 = [0.0, 0.67, -1.3, 0.0, 0.67, -1.3,
|
|
0.0, 0.67, -1.3, 0.0, 0.67, -1.3]
|
|
|
|
self.targetPos_3 = [-0.65, 1.36, -2.65, 0.65, 1.36, -2.65,
|
|
-0.65, 1.36, -2.65, 0.65, 1.36, -2.65]
|
|
|
|
self.startPos = [0.0] * 12
|
|
self.duration_1 = 800
|
|
self.duration_2 = 800
|
|
self.duration_3 = 2000
|
|
self.duration_4 = 1500
|
|
self.percent_1 = 0
|
|
self.percent_2 = 0
|
|
self.percent_3 = 0
|
|
self.percent_4 = 0
|
|
|
|
self.firstRun = True
|
|
self.done = False
|
|
|
|
# thread handling
|
|
self.lowCmdWriteThreadPtr = None
|
|
|
|
self.crc = CRC()
|
|
|
|
def Init(self):
|
|
self.InitLowCmd()
|
|
|
|
# create publisher #
|
|
self.lowcmd_publisher = ChannelPublisher("rt/lowcmd", LowCmd_)
|
|
self.lowcmd_publisher.Init()
|
|
|
|
# create subscriber #
|
|
self.lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_)
|
|
self.lowstate_subscriber.Init(self.LowStateMessageHandler, 10)
|
|
|
|
self.sc = SportClient()
|
|
self.sc.SetTimeout(5.0)
|
|
self.sc.Init()
|
|
|
|
self.msc = MotionSwitcherClient()
|
|
self.msc.SetTimeout(5.0)
|
|
self.msc.Init()
|
|
|
|
status, result = self.msc.CheckMode()
|
|
while result['name']:
|
|
self.sc.StandDown()
|
|
self.msc.ReleaseMode()
|
|
status, result = self.msc.CheckMode()
|
|
time.sleep(1)
|
|
|
|
def Start(self):
|
|
self.lowCmdWriteThreadPtr = RecurrentThread(
|
|
name="writebasiccmd", interval=self.dt, target=self.LowCmdWrite,
|
|
)
|
|
self.lowCmdWriteThreadPtr.Start()
|
|
|
|
def InitLowCmd(self):
|
|
self.low_cmd.head[0] = 0xFE
|
|
self.low_cmd.head[1] = 0xEF
|
|
self.low_cmd.level_flag = 0xFF
|
|
self.low_cmd.gpio = 0
|
|
for i in range(20):
|
|
self.low_cmd.motor_cmd[i].mode = 0x01 # (PMSM) mode
|
|
self.low_cmd.motor_cmd[i].q = b2w.PosStopF
|
|
self.low_cmd.motor_cmd[i].kp = 0
|
|
self.low_cmd.motor_cmd[i].dq = b2w.VelStopF
|
|
self.low_cmd.motor_cmd[i].kd = 0
|
|
self.low_cmd.motor_cmd[i].tau = 0
|
|
|
|
def LowStateMessageHandler(self, msg: LowState_):
|
|
self.low_state = msg
|
|
|
|
def LowCmdWrite(self):
|
|
if self.firstRun:
|
|
for i in range(12):
|
|
self.startPos[i] = self.low_state.motor_state[i].q
|
|
self.firstRun = False
|
|
|
|
self.percent_1 += 1.0 / self.duration_1
|
|
self.percent_1 = min(self.percent_1, 1)
|
|
if self.percent_1 < 1:
|
|
for i in range(12):
|
|
self.low_cmd.motor_cmd[i].q = (1 - self.percent_1) * self.startPos[i] + self.percent_1 * self.targetPos_1[i]
|
|
self.low_cmd.motor_cmd[i].dq = 0
|
|
self.low_cmd.motor_cmd[i].kp = self.Kp
|
|
self.low_cmd.motor_cmd[i].kd = self.Kd
|
|
self.low_cmd.motor_cmd[i].tau = 0
|
|
|
|
if (self.percent_1 == 1) and (self.percent_2 <= 1):
|
|
self.percent_2 += 1.0 / self.duration_2
|
|
self.percent_2 = min(self.percent_2, 1)
|
|
for i in range(12):
|
|
self.low_cmd.motor_cmd[i].q = (1 - self.percent_2) * self.targetPos_1[i] + self.percent_2 * self.targetPos_2[i]
|
|
self.low_cmd.motor_cmd[i].dq = 0
|
|
self.low_cmd.motor_cmd[i].kp = self.Kp
|
|
self.low_cmd.motor_cmd[i].kd = self.Kd
|
|
self.low_cmd.motor_cmd[i].tau = 0
|
|
|
|
if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 < 1):
|
|
self.percent_3 += 1.0 / self.duration_3
|
|
self.percent_3 = min(self.percent_3, 1)
|
|
for i in range(12):
|
|
self.low_cmd.motor_cmd[i].q = self.targetPos_2[i]
|
|
self.low_cmd.motor_cmd[i].dq = 0
|
|
self.low_cmd.motor_cmd[i].kp = self.Kp
|
|
self.low_cmd.motor_cmd[i].kd = self.Kd
|
|
self.low_cmd.motor_cmd[i].tau = 0
|
|
|
|
if self.percent_3 < 0.4:
|
|
for i in range(12, 16):
|
|
self.low_cmd.motor_cmd[i].q = 0
|
|
self.low_cmd.motor_cmd[i].kp = 0
|
|
self.low_cmd.motor_cmd[i].dq = 3
|
|
self.low_cmd.motor_cmd[i].kd = self.Kd
|
|
self.low_cmd.motor_cmd[i].tau = 0
|
|
|
|
if 0.4 <= self.percent_3 < 0.8:
|
|
for i in range(12, 16):
|
|
self.low_cmd.motor_cmd[i].q = 0
|
|
self.low_cmd.motor_cmd[i].kp = 0
|
|
self.low_cmd.motor_cmd[i].dq = -3
|
|
self.low_cmd.motor_cmd[i].kd = self.Kd
|
|
self.low_cmd.motor_cmd[i].tau = 0
|
|
|
|
if self.percent_3 >= 0.8:
|
|
for i in range(12, 16):
|
|
self.low_cmd.motor_cmd[i].q = 0
|
|
self.low_cmd.motor_cmd[i].kp = 0
|
|
self.low_cmd.motor_cmd[i].dq = 0
|
|
self.low_cmd.motor_cmd[i].kd = self.Kd
|
|
self.low_cmd.motor_cmd[i].tau = 0
|
|
|
|
if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 == 1) and (self.percent_4 <= 1):
|
|
self.percent_4 += 1.0 / self.duration_4
|
|
self.percent_4 = min(self.percent_4, 1)
|
|
for i in range(12):
|
|
self.low_cmd.motor_cmd[i].q = (1 - self.percent_4) * self.targetPos_2[i] + self.percent_4 * self.targetPos_3[i]
|
|
self.low_cmd.motor_cmd[i].dq = 0
|
|
self.low_cmd.motor_cmd[i].kp = self.Kp
|
|
self.low_cmd.motor_cmd[i].kd = self.Kd
|
|
self.low_cmd.motor_cmd[i].tau = 0
|
|
|
|
self.low_cmd.crc = self.crc.Crc(self.low_cmd)
|
|
self.lowcmd_publisher.Write(self.low_cmd)
|
|
|
|
if __name__ == '__main__':
|
|
|
|
print("WARNING: Please ensure there are no obstacles around the robot while running this example.")
|
|
input("Press Enter to continue...")
|
|
|
|
if len(sys.argv)>1:
|
|
ChannelFactoryInitialize(0, sys.argv[1])
|
|
else:
|
|
ChannelFactoryInitialize(0)
|
|
|
|
custom = Custom()
|
|
custom.Init()
|
|
custom.Start()
|
|
|
|
while True:
|
|
if custom.percent_4 == 1.0:
|
|
time.sleep(1)
|
|
print("Done!")
|
|
sys.exit(-1)
|
|
time.sleep(1) |