update high-level and low-level control examples
This commit is contained in:
parent
6cf6a18471
commit
c52f48b0b5
|
@ -0,0 +1,105 @@
|
|||
import time
|
||||
import sys
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.b2.sport.sport_client import SportClient
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class TestOption:
|
||||
name: str
|
||||
id: int
|
||||
|
||||
option_list = [
|
||||
TestOption(name="damp", id=0),
|
||||
TestOption(name="stand_up", id=1),
|
||||
TestOption(name="stand_down", id=2),
|
||||
TestOption(name="move forward", id=3),
|
||||
TestOption(name="move lateral", id=4),
|
||||
TestOption(name="move rotate", id=5),
|
||||
TestOption(name="stop_move", id=6),
|
||||
TestOption(name="switch_gait", id=7),
|
||||
TestOption(name="switch_gait", id=8),
|
||||
TestOption(name="recovery", id=9),
|
||||
TestOption(name="balanced stand", id=10)
|
||||
]
|
||||
|
||||
class UserInterface:
|
||||
def __init__(self):
|
||||
self.test_option_ = None
|
||||
|
||||
def convert_to_int(self, input_str):
|
||||
try:
|
||||
return int(input_str)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def terminal_handle(self):
|
||||
input_str = input("Enter id or name: \n")
|
||||
|
||||
if input_str == "list":
|
||||
self.test_option_.name = None
|
||||
self.test_option_.id = None
|
||||
for option in option_list:
|
||||
print(f"{option.name}, id: {option.id}")
|
||||
return
|
||||
|
||||
for option in option_list:
|
||||
if input_str == option.name or self.convert_to_int(input_str) == option.id:
|
||||
self.test_option_.name = option.name
|
||||
self.test_option_.id = option.id
|
||||
print(f"Test: {self.test_option_.name}, test_id: {self.test_option_.id}")
|
||||
return
|
||||
|
||||
print("No matching test option found.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print(f"Usage: python3 {sys.argv[0]} networkInterface")
|
||||
sys.exit(-1)
|
||||
|
||||
print("WARNING: Please ensure there are no obstacles around the robot while running this example.")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
|
||||
test_option = TestOption(name=None, id=None)
|
||||
user_interface = UserInterface()
|
||||
user_interface.test_option_ = test_option
|
||||
|
||||
sport_client = SportClient()
|
||||
sport_client.SetTimeout(10.0)
|
||||
sport_client.Init()
|
||||
|
||||
print("Input \"list\" to list all test option ...")
|
||||
|
||||
while True:
|
||||
user_interface.terminal_handle()
|
||||
|
||||
print(f"Updated Test Option: Name = {test_option.name}, ID = {test_option.id}")
|
||||
|
||||
if test_option.id == 0:
|
||||
sport_client.Damp()
|
||||
elif test_option.id == 1:
|
||||
sport_client.StandUp()
|
||||
elif test_option.id == 2:
|
||||
sport_client.StandDown()
|
||||
elif test_option.id == 3:
|
||||
sport_client.Move(0.3,0,0)
|
||||
elif test_option.id == 4:
|
||||
sport_client.Move(0,0.3,0)
|
||||
elif test_option.id == 5:
|
||||
sport_client.Move(0,0,0.5)
|
||||
elif test_option.id == 6:
|
||||
sport_client.StopMove()
|
||||
elif test_option.id == 7:
|
||||
sport_client.SwitchGait(0)
|
||||
elif test_option.id == 8:
|
||||
sport_client.SwitchGait(1)
|
||||
elif test_option.id == 9:
|
||||
sport_client.RecoveryStand()
|
||||
elif test_option.id == 10:
|
||||
sport_client.BalanceStand()
|
||||
|
||||
time.sleep(1)
|
|
@ -0,0 +1,175 @@
|
|||
import time
|
||||
import sys
|
||||
|
||||
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_
|
||||
from unitree_sdk2py.utils.thread import RecurrentThread
|
||||
import unitree_legged_const as b2
|
||||
from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient
|
||||
from unitree_sdk2py.b2.sport.sport_client import SportClient
|
||||
|
||||
from unitree_sdk2py.utils.crc import CRC
|
||||
|
||||
class Custom:
|
||||
def __init__(self):
|
||||
self.Kp = 1000.0
|
||||
self.Kd = 10.0
|
||||
self.time_consume = 0
|
||||
self.rate_count = 0
|
||||
self.sin_count = 0
|
||||
self.motiontime = 0
|
||||
self.dt = 0.002
|
||||
|
||||
self.low_cmd = unitree_go_msg_dds__LowCmd_()
|
||||
self.low_state = None
|
||||
|
||||
self.targetPos_1 = [0.0, 1.36, -2.65, 0.0, 1.36, -2.65,
|
||||
-0.2, 1.36, -2.65, 0.2, 1.36, -2.65]
|
||||
|
||||
self.targetPos_2 = [0.0, 0.67, -1.3, 0.0, 0.67, -1.3,
|
||||
0.0, 0.67, -1.3, 0.0, 0.67, -1.3]
|
||||
|
||||
self.targetPos_3 = [-0.5, 1.36, -2.65, 0.5, 1.36, -2.65,
|
||||
-0.5, 1.36, -2.65, 0.5, 1.36, -2.65]
|
||||
|
||||
self.startPos = [0.0] * 12
|
||||
self.duration_1 = 500
|
||||
self.duration_2 = 900
|
||||
self.duration_3 = 1000
|
||||
self.duration_4 = 900
|
||||
self.percent_1 = 0
|
||||
self.percent_2 = 0
|
||||
self.percent_3 = 0
|
||||
self.percent_4 = 0
|
||||
|
||||
self.firstRun = True
|
||||
self.done = False
|
||||
|
||||
# thread handling
|
||||
self.lowCmdWriteThreadPtr = None
|
||||
|
||||
self.crc = CRC()
|
||||
|
||||
def Init(self):
|
||||
self.InitLowCmd()
|
||||
|
||||
# create publisher #
|
||||
self.lowcmd_publisher = ChannelPublisher("rt/lowcmd", LowCmd_)
|
||||
self.lowcmd_publisher.Init()
|
||||
|
||||
# create subscriber #
|
||||
self.lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_)
|
||||
self.lowstate_subscriber.Init(self.LowStateMessageHandler, 10)
|
||||
|
||||
self.sc = SportClient()
|
||||
self.sc.SetTimeout(5.0)
|
||||
self.sc.Init()
|
||||
|
||||
self.msc = MotionSwitcherClient()
|
||||
self.msc.SetTimeout(5.0)
|
||||
self.msc.Init()
|
||||
|
||||
status, result = self.msc.CheckMode()
|
||||
while result['name']:
|
||||
self.sc.StandDown()
|
||||
self.msc.ReleaseMode()
|
||||
status, result = self.msc.CheckMode()
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def Start(self):
|
||||
self.lowCmdWriteThreadPtr = RecurrentThread(
|
||||
interval=0.002, target=self.LowCmdWrite, name="writebasiccmd"
|
||||
)
|
||||
self.lowCmdWriteThreadPtr.Start()
|
||||
|
||||
def InitLowCmd(self):
|
||||
self.low_cmd.head[0] = 0xFE
|
||||
self.low_cmd.head[1] = 0xEF
|
||||
self.low_cmd.level_flag = 0xFF
|
||||
self.low_cmd.gpio = 0
|
||||
for i in range(20):
|
||||
self.low_cmd.motor_cmd[i].mode = 0x01
|
||||
self.low_cmd.motor_cmd[i].q= b2.PosStopF
|
||||
self.low_cmd.motor_cmd[i].kp = 0
|
||||
self.low_cmd.motor_cmd[i].dq = b2.VelStopF
|
||||
self.low_cmd.motor_cmd[i].kd = 0
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
def LowStateMessageHandler(self, msg: LowState_):
|
||||
self.low_state = msg
|
||||
|
||||
def LowCmdWrite(self):
|
||||
|
||||
if self.firstRun:
|
||||
for i in range(12):
|
||||
self.startPos[i] = self.low_state.motor_state[i].q
|
||||
self.firstRun = False
|
||||
|
||||
self.percent_1 += 1.0 / self.duration_1
|
||||
self.percent_1 = min(self.percent_1, 1)
|
||||
if self.percent_1 < 1:
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = (1 - self.percent_1) * self.startPos[i] + self.percent_1 * self.targetPos_1[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if (self.percent_1 == 1) and (self.percent_2 <= 1):
|
||||
self.percent_2 += 1.0 / self.duration_2
|
||||
self.percent_2 = min(self.percent_2, 1)
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = (1 - self.percent_2) * self.targetPos_1[i] + self.percent_2 * self.targetPos_2[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 < 1):
|
||||
self.percent_3 += 1.0 / self.duration_3
|
||||
self.percent_3 = min(self.percent_3, 1)
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = self.targetPos_2[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 == 1) and (self.percent_4 <= 1):
|
||||
self.percent_4 += 1.0 / self.duration_4
|
||||
self.percent_4 = min(self.percent_4, 1)
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = (1 - self.percent_4) * self.targetPos_2[i] + self.percent_4 * self.targetPos_3[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
self.low_cmd.crc = self.crc.Crc(self.low_cmd)
|
||||
self.lowcmd_publisher.Write(self.low_cmd)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
print("WARNING: Please ensure there are no obstacles around the robot while running this example.")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
if len(sys.argv)>1:
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
else:
|
||||
ChannelFactoryInitialize(0)
|
||||
|
||||
custom = Custom()
|
||||
custom.Init()
|
||||
custom.Start()
|
||||
|
||||
while True:
|
||||
if custom.percent_4 == 1.0:
|
||||
time.sleep(1)
|
||||
print("Done!")
|
||||
sys.exit(-1)
|
||||
time.sleep(1)
|
|
@ -0,0 +1,101 @@
|
|||
import time
|
||||
import sys
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.b2.sport.sport_client import SportClient
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class TestOption:
|
||||
name: str
|
||||
id: int
|
||||
|
||||
option_list = [
|
||||
TestOption(name="damp", id=0),
|
||||
TestOption(name="stand_up", id=1),
|
||||
TestOption(name="stand_down", id=2),
|
||||
TestOption(name="move forward", id=3),
|
||||
TestOption(name="move lateral", id=4),
|
||||
TestOption(name="move rotate", id=5),
|
||||
TestOption(name="stop_move", id=6),
|
||||
TestOption(name="switch_gait", id=7),
|
||||
TestOption(name="switch_gait", id=8),
|
||||
TestOption(name="recovery", id=9)
|
||||
]
|
||||
|
||||
class UserInterface:
|
||||
def __init__(self):
|
||||
self.test_option_ = None
|
||||
|
||||
def convert_to_int(self, input_str):
|
||||
try:
|
||||
return int(input_str)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def terminal_handle(self):
|
||||
input_str = input("Enter id or name: \n")
|
||||
|
||||
if input_str == "list":
|
||||
self.test_option_.name = None
|
||||
self.test_option_.id = None
|
||||
for option in option_list:
|
||||
print(f"name: {option.name}, id: {option.id}")
|
||||
return
|
||||
|
||||
for option in option_list:
|
||||
if input_str == option.name or self.convert_to_int(input_str) == option.id:
|
||||
self.test_option_.name = option.name
|
||||
self.test_option_.id = option.id
|
||||
print(f"Test: {self.test_option_.name}, test_id: {self.test_option_.id}")
|
||||
return
|
||||
|
||||
print("No matching test option found.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
print(f"Usage: python3 {sys.argv[0]} networkInterface")
|
||||
sys.exit(-1)
|
||||
|
||||
print("WARNING: Please ensure there are no obstacles around the robot while running this example.")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
|
||||
test_option = TestOption(name=None, id=None)
|
||||
user_interface = UserInterface()
|
||||
user_interface.test_option_ = test_option
|
||||
|
||||
sport_client = SportClient()
|
||||
sport_client.SetTimeout(10.0)
|
||||
sport_client.Init()
|
||||
|
||||
print("Input \"list\" to list all test option ...")
|
||||
|
||||
while True:
|
||||
user_interface.terminal_handle()
|
||||
|
||||
print(f"Updated Test Option: Name = {test_option.name}, ID = {test_option.id}\n")
|
||||
|
||||
if test_option.id == 0:
|
||||
sport_client.Damp()
|
||||
elif test_option.id == 1:
|
||||
sport_client.StandUp()
|
||||
elif test_option.id == 2:
|
||||
sport_client.StandDown()
|
||||
elif test_option.id == 3:
|
||||
sport_client.Move(0.3,0,0)
|
||||
elif test_option.id == 4:
|
||||
sport_client.Move(0,0.3,0)
|
||||
elif test_option.id == 5:
|
||||
sport_client.Move(0,0,0.5)
|
||||
elif test_option.id == 6:
|
||||
sport_client.StopMove()
|
||||
elif test_option.id == 7:
|
||||
sport_client.SwitchGait(0)
|
||||
elif test_option.id == 8:
|
||||
sport_client.SwitchGait(1)
|
||||
elif test_option.id == 9:
|
||||
sport_client.RecoveryStand()
|
||||
|
||||
time.sleep(1)
|
|
@ -0,0 +1,196 @@
|
|||
import time
|
||||
import sys
|
||||
|
||||
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_
|
||||
from unitree_sdk2py.utils.crc import CRC
|
||||
from unitree_sdk2py.utils.thread import RecurrentThread
|
||||
import unitree_legged_const as b2w
|
||||
from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient
|
||||
from unitree_sdk2py.b2.sport.sport_client import SportClient
|
||||
|
||||
class Custom:
|
||||
def __init__(self):
|
||||
self.Kp = 1000.0
|
||||
self.Kd = 10.0
|
||||
self.time_consume = 0
|
||||
self.rate_count = 0
|
||||
self.sin_count = 0
|
||||
self.motiontime = 0
|
||||
self.dt = 0.002
|
||||
|
||||
self.low_cmd = unitree_go_msg_dds__LowCmd_()
|
||||
self.low_state = None
|
||||
|
||||
self.targetPos_1 = [0.0, 1.36, -2.65, 0.0, 1.36, -2.65,
|
||||
-0.2, 1.36, -2.65, 0.2, 1.36, -2.65]
|
||||
|
||||
self.targetPos_2 = [0.0, 0.67, -1.3, 0.0, 0.67, -1.3,
|
||||
0.0, 0.67, -1.3, 0.0, 0.67, -1.3]
|
||||
|
||||
self.targetPos_3 = [-0.65, 1.36, -2.65, 0.65, 1.36, -2.65,
|
||||
-0.65, 1.36, -2.65, 0.65, 1.36, -2.65]
|
||||
|
||||
self.startPos = [0.0] * 12
|
||||
self.duration_1 = 800
|
||||
self.duration_2 = 800
|
||||
self.duration_3 = 2000
|
||||
self.duration_4 = 1500
|
||||
self.percent_1 = 0
|
||||
self.percent_2 = 0
|
||||
self.percent_3 = 0
|
||||
self.percent_4 = 0
|
||||
|
||||
self.firstRun = True
|
||||
self.done = False
|
||||
|
||||
# thread handling
|
||||
self.lowCmdWriteThreadPtr = None
|
||||
|
||||
self.crc = CRC()
|
||||
|
||||
def Init(self):
|
||||
self.InitLowCmd()
|
||||
|
||||
# create publisher #
|
||||
self.lowcmd_publisher = ChannelPublisher("rt/lowcmd", LowCmd_)
|
||||
self.lowcmd_publisher.Init()
|
||||
|
||||
# create subscriber #
|
||||
self.lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_)
|
||||
self.lowstate_subscriber.Init(self.LowStateMessageHandler, 10)
|
||||
|
||||
self.sc = SportClient()
|
||||
self.sc.SetTimeout(5.0)
|
||||
self.sc.Init()
|
||||
|
||||
self.msc = MotionSwitcherClient()
|
||||
self.msc.SetTimeout(5.0)
|
||||
self.msc.Init()
|
||||
|
||||
status, result = self.msc.CheckMode()
|
||||
while result['name']:
|
||||
self.sc.StandDown()
|
||||
self.msc.ReleaseMode()
|
||||
status, result = self.msc.CheckMode()
|
||||
time.sleep(1)
|
||||
|
||||
def Start(self):
|
||||
self.lowCmdWriteThreadPtr = RecurrentThread(
|
||||
name="writebasiccmd", interval=self.dt, target=self.LowCmdWrite,
|
||||
)
|
||||
self.lowCmdWriteThreadPtr.Start()
|
||||
|
||||
def InitLowCmd(self):
|
||||
self.low_cmd.head[0] = 0xFE
|
||||
self.low_cmd.head[1] = 0xEF
|
||||
self.low_cmd.level_flag = 0xFF
|
||||
self.low_cmd.gpio = 0
|
||||
for i in range(20):
|
||||
self.low_cmd.motor_cmd[i].mode = 0x01 # (PMSM) mode
|
||||
self.low_cmd.motor_cmd[i].q = b2w.PosStopF
|
||||
self.low_cmd.motor_cmd[i].kp = 0
|
||||
self.low_cmd.motor_cmd[i].dq = b2w.VelStopF
|
||||
self.low_cmd.motor_cmd[i].kd = 0
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
def LowStateMessageHandler(self, msg: LowState_):
|
||||
self.low_state = msg
|
||||
|
||||
def LowCmdWrite(self):
|
||||
if self.firstRun:
|
||||
for i in range(12):
|
||||
self.startPos[i] = self.low_state.motor_state[i].q
|
||||
self.firstRun = False
|
||||
|
||||
self.percent_1 += 1.0 / self.duration_1
|
||||
self.percent_1 = min(self.percent_1, 1)
|
||||
if self.percent_1 < 1:
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = (1 - self.percent_1) * self.startPos[i] + self.percent_1 * self.targetPos_1[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if (self.percent_1 == 1) and (self.percent_2 <= 1):
|
||||
self.percent_2 += 1.0 / self.duration_2
|
||||
self.percent_2 = min(self.percent_2, 1)
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = (1 - self.percent_2) * self.targetPos_1[i] + self.percent_2 * self.targetPos_2[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 < 1):
|
||||
self.percent_3 += 1.0 / self.duration_3
|
||||
self.percent_3 = min(self.percent_3, 1)
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = self.targetPos_2[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if self.percent_3 < 0.4:
|
||||
for i in range(12, 16):
|
||||
self.low_cmd.motor_cmd[i].q = 0
|
||||
self.low_cmd.motor_cmd[i].kp = 0
|
||||
self.low_cmd.motor_cmd[i].dq = 3
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if 0.4 <= self.percent_3 < 0.8:
|
||||
for i in range(12, 16):
|
||||
self.low_cmd.motor_cmd[i].q = 0
|
||||
self.low_cmd.motor_cmd[i].kp = 0
|
||||
self.low_cmd.motor_cmd[i].dq = -3
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if self.percent_3 >= 0.8:
|
||||
for i in range(12, 16):
|
||||
self.low_cmd.motor_cmd[i].q = 0
|
||||
self.low_cmd.motor_cmd[i].kp = 0
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 == 1) and (self.percent_4 <= 1):
|
||||
self.percent_4 += 1.0 / self.duration_4
|
||||
self.percent_4 = min(self.percent_4, 1)
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = (1 - self.percent_4) * self.targetPos_2[i] + self.percent_4 * self.targetPos_3[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
self.low_cmd.crc = self.crc.Crc(self.low_cmd)
|
||||
self.lowcmd_publisher.Write(self.low_cmd)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
print("WARNING: Please ensure there are no obstacles around the robot while running this example.")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
if len(sys.argv)>1:
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
else:
|
||||
ChannelFactoryInitialize(0)
|
||||
|
||||
custom = Custom()
|
||||
custom.Init()
|
||||
custom.Start()
|
||||
|
||||
while True:
|
||||
if custom.percent_4 == 1.0:
|
||||
time.sleep(1)
|
||||
print("Done!")
|
||||
sys.exit(-1)
|
||||
time.sleep(1)
|
|
@ -0,0 +1,24 @@
|
|||
LegID = {
|
||||
"FR_0": 0, # Front right hip
|
||||
"FR_1": 1, # Front right thigh
|
||||
"FR_2": 2, # Front right calf
|
||||
"FL_0": 3,
|
||||
"FL_1": 4,
|
||||
"FL_2": 5,
|
||||
"RR_0": 6,
|
||||
"RR_1": 7,
|
||||
"RR_2": 8,
|
||||
"RL_0": 9,
|
||||
"RL_1": 10,
|
||||
"RL_2": 11,
|
||||
"FR_w": 12, # Front right wheel
|
||||
"FL_w": 13, # Front left wheel
|
||||
"RR_w": 14, # Rear right wheel
|
||||
"RL_w": 15, # Rear left wheel
|
||||
}
|
||||
|
||||
HIGHLEVEL = 0xEE
|
||||
LOWLEVEL = 0xFF
|
||||
TRIGERLEVEL = 0xF0
|
||||
PosStopF = 2.146e9
|
||||
VelStopF = 16000.0
|
|
@ -0,0 +1,99 @@
|
|||
import time
|
||||
import sys
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
|
||||
from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class TestOption:
|
||||
name: str
|
||||
id: int
|
||||
|
||||
option_list = [
|
||||
TestOption(name="damp", id=0),
|
||||
TestOption(name="stand_up", id=1),
|
||||
TestOption(name="sit", id=2),
|
||||
TestOption(name="move forward", id=3),
|
||||
TestOption(name="move lateral", id=4),
|
||||
TestOption(name="move rotate", id=5),
|
||||
TestOption(name="low stand", id=6),
|
||||
TestOption(name="high stand", id=7),
|
||||
TestOption(name="zero torque", id=8)
|
||||
]
|
||||
|
||||
class UserInterface:
|
||||
def __init__(self):
|
||||
self.test_option_ = None
|
||||
|
||||
def convert_to_int(self, input_str):
|
||||
try:
|
||||
return int(input_str)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def terminal_handle(self):
|
||||
input_str = input("Enter id or name: \n")
|
||||
|
||||
if input_str == "list":
|
||||
self.test_option_.name = None
|
||||
self.test_option_.id = None
|
||||
for option in option_list:
|
||||
print(f"{option.name}, id: {option.id}")
|
||||
return
|
||||
|
||||
for option in option_list:
|
||||
if input_str == option.name or self.convert_to_int(input_str) == option.id:
|
||||
self.test_option_.name = option.name
|
||||
self.test_option_.id = option.id
|
||||
print(f"Test: {self.test_option_.name}, test_id: {self.test_option_.id}")
|
||||
return
|
||||
|
||||
print("No matching test option found.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
print(f"Usage: python3 {sys.argv[0]} networkInterface")
|
||||
sys.exit(-1)
|
||||
|
||||
print("WARNING: Please ensure there are no obstacles around the robot while running this example.")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
|
||||
test_option = TestOption(name=None, id=None)
|
||||
user_interface = UserInterface()
|
||||
user_interface.test_option_ = test_option
|
||||
|
||||
sport_client = LocoClient()
|
||||
sport_client.SetTimeout(10.0)
|
||||
sport_client.Init()
|
||||
|
||||
print("Input \"list\" to list all test option ...")
|
||||
while True:
|
||||
user_interface.terminal_handle()
|
||||
|
||||
print(f"Updated Test Option: Name = {test_option.name}, ID = {test_option.id}")
|
||||
|
||||
if test_option.id == 0:
|
||||
sport_client.Damp()
|
||||
elif test_option.id == 1:
|
||||
sport_client.StandUp()
|
||||
elif test_option.id == 2:
|
||||
sport_client.Sit()
|
||||
elif test_option.id == 3:
|
||||
sport_client.Move(0.3,0,0)
|
||||
elif test_option.id == 4:
|
||||
sport_client.Move(0,0.3,0)
|
||||
elif test_option.id == 5:
|
||||
sport_client.Move(0,0,0.3)
|
||||
elif test_option.id == 6:
|
||||
sport_client.LowStand()
|
||||
elif test_option.id == 7:
|
||||
sport_client.HighStand()
|
||||
elif test_option.id == 8:
|
||||
sport_client.ZeroTorque()
|
||||
|
||||
time.sleep(1)
|
|
@ -0,0 +1,198 @@
|
|||
import time
|
||||
import sys
|
||||
|
||||
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
|
||||
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowState_
|
||||
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_
|
||||
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowState_
|
||||
from unitree_sdk2py.utils.crc import CRC
|
||||
from unitree_sdk2py.utils.thread import RecurrentThread
|
||||
from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient
|
||||
|
||||
import numpy as np
|
||||
|
||||
G1_NUM_MOTOR = 29
|
||||
|
||||
Kp = [
|
||||
60, 60, 60, 100, 40, 40, # legs
|
||||
60, 60, 60, 100, 40, 40, # legs
|
||||
60, 40, 40, # waist
|
||||
40, 40, 40, 40, 40, 40, 40, # arms
|
||||
40, 40, 40, 40, 40, 40, 40 # arms
|
||||
]
|
||||
|
||||
Kd = [
|
||||
1, 1, 1, 2, 1, 1, # legs
|
||||
1, 1, 1, 2, 1, 1, # legs
|
||||
1, 1, 1, # waist
|
||||
1, 1, 1, 1, 1, 1, 1, # arms
|
||||
1, 1, 1, 1, 1, 1, 1 # arms
|
||||
]
|
||||
|
||||
class G1JointIndex:
|
||||
LeftHipPitch = 0
|
||||
LeftHipRoll = 1
|
||||
LeftHipYaw = 2
|
||||
LeftKnee = 3
|
||||
LeftAnklePitch = 4
|
||||
LeftAnkleB = 4
|
||||
LeftAnkleRoll = 5
|
||||
LeftAnkleA = 5
|
||||
RightHipPitch = 6
|
||||
RightHipRoll = 7
|
||||
RightHipYaw = 8
|
||||
RightKnee = 9
|
||||
RightAnklePitch = 10
|
||||
RightAnkleB = 10
|
||||
RightAnkleRoll = 11
|
||||
RightAnkleA = 11
|
||||
WaistYaw = 12
|
||||
WaistRoll = 13 # NOTE: INVALID for g1 23dof/29dof with waist locked
|
||||
WaistA = 13 # NOTE: INVALID for g1 23dof/29dof with waist locked
|
||||
WaistPitch = 14 # NOTE: INVALID for g1 23dof/29dof with waist locked
|
||||
WaistB = 14 # NOTE: INVALID for g1 23dof/29dof with waist locked
|
||||
LeftShoulderPitch = 15
|
||||
LeftShoulderRoll = 16
|
||||
LeftShoulderYaw = 17
|
||||
LeftElbow = 18
|
||||
LeftWristRoll = 19
|
||||
LeftWristPitch = 20 # NOTE: INVALID for g1 23dof
|
||||
LeftWristYaw = 21 # NOTE: INVALID for g1 23dof
|
||||
RightShoulderPitch = 22
|
||||
RightShoulderRoll = 23
|
||||
RightShoulderYaw = 24
|
||||
RightElbow = 25
|
||||
RightWristRoll = 26
|
||||
RightWristPitch = 27 # NOTE: INVALID for g1 23dof
|
||||
RightWristYaw = 28 # NOTE: INVALID for g1 23dof
|
||||
|
||||
|
||||
class Mode:
|
||||
PR = 0 # Series Control for Pitch/Roll Joints
|
||||
AB = 1 # Parallel Control for A/B Joints
|
||||
|
||||
class Custom:
|
||||
def __init__(self):
|
||||
self.time_ = 0.0
|
||||
self.control_dt_ = 0.002 # [2ms]
|
||||
self.duration_ = 3.0 # [3 s]
|
||||
self.counter_ = 0
|
||||
self.mode_pr_ = Mode.PR
|
||||
self.mode_machine_ = 0
|
||||
self.low_cmd = unitree_hg_msg_dds__LowCmd_()
|
||||
self.low_state = None
|
||||
self.update_mode_machine_ = False
|
||||
self.crc = CRC()
|
||||
|
||||
def Init(self):
|
||||
self.msc = MotionSwitcherClient()
|
||||
self.msc.SetTimeout(5.0)
|
||||
self.msc.Init()
|
||||
|
||||
status, result = self.msc.CheckMode()
|
||||
while result['name']:
|
||||
self.msc.ReleaseMode()
|
||||
status, result = self.msc.CheckMode()
|
||||
time.sleep(1)
|
||||
|
||||
# create publisher #
|
||||
self.lowcmd_publisher_ = ChannelPublisher("rt/lowcmd", LowCmd_)
|
||||
self.lowcmd_publisher_.Init()
|
||||
|
||||
# create subscriber #
|
||||
self.__lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_)
|
||||
self.__lowstate_subscriber.Init(self.LowStateHandler, 10)
|
||||
|
||||
def Start(self):
|
||||
self.lowCmdWriteThreadPtr = RecurrentThread(
|
||||
interval=self.control_dt_, target=self.__LowCmdWrite, name="control"
|
||||
)
|
||||
while self.update_mode_machine_ == False:
|
||||
time.sleep(1)
|
||||
|
||||
if self.update_mode_machine_ == True:
|
||||
self.lowCmdWriteThreadPtr.Start()
|
||||
|
||||
def LowStateHandler(self, msg: LowState_):
|
||||
self.low_state = msg
|
||||
|
||||
if self.update_mode_machine_ == False:
|
||||
self.mode_machine_ = self.low_state.mode_machine
|
||||
self.update_mode_machine_ = True
|
||||
|
||||
self.counter_ +=1
|
||||
if (self.counter_ % 500 == 0) :
|
||||
self.counter_ = 0
|
||||
print(self.low_state.imu_state.rpy)
|
||||
|
||||
def __LowCmdWrite(self):
|
||||
self.time_ += self.control_dt_
|
||||
|
||||
if self.time_ < self.duration_ :
|
||||
# [Stage 1]: set robot to zero posture
|
||||
for i in range(G1_NUM_MOTOR):
|
||||
ratio = np.clip(self.time_ / self.duration_, 0.0, 1.0)
|
||||
self.low_cmd.mode_pr = Mode.PR
|
||||
self.low_cmd.mode_machine = self.mode_machine_
|
||||
self.low_cmd.motor_cmd[i].mode = 1 # 1:Enable, 0:Disable
|
||||
self.low_cmd.motor_cmd[i].tau = 0.
|
||||
self.low_cmd.motor_cmd[i].q = (1.0 - ratio) * self.low_state.motor_state[i].q
|
||||
self.low_cmd.motor_cmd[i].dq = 0.
|
||||
self.low_cmd.motor_cmd[i].kp = Kp[i]
|
||||
self.low_cmd.motor_cmd[i].kd = Kd[i]
|
||||
|
||||
elif self.time_ < self.duration_ * 2 :
|
||||
# [Stage 2]: swing ankle using PR mode
|
||||
max_P = np.pi * 30.0 / 180.0
|
||||
max_R = np.pi * 10.0 / 180.0
|
||||
t = self.time_ - self.duration_
|
||||
L_P_des = max_P * np.sin(2.0 * np.pi * t)
|
||||
L_R_des = max_R * np.sin(2.0 * np.pi * t)
|
||||
R_P_des = max_P * np.sin(2.0 * np.pi * t)
|
||||
R_R_des = -max_R * np.sin(2.0 * np.pi * t)
|
||||
|
||||
self.low_cmd.mode_pr = Mode.PR
|
||||
self.low_cmd.mode_machine = self.mode_machine_
|
||||
self.low_cmd.motor_cmd[G1JointIndex.LeftAnklePitch].q = L_P_des
|
||||
self.low_cmd.motor_cmd[G1JointIndex.LeftAnkleRoll].q = L_R_des
|
||||
self.low_cmd.motor_cmd[G1JointIndex.RightAnklePitch].q = R_P_des
|
||||
self.low_cmd.motor_cmd[G1JointIndex.RightAnkleRoll].q = R_R_des
|
||||
|
||||
else :
|
||||
# [Stage 3]: swing ankle using AB mode
|
||||
max_A = np.pi * 30.0 / 180.0
|
||||
max_B = np.pi * 10.0 / 180.0
|
||||
t = self.time_ - self.duration_ * 2
|
||||
L_A_des = max_A * np.sin(2.0 * np.pi * t)
|
||||
L_B_des = max_B * np.sin(2.0 * np.pi * t + np.pi)
|
||||
R_A_des = -max_A * np.sin(2.0 * np.pi * t)
|
||||
R_B_des = -max_B * np.sin(2.0 * np.pi * t + np.pi)
|
||||
|
||||
self.low_cmd.mode_pr = Mode.AB
|
||||
self.low_cmd.mode_machine = self.mode_machine_
|
||||
self.low_cmd.motor_cmd[G1JointIndex.LeftAnkleA].q = L_A_des
|
||||
self.low_cmd.motor_cmd[G1JointIndex.LeftAnkleB].q = L_B_des
|
||||
self.low_cmd.motor_cmd[G1JointIndex.RightAnkleA].q = R_A_des
|
||||
self.low_cmd.motor_cmd[G1JointIndex.RightAnkleB].q = R_B_des
|
||||
|
||||
self.low_cmd.crc = self.crc.Crc(self.low_cmd)
|
||||
self.lowcmd_publisher_.Write(self.low_cmd)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
print("WARNING: Please ensure there are no obstacles around the robot while running this example.")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
if len(sys.argv)>1:
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
else:
|
||||
ChannelFactoryInitialize(0)
|
||||
|
||||
custom = Custom()
|
||||
custom.Init()
|
||||
custom.Start()
|
||||
|
||||
while True:
|
||||
time.sleep(1)
|
|
@ -0,0 +1,109 @@
|
|||
import time
|
||||
import sys
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
|
||||
from unitree_sdk2py.go2.sport.sport_client import (
|
||||
SportClient,
|
||||
PathPoint,
|
||||
SPORT_PATH_POINT_SIZE,
|
||||
)
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class TestOption:
|
||||
name: str
|
||||
id: int
|
||||
|
||||
option_list = [
|
||||
TestOption(name="damp", id=0),
|
||||
TestOption(name="stand_up", id=1),
|
||||
TestOption(name="stand_down", id=2),
|
||||
TestOption(name="move forward", id=3),
|
||||
TestOption(name="move lateral", id=4),
|
||||
TestOption(name="move rotate", id=5),
|
||||
TestOption(name="stop_move", id=6),
|
||||
TestOption(name="switch_gait", id=7),
|
||||
TestOption(name="switch_gait", id=8),
|
||||
TestOption(name="balanced stand", id=9),
|
||||
TestOption(name="recovery", id=10)
|
||||
]
|
||||
|
||||
class UserInterface:
|
||||
def __init__(self):
|
||||
self.test_option_ = None
|
||||
|
||||
def convert_to_int(self, input_str):
|
||||
try:
|
||||
return int(input_str)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def terminal_handle(self):
|
||||
input_str = input("Enter id or name: \n")
|
||||
|
||||
if input_str == "list":
|
||||
self.test_option_.name = None
|
||||
self.test_option_.id = None
|
||||
for option in option_list:
|
||||
print(f"{option.name}, id: {option.id}")
|
||||
return
|
||||
|
||||
for option in option_list:
|
||||
if input_str == option.name or self.convert_to_int(input_str) == option.id:
|
||||
self.test_option_.name = option.name
|
||||
self.test_option_.id = option.id
|
||||
print(f"Test: {self.test_option_.name}, test_id: {self.test_option_.id}")
|
||||
return
|
||||
|
||||
print("No matching test option found.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print(f"Usage: python3 {sys.argv[0]} networkInterface")
|
||||
sys.exit(-1)
|
||||
|
||||
print("WARNING: Please ensure there are no obstacles around the robot while running this example.")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
|
||||
test_option = TestOption(name=None, id=None)
|
||||
user_interface = UserInterface()
|
||||
user_interface.test_option_ = test_option
|
||||
|
||||
sport_client = SportClient()
|
||||
sport_client.SetTimeout(10.0)
|
||||
sport_client.Init()
|
||||
while True:
|
||||
|
||||
user_interface.terminal_handle()
|
||||
|
||||
print(f"Updated Test Option: Name = {test_option.name}, ID = {test_option.id}\n")
|
||||
|
||||
if test_option.id == 0:
|
||||
sport_client.Damp()
|
||||
elif test_option.id == 1:
|
||||
sport_client.StandUp()
|
||||
elif test_option.id == 2:
|
||||
sport_client.StandDown()
|
||||
elif test_option.id == 3:
|
||||
sport_client.Move(0.3,0,0)
|
||||
elif test_option.id == 4:
|
||||
sport_client.Move(0,0.3,0)
|
||||
elif test_option.id == 5:
|
||||
sport_client.Move(0,0,0.5)
|
||||
elif test_option.id == 6:
|
||||
sport_client.StopMove()
|
||||
elif test_option.id == 7:
|
||||
sport_client.SwitchGait(0)
|
||||
elif test_option.id == 8:
|
||||
sport_client.SwitchGait(1)
|
||||
elif test_option.id == 9:
|
||||
sport_client.BalanceStand()
|
||||
elif test_option.id == 10:
|
||||
sport_client.RecoveryStand()
|
||||
|
||||
time.sleep(1)
|
|
@ -0,0 +1,176 @@
|
|||
import time
|
||||
import sys
|
||||
|
||||
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_
|
||||
from unitree_sdk2py.utils.crc import CRC
|
||||
from unitree_sdk2py.utils.thread import RecurrentThread
|
||||
import unitree_legged_const as go2
|
||||
from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient
|
||||
from unitree_sdk2py.go2.sport.sport_client import SportClient
|
||||
|
||||
class Custom:
|
||||
def __init__(self):
|
||||
self.Kp = 60.0
|
||||
self.Kd = 5.0
|
||||
self.time_consume = 0
|
||||
self.rate_count = 0
|
||||
self.sin_count = 0
|
||||
self.motiontime = 0
|
||||
self.dt = 0.002 # 0.001~0.01
|
||||
|
||||
self.low_cmd = unitree_go_msg_dds__LowCmd_()
|
||||
self.low_state = None
|
||||
|
||||
self._targetPos_1 = [0.0, 1.36, -2.65, 0.0, 1.36, -2.65,
|
||||
-0.2, 1.36, -2.65, 0.2, 1.36, -2.65]
|
||||
self._targetPos_2 = [0.0, 0.67, -1.3, 0.0, 0.67, -1.3,
|
||||
0.0, 0.67, -1.3, 0.0, 0.67, -1.3]
|
||||
self._targetPos_3 = [-0.35, 1.36, -2.65, 0.35, 1.36, -2.65,
|
||||
-0.5, 1.36, -2.65, 0.5, 1.36, -2.65]
|
||||
|
||||
self.startPos = [0.0] * 12
|
||||
self.duration_1 = 500
|
||||
self.duration_2 = 500
|
||||
self.duration_3 = 1000
|
||||
self.duration_4 = 900
|
||||
self.percent_1 = 0
|
||||
self.percent_2 = 0
|
||||
self.percent_3 = 0
|
||||
self.percent_4 = 0
|
||||
|
||||
self.firstRun = True
|
||||
self.done = False
|
||||
|
||||
# thread handling
|
||||
self.lowCmdWriteThreadPtr = None
|
||||
|
||||
self.crc = CRC()
|
||||
|
||||
# Public methods
|
||||
def Init(self):
|
||||
self.InitLowCmd()
|
||||
|
||||
# create publisher #
|
||||
self.lowcmd_publisher = ChannelPublisher("rt/lowcmd", LowCmd_)
|
||||
self.lowcmd_publisher.Init()
|
||||
|
||||
# create subscriber #
|
||||
self.lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_)
|
||||
self.lowstate_subscriber.Init(self.LowStateMessageHandler, 10)
|
||||
|
||||
self.sc = SportClient()
|
||||
self.sc.SetTimeout(5.0)
|
||||
self.sc.Init()
|
||||
|
||||
self.msc = MotionSwitcherClient()
|
||||
self.msc.SetTimeout(5.0)
|
||||
self.msc.Init()
|
||||
|
||||
status, result = self.msc.CheckMode()
|
||||
while result['name']:
|
||||
self.sc.StandDown()
|
||||
self.msc.ReleaseMode()
|
||||
status, result = self.msc.CheckMode()
|
||||
time.sleep(1)
|
||||
|
||||
def Start(self):
|
||||
self.lowCmdWriteThreadPtr = RecurrentThread(
|
||||
interval=0.002, target=self.LowCmdWrite, name="writebasiccmd"
|
||||
)
|
||||
self.lowCmdWriteThreadPtr.Start()
|
||||
|
||||
# Private methods
|
||||
def InitLowCmd(self):
|
||||
self.low_cmd.head[0]=0xFE
|
||||
self.low_cmd.head[1]=0xEF
|
||||
self.low_cmd.level_flag = 0xFF
|
||||
self.low_cmd.gpio = 0
|
||||
for i in range(20):
|
||||
self.low_cmd.motor_cmd[i].mode = 0x01 # (PMSM) mode
|
||||
self.low_cmd.motor_cmd[i].q= go2.PosStopF
|
||||
self.low_cmd.motor_cmd[i].kp = 0
|
||||
self.low_cmd.motor_cmd[i].dq = go2.VelStopF
|
||||
self.low_cmd.motor_cmd[i].kd = 0
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
def LowStateMessageHandler(self, msg: LowState_):
|
||||
self.low_state = msg
|
||||
# print("FR_0 motor state: ", msg.motor_state[go2.LegID["FR_0"]])
|
||||
# print("IMU state: ", msg.imu_state)
|
||||
# print("Battery state: voltage: ", msg.power_v, "current: ", msg.power_a)
|
||||
|
||||
def LowCmdWrite(self):
|
||||
|
||||
if self.firstRun:
|
||||
for i in range(12):
|
||||
self.startPos[i] = self.low_state.motor_state[i].q
|
||||
self.firstRun = False
|
||||
|
||||
self.percent_1 += 1.0 / self.duration_1
|
||||
self.percent_1 = min(self.percent_1, 1)
|
||||
if self.percent_1 < 1:
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = (1 - self.percent_1) * self.startPos[i] + self.percent_1 * self._targetPos_1[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if (self.percent_1 == 1) and (self.percent_2 <= 1):
|
||||
self.percent_2 += 1.0 / self.duration_2
|
||||
self.percent_2 = min(self.percent_2, 1)
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = (1 - self.percent_2) * self._targetPos_1[i] + self.percent_2 * self._targetPos_2[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 < 1):
|
||||
self.percent_3 += 1.0 / self.duration_3
|
||||
self.percent_3 = min(self.percent_3, 1)
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = self._targetPos_2[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 == 1) and (self.percent_4 <= 1):
|
||||
self.percent_4 += 1.0 / self.duration_4
|
||||
self.percent_4 = min(self.percent_4, 1)
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = (1 - self.percent_4) * self._targetPos_2[i] + self.percent_4 * self._targetPos_3[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
self.low_cmd.crc = self.crc.Crc(self.low_cmd)
|
||||
self.lowcmd_publisher.Write(self.low_cmd)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
print("WARNING: Please ensure there are no obstacles around the robot while running this example.")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
if len(sys.argv)>1:
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
else:
|
||||
ChannelFactoryInitialize(0)
|
||||
|
||||
custom = Custom()
|
||||
custom.Init()
|
||||
custom.Start()
|
||||
|
||||
while True:
|
||||
if custom.percent_4 == 1.0:
|
||||
time.sleep(1)
|
||||
print("Done!")
|
||||
sys.exit(-1)
|
||||
time.sleep(1)
|
|
@ -0,0 +1,20 @@
|
|||
LegID = {
|
||||
"FR_0": 0, # Front right hip
|
||||
"FR_1": 1, # Front right thigh
|
||||
"FR_2": 2, # Front right calf
|
||||
"FL_0": 3,
|
||||
"FL_1": 4,
|
||||
"FL_2": 5,
|
||||
"RR_0": 6,
|
||||
"RR_1": 7,
|
||||
"RR_2": 8,
|
||||
"RL_0": 9,
|
||||
"RL_1": 10,
|
||||
"RL_2": 11,
|
||||
}
|
||||
|
||||
HIGHLEVEL = 0xEE
|
||||
LOWLEVEL = 0xFF
|
||||
TRIGERLEVEL = 0xF0
|
||||
PosStopF = 2.146e9
|
||||
VelStopF = 16000.0
|
|
@ -0,0 +1,99 @@
|
|||
import time
|
||||
import sys
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
|
||||
from unitree_sdk2py.go2.sport.sport_client import SportClient
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class TestOption:
|
||||
name: str
|
||||
id: int
|
||||
|
||||
option_list = [
|
||||
TestOption(name="damp", id=0),
|
||||
TestOption(name="stand_up", id=1),
|
||||
TestOption(name="stand_down", id=2),
|
||||
TestOption(name="move", id=3),
|
||||
TestOption(name="stop_move", id=4),
|
||||
TestOption(name="speed_level", id=5),
|
||||
TestOption(name="switch_gait", id=6),
|
||||
TestOption(name="get_state", id=7),
|
||||
TestOption(name="recovery", id=8),
|
||||
TestOption(name="balance", id=9)
|
||||
]
|
||||
|
||||
class UserInterface:
|
||||
def __init__(self):
|
||||
self.test_option_ = None
|
||||
|
||||
def convert_to_int(self, input_str):
|
||||
try:
|
||||
return int(input_str)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def terminal_handle(self):
|
||||
input_str = input("Enter id or name: \n")
|
||||
|
||||
if input_str == "list":
|
||||
self.test_option_.name = None
|
||||
self.test_option_.id = None
|
||||
for option in option_list:
|
||||
print(f"{option.name}, id: {option.id}")
|
||||
return
|
||||
|
||||
for option in option_list:
|
||||
if input_str == option.name or self.convert_to_int(input_str) == option.id:
|
||||
self.test_option_.name = option.name
|
||||
self.test_option_.id = option.id
|
||||
print(f"Test: {self.test_option_.name}, test_id: {self.test_option_.id}")
|
||||
return
|
||||
|
||||
print("No matching test option found.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
print(f"Usage: python3 {sys.argv[0]} networkInterface")
|
||||
sys.exit(-1)
|
||||
|
||||
print("WARNING: Please ensure there are no obstacles around the robot while running this example.")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
|
||||
test_option = TestOption(name=None, id=None)
|
||||
user_interface = UserInterface()
|
||||
user_interface.test_option_ = test_option
|
||||
|
||||
sport_client = SportClient()
|
||||
sport_client.SetTimeout(10.0)
|
||||
sport_client.Init()
|
||||
|
||||
while True:
|
||||
user_interface.terminal_handle()
|
||||
|
||||
print(f"Updated Test Option: Name = {test_option.name}, ID = {test_option.id}\n")
|
||||
|
||||
if test_option.id == 0:
|
||||
sport_client.Damp()
|
||||
elif test_option.id == 1:
|
||||
sport_client.StandUp()
|
||||
elif test_option.id == 2:
|
||||
sport_client.StandDown()
|
||||
elif test_option.id == 3:
|
||||
sport_client.Move(0.5,0,0)
|
||||
elif test_option.id == 4:
|
||||
sport_client.StopMove()
|
||||
elif test_option.id == 5:
|
||||
sport_client.SpeedLevel(1)
|
||||
elif test_option.id == 6:
|
||||
sport_client.SwitchGait(1)
|
||||
elif test_option.id == 8:
|
||||
sport_client.RecoveryStand()
|
||||
elif test_option.id == 9:
|
||||
sport_client.BalanceStand()
|
||||
|
||||
time.sleep(1)
|
|
@ -0,0 +1,196 @@
|
|||
import time
|
||||
import sys
|
||||
|
||||
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_
|
||||
from unitree_sdk2py.utils.crc import CRC
|
||||
from unitree_sdk2py.utils.thread import RecurrentThread
|
||||
import unitree_legged_const as go2w
|
||||
from unitree_sdk2py.go2.robot_state.robot_state_client import RobotStateClient
|
||||
from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient
|
||||
from unitree_sdk2py.go2.sport.sport_client import SportClient
|
||||
|
||||
class Custom:
|
||||
def __init__(self):
|
||||
self.Kp = 70.0
|
||||
self.Kd = 5.0
|
||||
self.time_consume = 0
|
||||
self.rate_count = 0
|
||||
self.sin_count = 0
|
||||
self.motiontime = 0
|
||||
self.dt = 0.002
|
||||
|
||||
self.low_cmd = unitree_go_msg_dds__LowCmd_()
|
||||
self.low_state = None
|
||||
|
||||
self.targetPos_1 = [0.0, 1.36, -2.65, 0.0, 1.36, -2.65,
|
||||
-0.2, 1.36, -2.65, 0.2, 1.36, -2.65]
|
||||
|
||||
self.targetPos_2 = [0.0, 0.67, -1.3, 0.0, 0.67, -1.3,
|
||||
0.0, 0.67, -1.3, 0.0, 0.67, -1.3]
|
||||
|
||||
self.targetPos_3 = [-0.35, 1.36, -2.65, 0.35, 1.36, -2.65,
|
||||
-0.5, 1.36, -2.65, 0.5, 1.36, -2.65]
|
||||
|
||||
self.startPos = [0.0] * 12
|
||||
self.duration_1 = 500
|
||||
self.duration_2 = 500
|
||||
self.duration_3 = 2000
|
||||
self.duration_4 = 900
|
||||
self.percent_1 = 0
|
||||
self.percent_2 = 0
|
||||
self.percent_3 = 0
|
||||
self.percent_4 = 0
|
||||
|
||||
self.firstRun = True
|
||||
self.done = False
|
||||
|
||||
# thread handling
|
||||
self.lowCmdWriteThreadPtr = None
|
||||
|
||||
self.crc = CRC()
|
||||
|
||||
# Public methods
|
||||
def Init(self):
|
||||
self.InitLowCmd()
|
||||
|
||||
# create publisher #
|
||||
self.lowcmd_publisher = ChannelPublisher("rt/lowcmd", LowCmd_)
|
||||
self.lowcmd_publisher.Init()
|
||||
|
||||
# create subscriber #
|
||||
self.lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_)
|
||||
self.lowstate_subscriber.Init(self.LowStateMessageHandler, 10)
|
||||
|
||||
self.sc = SportClient()
|
||||
self.sc.SetTimeout(5.0)
|
||||
self.sc.Init()
|
||||
|
||||
self.msc = MotionSwitcherClient()
|
||||
self.msc.SetTimeout(5.0)
|
||||
self.msc.Init()
|
||||
|
||||
status, result = self.msc.CheckMode()
|
||||
while result['name']:
|
||||
self.sc.StandUp()
|
||||
self.sc.StandDown()
|
||||
self.msc.ReleaseMode()
|
||||
status, result = self.msc.CheckMode()
|
||||
time.sleep(1)
|
||||
|
||||
def Start(self):
|
||||
self.lowCmdWriteThreadPtr = RecurrentThread(
|
||||
name="writebasiccmd", interval=0.002, target=self.LowCmdWrite,
|
||||
)
|
||||
self.lowCmdWriteThreadPtr.Start()
|
||||
|
||||
def InitLowCmd(self):
|
||||
self.low_cmd.head[0] = 0xFE
|
||||
self.low_cmd.head[1] = 0xEF
|
||||
self.low_cmd.level_flag = 0xFF
|
||||
self.low_cmd.gpio = 0
|
||||
for i in range(20):
|
||||
self.low_cmd.motor_cmd[i].mode = 0x01
|
||||
self.low_cmd.motor_cmd[i].q= go2w.PosStopF
|
||||
self.low_cmd.motor_cmd[i].kp = 0
|
||||
self.low_cmd.motor_cmd[i].dq = go2w.VelStopF
|
||||
self.low_cmd.motor_cmd[i].kd = 0
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
def LowStateMessageHandler(self, msg: LowState_):
|
||||
self.low_state = msg
|
||||
|
||||
def LowCmdWrite(self):
|
||||
if self.firstRun:
|
||||
for i in range(12):
|
||||
self.startPos[i] = self.low_state.motor_state[i].q
|
||||
self.firstRun = False
|
||||
|
||||
self.percent_1 += 1.0 / self.duration_1
|
||||
self.percent_1 = min(self.percent_1, 1)
|
||||
if self.percent_1 < 1:
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = (1 - self.percent_1) * self.startPos[i] + self.percent_1 * self.targetPos_1[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if (self.percent_1 == 1) and (self.percent_2 <= 1):
|
||||
self.percent_2 += 1.0 / self.duration_2
|
||||
self.percent_2 = min(self.percent_2, 1)
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = (1 - self.percent_2) * self.targetPos_1[i] + self.percent_2 * self.targetPos_2[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 < 1):
|
||||
self.percent_3 += 1.0 / self.duration_3
|
||||
self.percent_3 = min(self.percent_3, 1)
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = self.targetPos_2[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if self.percent_3 < 0.4:
|
||||
for i in range(12, 16):
|
||||
self.low_cmd.motor_cmd[i].q = 0
|
||||
self.low_cmd.motor_cmd[i].kp = 0.0
|
||||
self.low_cmd.motor_cmd[i].dq = 3
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if 0.4 <= self.percent_3 < 0.8:
|
||||
for i in range(12, 16):
|
||||
self.low_cmd.motor_cmd[i].q = 0
|
||||
self.low_cmd.motor_cmd[i].kp = 0
|
||||
self.low_cmd.motor_cmd[i].dq = -3
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if self.percent_3 >= 0.8:
|
||||
for i in range(12, 16):
|
||||
self.low_cmd.motor_cmd[i].q = 0
|
||||
self.low_cmd.motor_cmd[i].kp = 0
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
if (self.percent_1 == 1) and (self.percent_2 == 1) and (self.percent_3 == 1) and (self.percent_4 <= 1):
|
||||
self.percent_4 += 1.0 / self.duration_4
|
||||
self.percent_4 = min(self.percent_4, 1)
|
||||
for i in range(12):
|
||||
self.low_cmd.motor_cmd[i].q = (1 - self.percent_4) * self.targetPos_2[i] + self.percent_4 * self.targetPos_3[i]
|
||||
self.low_cmd.motor_cmd[i].dq = 0
|
||||
self.low_cmd.motor_cmd[i].kp = self.Kp
|
||||
self.low_cmd.motor_cmd[i].kd = self.Kd
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
self.low_cmd.crc = self.crc.Crc(self.low_cmd)
|
||||
self.lowcmd_publisher.Write(self.low_cmd)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
if len(sys.argv)>1:
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
else:
|
||||
ChannelFactoryInitialize(0)
|
||||
|
||||
custom = Custom()
|
||||
custom.Init()
|
||||
custom.Start()
|
||||
|
||||
while True:
|
||||
if custom.percent_4 == 1.0:
|
||||
time.sleep(1)
|
||||
print("Done!")
|
||||
sys.exit(-1)
|
||||
time.sleep(1)
|
|
@ -0,0 +1,24 @@
|
|||
LegID = {
|
||||
"FR_0": 0, # Front right hip
|
||||
"FR_1": 1, # Front right thigh
|
||||
"FR_2": 2, # Front right calf
|
||||
"FL_0": 3,
|
||||
"FL_1": 4,
|
||||
"FL_2": 5,
|
||||
"RR_0": 6,
|
||||
"RR_1": 7,
|
||||
"RR_2": 8,
|
||||
"RL_0": 9,
|
||||
"RL_1": 10,
|
||||
"RL_2": 11,
|
||||
"FR_w": 12, # Front right wheel
|
||||
"FL_w": 13, # Front left wheel
|
||||
"RR_w": 14, # Rear right wheel
|
||||
"RL_w": 15, # Rear left wheel
|
||||
}
|
||||
|
||||
HIGHLEVEL = 0xEE
|
||||
LOWLEVEL = 0xFF
|
||||
TRIGERLEVEL = 0xF0
|
||||
PosStopF = 2.146e9
|
||||
VelStopF = 16000.0
|
|
@ -0,0 +1,96 @@
|
|||
import time
|
||||
import sys
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
|
||||
from unitree_sdk2py.h1.loco.h1_loco_client import LocoClient
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
|
||||
@dataclass
|
||||
class TestOption:
|
||||
name: str
|
||||
id: int
|
||||
|
||||
option_list = [
|
||||
TestOption(name="damp", id=0),
|
||||
TestOption(name="stand_up", id=1),
|
||||
TestOption(name="move forward", id=3),
|
||||
TestOption(name="move lateral", id=4),
|
||||
TestOption(name="move rotate", id=5),
|
||||
TestOption(name="low stand", id=6),
|
||||
TestOption(name="high stand", id=7),
|
||||
TestOption(name="zero torque", id=8)
|
||||
]
|
||||
|
||||
class UserInterface:
|
||||
def __init__(self):
|
||||
self.test_option_ = None
|
||||
|
||||
def convert_to_int(self, input_str):
|
||||
try:
|
||||
return int(input_str)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def terminal_handle(self):
|
||||
input_str = input("Enter id or name: \n")
|
||||
|
||||
if input_str == "list":
|
||||
self.test_option_.name = None
|
||||
self.test_option_.id = None
|
||||
for option in option_list:
|
||||
print(f"{option.name}, id: {option.id}")
|
||||
return
|
||||
|
||||
for option in option_list:
|
||||
if input_str == option.name or self.convert_to_int(input_str) == option.id:
|
||||
self.test_option_.name = option.name
|
||||
self.test_option_.id = option.id
|
||||
print(f"Test: {self.test_option_.name}, test_id: {self.test_option_.id}")
|
||||
return
|
||||
|
||||
print("No matching test option found.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
print("WARNING: Please ensure there are no obstacles around the robot while running this example.")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
if len(sys.argv)>1:
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
else:
|
||||
ChannelFactoryInitialize(0)
|
||||
|
||||
test_option = TestOption(name=None, id=None)
|
||||
user_interface = UserInterface()
|
||||
user_interface.test_option_ = test_option
|
||||
|
||||
sport_client = LocoClient()
|
||||
sport_client.SetTimeout(10.0)
|
||||
sport_client.Init()
|
||||
|
||||
while True:
|
||||
|
||||
user_interface.terminal_handle()
|
||||
|
||||
print(f"Updated Test Option: Name = {test_option.name}, ID = {test_option.id}\n")
|
||||
|
||||
if test_option.id == 0:
|
||||
sport_client.Damp()
|
||||
elif test_option.id == 1:
|
||||
sport_client.StandUp()
|
||||
elif test_option.id == 3:
|
||||
sport_client.Move(0.3,0,0)
|
||||
elif test_option.id == 4:
|
||||
sport_client.Move(0,0.3,0)
|
||||
elif test_option.id == 5:
|
||||
sport_client.Move(0,0,0.3)
|
||||
elif test_option.id == 6:
|
||||
sport_client.LowStand()
|
||||
elif test_option.id == 7:
|
||||
sport_client.HighStand()
|
||||
elif test_option.id == 8:
|
||||
sport_client.ZeroTorque()
|
||||
|
||||
time.sleep(1)
|
|
@ -0,0 +1,167 @@
|
|||
import time
|
||||
import sys
|
||||
|
||||
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_
|
||||
from unitree_sdk2py.utils.crc import CRC
|
||||
from unitree_sdk2py.utils.thread import RecurrentThread
|
||||
from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient
|
||||
import unitree_legged_const as h1
|
||||
import numpy as np
|
||||
|
||||
H1_NUM_MOTOR = 20
|
||||
|
||||
class H1JointIndex:
|
||||
# Right leg
|
||||
kRightHipYaw = 8
|
||||
kRightHipRoll = 0
|
||||
kRightHipPitch = 1
|
||||
kRightKnee = 2
|
||||
kRightAnkle = 11
|
||||
# Left leg
|
||||
kLeftHipYaw = 7
|
||||
kLeftHipRoll = 3
|
||||
kLeftHipPitch = 4
|
||||
kLeftKnee = 5
|
||||
kLeftAnkle = 10
|
||||
|
||||
kWaistYaw = 6
|
||||
|
||||
kNotUsedJoint = 9
|
||||
|
||||
# Right arm
|
||||
kRightShoulderPitch = 12
|
||||
kRightShoulderRoll = 13
|
||||
kRightShoulderYaw = 14
|
||||
kRightElbow = 15
|
||||
# Left arm
|
||||
kLeftShoulderPitch = 16
|
||||
kLeftShoulderRoll = 17
|
||||
kLeftShoulderYaw = 18
|
||||
kLeftElbow = 19
|
||||
|
||||
class Custom:
|
||||
def __init__(self):
|
||||
self.time_ = 0.0
|
||||
self.control_dt_ = 0.01
|
||||
self.duration_ = 10.0
|
||||
self.counter_ = 0
|
||||
self.kp_low_ = 60.0
|
||||
self.kp_high_ = 200.0
|
||||
self.kd_low_ = 1.5
|
||||
self.kd_high_ = 5.0
|
||||
self.low_cmd = unitree_go_msg_dds__LowCmd_()
|
||||
self.InitLowCmd()
|
||||
self.low_state = None
|
||||
self.crc = CRC()
|
||||
|
||||
def Init(self):
|
||||
# # create publisher #
|
||||
self.lowcmd_publisher_ = ChannelPublisher("rt/lowcmd", LowCmd_)
|
||||
self.lowcmd_publisher_.Init()
|
||||
|
||||
# # create subscriber #
|
||||
self.lowstate_subscriber = ChannelSubscriber("rt/lowstate", LowState_)
|
||||
self.lowstate_subscriber.Init(self.LowStateHandler, 10)
|
||||
|
||||
self.msc = MotionSwitcherClient()
|
||||
self.msc.SetTimeout(5.0)
|
||||
self.msc.Init()
|
||||
|
||||
status, result = self.msc.CheckMode()
|
||||
while result['name']:
|
||||
self.msc.ReleaseMode()
|
||||
status, result = self.msc.CheckMode()
|
||||
time.sleep(1)
|
||||
|
||||
self.report_rpy_ptr_ = RecurrentThread(
|
||||
interval=0.1, target=self.ReportRPY, name="report_rpy"
|
||||
)
|
||||
|
||||
self.report_rpy_ptr_.Start()
|
||||
|
||||
def is_weak_motor(self,motor_index):
|
||||
return motor_index in {
|
||||
H1JointIndex.kLeftAnkle,
|
||||
H1JointIndex.kRightAnkle,
|
||||
H1JointIndex.kRightShoulderPitch,
|
||||
H1JointIndex.kRightShoulderRoll,
|
||||
H1JointIndex.kRightShoulderYaw,
|
||||
H1JointIndex.kRightElbow,
|
||||
H1JointIndex.kLeftShoulderPitch,
|
||||
H1JointIndex.kLeftShoulderRoll,
|
||||
H1JointIndex.kLeftShoulderYaw,
|
||||
H1JointIndex.kLeftElbow,
|
||||
}
|
||||
|
||||
def InitLowCmd(self):
|
||||
self.low_cmd.head[0] = 0xFE
|
||||
self.low_cmd.head[1] = 0xEF
|
||||
self.low_cmd.level_flag = 0xFF
|
||||
self.low_cmd.gpio = 0
|
||||
for i in range(H1_NUM_MOTOR):
|
||||
if self.is_weak_motor(i):
|
||||
self.low_cmd.motor_cmd[i].mode = 0x01
|
||||
else:
|
||||
self.low_cmd.motor_cmd[i].mode = 0x0A
|
||||
self.low_cmd.motor_cmd[i].q= h1.PosStopF
|
||||
self.low_cmd.motor_cmd[i].kp = 0
|
||||
self.low_cmd.motor_cmd[i].dq = h1.VelStopF
|
||||
self.low_cmd.motor_cmd[i].kd = 0
|
||||
self.low_cmd.motor_cmd[i].tau = 0
|
||||
|
||||
def Start(self):
|
||||
self.lowCmdWriteThreadPtr = RecurrentThread(
|
||||
interval=self.control_dt_, target=self.LowCmdWrite, name="control"
|
||||
)
|
||||
self.lowCmdWriteThreadPtr.Start()
|
||||
|
||||
def LowStateHandler(self, msg: LowState_):
|
||||
self.low_state = msg
|
||||
|
||||
def ReportRPY(self):
|
||||
print("rpy: [",self.low_state.imu_state.rpy[0],", "
|
||||
,self.low_state.imu_state.rpy[1],", "
|
||||
,self.low_state.imu_state.rpy[2],"]"
|
||||
)
|
||||
|
||||
def LowCmdWrite(self):
|
||||
self.time_ += self.control_dt_
|
||||
self.time_ = np.clip(self.time_ , 0.0, self.duration_)
|
||||
|
||||
# set robot to zero posture
|
||||
for i in range(H1_NUM_MOTOR):
|
||||
ratio = self.time_ / self.duration_
|
||||
self.low_cmd.motor_cmd[i].tau = 0.
|
||||
self.low_cmd.motor_cmd[i].q = (1.0 - ratio) * self.low_state.motor_state[i].q
|
||||
self.low_cmd.motor_cmd[i].dq = 0.
|
||||
self.low_cmd.motor_cmd[i].kp = self.kp_low_ if self.is_weak_motor(i) else self.kp_high_
|
||||
self.low_cmd.motor_cmd[i].kd = self.kd_low_ if self.is_weak_motor(i) else self.kd_high_
|
||||
|
||||
self.low_cmd.crc = self.crc.Crc(self.low_cmd)
|
||||
self.lowcmd_publisher_.Write(self.low_cmd)
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
print("WARNING: Please ensure there are no obstacles around the robot while running this example.")
|
||||
input("Press Enter to continue...")
|
||||
|
||||
if len(sys.argv)>1:
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
else:
|
||||
ChannelFactoryInitialize(0)
|
||||
|
||||
custom = Custom()
|
||||
custom.Init()
|
||||
custom.Start()
|
||||
|
||||
while True:
|
||||
if custom.time_ == custom.duration_:
|
||||
time.sleep(1)
|
||||
print("Done!")
|
||||
sys.exit(-1)
|
||||
time.sleep(1)
|
|
@ -0,0 +1,5 @@
|
|||
HIGHLEVEL = 0xEE
|
||||
LOWLEVEL = 0xFF
|
||||
TRIGERLEVEL = 0xF0
|
||||
PosStopF = 2.146e9
|
||||
VelStopF = 16000.0
|
|
@ -1,133 +0,0 @@
|
|||
import time
|
||||
import numpy as np
|
||||
from enum import IntEnum
|
||||
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize
|
||||
|
||||
# from user_data import *
|
||||
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_
|
||||
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import MotorCmd_
|
||||
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
|
||||
import unitree_sdk2py.idl.unitree_hg.msg.dds_ as dds
|
||||
print(dds.LowCmd_)
|
||||
print(dds.MotorCmd_)
|
||||
|
||||
kTopicArmSDK = "rt/arm_sdk"
|
||||
kPi = 3.141592654
|
||||
kPi_2 = 1.57079632
|
||||
|
||||
|
||||
class JointIndex(IntEnum):
|
||||
|
||||
# // Left arm
|
||||
kLeftShoulderPitch = 13
|
||||
kLeftShoulderRoll = 14
|
||||
kLeftShoulderYaw = 15
|
||||
kLeftElbow = 16
|
||||
kLeftWistYaw = 17
|
||||
kLeftWistPitch = 18
|
||||
kLeftWistRoll = 19
|
||||
# // Right arm
|
||||
kRightShoulderPitch = 20
|
||||
kRightShoulderRoll = 21
|
||||
kRightShoulderYaw = 22
|
||||
kRightElbow = 23
|
||||
kRightWistYaw = 24
|
||||
kRightWistPitch = 25
|
||||
kRightWistRoll = 26
|
||||
|
||||
kWaistYaw = 12
|
||||
|
||||
|
||||
kNotUsedJoint = 27
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
ChannelFactoryInitialize()
|
||||
|
||||
# Create a publisher to publish the data defined in UserData class
|
||||
arm_sdk_publisher = ChannelPublisher('rt/arm_sdk', LowCmd_)
|
||||
# pub = ChannelPublisher("rt/lowcmd", LowCmd_)
|
||||
arm_sdk_publisher.Init()
|
||||
|
||||
msg = unitree_hg_msg_dds__LowCmd_()
|
||||
|
||||
weight = 0
|
||||
weight_rate = 0.2
|
||||
|
||||
kp = 60
|
||||
kd = 1.5
|
||||
dq = 0
|
||||
tau_ff = 0
|
||||
|
||||
control_dt = 0.02
|
||||
max_joint_velocity = 0.5
|
||||
|
||||
delta_weight = weight_rate * control_dt
|
||||
max_joint_delta = max_joint_velocity * control_dt
|
||||
|
||||
init_pos = np.zeros(15)
|
||||
target_pos = np.array(
|
||||
[
|
||||
0.0,
|
||||
kPi_2,
|
||||
0.0,
|
||||
kPi_2,
|
||||
0.0,
|
||||
0.0,
|
||||
0.0,
|
||||
0.0,
|
||||
-kPi_2,
|
||||
0.0,
|
||||
kPi_2,
|
||||
0.0,
|
||||
0.0,
|
||||
0.0,
|
||||
0.0,
|
||||
]
|
||||
)
|
||||
print("Initailizing arms ...")
|
||||
|
||||
init_time = 5
|
||||
init_time_steps = int(init_time / control_dt)
|
||||
|
||||
for i in range(init_time_steps):
|
||||
weight += delta_weight
|
||||
weight = max(min(weight, 1.0), 0)
|
||||
msg.motor_cmd[kNotUsedJoint].q = weight * weight
|
||||
|
||||
for idx, joint in enumerate(JointIndex):
|
||||
msg.motor_cmd[joint].q = init_pos[idx]
|
||||
msg.motor_cmd[joint].dq = dq
|
||||
msg.motor_cmd[joint].kp = kp
|
||||
msg.motor_cmd[joint].kd = kd
|
||||
msg.motor_cmd[joint].tau = tau_ff
|
||||
|
||||
arm_sdk_publisher.Write(msg)
|
||||
|
||||
time.sleep(control_dt)
|
||||
|
||||
print("Done!")
|
||||
|
||||
period = 5
|
||||
num_time_steps = int(period / control_dt)
|
||||
|
||||
current_jpos_des = np.zeros_like(init_pos)
|
||||
|
||||
for i in range(num_time_steps):
|
||||
for j in range(len(current_jpos_des)):
|
||||
delta = target_pos[j] - current_jpos_des[j]
|
||||
clamped_delta = np.clip(delta, -max_joint_delta, max_joint_delta)
|
||||
current_jpos_des[j] += clamped_delta
|
||||
|
||||
for idx, joint in enumerate(JointIndex):
|
||||
msg.motor_cmd[joint].q = current_jpos_des[idx]
|
||||
msg.motor_cmd[joint].dq = dq
|
||||
msg.motor_cmd[joint].kp = kp
|
||||
msg.motor_cmd[joint].kd = kd
|
||||
msg.motor_cmd[joint].tau = tau_ff
|
||||
|
||||
arm_sdk_publisher.Write(msg)
|
||||
|
||||
time.sleep(control_dt)
|
||||
|
||||
exit()
|
|
@ -1,50 +0,0 @@
|
|||
import time
|
||||
import numpy as np
|
||||
from enum import IntEnum
|
||||
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize
|
||||
import sys
|
||||
|
||||
# from user_data import *
|
||||
from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_
|
||||
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
|
||||
import unitree_sdk2py.idl.unitree_hg.msg.dds_ as dds
|
||||
from unitree_sdk2py.utils.crc import CRC
|
||||
|
||||
|
||||
kTopicArmSDK = "rt/lowcmd"
|
||||
kPi = 3.141592654
|
||||
kPi_2 = 1.57079632
|
||||
kNumMotors = 35
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
if len(sys.argv)>1:
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
else:
|
||||
ChannelFactoryInitialize(0)
|
||||
|
||||
# 初始化发布节点
|
||||
low_cmd_publisher = ChannelPublisher(kTopicArmSDK, LowCmd_)
|
||||
low_cmd_publisher.Init()
|
||||
# 初始化LowCmd_
|
||||
msg = unitree_hg_msg_dds__LowCmd_()
|
||||
# CRC校验实例
|
||||
crc = CRC()
|
||||
# 控制时间参数
|
||||
control_dt = 0.02
|
||||
init_time = 20
|
||||
init_time_steps = int(init_time / control_dt)
|
||||
|
||||
# 控制参数设置
|
||||
msg.mode_pr = 0
|
||||
msg.mode_machine = 4
|
||||
for i in range(init_time_steps):
|
||||
for idx in range(kNumMotors):
|
||||
msg.motor_cmd[idx].mode = 0x01
|
||||
msg.crc = crc.Crc(msg)
|
||||
# print(msg.crc)
|
||||
low_cmd_publisher.Write(msg)
|
||||
time.sleep(control_dt)
|
||||
|
||||
print("Done!")
|
||||
exit()
|
|
@ -1,24 +0,0 @@
|
|||
import time
|
||||
import sys
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
|
||||
|
||||
|
||||
def HighStateHandler(msg: SportModeState_):
|
||||
print("Position: ", msg.position)
|
||||
print("Velocity: ", msg.velocity)
|
||||
print("Yaw velocity: ", msg.yaw_speed)
|
||||
print("Foot position in body frame: ", msg.foot_position_body)
|
||||
print("Foot velocity in body frame: ", msg.foot_speed_body)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# sys.argv[1]: name of the network interface
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
sub = ChannelSubscriber("rt/sportmodestate", SportModeState_)
|
||||
sub.Init(HighStateHandler, 10)
|
||||
|
||||
while True:
|
||||
time.sleep(10.0)
|
|
@ -1,144 +0,0 @@
|
|||
import time
|
||||
import sys
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
|
||||
from unitree_sdk2py.go2.sport.sport_client import (
|
||||
SportClient,
|
||||
PathPoint,
|
||||
SPORT_PATH_POINT_SIZE,
|
||||
)
|
||||
import math
|
||||
|
||||
|
||||
class SportModeTest:
|
||||
def __init__(self) -> None:
|
||||
# Time count
|
||||
self.t = 0
|
||||
self.dt = 0.01
|
||||
|
||||
# Initial poition and yaw
|
||||
self.px0 = 0
|
||||
self.py0 = 0
|
||||
self.yaw0 = 0
|
||||
|
||||
self.client = SportClient() # Create a sport client
|
||||
self.client.SetTimeout(10.0)
|
||||
self.client.Init()
|
||||
|
||||
def GetInitState(self, robot_state: SportModeState_):
|
||||
self.px0 = robot_state.position[0]
|
||||
self.py0 = robot_state.position[1]
|
||||
self.yaw0 = robot_state.imu_state.rpy[2]
|
||||
|
||||
def StandUpDown(self):
|
||||
self.client.StandDown()
|
||||
print("Stand down !!!")
|
||||
time.sleep(1)
|
||||
|
||||
self.client.StandUp()
|
||||
print("Stand up !!!")
|
||||
time.sleep(1)
|
||||
|
||||
self.client.StandDown()
|
||||
print("Stand down !!!")
|
||||
time.sleep(1)
|
||||
|
||||
self.client.Damp()
|
||||
|
||||
def VelocityMove(self):
|
||||
elapsed_time = 1
|
||||
for i in range(int(elapsed_time / self.dt)):
|
||||
self.client.Move(0.3, 0, 0.3) # vx, vy vyaw
|
||||
time.sleep(self.dt)
|
||||
self.client.StopMove()
|
||||
|
||||
def BalanceAttitude(self):
|
||||
self.client.Euler(0.1, 0.2, 0.3) # roll, pitch, yaw
|
||||
self.client.BalanceStand()
|
||||
|
||||
def TrajectoryFollow(self):
|
||||
time_seg = 0.2
|
||||
time_temp = self.t - time_seg
|
||||
path = []
|
||||
for i in range(SPORT_PATH_POINT_SIZE):
|
||||
time_temp += time_seg
|
||||
|
||||
px_local = 0.5 * math.sin(0.5 * time_temp)
|
||||
py_local = 0
|
||||
yaw_local = 0
|
||||
vx_local = 0.25 * math.cos(0.5 * time_temp)
|
||||
vy_local = 0
|
||||
vyaw_local = 0
|
||||
|
||||
path_point_tmp = PathPoint(0, 0, 0, 0, 0, 0, 0)
|
||||
|
||||
path_point_tmp.timeFromStart = i * time_seg
|
||||
path_point_tmp.x = (
|
||||
px_local * math.cos(self.yaw0)
|
||||
- py_local * math.sin(self.yaw0)
|
||||
+ self.px0
|
||||
)
|
||||
path_point_tmp.y = (
|
||||
px_local * math.sin(self.yaw0)
|
||||
+ py_local * math.cos(self.yaw0)
|
||||
+ self.py0
|
||||
)
|
||||
path_point_tmp.yaw = yaw_local + self.yaw0
|
||||
path_point_tmp.vx = vx_local * math.cos(self.yaw0) - vy_local * math.sin(
|
||||
self.yaw0
|
||||
)
|
||||
path_point_tmp.vy = vx_local * math.sin(self.yaw0) + vy_local * math.cos(
|
||||
self.yaw0
|
||||
)
|
||||
path_point_tmp.vyaw = vyaw_local
|
||||
|
||||
path.append(path_point_tmp)
|
||||
|
||||
self.client.TrajectoryFollow(path)
|
||||
|
||||
def SpecialMotions(self):
|
||||
self.client.RecoveryStand()
|
||||
print("RecoveryStand !!!")
|
||||
time.sleep(1)
|
||||
|
||||
self.client.Stretch()
|
||||
print("Sit !!!")
|
||||
time.sleep(1)
|
||||
|
||||
self.client.RecoveryStand()
|
||||
print("RecoveryStand !!!")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
# Robot state
|
||||
robot_state = unitree_go_msg_dds__SportModeState_()
|
||||
def HighStateHandler(msg: SportModeState_):
|
||||
global robot_state
|
||||
robot_state = msg
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv)>1:
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
else:
|
||||
ChannelFactoryInitialize(0)
|
||||
|
||||
sub = ChannelSubscriber("rt/sportmodestate", SportModeState_)
|
||||
sub.Init(HighStateHandler, 10)
|
||||
time.sleep(1)
|
||||
|
||||
test = SportModeTest()
|
||||
test.GetInitState(robot_state)
|
||||
|
||||
print("Start test !!!")
|
||||
while True:
|
||||
test.t += test.dt
|
||||
|
||||
test.StandUpDown()
|
||||
# test.VelocityMove()
|
||||
# test.BalanceAttitude()
|
||||
# test.TrajectoryFollow()
|
||||
# test.SpecialMotions()
|
||||
|
||||
time.sleep(test.dt)
|
|
@ -1,60 +0,0 @@
|
|||
import time
|
||||
import sys
|
||||
|
||||
from unitree_sdk2py.core.channel import ChannelPublisher, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_
|
||||
from unitree_sdk2py.utils.crc import CRC
|
||||
from unitree_sdk2py.utils.thread import Thread
|
||||
import unitree_legged_const as go2
|
||||
|
||||
crc = CRC()
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
if len(sys.argv)>1:
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
else:
|
||||
ChannelFactoryInitialize(0)
|
||||
# Create a publisher to publish the data defined in UserData class
|
||||
pub = ChannelPublisher("rt/lowcmd", LowCmd_)
|
||||
pub.Init()
|
||||
|
||||
cmd = unitree_go_msg_dds__LowCmd_()
|
||||
cmd.head[0]=0xFE
|
||||
cmd.head[1]=0xEF
|
||||
cmd.level_flag = 0xFF
|
||||
cmd.gpio = 0
|
||||
for i in range(20):
|
||||
cmd.motor_cmd[i].mode = 0x01 # (PMSM) mode
|
||||
cmd.motor_cmd[i].q= go2.PosStopF
|
||||
cmd.motor_cmd[i].kp = 0
|
||||
cmd.motor_cmd[i].dq = go2.VelStopF
|
||||
cmd.motor_cmd[i].kd = 0
|
||||
cmd.motor_cmd[i].tau = 0
|
||||
|
||||
while True:
|
||||
# Toque controle, set RL_2 toque
|
||||
cmd.motor_cmd[go2.LegID["RL_2"]].q = 0.0 # Set to stop position(rad)
|
||||
cmd.motor_cmd[go2.LegID["RL_2"]].kp = 0.0
|
||||
cmd.motor_cmd[go2.LegID["RL_2"]].dq = 0.0 # Set to stop angular velocity(rad/s)
|
||||
cmd.motor_cmd[go2.LegID["RL_2"]].kd = 0.0
|
||||
cmd.motor_cmd[go2.LegID["RL_2"]].tau = 1.0 # target toque is set to 1N.m
|
||||
|
||||
# Poinstion(rad) control, set RL_0 rad
|
||||
cmd.motor_cmd[go2.LegID["RL_0"]].q = 0.0 # Taregt angular(rad)
|
||||
cmd.motor_cmd[go2.LegID["RL_0"]].kp = 10.0 # Poinstion(rad) control kp gain
|
||||
cmd.motor_cmd[go2.LegID["RL_0"]].dq = 0.0 # Taregt angular velocity(rad/ss)
|
||||
cmd.motor_cmd[go2.LegID["RL_0"]].kd = 1.0 # Poinstion(rad) control kd gain
|
||||
cmd.motor_cmd[go2.LegID["RL_0"]].tau = 0.0 # Feedforward toque 1N.m
|
||||
|
||||
cmd.crc = crc.Crc(cmd)
|
||||
|
||||
#Publish message
|
||||
if pub.Write(cmd):
|
||||
print("Publish success. msg:", cmd.crc)
|
||||
else:
|
||||
print("Waitting for subscriber.")
|
||||
|
||||
time.sleep(0.002)
|
|
@ -1,27 +0,0 @@
|
|||
import time
|
||||
import sys
|
||||
from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_
|
||||
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_
|
||||
|
||||
import unitree_legged_const as go2
|
||||
|
||||
|
||||
def LowStateHandler(msg: LowState_):
|
||||
|
||||
# print front right hip motor states
|
||||
print("FR_0 motor state: ", msg.motor_state[go2.LegID["FR_0"]])
|
||||
print("IMU state: ", msg.imu_state)
|
||||
print("Battery state: voltage: ", msg.power_v, "current: ", msg.power_a)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv)>1:
|
||||
ChannelFactoryInitialize(0, sys.argv[1])
|
||||
else:
|
||||
ChannelFactoryInitialize(0)
|
||||
sub = ChannelSubscriber("rt/lowstate", LowState_)
|
||||
sub.Init(LowStateHandler, 10)
|
||||
|
||||
while True:
|
||||
time.sleep(10.0)
|
|
@ -1,46 +0,0 @@
|
|||
import time
|
||||
import sys
|
||||
|
||||
from unitree_sdk2py.core.channel import ChannelFactoryInitialize
|
||||
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_
|
||||
from unitree_sdk2py.utils.crc import CRC
|
||||
|
||||
crc = CRC()
|
||||
|
||||
PosStopF = 2.146e9
|
||||
VelStopF = 16000.0
|
||||
|
||||
if __name__ == "__main__":
|
||||
ChannelFactoryInitialize(0)
|
||||
|
||||
cmd = unitree_go_msg_dds__LowCmd_()
|
||||
cmd.head[0] = 0xFE
|
||||
cmd.head[1] = 0xEF
|
||||
cmd.level_flag = 0xFF
|
||||
cmd.gpio = 0
|
||||
for i in range(20):
|
||||
cmd.motor_cmd[i].mode = 0x01
|
||||
cmd.motor_cmd[i].q = PosStopF
|
||||
cmd.motor_cmd[i].kp = 0
|
||||
cmd.motor_cmd[i].dq = VelStopF
|
||||
cmd.motor_cmd[i].kd = 0
|
||||
cmd.motor_cmd[i].tau = 0
|
||||
|
||||
cmd.motor_cmd[0].q = 0.0
|
||||
cmd.motor_cmd[0].kp = 0.0
|
||||
cmd.motor_cmd[0].dq = 0.0
|
||||
cmd.motor_cmd[0].kd = 0.0
|
||||
cmd.motor_cmd[0].tau = 1.0
|
||||
|
||||
cmd.motor_cmd[1].q = 0.0
|
||||
cmd.motor_cmd[1].kp = 10.0
|
||||
cmd.motor_cmd[1].dq = 0.0
|
||||
cmd.motor_cmd[1].kd = 1.0
|
||||
cmd.motor_cmd[1].tau = 0.0
|
||||
|
||||
now = time.perf_counter()
|
||||
|
||||
|
||||
cmd.crc = crc.Crc(cmd)
|
||||
|
||||
print("CRC:", cmd.crc, "Time cost:", (time.perf_counter() - now)*1000)
|
|
@ -0,0 +1,29 @@
|
|||
"""
|
||||
" service name
|
||||
"""
|
||||
MOTION_SWITCHER_SERVICE_NAME = "motion_switcher"
|
||||
|
||||
|
||||
"""
|
||||
" service api version
|
||||
"""
|
||||
MOTION_SWITCHER_API_VERSION = "1.0.0.1"
|
||||
|
||||
|
||||
"""
|
||||
" api id
|
||||
"""
|
||||
MOTION_SWITCHER_API_ID_CHECK_MODE = 1001
|
||||
MOTION_SWITCHER_API_ID_SELECT_MODE = 1002
|
||||
MOTION_SWITCHER_API_ID_RELEASE_MODE = 1003
|
||||
MOTION_SWITCHER_API_ID_SET_SILENT = 1004
|
||||
MOTION_SWITCHER_API_ID_GET_SILENT = 1005
|
||||
|
||||
# """
|
||||
# " error code
|
||||
# """
|
||||
# # client side
|
||||
# SPORT_ERR_CLIENT_POINT_PATH = 4101
|
||||
# # server side
|
||||
# SPORT_ERR_SERVER_OVERTIME = 4201
|
||||
# SPORT_ERR_SERVER_NOT_INIT = 4202
|
|
@ -0,0 +1,42 @@
|
|||
import json
|
||||
|
||||
from ...rpc.client import Client
|
||||
from .motion_switcher_api import *
|
||||
|
||||
"""
|
||||
" class MotionSwitcherClient
|
||||
"""
|
||||
class MotionSwitcherClient(Client):
|
||||
def __init__(self):
|
||||
super().__init__(MOTION_SWITCHER_SERVICE_NAME, False)
|
||||
|
||||
|
||||
def Init(self):
|
||||
# set api version
|
||||
self._SetApiVerson(MOTION_SWITCHER_API_VERSION)
|
||||
|
||||
# regist api
|
||||
self._RegistApi(MOTION_SWITCHER_API_ID_CHECK_MODE, 0)
|
||||
self._RegistApi(MOTION_SWITCHER_API_ID_SELECT_MODE, 0)
|
||||
self._RegistApi(MOTION_SWITCHER_API_ID_RELEASE_MODE, 0)
|
||||
self._RegistApi(MOTION_SWITCHER_API_ID_SET_SILENT, 0)
|
||||
self._RegistApi(MOTION_SWITCHER_API_ID_GET_SILENT, 0)
|
||||
|
||||
# 1001
|
||||
def CheckMode(self):
|
||||
p = {}
|
||||
parameter = json.dumps(p)
|
||||
code, data = self._Call(MOTION_SWITCHER_API_ID_CHECK_MODE, parameter)
|
||||
if code == 0:
|
||||
return code, json.loads(data)
|
||||
else:
|
||||
return code, None
|
||||
|
||||
# 1003
|
||||
def ReleaseMode(self):
|
||||
p = {}
|
||||
parameter = json.dumps(p)
|
||||
code, data = self._Call(MOTION_SWITCHER_API_ID_RELEASE_MODE, parameter)
|
||||
|
||||
return code, None
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
"""
|
||||
" service name
|
||||
"""
|
||||
LOCO_SERVICE_NAME = "loco"
|
||||
|
||||
|
||||
"""
|
||||
" service api version
|
||||
"""
|
||||
LOCO_API_VERSION = "1.0.0.0"
|
||||
|
||||
|
||||
"""
|
||||
" api id
|
||||
"""
|
||||
ROBOT_API_ID_LOCO_GET_FSM_ID = 7001
|
||||
ROBOT_API_ID_LOCO_GET_FSM_MODE = 7002
|
||||
ROBOT_API_ID_LOCO_GET_BALANCE_MODE = 7003
|
||||
ROBOT_API_ID_LOCO_GET_SWING_HEIGHT = 7004
|
||||
ROBOT_API_ID_LOCO_GET_STAND_HEIGHT = 7005
|
||||
ROBOT_API_ID_LOCO_GET_PHASE = 7006 # deprecated
|
||||
|
||||
ROBOT_API_ID_LOCO_SET_FSM_ID = 7101
|
||||
ROBOT_API_ID_LOCO_SET_BALANCE_MODE = 7102
|
||||
ROBOT_API_ID_LOCO_SET_SWING_HEIGHT = 7103
|
||||
ROBOT_API_ID_LOCO_SET_STAND_HEIGHT = 7104
|
||||
ROBOT_API_ID_LOCO_SET_VELOCITY = 7105
|
||||
|
||||
"""
|
||||
" error code
|
||||
"""
|
|
@ -0,0 +1,101 @@
|
|||
import json
|
||||
|
||||
from ...rpc.client import Client
|
||||
from .g1_loco_api import *
|
||||
|
||||
"""
|
||||
" class SportClient
|
||||
"""
|
||||
class LocoClient(Client):
|
||||
def __init__(self):
|
||||
super().__init__(LOCO_SERVICE_NAME, False)
|
||||
|
||||
|
||||
def Init(self):
|
||||
# set api version
|
||||
self._SetApiVerson(LOCO_API_VERSION)
|
||||
|
||||
# regist api
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_GET_FSM_ID, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_GET_FSM_MODE, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_GET_BALANCE_MODE, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_GET_SWING_HEIGHT, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_GET_STAND_HEIGHT, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_GET_PHASE, 0) # deprecated
|
||||
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_SET_FSM_ID, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_SET_BALANCE_MODE, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_SET_SWING_HEIGHT, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_SET_STAND_HEIGHT, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_SET_VELOCITY, 0)
|
||||
|
||||
# 7101
|
||||
def SetFsmId(self, fsm_id: int):
|
||||
p = {}
|
||||
p["data"] = fsm_id
|
||||
parameter = json.dumps(p)
|
||||
code, data = self._Call(ROBOT_API_ID_LOCO_SET_FSM_ID, parameter)
|
||||
return code
|
||||
|
||||
# 7102
|
||||
def SetBalanceMode(self, balance_mode: int):
|
||||
p = {}
|
||||
p["data"] = balance_mode
|
||||
parameter = json.dumps(p)
|
||||
code, data = self._Call(ROBOT_API_ID_LOCO_SET_BALANCE_MODE, parameter)
|
||||
return code
|
||||
|
||||
# 7104
|
||||
def SetStandHeight(self, stand_height: float):
|
||||
p = {}
|
||||
p["data"] = stand_height
|
||||
parameter = json.dumps(p)
|
||||
code, data = self._Call(ROBOT_API_ID_LOCO_SET_STAND_HEIGHT, parameter)
|
||||
return code
|
||||
|
||||
# 7105
|
||||
def SetVelocity(self, vx: float, vy: float, omega: float, duration: float = 1.0):
|
||||
p = {}
|
||||
velocity = [vx,vy,omega]
|
||||
p["velocity"] = velocity
|
||||
p["duration"] = duration
|
||||
parameter = json.dumps(p)
|
||||
code, data = self._Call(ROBOT_API_ID_LOCO_SET_VELOCITY, parameter)
|
||||
return code
|
||||
|
||||
def Damp(self):
|
||||
self.SetFsmId(1)
|
||||
|
||||
def Start(self):
|
||||
self.SetFsmId(200)
|
||||
|
||||
def Squat(self):
|
||||
self.SetFsmId(2)
|
||||
|
||||
def Sit(self):
|
||||
self.SetFsmId(3)
|
||||
|
||||
def StandUp(self):
|
||||
self.SetFsmId(4)
|
||||
|
||||
def ZeroTorque(self):
|
||||
self.SetFsmId(0)
|
||||
|
||||
def StopMove(self):
|
||||
self.SetVelocity(0., 0., 0.)
|
||||
|
||||
def HighStand(self):
|
||||
UINT32_MAX = (1 << 32) - 1
|
||||
self.SetStandHeight(UINT32_MAX)
|
||||
|
||||
def LowStand(self):
|
||||
UINT32_MIN = 0
|
||||
self.SetStandHeight(UINT32_MIN)
|
||||
|
||||
def Move(self, vx: float, vy: float, vyaw: float, continous_move: bool = False):
|
||||
duration = 864000.0 if continous_move else 1
|
||||
self.SetVelocity(vx, vy, vyaw, duration)
|
||||
|
||||
def BalanceStand(self, balance_mode: int):
|
||||
self.SetBalanceMode(balance_mode)
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
"""
|
||||
" service name
|
||||
"""
|
||||
LOCO_SERVICE_NAME = "loco"
|
||||
|
||||
|
||||
"""
|
||||
" service api version
|
||||
"""
|
||||
LOCO_API_VERSION = "2.0.0.0"
|
||||
|
||||
|
||||
"""
|
||||
" api id
|
||||
"""
|
||||
ROBOT_API_ID_LOCO_GET_FSM_ID = 8001
|
||||
ROBOT_API_ID_LOCO_GET_FSM_MODE = 8002
|
||||
ROBOT_API_ID_LOCO_GET_BALANCE_MODE = 8003
|
||||
ROBOT_API_ID_LOCO_GET_SWING_HEIGHT = 8004
|
||||
ROBOT_API_ID_LOCO_GET_STAND_HEIGHT = 8005
|
||||
ROBOT_API_ID_LOCO_GET_PHASE = 8006 # deprecated
|
||||
|
||||
ROBOT_API_ID_LOCO_SET_FSM_ID = 8101
|
||||
ROBOT_API_ID_LOCO_SET_BALANCE_MODE = 8102
|
||||
ROBOT_API_ID_LOCO_SET_SWING_HEIGHT = 8103
|
||||
ROBOT_API_ID_LOCO_SET_STAND_HEIGHT = 8104
|
||||
ROBOT_API_ID_LOCO_SET_VELOCITY = 8105
|
||||
|
||||
"""
|
||||
" error code
|
||||
"""
|
|
@ -0,0 +1,83 @@
|
|||
import json
|
||||
|
||||
from ...rpc.client import Client
|
||||
from .h1_loco_api import *
|
||||
|
||||
"""
|
||||
" class SportClient
|
||||
"""
|
||||
class LocoClient(Client):
|
||||
def __init__(self):
|
||||
super().__init__(LOCO_SERVICE_NAME, False)
|
||||
|
||||
|
||||
def Init(self):
|
||||
# set api version
|
||||
self._SetApiVerson(LOCO_API_VERSION)
|
||||
|
||||
# regist api
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_GET_FSM_ID, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_GET_FSM_MODE, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_GET_BALANCE_MODE, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_GET_SWING_HEIGHT, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_GET_STAND_HEIGHT, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_GET_PHASE, 0) # deprecated
|
||||
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_SET_FSM_ID, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_SET_BALANCE_MODE, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_SET_SWING_HEIGHT, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_SET_STAND_HEIGHT, 0)
|
||||
self._RegistApi(ROBOT_API_ID_LOCO_SET_VELOCITY, 0)
|
||||
|
||||
# 8101
|
||||
def SetFsmId(self, fsm_id: int):
|
||||
p = {}
|
||||
p["data"] = fsm_id
|
||||
parameter = json.dumps(p)
|
||||
code, data = self._Call(ROBOT_API_ID_LOCO_SET_FSM_ID, parameter)
|
||||
return code
|
||||
|
||||
# 8104
|
||||
def SetStandHeight(self, stand_height: float):
|
||||
p = {}
|
||||
p["data"] = stand_height
|
||||
parameter = json.dumps(p)
|
||||
code, data = self._Call(ROBOT_API_ID_LOCO_SET_STAND_HEIGHT, parameter)
|
||||
return code
|
||||
|
||||
# 8105
|
||||
def SetVelocity(self, vx: float, vy: float, omega: float, duration: float = 1.0):
|
||||
p = {}
|
||||
velocity = [vx,vy,omega]
|
||||
p["velocity"] = velocity
|
||||
p["duration"] = duration
|
||||
parameter = json.dumps(p)
|
||||
code, data = self._Call(ROBOT_API_ID_LOCO_SET_VELOCITY, parameter)
|
||||
return code
|
||||
|
||||
def Damp(self):
|
||||
self.SetFsmId(1)
|
||||
|
||||
def Start(self):
|
||||
self.SetFsmId(204)
|
||||
|
||||
def StandUp(self):
|
||||
self.SetFsmId(2)
|
||||
|
||||
def ZeroTorque(self):
|
||||
self.SetFsmId(0)
|
||||
|
||||
def StopMove(self):
|
||||
self.SetVelocity(0., 0., 0.)
|
||||
|
||||
def HighStand(self):
|
||||
UINT32_MAX = (1 << 32) - 1
|
||||
self.SetStandHeight(UINT32_MAX)
|
||||
|
||||
def LowStand(self):
|
||||
UINT32_MIN = 0
|
||||
self.SetStandHeight(UINT32_MIN)
|
||||
|
||||
def Move(self, vx: float, vy: float, vyaw: float, continous_move: bool = False):
|
||||
duration = 864000.0 if continous_move else 1
|
||||
self.SetVelocity(vx, vy, vyaw, duration)
|
Loading…
Reference in New Issue