import mujoco import numpy as np import pygame import sys from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelPublisher from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_ from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_ from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_ from unitree_sdk2py.idl.unitree_go.msg.dds_ import WirelessController_ from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowCmd_ from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_ from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_ from unitree_sdk2py.idl.default import unitree_go_msg_dds__WirelessController_ from unitree_sdk2py.utils.thread import RecurrentThread TOPIC_LOWCMD = "rt/lowcmd" TOPIC_LOWSTATE = "rt/lowstate" TOPIC_HIGHSTATE = "rt/sportmodestate" TOPIC_WIRELESS_CONTROLLER = "rt/wirelesscontroller" MOTOR_SENSOR_NUM = 3 class UnitreeSdk2Bridge: def __init__(self, mj_model, mj_data): self.mj_model = mj_model self.mj_data = mj_data self.num_motor = self.mj_model.nu self.dim_motor_sensor = MOTOR_SENSOR_NUM * self.num_motor self.have_imu = False self.have_frame_sensor = False self.dt = self.mj_model.opt.timestep # Check sensor for i in range(self.dim_motor_sensor, self.mj_model.nsensor): name = mujoco.mj_id2name( self.mj_model, mujoco._enums.mjtObj.mjOBJ_SENSOR, i ) if name == "imu_quat": self.have_imu_ = True if name == "frame_pos": self.have_frame_sensor_ = True # Unitree sdk2 message self.low_state = unitree_go_msg_dds__LowState_() self.low_state_puber = ChannelPublisher(TOPIC_LOWSTATE, LowState_) self.low_state_puber.Init() self.lowStateThread = RecurrentThread( interval=self.dt, target=self.PublishLowState, name="sim_lowstate" ) self.lowStateThread.Start() self.high_state = unitree_go_msg_dds__SportModeState_() self.high_state_puber = ChannelPublisher(TOPIC_HIGHSTATE, SportModeState_) self.high_state_puber.Init() self.HighStateThread = RecurrentThread( interval=self.dt, target=self.PublishHighState, name="sim_highstate" ) self.HighStateThread.Start() self.wireless_controller = unitree_go_msg_dds__WirelessController_() self.wireless_controller_puber = ChannelPublisher( TOPIC_WIRELESS_CONTROLLER, WirelessController_ ) self.wireless_controller_puber.Init() self.WirelessControllerThread = RecurrentThread( interval=0.01, target=self.PublishWirelessController, name="sim_wireless_controller", ) self.WirelessControllerThread.Start() self.low_cmd_suber = ChannelSubscriber(TOPIC_LOWCMD, LowCmd_) self.low_cmd_suber.Init(self.LowCmdHandler, 10) # joystick self.key_map = { "R1": 0, "L1": 1, "start": 2, "select": 3, "R2": 4, "L2": 5, "F1": 6, "F2": 7, "A": 8, "B": 9, "X": 10, "Y": 11, "up": 12, "right": 13, "down": 14, "left": 15, } self.joystick = None def LowCmdHandler(self, msg: LowCmd_): if self.mj_data != None: for i in range(self.num_motor): self.mj_data.ctrl[i] = ( msg.motor_cmd[i].tau + msg.motor_cmd[i].kp * (msg.motor_cmd[i].q - self.mj_data.sensordata[i]) + msg.motor_cmd[i].kd * ( msg.motor_cmd[i].dq - self.mj_data.sensordata[i + self.num_motor] ) ) def PublishLowState(self): if self.mj_data != None: for i in range(self.num_motor): self.low_state.motor_state[i].q = self.mj_data.sensordata[i] self.low_state.motor_state[i].dq = self.mj_data.sensordata[ i + self.num_motor ] self.low_state.motor_state[i].tau_est = self.mj_data.sensordata[ i + 2 * self.num_motor ] if self.have_frame_sensor_: self.low_state.imu_state.quaternion[0] = self.mj_data.sensordata[ self.dim_motor_sensor + 0 ] self.low_state.imu_state.quaternion[1] = self.mj_data.sensordata[ self.dim_motor_sensor + 1 ] self.low_state.imu_state.quaternion[2] = self.mj_data.sensordata[ self.dim_motor_sensor + 2 ] self.low_state.imu_state.quaternion[3] = self.mj_data.sensordata[ self.dim_motor_sensor + 3 ] self.low_state.imu_state.gyroscope[0] = self.mj_data.sensordata[ self.dim_motor_sensor + 4 ] self.low_state.imu_state.gyroscope[1] = self.mj_data.sensordata[ self.dim_motor_sensor + 5 ] self.low_state.imu_state.gyroscope[2] = self.mj_data.sensordata[ self.dim_motor_sensor + 6 ] self.low_state.imu_state.accelerometer[0] = self.mj_data.sensordata[ self.dim_motor_sensor + 7 ] self.low_state.imu_state.accelerometer[1] = self.mj_data.sensordata[ self.dim_motor_sensor + 8 ] self.low_state.imu_state.accelerometer[2] = self.mj_data.sensordata[ self.dim_motor_sensor + 9 ] self.low_state_puber.Write(self.low_state) def PublishHighState(self): if self.mj_data != None: self.high_state.position[0] = self.mj_data.sensordata[ self.dim_motor_sensor + 10 ] self.high_state.position[1] = self.mj_data.sensordata[ self.dim_motor_sensor + 11 ] self.high_state.position[2] = self.mj_data.sensordata[ self.dim_motor_sensor + 12 ] self.high_state.velocity[0] = self.mj_data.sensordata[ self.dim_motor_sensor + 13 ] self.high_state.velocity[1] = self.mj_data.sensordata[ self.dim_motor_sensor + 14 ] self.high_state.velocity[2] = self.mj_data.sensordata[ self.dim_motor_sensor + 15 ] self.high_state_puber.Write(self.high_state) def PublishWirelessController(self): if self.joystick != None: pygame.event.get() key_state = [0]*16 key_state[self.key_map["R1"]] = self.joystick.get_button(self.button_id["RB"]) key_state[self.key_map["L1"]] = self.joystick.get_button(self.button_id["LB"]) key_state[self.key_map["start"]] = self.joystick.get_button(self.button_id["START"]) key_state[self.key_map["select"]] = self.joystick.get_button(self.button_id["SELECT"]) key_state[self.key_map["R2"]] = (self.joystick.get_axis(self.axis_id["RT"])>0) key_state[self.key_map["L2"]] = (self.joystick.get_axis(self.axis_id["LT"])>0) key_state[self.key_map["F1"]] = 0 key_state[self.key_map["F2"]] = 0 key_state[self.key_map["A"]] = self.joystick.get_button(self.button_id["A"]) key_state[self.key_map["B"]] = self.joystick.get_button(self.button_id["B"]) key_state[self.key_map["X"]] = self.joystick.get_button(self.button_id["X"]) key_state[self.key_map["Y"]] = self.joystick.get_button(self.button_id["Y"]) key_state[self.key_map["up"]] = (self.joystick.get_hat(0)[1]>0) key_state[self.key_map["right"]] = (self.joystick.get_hat(0)[0]>0) key_state[self.key_map["down"]] = (self.joystick.get_hat(0)[1]<0) key_state[self.key_map["left"]] = (self.joystick.get_hat(0)[0]<0) key_value = 0 for i in range(16): key_value += key_state[i] << i self.wireless_controller.keys = key_value self.wireless_controller.lx = self.joystick.get_axis(self.axis_id["LX"]) self.wireless_controller.ly = -self.joystick.get_axis(self.axis_id["LY"]) self.wireless_controller.rx = self.joystick.get_axis(self.axis_id["RX"]) self.wireless_controller.ry = -self.joystick.get_axis(self.axis_id["RY"]) self.wireless_controller_puber.Write(self.wireless_controller) def SetupJoystick(self, device_id=0, js_type="xbox"): pygame.init() pygame.joystick.init() joystick_count = pygame.joystick.get_count() if joystick_count > 0: self.joystick = pygame.joystick.Joystick(device_id) self.joystick.init() else: print("No gamepad detected.") sys.exit() if js_type == "xbox": self.axis_id = { "LX": 0, # Left stick axis x "LY": 1, # Left stick axis y "RX": 3, # Right stick axis x "RY": 4, # Right stick axis y "LT": 2, # Left trigger "RT": 5, # Right trigger "DX": 6, # Directional pad x "DY": 7, # Directional pad y } self.button_id = { "X": 2, "Y": 3, "B": 1, "A": 0, "LB": 4, "RB": 5, "SELECT": 6, "START": 7, } elif js_type == "stitch": self.axis_id = { "LX": 0, # Left stick axis x "LY": 1, # Left stick axis y "RX": 2, # Right stick axis x "RY": 3, # Right stick axis y "LT": 5, # Left trigger "RT": 4, # Right trigger "DX": 6, # Directional pad x "DY": 7, # Directional pad y } self.button_id = { "X": 3, "Y": 4, "B": 1, "A": 0, "LB": 6, "RB": 7, "SELECT": 10, "START": 11, } else: print("Unsupported gamepad. ") def PrintSceneInformation(self): print(" ") print("<<------------- Link ------------->> ") for i in range(self.mj_model.nbody): name = mujoco.mj_id2name(self.mj_model, mujoco._enums.mjtObj.mjOBJ_BODY, i) if name: print("link_index:", i, ", name:", name) print(" ") print("<<------------- Joint ------------->> ") for i in range(self.mj_model.njnt): name = mujoco.mj_id2name(self.mj_model, mujoco._enums.mjtObj.mjOBJ_JOINT, i) if name: print("joint_index:", i, ", name:", name) print(" ") print("<<------------- Actuator ------------->>") for i in range(self.mj_model.nu): name = mujoco.mj_id2name( self.mj_model, mujoco._enums.mjtObj.mjOBJ_ACTUATOR, i ) if name: print("actuator_index:", i, ", name:", name) print(" ") print("<<------------- Sensor ------------->>") index = 0 for i in range(self.mj_model.nsensor): name = mujoco.mj_id2name( self.mj_model, mujoco._enums.mjtObj.mjOBJ_SENSOR, i ) if name: print( "sensor_index:", index, ", name:", name, ", dim:", self.mj_model.sensor_dim[i], ) index = index + self.mj_model.sensor_dim[i] print(" ") class ElasticBand: def __init__(self): self.stiffness = 200 self.damping = 100 self.point = np.array([0, 0, 3]) self.length = 0 self.enable = True def Advance(self, x, dx): """ Args: δx: desired position - current position dx: current velocity """ δx = self.point - x distance = np.linalg.norm(δx) direction = δx / distance v = np.dot(dx, direction) f = (self.stiffness * (distance - self.length) - self.damping * v) * direction return f def MujuocoKeyCallback(self, key): glfw = mujoco.glfw.glfw if key == glfw.KEY_7: self.length -= 0.1 if key == glfw.KEY_8: self.length += 0.1 if key == glfw.KEY_9: self.enable = not self.enable