634 lines
30 KiB
Python
634 lines
30 KiB
Python
import os, sys
|
|
|
|
import rclpy
|
|
from rclpy.node import Node
|
|
from unitree_go.msg import (
|
|
WirelessController,
|
|
LowState,
|
|
# MotorState,
|
|
# IMUState,
|
|
LowCmd,
|
|
# MotorCmd,
|
|
)
|
|
from std_msgs.msg import Float32MultiArray
|
|
if os.uname().machine in ["x86_64", "amd64"]:
|
|
sys.path.append(os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)),
|
|
"x86",
|
|
))
|
|
elif os.uname().machine == "aarch64":
|
|
sys.path.append(os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)),
|
|
"aarch64",
|
|
))
|
|
from crc_module import get_crc
|
|
|
|
from multiprocessing import Process
|
|
from collections import OrderedDict
|
|
import numpy as np
|
|
import torch
|
|
|
|
@torch.jit.script
|
|
def quat_from_euler_xyz(roll, pitch, yaw):
|
|
cy = torch.cos(yaw * 0.5)
|
|
sy = torch.sin(yaw * 0.5)
|
|
cr = torch.cos(roll * 0.5)
|
|
sr = torch.sin(roll * 0.5)
|
|
cp = torch.cos(pitch * 0.5)
|
|
sp = torch.sin(pitch * 0.5)
|
|
|
|
qw = cy * cr * cp + sy * sr * sp
|
|
qx = cy * sr * cp - sy * cr * sp
|
|
qy = cy * cr * sp + sy * sr * cp
|
|
qz = sy * cr * cp - cy * sr * sp
|
|
|
|
return torch.stack([qx, qy, qz, qw], dim=-1)
|
|
|
|
@torch.jit.script
|
|
def quat_rotate_inverse(q, v):
|
|
""" q must be in x, y, z, w order """
|
|
shape = q.shape
|
|
q_w = q[:, -1]
|
|
q_vec = q[:, :3]
|
|
a = v * (2.0 * q_w ** 2 - 1.0).unsqueeze(-1)
|
|
b = torch.cross(q_vec, v, dim=-1) * q_w.unsqueeze(-1) * 2.0
|
|
c = q_vec * \
|
|
torch.bmm(q_vec.view(shape[0], 1, 3), v.view(
|
|
shape[0], 3, 1)).squeeze(-1) * 2.0
|
|
return a - b + c
|
|
|
|
@torch.jit.script
|
|
def copysign(a, b):
|
|
# type: (float, Tensor) -> Tensor
|
|
a = torch.tensor(a, device=b.device, dtype=torch.float).repeat(b.shape[0])
|
|
return torch.abs(a) * torch.sign(b)
|
|
|
|
@torch.jit.script
|
|
def get_euler_xyz(q):
|
|
qx, qy, qz, qw = 0, 1, 2, 3
|
|
# roll (x-axis rotation)
|
|
sinr_cosp = 2.0 * (q[:, qw] * q[:, qx] + q[:, qy] * q[:, qz])
|
|
cosr_cosp = q[:, qw] * q[:, qw] - q[:, qx] * \
|
|
q[:, qx] - q[:, qy] * q[:, qy] + q[:, qz] * q[:, qz]
|
|
roll = torch.atan2(sinr_cosp, cosr_cosp)
|
|
|
|
# pitch (y-axis rotation)
|
|
sinp = 2.0 * (q[:, qw] * q[:, qy] - q[:, qz] * q[:, qx])
|
|
pitch = torch.where(torch.abs(sinp) >= 1, copysign(
|
|
np.pi / 2.0, sinp), torch.asin(sinp))
|
|
|
|
# yaw (z-axis rotation)
|
|
siny_cosp = 2.0 * (q[:, qw] * q[:, qz] + q[:, qx] * q[:, qy])
|
|
cosy_cosp = q[:, qw] * q[:, qw] + q[:, qx] * \
|
|
q[:, qx] - q[:, qy] * q[:, qy] - q[:, qz] * q[:, qz]
|
|
yaw = torch.atan2(siny_cosp, cosy_cosp)
|
|
|
|
return roll % (2*np.pi), pitch % (2*np.pi), yaw % (2*np.pi)
|
|
|
|
class RobotCfgs:
|
|
class H1:
|
|
pass
|
|
|
|
class Go2:
|
|
NUM_DOF = 12
|
|
NUM_ACTIONS = 12
|
|
dof_map = [ # from isaacgym simulation joint order to real robot joint order
|
|
3, 4, 5,
|
|
0, 1, 2,
|
|
9, 10, 11,
|
|
6, 7, 8,
|
|
]
|
|
dof_names = [ # NOTE: order matters. This list is the order in simulation.
|
|
"FL_hip_joint",
|
|
"FL_thigh_joint",
|
|
"FL_calf_joint",
|
|
"FR_hip_joint",
|
|
"FR_thigh_joint",
|
|
"FR_calf_joint",
|
|
"RL_hip_joint",
|
|
"RL_thigh_joint",
|
|
"RL_calf_joint",
|
|
"RR_hip_joint",
|
|
"RR_thigh_joint",
|
|
"RR_calf_joint",
|
|
]
|
|
dof_signs = [1.] * 12
|
|
joint_limits_high = torch.tensor([
|
|
1.0472, 3.4907, -0.83776,
|
|
1.0472, 3.4907, -0.83776,
|
|
1.0472, 4.5379, -0.83776,
|
|
1.0472, 4.5379, -0.83776,
|
|
], device= "cpu", dtype= torch.float32)
|
|
joint_limits_low = torch.tensor([
|
|
-1.0472, -1.5708, -2.7227,
|
|
-1.0472, -1.5708, -2.7227,
|
|
-1.0472, -0.5236, -2.7227,
|
|
-1.0472, -0.5236, -2.7227,
|
|
], device= "cpu", dtype= torch.float32)
|
|
torque_limits = torch.tensor([ # from urdf and in simulation order
|
|
25, 40, 40,
|
|
25, 40, 40,
|
|
25, 40, 40,
|
|
25, 40, 40,
|
|
], device= "cpu", dtype= torch.float32)
|
|
turn_on_motor_mode = [0x01] * 12
|
|
|
|
|
|
class UnitreeRos2Real(Node):
|
|
""" A proxy implementation of the real H1 robot. """
|
|
class WirelessButtons:
|
|
R1 = 0b00000001 # 1
|
|
L1 = 0b00000010 # 2
|
|
start = 0b00000100 # 4
|
|
select = 0b00001000 # 8
|
|
R2 = 0b00010000 # 16
|
|
L2 = 0b00100000 # 32
|
|
F1 = 0b01000000 # 64
|
|
F2 = 0b10000000 # 128
|
|
A = 0b100000000 # 256
|
|
B = 0b1000000000 # 512
|
|
X = 0b10000000000 # 1024
|
|
Y = 0b100000000000 # 2048
|
|
up = 0b1000000000000 # 4096
|
|
right = 0b10000000000000 # 8192
|
|
down = 0b100000000000000 # 16384
|
|
left = 0b1000000000000000 # 32768
|
|
|
|
def __init__(self,
|
|
robot_namespace= None,
|
|
low_state_topic= "/lowstate",
|
|
low_cmd_topic= "/lowcmd",
|
|
joy_stick_topic= "/wirelesscontroller",
|
|
forward_depth_topic= None, # if None and still need access, set to str "pyrealsense"
|
|
forward_depth_embedding_topic= "/forward_depth_embedding",
|
|
cfg= dict(),
|
|
lin_vel_deadband= 0.1,
|
|
ang_vel_deadband= 0.1,
|
|
cmd_px_range= [0.4, 1.0], # check joy_stick_callback (p for positive, n for negative)
|
|
cmd_nx_range= [0.4, 0.8], # check joy_stick_callback (p for positive, n for negative)
|
|
cmd_py_range= [0.4, 0.8], # check joy_stick_callback (p for positive, n for negative)
|
|
cmd_ny_range= [0.4, 0.8], # check joy_stick_callback (p for positive, n for negative)
|
|
cmd_pyaw_range= [0.4, 1.6], # check joy_stick_callback (p for positive, n for negative)
|
|
cmd_nyaw_range= [0.4, 1.6], # check joy_stick_callback (p for positive, n for negative)
|
|
replace_obs_with_embeddings= [], # a list of strings, e.g. ["forward_depth"] then the corrseponding obs will be processed by _get_forward_depth_embedding_obs()
|
|
move_by_wireless_remote= True, # if True, the robot will be controlled by a wireless remote
|
|
model_device= "cpu",
|
|
dof_pos_protect_ratio= 1.1, # if the dof_pos is out of the range of this ratio, the process will shutdown.
|
|
robot_class_name= "H1",
|
|
dryrun= True, # if True, the robot will not send commands to the real robot
|
|
):
|
|
super().__init__("unitree_ros2_real")
|
|
self.NUM_DOF = getattr(RobotCfgs, robot_class_name).NUM_DOF
|
|
self.NUM_ACTIONS = getattr(RobotCfgs, robot_class_name).NUM_ACTIONS
|
|
self.robot_namespace = robot_namespace
|
|
self.low_state_topic = low_state_topic
|
|
# Generate a unique cmd topic so that the low_cmd will not send to the robot's motor.
|
|
self.low_cmd_topic = low_cmd_topic if not dryrun else low_cmd_topic + "_dryrun_" + str(np.random.randint(0, 65535))
|
|
self.joy_stick_topic = joy_stick_topic
|
|
self.forward_depth_topic = forward_depth_topic
|
|
self.forward_depth_embedding_topic = forward_depth_embedding_topic
|
|
self.cfg = cfg
|
|
self.lin_vel_deadband = lin_vel_deadband
|
|
self.ang_vel_deadband = ang_vel_deadband
|
|
self.cmd_px_range = cmd_px_range
|
|
self.cmd_nx_range = cmd_nx_range
|
|
self.cmd_py_range = cmd_py_range
|
|
self.cmd_ny_range = cmd_ny_range
|
|
self.cmd_pyaw_range = cmd_pyaw_range
|
|
self.cmd_nyaw_range = cmd_nyaw_range
|
|
self.replace_obs_with_embeddings = replace_obs_with_embeddings
|
|
self.move_by_wireless_remote = move_by_wireless_remote
|
|
self.model_device = model_device
|
|
self.dof_pos_protect_ratio = dof_pos_protect_ratio
|
|
self.robot_class_name = robot_class_name
|
|
self.dryrun = dryrun
|
|
|
|
self.dof_map = getattr(RobotCfgs, robot_class_name).dof_map
|
|
self.dof_names = getattr(RobotCfgs, robot_class_name).dof_names
|
|
self.dof_signs = getattr(RobotCfgs, robot_class_name).dof_signs
|
|
self.turn_on_motor_mode = getattr(RobotCfgs, robot_class_name).turn_on_motor_mode
|
|
|
|
self.parse_config()
|
|
|
|
def parse_config(self):
|
|
""" parse, set attributes from config dict, initialize buffers to speed up the computation """
|
|
self.up_axis_idx = 2 # 2 for z, 1 for y -> adapt gravity accordingly
|
|
self.gravity_vec = torch.zeros((1, 3), device= self.model_device, dtype= torch.float32)
|
|
self.gravity_vec[:, self.up_axis_idx] = -1
|
|
|
|
# observations
|
|
self.clip_obs = self.cfg["normalization"]["clip_observations"]
|
|
self.obs_scales = self.cfg["normalization"]["obs_scales"]
|
|
for k, v in self.obs_scales.items():
|
|
if isinstance(v, (list, tuple)):
|
|
self.obs_scales[k] = torch.tensor(v, device= self.model_device, dtype= torch.float32)
|
|
# check whether there are embeddings in obs_components and launch encoder process later
|
|
if len(self.replace_obs_with_embeddings) > 0:
|
|
for comp in self.replace_obs_with_embeddings:
|
|
self.get_logger().warn(f"{comp} will be replaced with its embedding when get_obs, don't forget to launch the corresponding process before running the policy.")
|
|
self.obs_segments = self.get_obs_segment_from_components(self.cfg["env"]["obs_components"])
|
|
self.num_obs = self.get_num_obs_from_components(self.cfg["env"]["obs_components"])
|
|
if "privileged_obs_components" in self.cfg["env"].keys():
|
|
self.privileged_obs_segments = self.get_obs_segment_from_components(self.cfg["env"]["privileged_obs_components"])
|
|
self.num_privileged_obs = self.get_num_obs_from_components(self.cfg["env"]["privileged_obs_components"])
|
|
for obs_component in self.cfg["env"]["obs_components"]:
|
|
if "orientation_cmds" in obs_component:
|
|
self.roll_pitch_yaw_cmd = torch.zeros(1, 3, device= self.model_device, dtype= torch.float32)
|
|
|
|
# controls
|
|
self.control_type = self.cfg["control"]["control_type"]
|
|
if not (self.control_type == "P"):
|
|
raise NotImplementedError("Only position control is supported for now.")
|
|
self.p_gains = []
|
|
for i in range(self.NUM_DOF):
|
|
name = self.dof_names[i] # set p_gains in simulation order
|
|
for k, v in self.cfg["control"]["stiffness"].items():
|
|
if k in name:
|
|
self.p_gains.append(v)
|
|
break # only one match
|
|
self.p_gains = torch.tensor(self.p_gains, device= self.model_device, dtype= torch.float32)
|
|
self.d_gains = []
|
|
for i in range(self.NUM_DOF):
|
|
name = self.dof_names[i] # set d_gains in simulation order
|
|
for k, v in self.cfg["control"]["damping"].items():
|
|
if k in name:
|
|
self.d_gains.append(v)
|
|
break
|
|
self.d_gains = torch.tensor(self.d_gains, device= self.model_device, dtype= torch.float32)
|
|
self.default_dof_pos = torch.zeros(self.NUM_DOF, device= self.model_device, dtype= torch.float32)
|
|
self.dof_pos_ = torch.empty(1, self.NUM_DOF, device= self.model_device, dtype= torch.float32)
|
|
self.dof_vel_ = torch.empty(1, self.NUM_DOF, device= self.model_device, dtype= torch.float32)
|
|
for i in range(self.NUM_DOF):
|
|
name = self.dof_names[i]
|
|
default_joint_angle = self.cfg["init_state"]["default_joint_angles"][name]
|
|
# in simulation order.
|
|
self.default_dof_pos[i] = default_joint_angle
|
|
self.computer_clip_torque = self.cfg["control"].get("computer_clip_torque", True)
|
|
self.get_logger().info("Computer Clip Torque (onboard) is " + str(self.computer_clip_torque))
|
|
self.torque_limits = getattr(RobotCfgs, self.robot_class_name).torque_limits.to(self.model_device)
|
|
if self.computer_clip_torque:
|
|
assert hasattr(self, "torque_limits") and (len(self.torque_limits) == self.NUM_DOF), f"torque_limits must be set with the length of {self.NUM_DOF} if computer_clip_torque is True"
|
|
self.get_logger().info("[Env] torque limit: " + ",".join("{:.1f}".format(x) for x in self.torque_limits))
|
|
|
|
# actions
|
|
self.num_actions = self.NUM_ACTIONS
|
|
self.action_scale = self.cfg["control"]["action_scale"]
|
|
self.get_logger().info("[Env] action scale: {:.1f}".format(self.action_scale))
|
|
self.clip_actions = self.cfg["normalization"]["clip_actions"]
|
|
if self.cfg["normalization"].get("clip_actions_method", None) == "hard":
|
|
self.get_logger().info("clip_actions_method with hard mode")
|
|
self.get_logger().info("clip_actions_high: " + str(self.cfg["normalization"]["clip_actions_high"]))
|
|
self.get_logger().info("clip_actions_low: " + str(self.cfg["normalization"]["clip_actions_low"]))
|
|
self.clip_actions_method = "hard"
|
|
self.clip_actions_low = torch.tensor(self.cfg["normalization"]["clip_actions_low"], device= self.model_device, dtype= torch.float32)
|
|
self.clip_actions_high = torch.tensor(self.cfg["normalization"]["clip_actions_high"], device= self.model_device, dtype= torch.float32)
|
|
else:
|
|
self.get_logger().info("clip_actions_method is " + str(self.cfg["normalization"].get("clip_actions_method", None)))
|
|
self.actions = torch.zeros(self.NUM_ACTIONS, device= self.model_device, dtype= torch.float32)
|
|
|
|
|
|
# hardware related, in simulation order
|
|
self.joint_limits_high = getattr(RobotCfgs, self.robot_class_name).joint_limits_high.to(self.model_device)
|
|
self.joint_limits_low = getattr(RobotCfgs, self.robot_class_name).joint_limits_low.to(self.model_device)
|
|
joint_pos_mid = (self.joint_limits_high + self.joint_limits_low) / 2
|
|
joint_pos_range = (self.joint_limits_high - self.joint_limits_low) / 2
|
|
self.joint_pos_protect_high = joint_pos_mid + joint_pos_range * self.dof_pos_protect_ratio
|
|
self.joint_pos_protect_low = joint_pos_mid - joint_pos_range * self.dof_pos_protect_ratio
|
|
|
|
def start_ros_handlers(self):
|
|
""" after initializing the env and policy, register ros related callbacks and topics
|
|
"""
|
|
|
|
# ROS publishers
|
|
self.low_cmd_pub = self.create_publisher(
|
|
LowCmd,
|
|
self.low_cmd_topic,
|
|
1
|
|
)
|
|
self.low_cmd_buffer = LowCmd()
|
|
|
|
# ROS subscribers
|
|
self.low_state_sub = self.create_subscription(
|
|
LowState,
|
|
self.low_state_topic,
|
|
self._low_state_callback,
|
|
1
|
|
)
|
|
self.joy_stick_sub = self.create_subscription(
|
|
WirelessController,
|
|
self.joy_stick_topic,
|
|
self._joy_stick_callback,
|
|
1
|
|
)
|
|
|
|
if self.forward_depth_topic is not None:
|
|
self.forward_camera_sub = self.create_subscription(
|
|
Image,
|
|
self.forward_depth_topic,
|
|
self._forward_depth_callback,
|
|
1
|
|
)
|
|
|
|
if self.forward_depth_embedding_topic is not None and "forward_depth" in self.replace_obs_with_embeddings:
|
|
self.forward_depth_embedding_sub = self.create_subscription(
|
|
Float32MultiArray,
|
|
self.forward_depth_embedding_topic,
|
|
self._forward_depth_embedding_callback,
|
|
1,
|
|
)
|
|
|
|
self.get_logger().info("ROS handlers started, waiting to recieve critical low state and wireless controller messages.")
|
|
if not self.dryrun:
|
|
self.get_logger().warn(f"You are running the code in no-dryrun mode and publishing to '{self.low_cmd_topic}', Please keep safe.")
|
|
else:
|
|
self.get_logger().warn(f"You are publishing low cmd to '{self.low_cmd_topic}' because of dryrun mode, Please check and be safe.")
|
|
while rclpy.ok():
|
|
rclpy.spin_once(self)
|
|
if hasattr(self, "low_state_buffer") and hasattr(self, "joy_stick_buffer"):
|
|
break
|
|
self.get_logger().info("Low state message received, the robot is ready to go.")
|
|
|
|
""" ROS callbacks and handlers that update the buffer """
|
|
def _low_state_callback(self, msg):
|
|
""" store and handle proprioception data """
|
|
self.low_state_buffer = msg # keep the latest low state
|
|
|
|
# refresh dof_pos and dof_vel
|
|
for sim_idx in range(self.NUM_DOF):
|
|
real_idx = self.dof_map[sim_idx]
|
|
self.dof_pos_[0, sim_idx] = self.low_state_buffer.motor_state[real_idx].q * self.dof_signs[sim_idx]
|
|
for sim_idx in range(self.NUM_DOF):
|
|
real_idx = self.dof_map[sim_idx]
|
|
self.dof_vel_[0, sim_idx] = self.low_state_buffer.motor_state[real_idx].dq * self.dof_signs[sim_idx]
|
|
# automatic safety check
|
|
for sim_idx in range(self.NUM_DOF):
|
|
real_idx = self.dof_map[sim_idx]
|
|
if self.dof_pos_[0, sim_idx] > self.joint_pos_protect_high[sim_idx] or \
|
|
self.dof_pos_[0, sim_idx] < self.joint_pos_protect_low[sim_idx]:
|
|
self.get_logger().error(f"Joint {sim_idx}(sim), {real_idx}(real) position out of range at {self.low_state_buffer.motor_state[real_idx].q}")
|
|
self.get_logger().error("The motors and this process shuts down.")
|
|
self._turn_off_motors()
|
|
raise SystemExit()
|
|
|
|
def _joy_stick_callback(self, msg):
|
|
self.joy_stick_buffer = msg
|
|
if self.move_by_wireless_remote:
|
|
# left-y for forward/backward
|
|
ly = msg.ly
|
|
if ly > self.lin_vel_deadband:
|
|
vx = (ly - self.lin_vel_deadband) / (1 - self.lin_vel_deadband) # (0, 1)
|
|
vx = vx * (self.cmd_px_range[1] - self.cmd_px_range[0]) + self.cmd_px_range[0]
|
|
elif ly < -self.lin_vel_deadband:
|
|
vx = (ly + self.lin_vel_deadband) / (1 - self.lin_vel_deadband) # (-1, 0)
|
|
vx = vx * (self.cmd_nx_range[1] - self.cmd_nx_range[0]) - self.cmd_nx_range[0]
|
|
else:
|
|
vx = 0
|
|
# left-x for turning left/right
|
|
lx = -msg.lx
|
|
if lx > self.ang_vel_deadband:
|
|
yaw = (lx - self.ang_vel_deadband) / (1 - self.ang_vel_deadband)
|
|
yaw = yaw * (self.cmd_pyaw_range[1] - self.cmd_pyaw_range[0]) + self.cmd_pyaw_range[0]
|
|
elif lx < -self.ang_vel_deadband:
|
|
yaw = (lx + self.ang_vel_deadband) / (1 - self.ang_vel_deadband)
|
|
yaw = yaw * (self.cmd_nyaw_range[1] - self.cmd_nyaw_range[0]) - self.cmd_nyaw_range[0]
|
|
else:
|
|
yaw = 0
|
|
# right-x for side moving left/right
|
|
rx = -msg.rx
|
|
if rx > self.lin_vel_deadband:
|
|
vy = (rx - self.lin_vel_deadband) / (1 - self.lin_vel_deadband)
|
|
vy = vy * (self.cmd_py_range[1] - self.cmd_py_range[0]) + self.cmd_py_range[0]
|
|
elif rx < -self.lin_vel_deadband:
|
|
vy = (rx + self.lin_vel_deadband) / (1 - self.lin_vel_deadband)
|
|
vy = vy * (self.cmd_ny_range[1] - self.cmd_ny_range[0]) - self.cmd_ny_range[0]
|
|
else:
|
|
vy = 0
|
|
self.xyyaw_command = torch.tensor([vx, vy, yaw], device= self.model_device, dtype= torch.float32)
|
|
|
|
# refer to Unitree Remote Control data structure, msg.keys is a bit mask
|
|
# 00000000 00000001 means pressing the 0-th button (R1)
|
|
# 00000000 00000010 means pressing the 1-th button (L1)
|
|
# 10000000 00000000 means pressing the 15-th button (left)
|
|
if (msg.keys & self.WirelessButtons.R2) or (msg.keys & self.WirelessButtons.L2): # R2 or L2 is pressed
|
|
self.get_logger().warn("R2 or L2 is pressed, the motors and this process shuts down.")
|
|
self._turn_off_motors()
|
|
raise SystemExit()
|
|
|
|
# roll-pitch target
|
|
if hasattr(self, "roll_pitch_yaw_cmd"):
|
|
if (msg.keys & self.WirelessButtons.up):
|
|
self.roll_pitch_yaw_cmd[0, 1] += 0.1
|
|
self.get_logger().info("Pitch Command: " + str(self.roll_pitch_yaw_cmd))
|
|
if (msg.keys & self.WirelessButtons.down):
|
|
self.roll_pitch_yaw_cmd[0, 1] -= 0.1
|
|
self.get_logger().info("Pitch Command: " + str(self.roll_pitch_yaw_cmd))
|
|
if (msg.keys & self.WirelessButtons.left):
|
|
self.roll_pitch_yaw_cmd[0, 0] -= 0.1
|
|
self.get_logger().info("Roll Command: " + str(self.roll_pitch_yaw_cmd))
|
|
if (msg.keys & self.WirelessButtons.right):
|
|
self.roll_pitch_yaw_cmd[0, 0] += 0.1
|
|
self.get_logger().info("Roll Command: " + str(self.roll_pitch_yaw_cmd))
|
|
|
|
def _forward_depth_callback(self, msg):
|
|
""" store and handle depth camera data """
|
|
pass
|
|
|
|
def _forward_depth_embedding_callback(self, msg):
|
|
self.forward_depth_embedding_buffer = torch.tensor(msg.data, device= self.model_device, dtype= torch.float32).view(1, -1)
|
|
|
|
""" Done: ROS callbacks and handlers that update the buffer """
|
|
|
|
""" refresh observation buffer and corresponding sub-functions """
|
|
def _get_lin_vel_obs(self):
|
|
return torch.zeros(1, 3, device= self.model_device, dtype= torch.float32)
|
|
|
|
def _get_ang_vel_obs(self):
|
|
buffer = torch.from_numpy(self.low_state_buffer.imu_state.gyroscope).unsqueeze(0)
|
|
return buffer
|
|
|
|
def _get_projected_gravity_obs(self):
|
|
quat_xyzw = torch.tensor([
|
|
self.low_state_buffer.imu_state.quaternion[1],
|
|
self.low_state_buffer.imu_state.quaternion[2],
|
|
self.low_state_buffer.imu_state.quaternion[3],
|
|
self.low_state_buffer.imu_state.quaternion[0],
|
|
], device= self.model_device, dtype= torch.float32).unsqueeze(0)
|
|
return quat_rotate_inverse(
|
|
quat_xyzw,
|
|
self.gravity_vec,
|
|
)
|
|
|
|
def _get_commands_obs(self):
|
|
return self.xyyaw_command.unsqueeze(0) # (1, 3)
|
|
|
|
def _get_dof_pos_obs(self):
|
|
return self.dof_pos_ - self.default_dof_pos.unsqueeze(0)
|
|
|
|
def _get_dof_vel_obs(self):
|
|
return self.dof_vel_
|
|
|
|
def _get_last_actions_obs(self):
|
|
return self.actions
|
|
|
|
def _get_forward_depth_embedding_obs(self):
|
|
return self.forward_depth_embedding_buffer
|
|
|
|
def _get_forward_depth_obs(self):
|
|
raise NotImplementedError()
|
|
|
|
def _get_orientation_cmds_obs(self):
|
|
return quat_rotate_inverse(
|
|
quat_from_euler_xyz(self.roll_pitch_yaw_cmd[:, 0], self.roll_pitch_yaw_cmd[:, 1], self.roll_pitch_yaw_cmd[:, 2]),
|
|
self.gravity_vec,
|
|
)
|
|
|
|
def get_num_obs_from_components(self, components):
|
|
obs_segments = self.get_obs_segment_from_components(components)
|
|
num_obs = 0
|
|
for k, v in obs_segments.items():
|
|
num_obs += np.prod(v)
|
|
return num_obs
|
|
|
|
def get_obs_segment_from_components(self, components):
|
|
""" Observation segment is defined as a list of lists/ints defining the tensor shape with
|
|
corresponding order.
|
|
"""
|
|
segments = OrderedDict()
|
|
if "lin_vel" in components:
|
|
print("Warning: lin_vel is not typically available or accurate enough on the real robot. Will return zeros.")
|
|
segments["lin_vel"] = (3,)
|
|
if "ang_vel" in components:
|
|
segments["ang_vel"] = (3,)
|
|
if "projected_gravity" in components:
|
|
segments["projected_gravity"] = (3,)
|
|
if "commands" in components:
|
|
segments["commands"] = (3,)
|
|
if "dof_pos" in components:
|
|
segments["dof_pos"] = (self.NUM_DOF,)
|
|
if "dof_vel" in components:
|
|
segments["dof_vel"] = (self.NUM_DOF,)
|
|
if "last_actions" in components:
|
|
segments["last_actions"] = (self.NUM_ACTIONS,)
|
|
if "height_measurements" in components:
|
|
print("Warning: height_measurements is not typically available on the real robot.")
|
|
segments["height_measurements"] = (1, len(self.cfg["terrain"]["measured_points_x"]), len(self.cfg["terrain"]["measured_points_y"]))
|
|
if "forward_depth" in components:
|
|
if "output_resolution" in self.cfg["sensor"]["forward_camera"]:
|
|
segments["forward_depth"] = (1, *self.cfg["sensor"]["forward_camera"]["output_resolution"])
|
|
else:
|
|
segments["forward_depth"] = (1, *self.cfg["sensor"]["forward_camera"]["resolution"])
|
|
if "base_pose" in components:
|
|
segments["base_pose"] = (6,) # xyz + rpy
|
|
if "robot_config" in components:
|
|
""" Related to robot_config_buffer attribute, Be careful to change. """
|
|
# robot shape friction
|
|
# CoM (Center of Mass) x, y, z
|
|
# base mass (payload)
|
|
# motor strength for each joint
|
|
print("Warning: height_measurements is not typically available on the real robot.")
|
|
segments["robot_config"] = (1 + 3 + 1 + self.NUM_ACTIONS,)
|
|
|
|
""" NOTE: The following components are not directly set in legged_robot.py.
|
|
Please check the order or extend the class implementation if needed.
|
|
"""
|
|
if "joints_target" in components:
|
|
# o[0] for target value, 0[1] for wether the target should be tracked (1) or not (0)
|
|
segments["joints_target"] = (2, self.NUM_DOF)
|
|
if "projected_gravity_target" in components:
|
|
# projected_gravity for which the robot should track the target
|
|
# last value as a mask of whether to follow the target or not
|
|
segments["projected_gravity_target"] = (3+1,)
|
|
if "orientation_cmds" in components:
|
|
segments["orientation_cmds"] = (3,)
|
|
return segments
|
|
|
|
def get_obs(self, obs_segments= None):
|
|
""" Extract from the buffers and build the 1d observation tensor
|
|
Each get ... obs function does not do the obs_scale multiplication.
|
|
NOTE: obs_buffer has the batch dimension, whose size is 1.
|
|
"""
|
|
if obs_segments is None:
|
|
obs_segments = self.obs_segments
|
|
obs_buffer = []
|
|
for k, v in obs_segments.items():
|
|
if k in self.replace_obs_with_embeddings:
|
|
obs_component_value = getattr(self, "_get_" + k + "_embedding_obs")()
|
|
else:
|
|
obs_component_value = getattr(self, "_get_" + k + "_obs")() * self.obs_scales.get(k, 1.0)
|
|
obs_buffer.append(obs_component_value)
|
|
obs_buffer = torch.cat(obs_buffer, dim=1)
|
|
obs_buffer = torch.clamp(obs_buffer, -self.clip_obs, self.clip_obs)
|
|
return obs_buffer
|
|
""" Done: refresh observation buffer and corresponding sub-functions """
|
|
|
|
""" Control related functions """
|
|
def clip_action_before_scale(self, action):
|
|
action = torch.clip(action, -self.clip_actions, self.clip_actions)
|
|
if getattr(self, "clip_actions_method", None) == "hard":
|
|
action = torch.clip(action, self.clip_actions_low, self.clip_actions_high)
|
|
return action
|
|
|
|
def clip_by_torque_limit(self, actions_scaled):
|
|
""" Different from simulation, we reverse the process and clip the actions directly,
|
|
so that the PD controller runs in robot but not our script.
|
|
"""
|
|
control_type = self.cfg["control"]["control_type"]
|
|
if control_type == "P":
|
|
p_limits_low = (-self.torque_limits) + self.d_gains*self.dof_vel_
|
|
p_limits_high = (self.torque_limits) + self.d_gains*self.dof_vel_
|
|
actions_low = (p_limits_low/self.p_gains) - self.default_dof_pos + self.dof_pos_
|
|
actions_high = (p_limits_high/self.p_gains) - self.default_dof_pos + self.dof_pos_
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
return torch.clip(actions_scaled, actions_low, actions_high)
|
|
|
|
def send_action(self, actions):
|
|
""" Send the action to the robot motors, which does the preprocessing
|
|
just like env.step in simulation.
|
|
Thus, the actions has the batch dimension, whose size is 1.
|
|
"""
|
|
self.actions = self.clip_action_before_scale(actions)
|
|
if self.computer_clip_torque:
|
|
clipped_scaled_action = self.clip_by_torque_limit(actions * self.action_scale)
|
|
else:
|
|
self.get_logger().warn("Computer Clip Torque is False, the robot may be damaged.", throttle_duration_sec= 1)
|
|
clipped_scaled_action = actions * self.action_scale
|
|
robot_coordinates_action = clipped_scaled_action + self.default_dof_pos.unsqueeze(0)
|
|
|
|
self._publish_legs_cmd(robot_coordinates_action[0])
|
|
|
|
""" Done: Control related functions """
|
|
|
|
""" functions that actually publish the commands and take effect """
|
|
def _publish_legs_cmd(self, robot_coordinates_action: torch.Tensor):
|
|
""" Publish the joint commands to the robot legs in robot coordinates system.
|
|
robot_coordinates_action: shape (NUM_DOF,), in simulation order.
|
|
"""
|
|
for sim_idx in range(self.NUM_DOF):
|
|
real_idx = self.dof_map[sim_idx]
|
|
if not self.dryrun:
|
|
self.low_cmd_buffer.motor_cmd[real_idx].mode = self.turn_on_motor_mode[sim_idx]
|
|
self.low_cmd_buffer.motor_cmd[real_idx].q = robot_coordinates_action[sim_idx].item() * self.dof_signs[sim_idx]
|
|
self.low_cmd_buffer.motor_cmd[real_idx].dq = 0.
|
|
self.low_cmd_buffer.motor_cmd[real_idx].tau = 0.
|
|
self.low_cmd_buffer.motor_cmd[real_idx].kp = self.p_gains[sim_idx].item()
|
|
self.low_cmd_buffer.motor_cmd[real_idx].kd = self.d_gains[sim_idx].item()
|
|
|
|
self.low_cmd_buffer.crc = get_crc(self.low_cmd_buffer)
|
|
self.low_cmd_pub.publish(self.low_cmd_buffer)
|
|
|
|
def _turn_off_motors(self):
|
|
""" Turn off the motors """
|
|
for sim_idx in range(self.NUM_DOF):
|
|
real_idx = self.dof_map[sim_idx]
|
|
self.low_cmd_buffer.motor_cmd[real_idx].mode = 0x00
|
|
self.low_cmd_buffer.motor_cmd[real_idx].q = 0.
|
|
self.low_cmd_buffer.motor_cmd[real_idx].dq = 0.
|
|
self.low_cmd_buffer.motor_cmd[real_idx].tau = 0.
|
|
self.low_cmd_buffer.motor_cmd[real_idx].kp = 0.
|
|
self.low_cmd_buffer.motor_cmd[real_idx].kd = 0.
|
|
self.low_cmd_buffer.crc = get_crc(self.low_cmd_buffer)
|
|
self.low_cmd_pub.publish(self.low_cmd_buffer)
|
|
""" Done: functions that actually publish the commands and take effect """
|