import struct import threading import time import numpy as np import numpy.linalg as LA from scipy.spatial.transform import Rotation as R from cyclonedds.domain import DomainParticipant from go2py_common.msg.dds_ import Go2pyLowCmd_ from unitree_go.msg.dds_ import LowState_ from cyclonedds.topic import Topic from cyclonedds.pub import DataWriter from cyclonedds.domain import DomainParticipant from cyclonedds.topic import Topic from cyclonedds.sub import DataReader from cyclonedds.util import duration from threading import Thread from scipy.spatial.transform import Rotation from Go2Py.joy import xKeySwitch, xRockerBtn class GO2Real(): def __init__( self, mode = 'lowlevel', # 'highlevel' or 'lowlevel' vx_max=0.5, vy_max=0.4, ωz_max=0.5, ): assert mode in ['highlevel', 'lowlevel'], "mode should be either 'highlevel' or 'lowlevel'" self.mode = mode if self.mode == 'highlevel': raise NotImplementedError('DDS interface for the highlevel commands is not implemented yet. Please use our ROS2 interface.') self.simulated = False self.prestanding_q = np.array([ 0.0, 1.26186061, -2.5, 0.0, 1.25883281, -2.5, 0.0, 1.27193761, -2.6, 0.0, 1.27148342, -2.6]) self.sitting_q = np.array([-0.02495611, 1.26249647, -2.82826662, 0.04563564, 1.2505368 , -2.7933557 , -0.30623949, 1.28283751, -2.82314873, 0.26400229, 1.29355574, -2.84276843]) self.standing_q = np.array([ 0.0, 0.77832842, -1.56065452, 0.0, 0.76754963, -1.56634164, 0.0, 0.76681757, -1.53601146, 0.0, 0.75422204, -1.53229916]) self.latest_command_stamp = time.time() self.highcmd_topic_name = "rt/go2/twist_cmd" self.lowcmd_topic_name = "rt/go2/lowcmd" self.lowstate_topic_name = "rt/lowstate" self.participant = DomainParticipant() self.lowstate_topic = Topic(self.participant, self.lowstate_topic_name, LowState_) self.lowstate_reader = DataReader(self.participant, self.lowstate_topic) self.lowcmd_topic = Topic(self.participant, self.lowcmd_topic_name, Go2pyLowCmd_) self.lowcmd_writer = DataWriter(self.participant, self.lowcmd_topic) self.state = None self.setCommands = {'lowlevel':self.setCommandsLow}[self.mode] self.lowstate_thread = Thread(target = self.lowstate_update) self.running = True self.lowstate_thread.start() def lowstate_update(self): """ Retrieve the state of the robot """ while self.running: for msg in self.lowstate_reader.take_iter(timeout=duration(milliseconds=1.)): self.state = msg def getIMU(self): accel = self.state.imu_state.accelerometer gyro = self.state.imu_state.gyroscope quat = self.state.imu_state.quaternion rpy = self.state.imu_state.rpy temp = self.state.imu_state.temperature return {'accel':accel, 'gyro':gyro, 'quat':quat, "rpy":rpy, 'temp':temp} def getFootContacts(self): """Returns the raw foot contact forces""" footContacts = self.state.foot_force return np.array(footContacts) def getJointStates(self): """Returns the joint angles (q) and velocities (dq) of the robot""" motor_state = np.array([[self.state.motor_state[i].q, self.state.motor_state[i].dq, self.state.motor_state[i].ddq, self.state.motor_state[i].tau_est, self.state.motor_state[i].temperature] for i in range(12)]) return {'q':motor_state[:,0], 'dq':motor_state[:,1], 'ddq':motor_state[:,2], 'tau_est':motor_state[:,3], 'temperature':motor_state[:,4]} def getRemoteState(self): """A method to get the state of the wireless remote control. Returns a xRockerBtn object: - head: [head1, head2] - keySwitch: xKeySwitch object - lx: float - rx: float - ry: float - L2: float - ly: float """ wirelessRemote = self.state.wireless_remote[:24] binary_data = bytes(wirelessRemote) format_str = "<2BH5f" data = struct.unpack(format_str, binary_data) head = list(data[:2]) lx = data[3] rx = data[4] ry = data[5] L2 = data[6] ly = data[7] _btn = bin(data[2])[2:].zfill(16) btn = [int(char) for char in _btn] btn.reverse() keySwitch = xKeySwitch(*btn) rockerBtn = xRockerBtn(head, keySwitch, lx, rx, ry, L2, ly) return rockerBtn def getCommandFromRemote(self): """Do not use directly for control!!!""" rockerBtn = self.getRemoteState() lx = rockerBtn.lx ly = rockerBtn.ly rx = rockerBtn.rx v_x = ly * self.vx_max v_y = lx * self.vy_max ω = rx * self.ωz_max return v_x, v_y, ω def getBatteryState(self): """Returns the battery percentage of the robot""" batteryState = self.state.bms_state return batteryState.soc def setCommandsHigh(self, v_x, v_y, ω_z, bodyHeight=0.0, footRaiseHeight=0.0, mode=2): self.cmd_watchdog_timer = time.time() _v_x, _v_y, _ω_z = self.clip_velocity(v_x, v_y, ω_z) self.highcmd.header.stamp = self.get_clock().now().to_msg() self.highcmd.header.frame_id = "base_link" self.highcmd.twist.linear.x = _v_x self.highcmd.twist.linear.y = _v_y self.highcmd.twist.angular.z = _ω_z self.highcmd_publisher.publish(self.highcmd) def setCommandsLow(self, q_des, dq_des, kp, kd, tau_ff): assert q_des.size == dq_des.size == kp.size == kd.size == tau_ff.size == 12, "q, dq, kp, kd, tau_ff should have size 12" lowcmd = Go2pyLowCmd_( q_des, dq_des, kp, kd, tau_ff ) self.lowcmd_writer.write(lowcmd) self.latest_command_stamp = time.time() def close(self): self.running = False def check_calf_collision(self, q): self.pin_robot.update(q) in_collision = self.pin_robot.check_calf_collision(q) return in_collision def clip_velocity(self, v_x, v_y, ω_z): _v = np.array([[v_x], [v_y]]) _scale = np.sqrt(_v.T @ self.P_v_max @ _v)[0, 0] if _scale > 1.0: scale = 1.0 / _scale else: scale = 1.0 return scale * v_x, scale * v_y, np.clip(ω_z, self.ωz_min, self.ωz_max) def overheat(self): return False def getGravityInBody(self): q = self.getIMU()['quat'] R = Rotation.from_quat([q[1], q[2], q[3], q[0]]).as_matrix() g_in_body = R.T@np.array([0.0, 0.0, -1.0]).reshape(3, 1) return g_in_body