diff --git a/example/b2/high_level/b2_sport_client.py b/example/b2/high_level/b2_sport_client.py new file mode 100644 index 0000000..695d28e --- /dev/null +++ b/example/b2/high_level/b2_sport_client.py @@ -0,0 +1,105 @@ +import time +import sys +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.b2.sport.sport_client import SportClient +import math +from dataclasses import dataclass + +@dataclass +class TestOption: + name: str + id: int + +option_list = [ + TestOption(name="damp", id=0), + TestOption(name="stand_up", id=1), + TestOption(name="stand_down", id=2), + TestOption(name="move forward", id=3), + TestOption(name="move lateral", id=4), + TestOption(name="move rotate", id=5), + TestOption(name="stop_move", id=6), + TestOption(name="switch_gait", id=7), + TestOption(name="switch_gait", id=8), + TestOption(name="recovery", id=9), + TestOption(name="balanced stand", id=10) +] + +class UserInterface: + def __init__(self): + self.test_option_ = None + + def convert_to_int(self, input_str): + try: + return int(input_str) + except ValueError: + return None + + def terminal_handle(self): + input_str = input("Enter id or name: \n") + + if input_str == "list": + self.test_option_.name = None + self.test_option_.id = None + for option in option_list: + print(f"{option.name}, id: {option.id}") + return + + for option in option_list: + if input_str == option.name or self.convert_to_int(input_str) == option.id: + self.test_option_.name = option.name + self.test_option_.id = option.id + print(f"Test: {self.test_option_.name}, test_id: {self.test_option_.id}") + return + + print("No matching test option found.") + +if __name__ == "__main__": + + if len(sys.argv) < 2: + print(f"Usage: python3 {sys.argv[0]} networkInterface") + sys.exit(-1) + + print("WARNING: Please ensure there are no obstacles around the robot while running this example.") + input("Press Enter to continue...") + + ChannelFactoryInitialize(0, sys.argv[1]) + + test_option = TestOption(name=None, id=None) + user_interface = UserInterface() + user_interface.test_option_ = test_option + + sport_client = SportClient() + sport_client.SetTimeout(10.0) + sport_client.Init() + + print("Input \"list\" to list all test option ...") + + while True: + user_interface.terminal_handle() + + print(f"Updated Test Option: Name = {test_option.name}, ID = {test_option.id}") + + if test_option.id == 0: + sport_client.Damp() + elif test_option.id == 1: + sport_client.StandUp() + elif test_option.id == 2: + sport_client.StandDown() + elif test_option.id == 3: + sport_client.Move(0.3,0,0) + elif test_option.id == 4: + sport_client.Move(0,0.3,0) + elif test_option.id == 5: + sport_client.Move(0,0,0.5) + elif test_option.id == 6: + sport_client.StopMove() + elif test_option.id == 7: + sport_client.SwitchGait(0) + elif test_option.id == 8: + sport_client.SwitchGait(1) + elif test_option.id == 9: + sport_client.RecoveryStand() + elif test_option.id == 10: + sport_client.BalanceStand() + + time.sleep(1) diff --git a/example/b2/low_level/b2_stand_example.py b/example/b2/low_level/b2_stand_example.py new file mode 100644 index 0000000..a091630 --- /dev/null +++ b/example/b2/low_level/b2_stand_example.py @@ -0,0 +1,175 @@ +import time +import sys + +from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_ +from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_ +from unitree_sdk2py.utils.thread import RecurrentThread +import unitree_legged_const as b2 +from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient +from unitree_sdk2py.b2.sport.sport_client import SportClient + +from unitree_sdk2py.utils.crc import CRC + +class Custom: + def __init__(self): + self.Kp = 1000.0 + self.Kd = 10.0 + self.time_consume = 0 + self.rate_count = 0 + self.sin_count = 0 + self.motiontime = 0 + self.dt = 0.002 + + self.low_cmd = unitree_go_msg_dds__LowCmd_() + self.low_state = None + + self.targetPos_1 = [0.0, 1.36, -2.65, 0.0, 1.36, -2.65, + -0.2, 1.36, -2.65, 0.2, 1.36, -2.65] + + self.targetPos_2 = [0.0, 0.67, -1.3, 0.0, 0.67, -1.3, + 0.0, 0.67, -1.3, 0.0, 0.67, -1.3] + + self.targetPos_3 = [-0.5, 1.36, -2.65, 0.5, 1.36, -2.65, + -0.5, 1.36, -2.65, 0.5, 1.36, -2.65] + + self.startPos = [0.0] * 12 + self.duration_1 = 500 + self.duration_2 = 900 + self.duration_3 = 1000 + self.duration_4 = 900 + self.percent_1 = 0 + self.percent_2 = 0 + self.percent_3 = 0 + self.percent_4 = 0 + + self.firstRun = True + self.done = False + + # thread handling + self.lowCmdWriteThreadPtr = None + + self.crc = CRC() + + def Init(self): + self.InitLowCmd() + + # create publisher # + self.lowcmd_publisher = ChannelPublisher("rt/lowcmd", LowCmd_) + self.lowcmd_publisher.Init() + + # create subscriber # + self.lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_) + self.lowstate_subscriber.Init(self.LowStateMessageHandler, 10) + + self.sc = SportClient() + self.sc.SetTimeout(5.0) + self.sc.Init() + + self.msc = MotionSwitcherClient() + self.msc.SetTimeout(5.0) + self.msc.Init() + + status, result = self.msc.CheckMode() + while result['name']: + self.sc.StandDown() + self.msc.ReleaseMode() + status, result = self.msc.CheckMode() + time.sleep(1) + + + def Start(self): + self.lowCmdWriteThreadPtr = RecurrentThread( + interval=0.002, target=self.LowCmdWrite, name="writebasiccmd" + ) + self.lowCmdWriteThreadPtr.Start() + + def InitLowCmd(self): + self.low_cmd.head[0] = 0xFE + self.low_cmd.head[1] = 0xEF + self.low_cmd.level_flag = 0xFF + self.low_cmd.gpio = 0 + for i in range(20): + self.low_cmd.motor_cmd[i].mode = 0x01 + self.low_cmd.motor_cmd[i].q= b2.PosStopF + self.low_cmd.motor_cmd[i].kp = 0 + self.low_cmd.motor_cmd[i].dq = b2.VelStopF + self.low_cmd.motor_cmd[i].kd = 0 + self.low_cmd.motor_cmd[i].tau = 0 + + def LowStateMessageHandler(self, msg: LowState_): + self.low_state = msg + + def LowCmdWrite(self): + + if self.firstRun: + for i in range(12): + self.startPos[i] = self.low_state.motor_state[i].q + self.firstRun = False + + self.percent_1 += 1.0 / self.duration_1 + self.percent_1 = min(self.percent_1, 1) + if self.percent_1 < 1: + for i in range(12): + self.low_cmd.motor_cmd[i].q = (1 - self.percent_1) * self.startPos[i] + self.percent_1 * self.targetPos_1[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if (self.percent_1 == 1) and (self.percent_2 <= 1): + self.percent_2 += 1.0 / self.duration_2 + self.percent_2 = min(self.percent_2, 1) + for i in range(12): + self.low_cmd.motor_cmd[i].q = (1 - self.percent_2) * self.targetPos_1[i] + self.percent_2 * self.targetPos_2[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 < 1): + self.percent_3 += 1.0 / self.duration_3 + self.percent_3 = min(self.percent_3, 1) + for i in range(12): + self.low_cmd.motor_cmd[i].q = self.targetPos_2[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 == 1) and (self.percent_4 <= 1): + self.percent_4 += 1.0 / self.duration_4 + self.percent_4 = min(self.percent_4, 1) + for i in range(12): + self.low_cmd.motor_cmd[i].q = (1 - self.percent_4) * self.targetPos_2[i] + self.percent_4 * self.targetPos_3[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + self.low_cmd.crc = self.crc.Crc(self.low_cmd) + self.lowcmd_publisher.Write(self.low_cmd) + +if __name__ == '__main__': + + print("WARNING: Please ensure there are no obstacles around the robot while running this example.") + input("Press Enter to continue...") + + if len(sys.argv)>1: + ChannelFactoryInitialize(0, sys.argv[1]) + else: + ChannelFactoryInitialize(0) + + custom = Custom() + custom.Init() + custom.Start() + + while True: + if custom.percent_4 == 1.0: + time.sleep(1) + print("Done!") + sys.exit(-1) + time.sleep(1) \ No newline at end of file diff --git a/example/low_level/unitree_legged_const.py b/example/b2/low_level/unitree_legged_const.py similarity index 100% rename from example/low_level/unitree_legged_const.py rename to example/b2/low_level/unitree_legged_const.py diff --git a/example/b2w/high_level/b2w_sport_client.py b/example/b2w/high_level/b2w_sport_client.py new file mode 100644 index 0000000..e324f75 --- /dev/null +++ b/example/b2w/high_level/b2w_sport_client.py @@ -0,0 +1,101 @@ +import time +import sys +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.b2.sport.sport_client import SportClient +import math +from dataclasses import dataclass + +@dataclass +class TestOption: + name: str + id: int + +option_list = [ + TestOption(name="damp", id=0), + TestOption(name="stand_up", id=1), + TestOption(name="stand_down", id=2), + TestOption(name="move forward", id=3), + TestOption(name="move lateral", id=4), + TestOption(name="move rotate", id=5), + TestOption(name="stop_move", id=6), + TestOption(name="switch_gait", id=7), + TestOption(name="switch_gait", id=8), + TestOption(name="recovery", id=9) +] + +class UserInterface: + def __init__(self): + self.test_option_ = None + + def convert_to_int(self, input_str): + try: + return int(input_str) + except ValueError: + return None + + def terminal_handle(self): + input_str = input("Enter id or name: \n") + + if input_str == "list": + self.test_option_.name = None + self.test_option_.id = None + for option in option_list: + print(f"name: {option.name}, id: {option.id}") + return + + for option in option_list: + if input_str == option.name or self.convert_to_int(input_str) == option.id: + self.test_option_.name = option.name + self.test_option_.id = option.id + print(f"Test: {self.test_option_.name}, test_id: {self.test_option_.id}") + return + + print("No matching test option found.") + +if __name__ == "__main__": + if len(sys.argv) < 2: + print(f"Usage: python3 {sys.argv[0]} networkInterface") + sys.exit(-1) + + print("WARNING: Please ensure there are no obstacles around the robot while running this example.") + input("Press Enter to continue...") + + ChannelFactoryInitialize(0, sys.argv[1]) + + test_option = TestOption(name=None, id=None) + user_interface = UserInterface() + user_interface.test_option_ = test_option + + sport_client = SportClient() + sport_client.SetTimeout(10.0) + sport_client.Init() + + print("Input \"list\" to list all test option ...") + + while True: + user_interface.terminal_handle() + + print(f"Updated Test Option: Name = {test_option.name}, ID = {test_option.id}\n") + + if test_option.id == 0: + sport_client.Damp() + elif test_option.id == 1: + sport_client.StandUp() + elif test_option.id == 2: + sport_client.StandDown() + elif test_option.id == 3: + sport_client.Move(0.3,0,0) + elif test_option.id == 4: + sport_client.Move(0,0.3,0) + elif test_option.id == 5: + sport_client.Move(0,0,0.5) + elif test_option.id == 6: + sport_client.StopMove() + elif test_option.id == 7: + sport_client.SwitchGait(0) + elif test_option.id == 8: + sport_client.SwitchGait(1) + elif test_option.id == 9: + sport_client.RecoveryStand() + + time.sleep(1) diff --git a/example/b2w/low_level/b2w_stand_example.py b/example/b2w/low_level/b2w_stand_example.py new file mode 100644 index 0000000..b535059 --- /dev/null +++ b/example/b2w/low_level/b2w_stand_example.py @@ -0,0 +1,196 @@ +import time +import sys + +from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_ +from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_ +from unitree_sdk2py.utils.crc import CRC +from unitree_sdk2py.utils.thread import RecurrentThread +import unitree_legged_const as b2w +from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient +from unitree_sdk2py.b2.sport.sport_client import SportClient + +class Custom: + def __init__(self): + self.Kp = 1000.0 + self.Kd = 10.0 + self.time_consume = 0 + self.rate_count = 0 + self.sin_count = 0 + self.motiontime = 0 + self.dt = 0.002 + + self.low_cmd = unitree_go_msg_dds__LowCmd_() + self.low_state = None + + self.targetPos_1 = [0.0, 1.36, -2.65, 0.0, 1.36, -2.65, + -0.2, 1.36, -2.65, 0.2, 1.36, -2.65] + + self.targetPos_2 = [0.0, 0.67, -1.3, 0.0, 0.67, -1.3, + 0.0, 0.67, -1.3, 0.0, 0.67, -1.3] + + self.targetPos_3 = [-0.65, 1.36, -2.65, 0.65, 1.36, -2.65, + -0.65, 1.36, -2.65, 0.65, 1.36, -2.65] + + self.startPos = [0.0] * 12 + self.duration_1 = 800 + self.duration_2 = 800 + self.duration_3 = 2000 + self.duration_4 = 1500 + self.percent_1 = 0 + self.percent_2 = 0 + self.percent_3 = 0 + self.percent_4 = 0 + + self.firstRun = True + self.done = False + + # thread handling + self.lowCmdWriteThreadPtr = None + + self.crc = CRC() + + def Init(self): + self.InitLowCmd() + + # create publisher # + self.lowcmd_publisher = ChannelPublisher("rt/lowcmd", LowCmd_) + self.lowcmd_publisher.Init() + + # create subscriber # + self.lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_) + self.lowstate_subscriber.Init(self.LowStateMessageHandler, 10) + + self.sc = SportClient() + self.sc.SetTimeout(5.0) + self.sc.Init() + + self.msc = MotionSwitcherClient() + self.msc.SetTimeout(5.0) + self.msc.Init() + + status, result = self.msc.CheckMode() + while result['name']: + self.sc.StandDown() + self.msc.ReleaseMode() + status, result = self.msc.CheckMode() + time.sleep(1) + + def Start(self): + self.lowCmdWriteThreadPtr = RecurrentThread( + name="writebasiccmd", interval=self.dt, target=self.LowCmdWrite, + ) + self.lowCmdWriteThreadPtr.Start() + + def InitLowCmd(self): + self.low_cmd.head[0] = 0xFE + self.low_cmd.head[1] = 0xEF + self.low_cmd.level_flag = 0xFF + self.low_cmd.gpio = 0 + for i in range(20): + self.low_cmd.motor_cmd[i].mode = 0x01 # (PMSM) mode + self.low_cmd.motor_cmd[i].q = b2w.PosStopF + self.low_cmd.motor_cmd[i].kp = 0 + self.low_cmd.motor_cmd[i].dq = b2w.VelStopF + self.low_cmd.motor_cmd[i].kd = 0 + self.low_cmd.motor_cmd[i].tau = 0 + + def LowStateMessageHandler(self, msg: LowState_): + self.low_state = msg + + def LowCmdWrite(self): + if self.firstRun: + for i in range(12): + self.startPos[i] = self.low_state.motor_state[i].q + self.firstRun = False + + self.percent_1 += 1.0 / self.duration_1 + self.percent_1 = min(self.percent_1, 1) + if self.percent_1 < 1: + for i in range(12): + self.low_cmd.motor_cmd[i].q = (1 - self.percent_1) * self.startPos[i] + self.percent_1 * self.targetPos_1[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if (self.percent_1 == 1) and (self.percent_2 <= 1): + self.percent_2 += 1.0 / self.duration_2 + self.percent_2 = min(self.percent_2, 1) + for i in range(12): + self.low_cmd.motor_cmd[i].q = (1 - self.percent_2) * self.targetPos_1[i] + self.percent_2 * self.targetPos_2[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 < 1): + self.percent_3 += 1.0 / self.duration_3 + self.percent_3 = min(self.percent_3, 1) + for i in range(12): + self.low_cmd.motor_cmd[i].q = self.targetPos_2[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if self.percent_3 < 0.4: + for i in range(12, 16): + self.low_cmd.motor_cmd[i].q = 0 + self.low_cmd.motor_cmd[i].kp = 0 + self.low_cmd.motor_cmd[i].dq = 3 + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if 0.4 <= self.percent_3 < 0.8: + for i in range(12, 16): + self.low_cmd.motor_cmd[i].q = 0 + self.low_cmd.motor_cmd[i].kp = 0 + self.low_cmd.motor_cmd[i].dq = -3 + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if self.percent_3 >= 0.8: + for i in range(12, 16): + self.low_cmd.motor_cmd[i].q = 0 + self.low_cmd.motor_cmd[i].kp = 0 + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 == 1) and (self.percent_4 <= 1): + self.percent_4 += 1.0 / self.duration_4 + self.percent_4 = min(self.percent_4, 1) + for i in range(12): + self.low_cmd.motor_cmd[i].q = (1 - self.percent_4) * self.targetPos_2[i] + self.percent_4 * self.targetPos_3[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + self.low_cmd.crc = self.crc.Crc(self.low_cmd) + self.lowcmd_publisher.Write(self.low_cmd) + +if __name__ == '__main__': + + print("WARNING: Please ensure there are no obstacles around the robot while running this example.") + input("Press Enter to continue...") + + if len(sys.argv)>1: + ChannelFactoryInitialize(0, sys.argv[1]) + else: + ChannelFactoryInitialize(0) + + custom = Custom() + custom.Init() + custom.Start() + + while True: + if custom.percent_4 == 1.0: + time.sleep(1) + print("Done!") + sys.exit(-1) + time.sleep(1) \ No newline at end of file diff --git a/example/b2w/low_level/unitree_legged_const.py b/example/b2w/low_level/unitree_legged_const.py new file mode 100644 index 0000000..7943e31 --- /dev/null +++ b/example/b2w/low_level/unitree_legged_const.py @@ -0,0 +1,24 @@ +LegID = { + "FR_0": 0, # Front right hip + "FR_1": 1, # Front right thigh + "FR_2": 2, # Front right calf + "FL_0": 3, + "FL_1": 4, + "FL_2": 5, + "RR_0": 6, + "RR_1": 7, + "RR_2": 8, + "RL_0": 9, + "RL_1": 10, + "RL_2": 11, + "FR_w": 12, # Front right wheel + "FL_w": 13, # Front left wheel + "RR_w": 14, # Rear right wheel + "RL_w": 15, # Rear left wheel +} + +HIGHLEVEL = 0xEE +LOWLEVEL = 0xFF +TRIGERLEVEL = 0xF0 +PosStopF = 2.146e9 +VelStopF = 16000.0 diff --git a/example/g1/high_level/g1_loco_client_example.py b/example/g1/high_level/g1_loco_client_example.py new file mode 100644 index 0000000..78e3ad2 --- /dev/null +++ b/example/g1/high_level/g1_loco_client_example.py @@ -0,0 +1,99 @@ +import time +import sys +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_ +from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient +import math +from dataclasses import dataclass + +@dataclass +class TestOption: + name: str + id: int + +option_list = [ + TestOption(name="damp", id=0), + TestOption(name="stand_up", id=1), + TestOption(name="sit", id=2), + TestOption(name="move forward", id=3), + TestOption(name="move lateral", id=4), + TestOption(name="move rotate", id=5), + TestOption(name="low stand", id=6), + TestOption(name="high stand", id=7), + TestOption(name="zero torque", id=8) +] + +class UserInterface: + def __init__(self): + self.test_option_ = None + + def convert_to_int(self, input_str): + try: + return int(input_str) + except ValueError: + return None + + def terminal_handle(self): + input_str = input("Enter id or name: \n") + + if input_str == "list": + self.test_option_.name = None + self.test_option_.id = None + for option in option_list: + print(f"{option.name}, id: {option.id}") + return + + for option in option_list: + if input_str == option.name or self.convert_to_int(input_str) == option.id: + self.test_option_.name = option.name + self.test_option_.id = option.id + print(f"Test: {self.test_option_.name}, test_id: {self.test_option_.id}") + return + + print("No matching test option found.") + +if __name__ == "__main__": + if len(sys.argv) < 2: + print(f"Usage: python3 {sys.argv[0]} networkInterface") + sys.exit(-1) + + print("WARNING: Please ensure there are no obstacles around the robot while running this example.") + input("Press Enter to continue...") + + ChannelFactoryInitialize(0, sys.argv[1]) + + test_option = TestOption(name=None, id=None) + user_interface = UserInterface() + user_interface.test_option_ = test_option + + sport_client = LocoClient() + sport_client.SetTimeout(10.0) + sport_client.Init() + + print("Input \"list\" to list all test option ...") + while True: + user_interface.terminal_handle() + + print(f"Updated Test Option: Name = {test_option.name}, ID = {test_option.id}") + + if test_option.id == 0: + sport_client.Damp() + elif test_option.id == 1: + sport_client.StandUp() + elif test_option.id == 2: + sport_client.Sit() + elif test_option.id == 3: + sport_client.Move(0.3,0,0) + elif test_option.id == 4: + sport_client.Move(0,0.3,0) + elif test_option.id == 5: + sport_client.Move(0,0,0.3) + elif test_option.id == 6: + sport_client.LowStand() + elif test_option.id == 7: + sport_client.HighStand() + elif test_option.id == 8: + sport_client.ZeroTorque() + + time.sleep(1) \ No newline at end of file diff --git a/example/g1/low_level/g1_ankle_swing_example.py b/example/g1/low_level/g1_ankle_swing_example.py new file mode 100644 index 0000000..9189912 --- /dev/null +++ b/example/g1/low_level/g1_ankle_swing_example.py @@ -0,0 +1,198 @@ +import time +import sys + +from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ +from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowState_ +from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ +from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_ +from unitree_sdk2py.utils.crc import CRC +from unitree_sdk2py.utils.thread import RecurrentThread +from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient + +import numpy as np + +G1_NUM_MOTOR = 29 + +Kp = [ + 60, 60, 60, 100, 40, 40, # legs + 60, 60, 60, 100, 40, 40, # legs + 60, 40, 40, # waist + 40, 40, 40, 40, 40, 40, 40, # arms + 40, 40, 40, 40, 40, 40, 40 # arms +] + +Kd = [ + 1, 1, 1, 2, 1, 1, # legs + 1, 1, 1, 2, 1, 1, # legs + 1, 1, 1, # waist + 1, 1, 1, 1, 1, 1, 1, # arms + 1, 1, 1, 1, 1, 1, 1 # arms +] + +class G1JointIndex: + LeftHipPitch = 0 + LeftHipRoll = 1 + LeftHipYaw = 2 + LeftKnee = 3 + LeftAnklePitch = 4 + LeftAnkleB = 4 + LeftAnkleRoll = 5 + LeftAnkleA = 5 + RightHipPitch = 6 + RightHipRoll = 7 + RightHipYaw = 8 + RightKnee = 9 + RightAnklePitch = 10 + RightAnkleB = 10 + RightAnkleRoll = 11 + RightAnkleA = 11 + WaistYaw = 12 + WaistRoll = 13 # NOTE: INVALID for g1 23dof/29dof with waist locked + WaistA = 13 # NOTE: INVALID for g1 23dof/29dof with waist locked + WaistPitch = 14 # NOTE: INVALID for g1 23dof/29dof with waist locked + WaistB = 14 # NOTE: INVALID for g1 23dof/29dof with waist locked + LeftShoulderPitch = 15 + LeftShoulderRoll = 16 + LeftShoulderYaw = 17 + LeftElbow = 18 + LeftWristRoll = 19 + LeftWristPitch = 20 # NOTE: INVALID for g1 23dof + LeftWristYaw = 21 # NOTE: INVALID for g1 23dof + RightShoulderPitch = 22 + RightShoulderRoll = 23 + RightShoulderYaw = 24 + RightElbow = 25 + RightWristRoll = 26 + RightWristPitch = 27 # NOTE: INVALID for g1 23dof + RightWristYaw = 28 # NOTE: INVALID for g1 23dof + + +class Mode: + PR = 0 # Series Control for Pitch/Roll Joints + AB = 1 # Parallel Control for A/B Joints + +class Custom: + def __init__(self): + self.time_ = 0.0 + self.control_dt_ = 0.002 # [2ms] + self.duration_ = 3.0 # [3 s] + self.counter_ = 0 + self.mode_pr_ = Mode.PR + self.mode_machine_ = 0 + self.low_cmd = unitree_hg_msg_dds__LowCmd_() + self.low_state = None + self.update_mode_machine_ = False + self.crc = CRC() + + def Init(self): + self.msc = MotionSwitcherClient() + self.msc.SetTimeout(5.0) + self.msc.Init() + + status, result = self.msc.CheckMode() + while result['name']: + self.msc.ReleaseMode() + status, result = self.msc.CheckMode() + time.sleep(1) + + # create publisher # + self.lowcmd_publisher_ = ChannelPublisher("rt/lowcmd", LowCmd_) + self.lowcmd_publisher_.Init() + + # create subscriber # + self.__lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_) + self.__lowstate_subscriber.Init(self.LowStateHandler, 10) + + def Start(self): + self.lowCmdWriteThreadPtr = RecurrentThread( + interval=self.control_dt_, target=self.__LowCmdWrite, name="control" + ) + while self.update_mode_machine_ == False: + time.sleep(1) + + if self.update_mode_machine_ == True: + self.lowCmdWriteThreadPtr.Start() + + def LowStateHandler(self, msg: LowState_): + self.low_state = msg + + if self.update_mode_machine_ == False: + self.mode_machine_ = self.low_state.mode_machine + self.update_mode_machine_ = True + + self.counter_ +=1 + if (self.counter_ % 500 == 0) : + self.counter_ = 0 + print(self.low_state.imu_state.rpy) + + def __LowCmdWrite(self): + self.time_ += self.control_dt_ + + if self.time_ < self.duration_ : + # [Stage 1]: set robot to zero posture + for i in range(G1_NUM_MOTOR): + ratio = np.clip(self.time_ / self.duration_, 0.0, 1.0) + self.low_cmd.mode_pr = Mode.PR + self.low_cmd.mode_machine = self.mode_machine_ + self.low_cmd.motor_cmd[i].mode = 1 # 1:Enable, 0:Disable + self.low_cmd.motor_cmd[i].tau = 0. + self.low_cmd.motor_cmd[i].q = (1.0 - ratio) * self.low_state.motor_state[i].q + self.low_cmd.motor_cmd[i].dq = 0. + self.low_cmd.motor_cmd[i].kp = Kp[i] + self.low_cmd.motor_cmd[i].kd = Kd[i] + + elif self.time_ < self.duration_ * 2 : + # [Stage 2]: swing ankle using PR mode + max_P = np.pi * 30.0 / 180.0 + max_R = np.pi * 10.0 / 180.0 + t = self.time_ - self.duration_ + L_P_des = max_P * np.sin(2.0 * np.pi * t) + L_R_des = max_R * np.sin(2.0 * np.pi * t) + R_P_des = max_P * np.sin(2.0 * np.pi * t) + R_R_des = -max_R * np.sin(2.0 * np.pi * t) + + self.low_cmd.mode_pr = Mode.PR + self.low_cmd.mode_machine = self.mode_machine_ + self.low_cmd.motor_cmd[G1JointIndex.LeftAnklePitch].q = L_P_des + self.low_cmd.motor_cmd[G1JointIndex.LeftAnkleRoll].q = L_R_des + self.low_cmd.motor_cmd[G1JointIndex.RightAnklePitch].q = R_P_des + self.low_cmd.motor_cmd[G1JointIndex.RightAnkleRoll].q = R_R_des + + else : + # [Stage 3]: swing ankle using AB mode + max_A = np.pi * 30.0 / 180.0 + max_B = np.pi * 10.0 / 180.0 + t = self.time_ - self.duration_ * 2 + L_A_des = max_A * np.sin(2.0 * np.pi * t) + L_B_des = max_B * np.sin(2.0 * np.pi * t + np.pi) + R_A_des = -max_A * np.sin(2.0 * np.pi * t) + R_B_des = -max_B * np.sin(2.0 * np.pi * t + np.pi) + + self.low_cmd.mode_pr = Mode.AB + self.low_cmd.mode_machine = self.mode_machine_ + self.low_cmd.motor_cmd[G1JointIndex.LeftAnkleA].q = L_A_des + self.low_cmd.motor_cmd[G1JointIndex.LeftAnkleB].q = L_B_des + self.low_cmd.motor_cmd[G1JointIndex.RightAnkleA].q = R_A_des + self.low_cmd.motor_cmd[G1JointIndex.RightAnkleB].q = R_B_des + + self.low_cmd.crc = self.crc.Crc(self.low_cmd) + self.lowcmd_publisher_.Write(self.low_cmd) + +if __name__ == '__main__': + + print("WARNING: Please ensure there are no obstacles around the robot while running this example.") + input("Press Enter to continue...") + + if len(sys.argv)>1: + ChannelFactoryInitialize(0, sys.argv[1]) + else: + ChannelFactoryInitialize(0) + + custom = Custom() + custom.Init() + custom.Start() + + while True: + time.sleep(1) \ No newline at end of file diff --git a/example/hg/readme.md b/example/g1/readme.md similarity index 100% rename from example/hg/readme.md rename to example/g1/readme.md diff --git a/example/go2/high_level/go2_sport_client.py b/example/go2/high_level/go2_sport_client.py new file mode 100644 index 0000000..d2d9ab0 --- /dev/null +++ b/example/go2/high_level/go2_sport_client.py @@ -0,0 +1,109 @@ +import time +import sys +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_ +from unitree_sdk2py.go2.sport.sport_client import ( + SportClient, + PathPoint, + SPORT_PATH_POINT_SIZE, +) +import math +from dataclasses import dataclass + +@dataclass +class TestOption: + name: str + id: int + +option_list = [ + TestOption(name="damp", id=0), + TestOption(name="stand_up", id=1), + TestOption(name="stand_down", id=2), + TestOption(name="move forward", id=3), + TestOption(name="move lateral", id=4), + TestOption(name="move rotate", id=5), + TestOption(name="stop_move", id=6), + TestOption(name="switch_gait", id=7), + TestOption(name="switch_gait", id=8), + TestOption(name="balanced stand", id=9), + TestOption(name="recovery", id=10) +] + +class UserInterface: + def __init__(self): + self.test_option_ = None + + def convert_to_int(self, input_str): + try: + return int(input_str) + except ValueError: + return None + + def terminal_handle(self): + input_str = input("Enter id or name: \n") + + if input_str == "list": + self.test_option_.name = None + self.test_option_.id = None + for option in option_list: + print(f"{option.name}, id: {option.id}") + return + + for option in option_list: + if input_str == option.name or self.convert_to_int(input_str) == option.id: + self.test_option_.name = option.name + self.test_option_.id = option.id + print(f"Test: {self.test_option_.name}, test_id: {self.test_option_.id}") + return + + print("No matching test option found.") + +if __name__ == "__main__": + + if len(sys.argv) < 2: + print(f"Usage: python3 {sys.argv[0]} networkInterface") + sys.exit(-1) + + print("WARNING: Please ensure there are no obstacles around the robot while running this example.") + input("Press Enter to continue...") + + ChannelFactoryInitialize(0, sys.argv[1]) + + test_option = TestOption(name=None, id=None) + user_interface = UserInterface() + user_interface.test_option_ = test_option + + sport_client = SportClient() + sport_client.SetTimeout(10.0) + sport_client.Init() + while True: + + user_interface.terminal_handle() + + print(f"Updated Test Option: Name = {test_option.name}, ID = {test_option.id}\n") + + if test_option.id == 0: + sport_client.Damp() + elif test_option.id == 1: + sport_client.StandUp() + elif test_option.id == 2: + sport_client.StandDown() + elif test_option.id == 3: + sport_client.Move(0.3,0,0) + elif test_option.id == 4: + sport_client.Move(0,0.3,0) + elif test_option.id == 5: + sport_client.Move(0,0,0.5) + elif test_option.id == 6: + sport_client.StopMove() + elif test_option.id == 7: + sport_client.SwitchGait(0) + elif test_option.id == 8: + sport_client.SwitchGait(1) + elif test_option.id == 9: + sport_client.BalanceStand() + elif test_option.id == 10: + sport_client.RecoveryStand() + + time.sleep(1) diff --git a/example/go2/low_level/go2_stand_example.py b/example/go2/low_level/go2_stand_example.py new file mode 100644 index 0000000..f2241ce --- /dev/null +++ b/example/go2/low_level/go2_stand_example.py @@ -0,0 +1,176 @@ +import time +import sys + +from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_ +from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_ +from unitree_sdk2py.utils.crc import CRC +from unitree_sdk2py.utils.thread import RecurrentThread +import unitree_legged_const as go2 +from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient +from unitree_sdk2py.go2.sport.sport_client import SportClient + +class Custom: + def __init__(self): + self.Kp = 60.0 + self.Kd = 5.0 + self.time_consume = 0 + self.rate_count = 0 + self.sin_count = 0 + self.motiontime = 0 + self.dt = 0.002 # 0.001~0.01 + + self.low_cmd = unitree_go_msg_dds__LowCmd_() + self.low_state = None + + self._targetPos_1 = [0.0, 1.36, -2.65, 0.0, 1.36, -2.65, + -0.2, 1.36, -2.65, 0.2, 1.36, -2.65] + self._targetPos_2 = [0.0, 0.67, -1.3, 0.0, 0.67, -1.3, + 0.0, 0.67, -1.3, 0.0, 0.67, -1.3] + self._targetPos_3 = [-0.35, 1.36, -2.65, 0.35, 1.36, -2.65, + -0.5, 1.36, -2.65, 0.5, 1.36, -2.65] + + self.startPos = [0.0] * 12 + self.duration_1 = 500 + self.duration_2 = 500 + self.duration_3 = 1000 + self.duration_4 = 900 + self.percent_1 = 0 + self.percent_2 = 0 + self.percent_3 = 0 + self.percent_4 = 0 + + self.firstRun = True + self.done = False + + # thread handling + self.lowCmdWriteThreadPtr = None + + self.crc = CRC() + + # Public methods + def Init(self): + self.InitLowCmd() + + # create publisher # + self.lowcmd_publisher = ChannelPublisher("rt/lowcmd", LowCmd_) + self.lowcmd_publisher.Init() + + # create subscriber # + self.lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_) + self.lowstate_subscriber.Init(self.LowStateMessageHandler, 10) + + self.sc = SportClient() + self.sc.SetTimeout(5.0) + self.sc.Init() + + self.msc = MotionSwitcherClient() + self.msc.SetTimeout(5.0) + self.msc.Init() + + status, result = self.msc.CheckMode() + while result['name']: + self.sc.StandDown() + self.msc.ReleaseMode() + status, result = self.msc.CheckMode() + time.sleep(1) + + def Start(self): + self.lowCmdWriteThreadPtr = RecurrentThread( + interval=0.002, target=self.LowCmdWrite, name="writebasiccmd" + ) + self.lowCmdWriteThreadPtr.Start() + + # Private methods + def InitLowCmd(self): + self.low_cmd.head[0]=0xFE + self.low_cmd.head[1]=0xEF + self.low_cmd.level_flag = 0xFF + self.low_cmd.gpio = 0 + for i in range(20): + self.low_cmd.motor_cmd[i].mode = 0x01 # (PMSM) mode + self.low_cmd.motor_cmd[i].q= go2.PosStopF + self.low_cmd.motor_cmd[i].kp = 0 + self.low_cmd.motor_cmd[i].dq = go2.VelStopF + self.low_cmd.motor_cmd[i].kd = 0 + self.low_cmd.motor_cmd[i].tau = 0 + + def LowStateMessageHandler(self, msg: LowState_): + self.low_state = msg + # print("FR_0 motor state: ", msg.motor_state[go2.LegID["FR_0"]]) + # print("IMU state: ", msg.imu_state) + # print("Battery state: voltage: ", msg.power_v, "current: ", msg.power_a) + + def LowCmdWrite(self): + + if self.firstRun: + for i in range(12): + self.startPos[i] = self.low_state.motor_state[i].q + self.firstRun = False + + self.percent_1 += 1.0 / self.duration_1 + self.percent_1 = min(self.percent_1, 1) + if self.percent_1 < 1: + for i in range(12): + self.low_cmd.motor_cmd[i].q = (1 - self.percent_1) * self.startPos[i] + self.percent_1 * self._targetPos_1[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if (self.percent_1 == 1) and (self.percent_2 <= 1): + self.percent_2 += 1.0 / self.duration_2 + self.percent_2 = min(self.percent_2, 1) + for i in range(12): + self.low_cmd.motor_cmd[i].q = (1 - self.percent_2) * self._targetPos_1[i] + self.percent_2 * self._targetPos_2[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 < 1): + self.percent_3 += 1.0 / self.duration_3 + self.percent_3 = min(self.percent_3, 1) + for i in range(12): + self.low_cmd.motor_cmd[i].q = self._targetPos_2[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 == 1) and (self.percent_4 <= 1): + self.percent_4 += 1.0 / self.duration_4 + self.percent_4 = min(self.percent_4, 1) + for i in range(12): + self.low_cmd.motor_cmd[i].q = (1 - self.percent_4) * self._targetPos_2[i] + self.percent_4 * self._targetPos_3[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + self.low_cmd.crc = self.crc.Crc(self.low_cmd) + self.lowcmd_publisher.Write(self.low_cmd) + +if __name__ == '__main__': + + print("WARNING: Please ensure there are no obstacles around the robot while running this example.") + input("Press Enter to continue...") + + if len(sys.argv)>1: + ChannelFactoryInitialize(0, sys.argv[1]) + else: + ChannelFactoryInitialize(0) + + custom = Custom() + custom.Init() + custom.Start() + + while True: + if custom.percent_4 == 1.0: + time.sleep(1) + print("Done!") + sys.exit(-1) + time.sleep(1) \ No newline at end of file diff --git a/example/go2/low_level/unitree_legged_const.py b/example/go2/low_level/unitree_legged_const.py new file mode 100644 index 0000000..153a051 --- /dev/null +++ b/example/go2/low_level/unitree_legged_const.py @@ -0,0 +1,20 @@ +LegID = { + "FR_0": 0, # Front right hip + "FR_1": 1, # Front right thigh + "FR_2": 2, # Front right calf + "FL_0": 3, + "FL_1": 4, + "FL_2": 5, + "RR_0": 6, + "RR_1": 7, + "RR_2": 8, + "RL_0": 9, + "RL_1": 10, + "RL_2": 11, +} + +HIGHLEVEL = 0xEE +LOWLEVEL = 0xFF +TRIGERLEVEL = 0xF0 +PosStopF = 2.146e9 +VelStopF = 16000.0 diff --git a/example/go2w/high_level/go2w_sport_client.py b/example/go2w/high_level/go2w_sport_client.py new file mode 100644 index 0000000..824a882 --- /dev/null +++ b/example/go2w/high_level/go2w_sport_client.py @@ -0,0 +1,99 @@ +import time +import sys +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_ +from unitree_sdk2py.go2.sport.sport_client import SportClient +import math +from dataclasses import dataclass + +@dataclass +class TestOption: + name: str + id: int + +option_list = [ + TestOption(name="damp", id=0), + TestOption(name="stand_up", id=1), + TestOption(name="stand_down", id=2), + TestOption(name="move", id=3), + TestOption(name="stop_move", id=4), + TestOption(name="speed_level", id=5), + TestOption(name="switch_gait", id=6), + TestOption(name="get_state", id=7), + TestOption(name="recovery", id=8), + TestOption(name="balance", id=9) +] + +class UserInterface: + def __init__(self): + self.test_option_ = None + + def convert_to_int(self, input_str): + try: + return int(input_str) + except ValueError: + return None + + def terminal_handle(self): + input_str = input("Enter id or name: \n") + + if input_str == "list": + self.test_option_.name = None + self.test_option_.id = None + for option in option_list: + print(f"{option.name}, id: {option.id}") + return + + for option in option_list: + if input_str == option.name or self.convert_to_int(input_str) == option.id: + self.test_option_.name = option.name + self.test_option_.id = option.id + print(f"Test: {self.test_option_.name}, test_id: {self.test_option_.id}") + return + + print("No matching test option found.") + +if __name__ == "__main__": + if len(sys.argv) < 2: + print(f"Usage: python3 {sys.argv[0]} networkInterface") + sys.exit(-1) + + print("WARNING: Please ensure there are no obstacles around the robot while running this example.") + input("Press Enter to continue...") + + ChannelFactoryInitialize(0, sys.argv[1]) + + test_option = TestOption(name=None, id=None) + user_interface = UserInterface() + user_interface.test_option_ = test_option + + sport_client = SportClient() + sport_client.SetTimeout(10.0) + sport_client.Init() + + while True: + user_interface.terminal_handle() + + print(f"Updated Test Option: Name = {test_option.name}, ID = {test_option.id}\n") + + if test_option.id == 0: + sport_client.Damp() + elif test_option.id == 1: + sport_client.StandUp() + elif test_option.id == 2: + sport_client.StandDown() + elif test_option.id == 3: + sport_client.Move(0.5,0,0) + elif test_option.id == 4: + sport_client.StopMove() + elif test_option.id == 5: + sport_client.SpeedLevel(1) + elif test_option.id == 6: + sport_client.SwitchGait(1) + elif test_option.id == 8: + sport_client.RecoveryStand() + elif test_option.id == 9: + sport_client.BalanceStand() + + time.sleep(1) diff --git a/example/go2w/low_level/go2w_stand_example.py b/example/go2w/low_level/go2w_stand_example.py new file mode 100644 index 0000000..e3a54de --- /dev/null +++ b/example/go2w/low_level/go2w_stand_example.py @@ -0,0 +1,196 @@ +import time +import sys + +from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_ +from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_ +from unitree_sdk2py.utils.crc import CRC +from unitree_sdk2py.utils.thread import RecurrentThread +import unitree_legged_const as go2w +from unitree_sdk2py.go2.robot_state.robot_state_client import RobotStateClient +from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient +from unitree_sdk2py.go2.sport.sport_client import SportClient + +class Custom: + def __init__(self): + self.Kp = 70.0 + self.Kd = 5.0 + self.time_consume = 0 + self.rate_count = 0 + self.sin_count = 0 + self.motiontime = 0 + self.dt = 0.002 + + self.low_cmd = unitree_go_msg_dds__LowCmd_() + self.low_state = None + + self.targetPos_1 = [0.0, 1.36, -2.65, 0.0, 1.36, -2.65, + -0.2, 1.36, -2.65, 0.2, 1.36, -2.65] + + self.targetPos_2 = [0.0, 0.67, -1.3, 0.0, 0.67, -1.3, + 0.0, 0.67, -1.3, 0.0, 0.67, -1.3] + + self.targetPos_3 = [-0.35, 1.36, -2.65, 0.35, 1.36, -2.65, + -0.5, 1.36, -2.65, 0.5, 1.36, -2.65] + + self.startPos = [0.0] * 12 + self.duration_1 = 500 + self.duration_2 = 500 + self.duration_3 = 2000 + self.duration_4 = 900 + self.percent_1 = 0 + self.percent_2 = 0 + self.percent_3 = 0 + self.percent_4 = 0 + + self.firstRun = True + self.done = False + + # thread handling + self.lowCmdWriteThreadPtr = None + + self.crc = CRC() + + # Public methods + def Init(self): + self.InitLowCmd() + + # create publisher # + self.lowcmd_publisher = ChannelPublisher("rt/lowcmd", LowCmd_) + self.lowcmd_publisher.Init() + + # create subscriber # + self.lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_) + self.lowstate_subscriber.Init(self.LowStateMessageHandler, 10) + + self.sc = SportClient() + self.sc.SetTimeout(5.0) + self.sc.Init() + + self.msc = MotionSwitcherClient() + self.msc.SetTimeout(5.0) + self.msc.Init() + + status, result = self.msc.CheckMode() + while result['name']: + self.sc.StandUp() + self.sc.StandDown() + self.msc.ReleaseMode() + status, result = self.msc.CheckMode() + time.sleep(1) + + def Start(self): + self.lowCmdWriteThreadPtr = RecurrentThread( + name="writebasiccmd", interval=0.002, target=self.LowCmdWrite, + ) + self.lowCmdWriteThreadPtr.Start() + + def InitLowCmd(self): + self.low_cmd.head[0] = 0xFE + self.low_cmd.head[1] = 0xEF + self.low_cmd.level_flag = 0xFF + self.low_cmd.gpio = 0 + for i in range(20): + self.low_cmd.motor_cmd[i].mode = 0x01 + self.low_cmd.motor_cmd[i].q= go2w.PosStopF + self.low_cmd.motor_cmd[i].kp = 0 + self.low_cmd.motor_cmd[i].dq = go2w.VelStopF + self.low_cmd.motor_cmd[i].kd = 0 + self.low_cmd.motor_cmd[i].tau = 0 + + def LowStateMessageHandler(self, msg: LowState_): + self.low_state = msg + + def LowCmdWrite(self): + if self.firstRun: + for i in range(12): + self.startPos[i] = self.low_state.motor_state[i].q + self.firstRun = False + + self.percent_1 += 1.0 / self.duration_1 + self.percent_1 = min(self.percent_1, 1) + if self.percent_1 < 1: + for i in range(12): + self.low_cmd.motor_cmd[i].q = (1 - self.percent_1) * self.startPos[i] + self.percent_1 * self.targetPos_1[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if (self.percent_1 == 1) and (self.percent_2 <= 1): + self.percent_2 += 1.0 / self.duration_2 + self.percent_2 = min(self.percent_2, 1) + for i in range(12): + self.low_cmd.motor_cmd[i].q = (1 - self.percent_2) * self.targetPos_1[i] + self.percent_2 * self.targetPos_2[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 < 1): + self.percent_3 += 1.0 / self.duration_3 + self.percent_3 = min(self.percent_3, 1) + for i in range(12): + self.low_cmd.motor_cmd[i].q = self.targetPos_2[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if self.percent_3 < 0.4: + for i in range(12, 16): + self.low_cmd.motor_cmd[i].q = 0 + self.low_cmd.motor_cmd[i].kp = 0.0 + self.low_cmd.motor_cmd[i].dq = 3 + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if 0.4 <= self.percent_3 < 0.8: + for i in range(12, 16): + self.low_cmd.motor_cmd[i].q = 0 + self.low_cmd.motor_cmd[i].kp = 0 + self.low_cmd.motor_cmd[i].dq = -3 + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if self.percent_3 >= 0.8: + for i in range(12, 16): + self.low_cmd.motor_cmd[i].q = 0 + self.low_cmd.motor_cmd[i].kp = 0 + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 == 1) and (self.percent_4 <= 1): + self.percent_4 += 1.0 / self.duration_4 + self.percent_4 = min(self.percent_4, 1) + for i in range(12): + self.low_cmd.motor_cmd[i].q = (1 - self.percent_4) * self.targetPos_2[i] + self.percent_4 * self.targetPos_3[i] + self.low_cmd.motor_cmd[i].dq = 0 + self.low_cmd.motor_cmd[i].kp = self.Kp + self.low_cmd.motor_cmd[i].kd = self.Kd + self.low_cmd.motor_cmd[i].tau = 0 + + self.low_cmd.crc = self.crc.Crc(self.low_cmd) + self.lowcmd_publisher.Write(self.low_cmd) + +if __name__ == '__main__': + + if len(sys.argv)>1: + ChannelFactoryInitialize(0, sys.argv[1]) + else: + ChannelFactoryInitialize(0) + + custom = Custom() + custom.Init() + custom.Start() + + while True: + if custom.percent_4 == 1.0: + time.sleep(1) + print("Done!") + sys.exit(-1) + time.sleep(1) \ No newline at end of file diff --git a/example/go2w/low_level/unitree_legged_const.py b/example/go2w/low_level/unitree_legged_const.py new file mode 100644 index 0000000..7943e31 --- /dev/null +++ b/example/go2w/low_level/unitree_legged_const.py @@ -0,0 +1,24 @@ +LegID = { + "FR_0": 0, # Front right hip + "FR_1": 1, # Front right thigh + "FR_2": 2, # Front right calf + "FL_0": 3, + "FL_1": 4, + "FL_2": 5, + "RR_0": 6, + "RR_1": 7, + "RR_2": 8, + "RL_0": 9, + "RL_1": 10, + "RL_2": 11, + "FR_w": 12, # Front right wheel + "FL_w": 13, # Front left wheel + "RR_w": 14, # Rear right wheel + "RL_w": 15, # Rear left wheel +} + +HIGHLEVEL = 0xEE +LOWLEVEL = 0xFF +TRIGERLEVEL = 0xF0 +PosStopF = 2.146e9 +VelStopF = 16000.0 diff --git a/example/h1/high_level/h1_loco_client_example.py b/example/h1/high_level/h1_loco_client_example.py new file mode 100644 index 0000000..4432d8a --- /dev/null +++ b/example/h1/high_level/h1_loco_client_example.py @@ -0,0 +1,96 @@ +import time +import sys +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_ +from unitree_sdk2py.h1.loco.h1_loco_client import LocoClient +import math +from dataclasses import dataclass + +@dataclass +class TestOption: + name: str + id: int + +option_list = [ + TestOption(name="damp", id=0), + TestOption(name="stand_up", id=1), + TestOption(name="move forward", id=3), + TestOption(name="move lateral", id=4), + TestOption(name="move rotate", id=5), + TestOption(name="low stand", id=6), + TestOption(name="high stand", id=7), + TestOption(name="zero torque", id=8) +] + +class UserInterface: + def __init__(self): + self.test_option_ = None + + def convert_to_int(self, input_str): + try: + return int(input_str) + except ValueError: + return None + + def terminal_handle(self): + input_str = input("Enter id or name: \n") + + if input_str == "list": + self.test_option_.name = None + self.test_option_.id = None + for option in option_list: + print(f"{option.name}, id: {option.id}") + return + + for option in option_list: + if input_str == option.name or self.convert_to_int(input_str) == option.id: + self.test_option_.name = option.name + self.test_option_.id = option.id + print(f"Test: {self.test_option_.name}, test_id: {self.test_option_.id}") + return + + print("No matching test option found.") + +if __name__ == "__main__": + + print("WARNING: Please ensure there are no obstacles around the robot while running this example.") + input("Press Enter to continue...") + + if len(sys.argv)>1: + ChannelFactoryInitialize(0, sys.argv[1]) + else: + ChannelFactoryInitialize(0) + + test_option = TestOption(name=None, id=None) + user_interface = UserInterface() + user_interface.test_option_ = test_option + + sport_client = LocoClient() + sport_client.SetTimeout(10.0) + sport_client.Init() + + while True: + + user_interface.terminal_handle() + + print(f"Updated Test Option: Name = {test_option.name}, ID = {test_option.id}\n") + + if test_option.id == 0: + sport_client.Damp() + elif test_option.id == 1: + sport_client.StandUp() + elif test_option.id == 3: + sport_client.Move(0.3,0,0) + elif test_option.id == 4: + sport_client.Move(0,0.3,0) + elif test_option.id == 5: + sport_client.Move(0,0,0.3) + elif test_option.id == 6: + sport_client.LowStand() + elif test_option.id == 7: + sport_client.HighStand() + elif test_option.id == 8: + sport_client.ZeroTorque() + + time.sleep(1) \ No newline at end of file diff --git a/example/h1/low_level/h1_low_level_example.py b/example/h1/low_level/h1_low_level_example.py new file mode 100644 index 0000000..51a9111 --- /dev/null +++ b/example/h1/low_level/h1_low_level_example.py @@ -0,0 +1,167 @@ +import time +import sys + +from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize +from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize +from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_ +from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_ +from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_ +from unitree_sdk2py.utils.crc import CRC +from unitree_sdk2py.utils.thread import RecurrentThread +from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient +import unitree_legged_const as h1 +import numpy as np + +H1_NUM_MOTOR = 20 + +class H1JointIndex: + # Right leg + kRightHipYaw = 8 + kRightHipRoll = 0 + kRightHipPitch = 1 + kRightKnee = 2 + kRightAnkle = 11 + # Left leg + kLeftHipYaw = 7 + kLeftHipRoll = 3 + kLeftHipPitch = 4 + kLeftKnee = 5 + kLeftAnkle = 10 + + kWaistYaw = 6 + + kNotUsedJoint = 9 + + # Right arm + kRightShoulderPitch = 12 + kRightShoulderRoll = 13 + kRightShoulderYaw = 14 + kRightElbow = 15 + # Left arm + kLeftShoulderPitch = 16 + kLeftShoulderRoll = 17 + kLeftShoulderYaw = 18 + kLeftElbow = 19 + +class Custom: + def __init__(self): + self.time_ = 0.0 + self.control_dt_ = 0.01 + self.duration_ = 10.0 + self.counter_ = 0 + self.kp_low_ = 60.0 + self.kp_high_ = 200.0 + self.kd_low_ = 1.5 + self.kd_high_ = 5.0 + self.low_cmd = unitree_go_msg_dds__LowCmd_() + self.InitLowCmd() + self.low_state = None + self.crc = CRC() + + def Init(self): + # # create publisher # + self.lowcmd_publisher_ = ChannelPublisher("rt/lowcmd", LowCmd_) + self.lowcmd_publisher_.Init() + + # # create subscriber # + self.lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_) + self.lowstate_subscriber.Init(self.LowStateHandler, 10) + + self.msc = MotionSwitcherClient() + self.msc.SetTimeout(5.0) + self.msc.Init() + + status, result = self.msc.CheckMode() + while result['name']: + self.msc.ReleaseMode() + status, result = self.msc.CheckMode() + time.sleep(1) + + self.report_rpy_ptr_ = RecurrentThread( + interval=0.1, target=self.ReportRPY, name="report_rpy" + ) + + self.report_rpy_ptr_.Start() + + def is_weak_motor(self,motor_index): + return motor_index in { + H1JointIndex.kLeftAnkle, + H1JointIndex.kRightAnkle, + H1JointIndex.kRightShoulderPitch, + H1JointIndex.kRightShoulderRoll, + H1JointIndex.kRightShoulderYaw, + H1JointIndex.kRightElbow, + H1JointIndex.kLeftShoulderPitch, + H1JointIndex.kLeftShoulderRoll, + H1JointIndex.kLeftShoulderYaw, + H1JointIndex.kLeftElbow, + } + + def InitLowCmd(self): + self.low_cmd.head[0] = 0xFE + self.low_cmd.head[1] = 0xEF + self.low_cmd.level_flag = 0xFF + self.low_cmd.gpio = 0 + for i in range(H1_NUM_MOTOR): + if self.is_weak_motor(i): + self.low_cmd.motor_cmd[i].mode = 0x01 + else: + self.low_cmd.motor_cmd[i].mode = 0x0A + self.low_cmd.motor_cmd[i].q= h1.PosStopF + self.low_cmd.motor_cmd[i].kp = 0 + self.low_cmd.motor_cmd[i].dq = h1.VelStopF + self.low_cmd.motor_cmd[i].kd = 0 + self.low_cmd.motor_cmd[i].tau = 0 + + def Start(self): + self.lowCmdWriteThreadPtr = RecurrentThread( + interval=self.control_dt_, target=self.LowCmdWrite, name="control" + ) + self.lowCmdWriteThreadPtr.Start() + + def LowStateHandler(self, msg: LowState_): + self.low_state = msg + + def ReportRPY(self): + print("rpy: [",self.low_state.imu_state.rpy[0],", " + ,self.low_state.imu_state.rpy[1],", " + ,self.low_state.imu_state.rpy[2],"]" + ) + + def LowCmdWrite(self): + self.time_ += self.control_dt_ + self.time_ = np.clip(self.time_ , 0.0, self.duration_) + + # set robot to zero posture + for i in range(H1_NUM_MOTOR): + ratio = self.time_ / self.duration_ + self.low_cmd.motor_cmd[i].tau = 0. + self.low_cmd.motor_cmd[i].q = (1.0 - ratio) * self.low_state.motor_state[i].q + self.low_cmd.motor_cmd[i].dq = 0. + self.low_cmd.motor_cmd[i].kp = self.kp_low_ if self.is_weak_motor(i) else self.kp_high_ + self.low_cmd.motor_cmd[i].kd = self.kd_low_ if self.is_weak_motor(i) else self.kd_high_ + + self.low_cmd.crc = self.crc.Crc(self.low_cmd) + self.lowcmd_publisher_.Write(self.low_cmd) + +if __name__ == '__main__': + + print("WARNING: Please ensure there are no obstacles around the robot while running this example.") + input("Press Enter to continue...") + + if len(sys.argv)>1: + ChannelFactoryInitialize(0, sys.argv[1]) + else: + ChannelFactoryInitialize(0) + + custom = Custom() + custom.Init() + custom.Start() + + while True: + if custom.time_ == custom.duration_: + time.sleep(1) + print("Done!") + sys.exit(-1) + time.sleep(1) \ No newline at end of file diff --git a/example/h1/low_level/unitree_legged_const.py b/example/h1/low_level/unitree_legged_const.py new file mode 100644 index 0000000..c9112ea --- /dev/null +++ b/example/h1/low_level/unitree_legged_const.py @@ -0,0 +1,5 @@ +HIGHLEVEL = 0xEE +LOWLEVEL = 0xFF +TRIGERLEVEL = 0xF0 +PosStopF = 2.146e9 +VelStopF = 16000.0 diff --git a/example/hg/arm_sdk.py b/example/hg/arm_sdk.py deleted file mode 100644 index ff6d6b4..0000000 --- a/example/hg/arm_sdk.py +++ /dev/null @@ -1,133 +0,0 @@ -import time -import numpy as np -from enum import IntEnum -from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize - -# from user_data import * -from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ -from unitree_sdk2py.idl.unitree_hg.msg.dds_ import MotorCmd_ -from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ -import unitree_sdk2py.idl.unitree_hg.msg.dds_ as dds -print(dds.LowCmd_) -print(dds.MotorCmd_) - -kTopicArmSDK = "rt/arm_sdk" -kPi = 3.141592654 -kPi_2 = 1.57079632 - - -class JointIndex(IntEnum): - - # // Left arm - kLeftShoulderPitch = 13 - kLeftShoulderRoll = 14 - kLeftShoulderYaw = 15 - kLeftElbow = 16 - kLeftWistYaw = 17 - kLeftWistPitch = 18 - kLeftWistRoll = 19 - # // Right arm - kRightShoulderPitch = 20 - kRightShoulderRoll = 21 - kRightShoulderYaw = 22 - kRightElbow = 23 - kRightWistYaw = 24 - kRightWistPitch = 25 - kRightWistRoll = 26 - - kWaistYaw = 12 - - -kNotUsedJoint = 27 - - -if __name__ == "__main__": - ChannelFactoryInitialize() - - # Create a publisher to publish the data defined in UserData class - arm_sdk_publisher = ChannelPublisher('rt/arm_sdk', LowCmd_) - # pub = ChannelPublisher("rt/lowcmd", LowCmd_) - arm_sdk_publisher.Init() - - msg = unitree_hg_msg_dds__LowCmd_() - - weight = 0 - weight_rate = 0.2 - - kp = 60 - kd = 1.5 - dq = 0 - tau_ff = 0 - - control_dt = 0.02 - max_joint_velocity = 0.5 - - delta_weight = weight_rate * control_dt - max_joint_delta = max_joint_velocity * control_dt - - init_pos = np.zeros(15) - target_pos = np.array( - [ - 0.0, - kPi_2, - 0.0, - kPi_2, - 0.0, - 0.0, - 0.0, - 0.0, - -kPi_2, - 0.0, - kPi_2, - 0.0, - 0.0, - 0.0, - 0.0, - ] - ) - print("Initailizing arms ...") - - init_time = 5 - init_time_steps = int(init_time / control_dt) - - for i in range(init_time_steps): - weight += delta_weight - weight = max(min(weight, 1.0), 0) - msg.motor_cmd[kNotUsedJoint].q = weight * weight - - for idx, joint in enumerate(JointIndex): - msg.motor_cmd[joint].q = init_pos[idx] - msg.motor_cmd[joint].dq = dq - msg.motor_cmd[joint].kp = kp - msg.motor_cmd[joint].kd = kd - msg.motor_cmd[joint].tau = tau_ff - - arm_sdk_publisher.Write(msg) - - time.sleep(control_dt) - - print("Done!") - - period = 5 - num_time_steps = int(period / control_dt) - - current_jpos_des = np.zeros_like(init_pos) - - for i in range(num_time_steps): - for j in range(len(current_jpos_des)): - delta = target_pos[j] - current_jpos_des[j] - clamped_delta = np.clip(delta, -max_joint_delta, max_joint_delta) - current_jpos_des[j] += clamped_delta - - for idx, joint in enumerate(JointIndex): - msg.motor_cmd[joint].q = current_jpos_des[idx] - msg.motor_cmd[joint].dq = dq - msg.motor_cmd[joint].kp = kp - msg.motor_cmd[joint].kd = kd - msg.motor_cmd[joint].tau = tau_ff - - arm_sdk_publisher.Write(msg) - - time.sleep(control_dt) - - exit() \ No newline at end of file diff --git a/example/hg/low_cmd.py b/example/hg/low_cmd.py deleted file mode 100644 index 331bfe5..0000000 --- a/example/hg/low_cmd.py +++ /dev/null @@ -1,50 +0,0 @@ -import time -import numpy as np -from enum import IntEnum -from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize -import sys - -# from user_data import * -from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_ -from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ -import unitree_sdk2py.idl.unitree_hg.msg.dds_ as dds -from unitree_sdk2py.utils.crc import CRC - - -kTopicArmSDK = "rt/lowcmd" -kPi = 3.141592654 -kPi_2 = 1.57079632 -kNumMotors = 35 - -if __name__ == "__main__": - - if len(sys.argv)>1: - ChannelFactoryInitialize(0, sys.argv[1]) - else: - ChannelFactoryInitialize(0) - - # 初始化发布节点 - low_cmd_publisher = ChannelPublisher(kTopicArmSDK, LowCmd_) - low_cmd_publisher.Init() - # 初始化LowCmd_ - msg = unitree_hg_msg_dds__LowCmd_() - # CRC校验实例 - crc = CRC() - # 控制时间参数 - control_dt = 0.02 - init_time = 20 - init_time_steps = int(init_time / control_dt) - - # 控制参数设置 - msg.mode_pr = 0 - msg.mode_machine = 4 - for i in range(init_time_steps): - for idx in range(kNumMotors): - msg.motor_cmd[idx].mode = 0x01 - msg.crc = crc.Crc(msg) - # print(msg.crc) - low_cmd_publisher.Write(msg) - time.sleep(control_dt) - - print("Done!") - exit() diff --git a/example/high_level/read_highstate.py b/example/high_level/read_highstate.py deleted file mode 100644 index a3cec53..0000000 --- a/example/high_level/read_highstate.py +++ /dev/null @@ -1,24 +0,0 @@ -import time -import sys -from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize - -from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_ -from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_ - - -def HighStateHandler(msg: SportModeState_): - print("Position: ", msg.position) - print("Velocity: ", msg.velocity) - print("Yaw velocity: ", msg.yaw_speed) - print("Foot position in body frame: ", msg.foot_position_body) - print("Foot velocity in body frame: ", msg.foot_speed_body) - - -if __name__ == "__main__": - # sys.argv[1]: name of the network interface - ChannelFactoryInitialize(0, sys.argv[1]) - sub = ChannelSubscriber("rt/sportmodestate", SportModeState_) - sub.Init(HighStateHandler, 10) - - while True: - time.sleep(10.0) diff --git a/example/high_level/sportmode_test.py b/example/high_level/sportmode_test.py deleted file mode 100644 index 845ddc5..0000000 --- a/example/high_level/sportmode_test.py +++ /dev/null @@ -1,144 +0,0 @@ -import time -import sys -from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize -from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_ -from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_ -from unitree_sdk2py.go2.sport.sport_client import ( - SportClient, - PathPoint, - SPORT_PATH_POINT_SIZE, -) -import math - - -class SportModeTest: - def __init__(self) -> None: - # Time count - self.t = 0 - self.dt = 0.01 - - # Initial poition and yaw - self.px0 = 0 - self.py0 = 0 - self.yaw0 = 0 - - self.client = SportClient() # Create a sport client - self.client.SetTimeout(10.0) - self.client.Init() - - def GetInitState(self, robot_state: SportModeState_): - self.px0 = robot_state.position[0] - self.py0 = robot_state.position[1] - self.yaw0 = robot_state.imu_state.rpy[2] - - def StandUpDown(self): - self.client.StandDown() - print("Stand down !!!") - time.sleep(1) - - self.client.StandUp() - print("Stand up !!!") - time.sleep(1) - - self.client.StandDown() - print("Stand down !!!") - time.sleep(1) - - self.client.Damp() - - def VelocityMove(self): - elapsed_time = 1 - for i in range(int(elapsed_time / self.dt)): - self.client.Move(0.3, 0, 0.3) # vx, vy vyaw - time.sleep(self.dt) - self.client.StopMove() - - def BalanceAttitude(self): - self.client.Euler(0.1, 0.2, 0.3) # roll, pitch, yaw - self.client.BalanceStand() - - def TrajectoryFollow(self): - time_seg = 0.2 - time_temp = self.t - time_seg - path = [] - for i in range(SPORT_PATH_POINT_SIZE): - time_temp += time_seg - - px_local = 0.5 * math.sin(0.5 * time_temp) - py_local = 0 - yaw_local = 0 - vx_local = 0.25 * math.cos(0.5 * time_temp) - vy_local = 0 - vyaw_local = 0 - - path_point_tmp = PathPoint(0, 0, 0, 0, 0, 0, 0) - - path_point_tmp.timeFromStart = i * time_seg - path_point_tmp.x = ( - px_local * math.cos(self.yaw0) - - py_local * math.sin(self.yaw0) - + self.px0 - ) - path_point_tmp.y = ( - px_local * math.sin(self.yaw0) - + py_local * math.cos(self.yaw0) - + self.py0 - ) - path_point_tmp.yaw = yaw_local + self.yaw0 - path_point_tmp.vx = vx_local * math.cos(self.yaw0) - vy_local * math.sin( - self.yaw0 - ) - path_point_tmp.vy = vx_local * math.sin(self.yaw0) + vy_local * math.cos( - self.yaw0 - ) - path_point_tmp.vyaw = vyaw_local - - path.append(path_point_tmp) - - self.client.TrajectoryFollow(path) - - def SpecialMotions(self): - self.client.RecoveryStand() - print("RecoveryStand !!!") - time.sleep(1) - - self.client.Stretch() - print("Sit !!!") - time.sleep(1) - - self.client.RecoveryStand() - print("RecoveryStand !!!") - time.sleep(1) - - -# Robot state -robot_state = unitree_go_msg_dds__SportModeState_() -def HighStateHandler(msg: SportModeState_): - global robot_state - robot_state = msg - - -if __name__ == "__main__": - if len(sys.argv)>1: - ChannelFactoryInitialize(0, sys.argv[1]) - else: - ChannelFactoryInitialize(0) - - sub = ChannelSubscriber("rt/sportmodestate", SportModeState_) - sub.Init(HighStateHandler, 10) - time.sleep(1) - - test = SportModeTest() - test.GetInitState(robot_state) - - print("Start test !!!") - while True: - test.t += test.dt - - test.StandUpDown() - # test.VelocityMove() - # test.BalanceAttitude() - # test.TrajectoryFollow() - # test.SpecialMotions() - - time.sleep(test.dt) diff --git a/example/low_level/lowlevel_control.py b/example/low_level/lowlevel_control.py deleted file mode 100644 index 638c5f2..0000000 --- a/example/low_level/lowlevel_control.py +++ /dev/null @@ -1,60 +0,0 @@ -import time -import sys - -from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize -from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize -from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_ -from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_ -from unitree_sdk2py.utils.crc import CRC -from unitree_sdk2py.utils.thread import Thread -import unitree_legged_const as go2 - -crc = CRC() - -if __name__ == '__main__': - - if len(sys.argv)>1: - ChannelFactoryInitialize(0, sys.argv[1]) - else: - ChannelFactoryInitialize(0) - # Create a publisher to publish the data defined in UserData class - pub = ChannelPublisher("rt/lowcmd", LowCmd_) - pub.Init() - - cmd = unitree_go_msg_dds__LowCmd_() - cmd.head[0]=0xFE - cmd.head[1]=0xEF - cmd.level_flag = 0xFF - cmd.gpio = 0 - for i in range(20): - cmd.motor_cmd[i].mode = 0x01 # (PMSM) mode - cmd.motor_cmd[i].q= go2.PosStopF - cmd.motor_cmd[i].kp = 0 - cmd.motor_cmd[i].dq = go2.VelStopF - cmd.motor_cmd[i].kd = 0 - cmd.motor_cmd[i].tau = 0 - - while True: - # Toque controle, set RL_2 toque - cmd.motor_cmd[go2.LegID["RL_2"]].q = 0.0 # Set to stop position(rad) - cmd.motor_cmd[go2.LegID["RL_2"]].kp = 0.0 - cmd.motor_cmd[go2.LegID["RL_2"]].dq = 0.0 # Set to stop angular velocity(rad/s) - cmd.motor_cmd[go2.LegID["RL_2"]].kd = 0.0 - cmd.motor_cmd[go2.LegID["RL_2"]].tau = 1.0 # target toque is set to 1N.m - - # Poinstion(rad) control, set RL_0 rad - cmd.motor_cmd[go2.LegID["RL_0"]].q = 0.0 # Taregt angular(rad) - cmd.motor_cmd[go2.LegID["RL_0"]].kp = 10.0 # Poinstion(rad) control kp gain - cmd.motor_cmd[go2.LegID["RL_0"]].dq = 0.0 # Taregt angular velocity(rad/ss) - cmd.motor_cmd[go2.LegID["RL_0"]].kd = 1.0 # Poinstion(rad) control kd gain - cmd.motor_cmd[go2.LegID["RL_0"]].tau = 0.0 # Feedforward toque 1N.m - - cmd.crc = crc.Crc(cmd) - - #Publish message - if pub.Write(cmd): - print("Publish success. msg:", cmd.crc) - else: - print("Waitting for subscriber.") - - time.sleep(0.002) \ No newline at end of file diff --git a/example/low_level/read_lowstate.py b/example/low_level/read_lowstate.py deleted file mode 100644 index 8653822..0000000 --- a/example/low_level/read_lowstate.py +++ /dev/null @@ -1,27 +0,0 @@ -import time -import sys -from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize -from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_ -from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_ - -import unitree_legged_const as go2 - - -def LowStateHandler(msg: LowState_): - - # print front right hip motor states - print("FR_0 motor state: ", msg.motor_state[go2.LegID["FR_0"]]) - print("IMU state: ", msg.imu_state) - print("Battery state: voltage: ", msg.power_v, "current: ", msg.power_a) - - -if __name__ == "__main__": - if len(sys.argv)>1: - ChannelFactoryInitialize(0, sys.argv[1]) - else: - ChannelFactoryInitialize(0) - sub = ChannelSubscriber("rt/lowstate", LowState_) - sub.Init(LowStateHandler, 10) - - while True: - time.sleep(10.0) diff --git a/example/test_crc.py b/example/test_crc.py deleted file mode 100644 index e4cf992..0000000 --- a/example/test_crc.py +++ /dev/null @@ -1,46 +0,0 @@ -import time -import sys - -from unitree_sdk2py.core.channel import ChannelFactoryInitialize -from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_ -from unitree_sdk2py.utils.crc import CRC - -crc = CRC() - -PosStopF = 2.146e9 -VelStopF = 16000.0 - -if __name__ == "__main__": - ChannelFactoryInitialize(0) - - cmd = unitree_go_msg_dds__LowCmd_() - cmd.head[0] = 0xFE - cmd.head[1] = 0xEF - cmd.level_flag = 0xFF - cmd.gpio = 0 - for i in range(20): - cmd.motor_cmd[i].mode = 0x01 - cmd.motor_cmd[i].q = PosStopF - cmd.motor_cmd[i].kp = 0 - cmd.motor_cmd[i].dq = VelStopF - cmd.motor_cmd[i].kd = 0 - cmd.motor_cmd[i].tau = 0 - - cmd.motor_cmd[0].q = 0.0 - cmd.motor_cmd[0].kp = 0.0 - cmd.motor_cmd[0].dq = 0.0 - cmd.motor_cmd[0].kd = 0.0 - cmd.motor_cmd[0].tau = 1.0 - - cmd.motor_cmd[1].q = 0.0 - cmd.motor_cmd[1].kp = 10.0 - cmd.motor_cmd[1].dq = 0.0 - cmd.motor_cmd[1].kd = 1.0 - cmd.motor_cmd[1].tau = 0.0 - - now = time.perf_counter() - - - cmd.crc = crc.Crc(cmd) - - print("CRC:", cmd.crc, "Time cost:", (time.perf_counter() - now)*1000) diff --git a/unitree_sdk2py/comm/motion_switcher/__init__.py b/unitree_sdk2py/comm/motion_switcher/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/unitree_sdk2py/comm/motion_switcher/motion_switcher_api.py b/unitree_sdk2py/comm/motion_switcher/motion_switcher_api.py new file mode 100644 index 0000000..ddb9a42 --- /dev/null +++ b/unitree_sdk2py/comm/motion_switcher/motion_switcher_api.py @@ -0,0 +1,29 @@ +""" +" service name +""" +MOTION_SWITCHER_SERVICE_NAME = "motion_switcher" + + +""" +" service api version +""" +MOTION_SWITCHER_API_VERSION = "1.0.0.1" + + +""" +" api id +""" +MOTION_SWITCHER_API_ID_CHECK_MODE = 1001 +MOTION_SWITCHER_API_ID_SELECT_MODE = 1002 +MOTION_SWITCHER_API_ID_RELEASE_MODE = 1003 +MOTION_SWITCHER_API_ID_SET_SILENT = 1004 +MOTION_SWITCHER_API_ID_GET_SILENT = 1005 + +# """ +# " error code +# """ +# # client side +# SPORT_ERR_CLIENT_POINT_PATH = 4101 +# # server side +# SPORT_ERR_SERVER_OVERTIME = 4201 +# SPORT_ERR_SERVER_NOT_INIT = 4202 diff --git a/unitree_sdk2py/comm/motion_switcher/motion_switcher_client.py b/unitree_sdk2py/comm/motion_switcher/motion_switcher_client.py new file mode 100644 index 0000000..74f223f --- /dev/null +++ b/unitree_sdk2py/comm/motion_switcher/motion_switcher_client.py @@ -0,0 +1,42 @@ +import json + +from ...rpc.client import Client +from .motion_switcher_api import * + +""" +" class MotionSwitcherClient +""" +class MotionSwitcherClient(Client): + def __init__(self): + super().__init__(MOTION_SWITCHER_SERVICE_NAME, False) + + + def Init(self): + # set api version + self._SetApiVerson(MOTION_SWITCHER_API_VERSION) + + # regist api + self._RegistApi(MOTION_SWITCHER_API_ID_CHECK_MODE, 0) + self._RegistApi(MOTION_SWITCHER_API_ID_SELECT_MODE, 0) + self._RegistApi(MOTION_SWITCHER_API_ID_RELEASE_MODE, 0) + self._RegistApi(MOTION_SWITCHER_API_ID_SET_SILENT, 0) + self._RegistApi(MOTION_SWITCHER_API_ID_GET_SILENT, 0) + + # 1001 + def CheckMode(self): + p = {} + parameter = json.dumps(p) + code, data = self._Call(MOTION_SWITCHER_API_ID_CHECK_MODE, parameter) + if code == 0: + return code, json.loads(data) + else: + return code, None + + # 1003 + def ReleaseMode(self): + p = {} + parameter = json.dumps(p) + code, data = self._Call(MOTION_SWITCHER_API_ID_RELEASE_MODE, parameter) + + return code, None + \ No newline at end of file diff --git a/unitree_sdk2py/g1/loco/g1_loco_api.py b/unitree_sdk2py/g1/loco/g1_loco_api.py new file mode 100644 index 0000000..6b2151f --- /dev/null +++ b/unitree_sdk2py/g1/loco/g1_loco_api.py @@ -0,0 +1,31 @@ +""" +" service name +""" +LOCO_SERVICE_NAME = "loco" + + +""" +" service api version +""" +LOCO_API_VERSION = "1.0.0.0" + + +""" +" api id +""" +ROBOT_API_ID_LOCO_GET_FSM_ID = 7001 +ROBOT_API_ID_LOCO_GET_FSM_MODE = 7002 +ROBOT_API_ID_LOCO_GET_BALANCE_MODE = 7003 +ROBOT_API_ID_LOCO_GET_SWING_HEIGHT = 7004 +ROBOT_API_ID_LOCO_GET_STAND_HEIGHT = 7005 +ROBOT_API_ID_LOCO_GET_PHASE = 7006 # deprecated + +ROBOT_API_ID_LOCO_SET_FSM_ID = 7101 +ROBOT_API_ID_LOCO_SET_BALANCE_MODE = 7102 +ROBOT_API_ID_LOCO_SET_SWING_HEIGHT = 7103 +ROBOT_API_ID_LOCO_SET_STAND_HEIGHT = 7104 +ROBOT_API_ID_LOCO_SET_VELOCITY = 7105 + +""" +" error code +""" \ No newline at end of file diff --git a/unitree_sdk2py/g1/loco/g1_loco_client.py b/unitree_sdk2py/g1/loco/g1_loco_client.py new file mode 100644 index 0000000..4d49ead --- /dev/null +++ b/unitree_sdk2py/g1/loco/g1_loco_client.py @@ -0,0 +1,101 @@ +import json + +from ...rpc.client import Client +from .g1_loco_api import * + +""" +" class SportClient +""" +class LocoClient(Client): + def __init__(self): + super().__init__(LOCO_SERVICE_NAME, False) + + + def Init(self): + # set api version + self._SetApiVerson(LOCO_API_VERSION) + + # regist api + self._RegistApi(ROBOT_API_ID_LOCO_GET_FSM_ID, 0) + self._RegistApi(ROBOT_API_ID_LOCO_GET_FSM_MODE, 0) + self._RegistApi(ROBOT_API_ID_LOCO_GET_BALANCE_MODE, 0) + self._RegistApi(ROBOT_API_ID_LOCO_GET_SWING_HEIGHT, 0) + self._RegistApi(ROBOT_API_ID_LOCO_GET_STAND_HEIGHT, 0) + self._RegistApi(ROBOT_API_ID_LOCO_GET_PHASE, 0) # deprecated + + self._RegistApi(ROBOT_API_ID_LOCO_SET_FSM_ID, 0) + self._RegistApi(ROBOT_API_ID_LOCO_SET_BALANCE_MODE, 0) + self._RegistApi(ROBOT_API_ID_LOCO_SET_SWING_HEIGHT, 0) + self._RegistApi(ROBOT_API_ID_LOCO_SET_STAND_HEIGHT, 0) + self._RegistApi(ROBOT_API_ID_LOCO_SET_VELOCITY, 0) + + # 7101 + def SetFsmId(self, fsm_id: int): + p = {} + p["data"] = fsm_id + parameter = json.dumps(p) + code, data = self._Call(ROBOT_API_ID_LOCO_SET_FSM_ID, parameter) + return code + + # 7102 + def SetBalanceMode(self, balance_mode: int): + p = {} + p["data"] = balance_mode + parameter = json.dumps(p) + code, data = self._Call(ROBOT_API_ID_LOCO_SET_BALANCE_MODE, parameter) + return code + + # 7104 + def SetStandHeight(self, stand_height: float): + p = {} + p["data"] = stand_height + parameter = json.dumps(p) + code, data = self._Call(ROBOT_API_ID_LOCO_SET_STAND_HEIGHT, parameter) + return code + + # 7105 + def SetVelocity(self, vx: float, vy: float, omega: float, duration: float = 1.0): + p = {} + velocity = [vx,vy,omega] + p["velocity"] = velocity + p["duration"] = duration + parameter = json.dumps(p) + code, data = self._Call(ROBOT_API_ID_LOCO_SET_VELOCITY, parameter) + return code + + def Damp(self): + self.SetFsmId(1) + + def Start(self): + self.SetFsmId(200) + + def Squat(self): + self.SetFsmId(2) + + def Sit(self): + self.SetFsmId(3) + + def StandUp(self): + self.SetFsmId(4) + + def ZeroTorque(self): + self.SetFsmId(0) + + def StopMove(self): + self.SetVelocity(0., 0., 0.) + + def HighStand(self): + UINT32_MAX = (1 << 32) - 1 + self.SetStandHeight(UINT32_MAX) + + def LowStand(self): + UINT32_MIN = 0 + self.SetStandHeight(UINT32_MIN) + + def Move(self, vx: float, vy: float, vyaw: float, continous_move: bool = False): + duration = 864000.0 if continous_move else 1 + self.SetVelocity(vx, vy, vyaw, duration) + + def BalanceStand(self, balance_mode: int): + self.SetBalanceMode(balance_mode) + \ No newline at end of file diff --git a/unitree_sdk2py/h1/loco/h1_loco_api.py b/unitree_sdk2py/h1/loco/h1_loco_api.py new file mode 100644 index 0000000..bd8ddbe --- /dev/null +++ b/unitree_sdk2py/h1/loco/h1_loco_api.py @@ -0,0 +1,31 @@ +""" +" service name +""" +LOCO_SERVICE_NAME = "loco" + + +""" +" service api version +""" +LOCO_API_VERSION = "2.0.0.0" + + +""" +" api id +""" +ROBOT_API_ID_LOCO_GET_FSM_ID = 8001 +ROBOT_API_ID_LOCO_GET_FSM_MODE = 8002 +ROBOT_API_ID_LOCO_GET_BALANCE_MODE = 8003 +ROBOT_API_ID_LOCO_GET_SWING_HEIGHT = 8004 +ROBOT_API_ID_LOCO_GET_STAND_HEIGHT = 8005 +ROBOT_API_ID_LOCO_GET_PHASE = 8006 # deprecated + +ROBOT_API_ID_LOCO_SET_FSM_ID = 8101 +ROBOT_API_ID_LOCO_SET_BALANCE_MODE = 8102 +ROBOT_API_ID_LOCO_SET_SWING_HEIGHT = 8103 +ROBOT_API_ID_LOCO_SET_STAND_HEIGHT = 8104 +ROBOT_API_ID_LOCO_SET_VELOCITY = 8105 + +""" +" error code +""" \ No newline at end of file diff --git a/unitree_sdk2py/h1/loco/h1_loco_client.py b/unitree_sdk2py/h1/loco/h1_loco_client.py new file mode 100644 index 0000000..3bc01e3 --- /dev/null +++ b/unitree_sdk2py/h1/loco/h1_loco_client.py @@ -0,0 +1,83 @@ +import json + +from ...rpc.client import Client +from .h1_loco_api import * + +""" +" class SportClient +""" +class LocoClient(Client): + def __init__(self): + super().__init__(LOCO_SERVICE_NAME, False) + + + def Init(self): + # set api version + self._SetApiVerson(LOCO_API_VERSION) + + # regist api + self._RegistApi(ROBOT_API_ID_LOCO_GET_FSM_ID, 0) + self._RegistApi(ROBOT_API_ID_LOCO_GET_FSM_MODE, 0) + self._RegistApi(ROBOT_API_ID_LOCO_GET_BALANCE_MODE, 0) + self._RegistApi(ROBOT_API_ID_LOCO_GET_SWING_HEIGHT, 0) + self._RegistApi(ROBOT_API_ID_LOCO_GET_STAND_HEIGHT, 0) + self._RegistApi(ROBOT_API_ID_LOCO_GET_PHASE, 0) # deprecated + + self._RegistApi(ROBOT_API_ID_LOCO_SET_FSM_ID, 0) + self._RegistApi(ROBOT_API_ID_LOCO_SET_BALANCE_MODE, 0) + self._RegistApi(ROBOT_API_ID_LOCO_SET_SWING_HEIGHT, 0) + self._RegistApi(ROBOT_API_ID_LOCO_SET_STAND_HEIGHT, 0) + self._RegistApi(ROBOT_API_ID_LOCO_SET_VELOCITY, 0) + + # 8101 + def SetFsmId(self, fsm_id: int): + p = {} + p["data"] = fsm_id + parameter = json.dumps(p) + code, data = self._Call(ROBOT_API_ID_LOCO_SET_FSM_ID, parameter) + return code + + # 8104 + def SetStandHeight(self, stand_height: float): + p = {} + p["data"] = stand_height + parameter = json.dumps(p) + code, data = self._Call(ROBOT_API_ID_LOCO_SET_STAND_HEIGHT, parameter) + return code + + # 8105 + def SetVelocity(self, vx: float, vy: float, omega: float, duration: float = 1.0): + p = {} + velocity = [vx,vy,omega] + p["velocity"] = velocity + p["duration"] = duration + parameter = json.dumps(p) + code, data = self._Call(ROBOT_API_ID_LOCO_SET_VELOCITY, parameter) + return code + + def Damp(self): + self.SetFsmId(1) + + def Start(self): + self.SetFsmId(204) + + def StandUp(self): + self.SetFsmId(2) + + def ZeroTorque(self): + self.SetFsmId(0) + + def StopMove(self): + self.SetVelocity(0., 0., 0.) + + def HighStand(self): + UINT32_MAX = (1 << 32) - 1 + self.SetStandHeight(UINT32_MAX) + + def LowStand(self): + UINT32_MIN = 0 + self.SetStandHeight(UINT32_MIN) + + def Move(self, vx: float, vy: float, vyaw: float, continous_move: bool = False): + duration = 864000.0 if continous_move else 1 + self.SetVelocity(vx, vy, vyaw, duration) \ No newline at end of file