197 lines
6.5 KiB
Python
197 lines
6.5 KiB
Python
import struct
|
|
import time
|
|
import numpy as np
|
|
import numpy.linalg as LA
|
|
from scipy.spatial.transform import Rotation as R
|
|
|
|
from cyclonedds.domain import DomainParticipant
|
|
from go2py_messages.msg.dds_ import Go2pyLowCmd_
|
|
from go2py_messages.msg.dds_ import Go2pyHighCmd_
|
|
from go2py_messages.msg.dds_ import Go2pyState_
|
|
from cyclonedds.topic import Topic
|
|
from cyclonedds.pub import DataWriter
|
|
|
|
from cyclonedds.domain import DomainParticipant
|
|
from cyclonedds.topic import Topic
|
|
from cyclonedds.sub import DataReader
|
|
from cyclonedds.util import duration
|
|
from threading import Thread
|
|
from scipy.spatial.transform import Rotation
|
|
from Go2Py.joy import xKeySwitch, xRockerBtn
|
|
|
|
|
|
class GO2Real():
|
|
def __init__(
|
|
self,
|
|
mode='lowlevel', # 'highlevel' or 'lowlevel'
|
|
vx_max=0.5,
|
|
vy_max=0.4,
|
|
ωz_max=0.5,
|
|
):
|
|
assert mode in ['highlevel', 'lowlevel'], "mode should be either 'highlevel' or 'lowlevel'"
|
|
self.mode = mode
|
|
self.simulated = False
|
|
self.prestanding_q = np.array([0.0, 1.26186061, -2.5,
|
|
0.0, 1.25883281, -2.5,
|
|
0.0, 1.27193761, -2.6,
|
|
0.0, 1.27148342, -2.6])
|
|
|
|
self.sitting_q = np.array([-0.02495611, 1.26249647, -2.82826662,
|
|
0.04563564, 1.2505368, -2.7933557,
|
|
-0.30623949, 1.28283751, -2.82314873,
|
|
0.26400229, 1.29355574, -2.84276843])
|
|
|
|
self.standing_q = np.array([0.0, 0.77832842, -1.56065452,
|
|
0.0, 0.76754963, -1.56634164,
|
|
0.0, 0.76681757, -1.53601146,
|
|
0.0, 0.75422204, -1.53229916])
|
|
self.latest_command_stamp = time.time()
|
|
self.highcmd_topic_name = "rt/go2/twist_cmd"
|
|
self.lowcmd_topic_name = "rt/go2py/low_cmd"
|
|
self.highcmd_topic_name = "rt/go2py/high_cmd"
|
|
self.lowstate_topic_name = "rt/go2py/state"
|
|
|
|
self.participant = DomainParticipant()
|
|
self.lowstate_topic = Topic(self.participant, self.lowstate_topic_name, Go2pyState_)
|
|
self.state_reader = DataReader(self.participant, self.lowstate_topic)
|
|
|
|
self.lowcmd_topic = Topic(self.participant, self.lowcmd_topic_name, Go2pyLowCmd_)
|
|
self.lowcmd_writer = DataWriter(self.participant, self.lowcmd_topic)
|
|
|
|
self.highcmd_topic = Topic(self.participant, self.highcmd_topic_name, Go2pyHighCmd_)
|
|
self.highcmd_writer = DataWriter(self.participant, self.highcmd_topic)
|
|
self.vx_max = vx_max
|
|
self.vy_max = vy_max
|
|
self.P_v_max = np.diag([1 / self.vx_max**2, 1 / self.vy_max**2])
|
|
self.ωz_max = ωz_max
|
|
self.ωz_min = -ωz_max
|
|
|
|
self.state = None
|
|
self.setCommands = {'lowlevel': self.setCommandsLow,
|
|
'highlevel': self.setCommandsHigh}[self.mode]
|
|
self.state_thread = Thread(target=self.state_update)
|
|
self.running = True
|
|
self.state_thread.start()
|
|
|
|
def state_update(self):
|
|
"""
|
|
Retrieve the state of the robot
|
|
"""
|
|
while self.running:
|
|
for msg in self.state_reader.take_iter(timeout=duration(milliseconds=1.)):
|
|
self.state = msg
|
|
|
|
def getIMU(self):
|
|
accel = self.state.accel
|
|
gyro = self.state.gyro
|
|
quat = self.state.quat
|
|
temp = self.state.imu_temp
|
|
return {'accel': accel, 'gyro': gyro, 'quat': quat, 'temp': temp}
|
|
|
|
def getFootContacts(self):
|
|
"""Returns the raw foot contact forces"""
|
|
footContacts = self.state.contact
|
|
return footContacts
|
|
|
|
def getJointStates(self):
|
|
"""Returns the joint angles (q) and velocities (dq) of the robot"""
|
|
return {'q': self.state.q,
|
|
'dq': self.state.dq,
|
|
'tau_est': self.state.tau,
|
|
'temperature': self.state.motor_temp}
|
|
|
|
def getRemoteState(self):
|
|
"""A method to get the state of the wireless remote control.
|
|
Returns a xRockerBtn object:
|
|
- head: [head1, head2]
|
|
- keySwitch: xKeySwitch object
|
|
- lx: float
|
|
- rx: float
|
|
- ry: float
|
|
- L2: float
|
|
- ly: float
|
|
"""
|
|
wirelessRemote = self.state.wireless_remote[:24]
|
|
binary_data = bytes(wirelessRemote)
|
|
|
|
format_str = "<2BH5f"
|
|
data = struct.unpack(format_str, binary_data)
|
|
|
|
head = list(data[:2])
|
|
lx = data[3]
|
|
rx = data[4]
|
|
ry = data[5]
|
|
L2 = data[6]
|
|
ly = data[7]
|
|
|
|
_btn = bin(data[2])[2:].zfill(16)
|
|
btn = [int(char) for char in _btn]
|
|
btn.reverse()
|
|
|
|
keySwitch = xKeySwitch(*btn)
|
|
rockerBtn = xRockerBtn(head, keySwitch, lx, rx, ry, L2, ly)
|
|
return rockerBtn
|
|
|
|
def getCommandFromRemote(self):
|
|
"""Do not use directly for control!!!"""
|
|
rockerBtn = self.getRemoteState()
|
|
|
|
lx = rockerBtn.lx
|
|
ly = rockerBtn.ly
|
|
rx = rockerBtn.rx
|
|
|
|
v_x = ly * self.vx_max
|
|
v_y = lx * self.vy_max
|
|
ω = rx * self.ωz_max
|
|
|
|
return v_x, v_y, ω
|
|
|
|
def getBatteryState(self):
|
|
"""Returns the battery percentage of the robot"""
|
|
return self.state.soc
|
|
|
|
def setCommandsHigh(self, v_x, v_y, ω_z, bodyHeight=0.0, footRaiseHeight=0.0, mode=2):
|
|
self.cmd_watchdog_timer = time.time()
|
|
_v_x, _v_y, _ω_z = self.clip_velocity(v_x, v_y, ω_z)
|
|
highcmd = Go2pyHighCmd_(
|
|
_v_x,
|
|
_v_y,
|
|
_ω_z
|
|
)
|
|
self.highcmd_writer.write(highcmd)
|
|
|
|
def setCommandsLow(self, q_des, dq_des, kp, kd, tau_ff):
|
|
assert q_des.size == dq_des.size == kp.size == kd.size == tau_ff.size == 12, "q, dq, kp, kd, tau_ff should have size 12"
|
|
lowcmd = Go2pyLowCmd_(
|
|
q_des,
|
|
dq_des,
|
|
kp,
|
|
kd,
|
|
tau_ff
|
|
)
|
|
self.lowcmd_writer.write(lowcmd)
|
|
self.latest_command_stamp = time.time()
|
|
|
|
def close(self):
|
|
self.running = False
|
|
|
|
def clip_velocity(self, v_x, v_y, ω_z):
|
|
_v = np.array([[v_x], [v_y]])
|
|
_scale = np.sqrt(_v.T @ self.P_v_max @ _v)[0, 0]
|
|
|
|
if _scale > 1.0:
|
|
scale = 1.0 / _scale
|
|
else:
|
|
scale = 1.0
|
|
|
|
return scale * v_x, scale * v_y, np.clip(ω_z, self.ωz_min, self.ωz_max)
|
|
|
|
def overheat(self):
|
|
return False
|
|
|
|
def getGravityInBody(self):
|
|
q = self.getIMU()['quat']
|
|
R = Rotation.from_quat(q).as_matrix()
|
|
g_in_body = R.T @ np.array([0.0, 0.0, -1.0]).reshape(3, 1)
|
|
return g_in_body
|