parkour/onboard_codes/go2/unitree_ros2_real.py

634 lines
30 KiB
Python
Raw Normal View History

import os, sys
import rclpy
from rclpy.node import Node
from unitree_go.msg import (
WirelessController,
LowState,
# MotorState,
# IMUState,
LowCmd,
# MotorCmd,
)
from std_msgs.msg import Float32MultiArray
if os.uname().machine in ["x86_64", "amd64"]:
sys.path.append(os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"x86",
))
elif os.uname().machine == "aarch64":
sys.path.append(os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"aarch64",
))
from crc_module import get_crc
from multiprocessing import Process
from collections import OrderedDict
import numpy as np
import torch
@torch.jit.script
def quat_from_euler_xyz(roll, pitch, yaw):
cy = torch.cos(yaw * 0.5)
sy = torch.sin(yaw * 0.5)
cr = torch.cos(roll * 0.5)
sr = torch.sin(roll * 0.5)
cp = torch.cos(pitch * 0.5)
sp = torch.sin(pitch * 0.5)
qw = cy * cr * cp + sy * sr * sp
qx = cy * sr * cp - sy * cr * sp
qy = cy * cr * sp + sy * sr * cp
qz = sy * cr * cp - cy * sr * sp
return torch.stack([qx, qy, qz, qw], dim=-1)
@torch.jit.script
def quat_rotate_inverse(q, v):
""" q must be in x, y, z, w order """
shape = q.shape
q_w = q[:, -1]
q_vec = q[:, :3]
a = v * (2.0 * q_w ** 2 - 1.0).unsqueeze(-1)
b = torch.cross(q_vec, v, dim=-1) * q_w.unsqueeze(-1) * 2.0
c = q_vec * \
torch.bmm(q_vec.view(shape[0], 1, 3), v.view(
shape[0], 3, 1)).squeeze(-1) * 2.0
return a - b + c
@torch.jit.script
def copysign(a, b):
# type: (float, Tensor) -> Tensor
a = torch.tensor(a, device=b.device, dtype=torch.float).repeat(b.shape[0])
return torch.abs(a) * torch.sign(b)
@torch.jit.script
def get_euler_xyz(q):
qx, qy, qz, qw = 0, 1, 2, 3
# roll (x-axis rotation)
sinr_cosp = 2.0 * (q[:, qw] * q[:, qx] + q[:, qy] * q[:, qz])
cosr_cosp = q[:, qw] * q[:, qw] - q[:, qx] * \
q[:, qx] - q[:, qy] * q[:, qy] + q[:, qz] * q[:, qz]
roll = torch.atan2(sinr_cosp, cosr_cosp)
# pitch (y-axis rotation)
sinp = 2.0 * (q[:, qw] * q[:, qy] - q[:, qz] * q[:, qx])
pitch = torch.where(torch.abs(sinp) >= 1, copysign(
np.pi / 2.0, sinp), torch.asin(sinp))
# yaw (z-axis rotation)
siny_cosp = 2.0 * (q[:, qw] * q[:, qz] + q[:, qx] * q[:, qy])
cosy_cosp = q[:, qw] * q[:, qw] + q[:, qx] * \
q[:, qx] - q[:, qy] * q[:, qy] - q[:, qz] * q[:, qz]
yaw = torch.atan2(siny_cosp, cosy_cosp)
return roll % (2*np.pi), pitch % (2*np.pi), yaw % (2*np.pi)
class RobotCfgs:
class H1:
pass
class Go2:
NUM_DOF = 12
NUM_ACTIONS = 12
dof_map = [ # from isaacgym simulation joint order to real robot joint order
3, 4, 5,
0, 1, 2,
9, 10, 11,
6, 7, 8,
]
dof_names = [ # NOTE: order matters. This list is the order in simulation.
"FL_hip_joint",
"FL_thigh_joint",
"FL_calf_joint",
"FR_hip_joint",
"FR_thigh_joint",
"FR_calf_joint",
"RL_hip_joint",
"RL_thigh_joint",
"RL_calf_joint",
"RR_hip_joint",
"RR_thigh_joint",
"RR_calf_joint",
]
dof_signs = [1.] * 12
joint_limits_high = torch.tensor([
1.0472, 3.4907, -0.83776,
1.0472, 3.4907, -0.83776,
1.0472, 4.5379, -0.83776,
1.0472, 4.5379, -0.83776,
], device= "cpu", dtype= torch.float32)
joint_limits_low = torch.tensor([
-1.0472, -1.5708, -2.7227,
-1.0472, -1.5708, -2.7227,
-1.0472, -0.5236, -2.7227,
-1.0472, -0.5236, -2.7227,
], device= "cpu", dtype= torch.float32)
torque_limits = torch.tensor([ # from urdf and in simulation order
25, 40, 40,
25, 40, 40,
25, 40, 40,
25, 40, 40,
], device= "cpu", dtype= torch.float32)
turn_on_motor_mode = [0x01] * 12
class UnitreeRos2Real(Node):
""" A proxy implementation of the real H1 robot. """
class WirelessButtons:
R1 = 0b00000001 # 1
L1 = 0b00000010 # 2
start = 0b00000100 # 4
select = 0b00001000 # 8
R2 = 0b00010000 # 16
L2 = 0b00100000 # 32
F1 = 0b01000000 # 64
F2 = 0b10000000 # 128
A = 0b100000000 # 256
B = 0b1000000000 # 512
X = 0b10000000000 # 1024
Y = 0b100000000000 # 2048
up = 0b1000000000000 # 4096
right = 0b10000000000000 # 8192
down = 0b100000000000000 # 16384
left = 0b1000000000000000 # 32768
def __init__(self,
robot_namespace= None,
low_state_topic= "/lowstate",
low_cmd_topic= "/lowcmd",
joy_stick_topic= "/wirelesscontroller",
forward_depth_topic= None, # if None and still need access, set to str "pyrealsense"
forward_depth_embedding_topic= "/forward_depth_embedding",
cfg= dict(),
lin_vel_deadband= 0.1,
ang_vel_deadband= 0.1,
cmd_px_range= [0.4, 1.0], # check joy_stick_callback (p for positive, n for negative)
cmd_nx_range= [0.4, 0.8], # check joy_stick_callback (p for positive, n for negative)
cmd_py_range= [0.4, 0.8], # check joy_stick_callback (p for positive, n for negative)
cmd_ny_range= [0.4, 0.8], # check joy_stick_callback (p for positive, n for negative)
cmd_pyaw_range= [0.4, 1.6], # check joy_stick_callback (p for positive, n for negative)
cmd_nyaw_range= [0.4, 1.6], # check joy_stick_callback (p for positive, n for negative)
replace_obs_with_embeddings= [], # a list of strings, e.g. ["forward_depth"] then the corrseponding obs will be processed by _get_forward_depth_embedding_obs()
move_by_wireless_remote= True, # if True, the robot will be controlled by a wireless remote
model_device= "cpu",
dof_pos_protect_ratio= 1.1, # if the dof_pos is out of the range of this ratio, the process will shutdown.
robot_class_name= "H1",
dryrun= True, # if True, the robot will not send commands to the real robot
):
super().__init__("unitree_ros2_real")
self.NUM_DOF = getattr(RobotCfgs, robot_class_name).NUM_DOF
self.NUM_ACTIONS = getattr(RobotCfgs, robot_class_name).NUM_ACTIONS
self.robot_namespace = robot_namespace
self.low_state_topic = low_state_topic
# Generate a unique cmd topic so that the low_cmd will not send to the robot's motor.
self.low_cmd_topic = low_cmd_topic if not dryrun else low_cmd_topic + "_dryrun_" + str(np.random.randint(0, 65535))
self.joy_stick_topic = joy_stick_topic
self.forward_depth_topic = forward_depth_topic
self.forward_depth_embedding_topic = forward_depth_embedding_topic
self.cfg = cfg
self.lin_vel_deadband = lin_vel_deadband
self.ang_vel_deadband = ang_vel_deadband
self.cmd_px_range = cmd_px_range
self.cmd_nx_range = cmd_nx_range
self.cmd_py_range = cmd_py_range
self.cmd_ny_range = cmd_ny_range
self.cmd_pyaw_range = cmd_pyaw_range
self.cmd_nyaw_range = cmd_nyaw_range
self.replace_obs_with_embeddings = replace_obs_with_embeddings
self.move_by_wireless_remote = move_by_wireless_remote
self.model_device = model_device
self.dof_pos_protect_ratio = dof_pos_protect_ratio
self.robot_class_name = robot_class_name
self.dryrun = dryrun
self.dof_map = getattr(RobotCfgs, robot_class_name).dof_map
self.dof_names = getattr(RobotCfgs, robot_class_name).dof_names
self.dof_signs = getattr(RobotCfgs, robot_class_name).dof_signs
self.turn_on_motor_mode = getattr(RobotCfgs, robot_class_name).turn_on_motor_mode
self.parse_config()
def parse_config(self):
""" parse, set attributes from config dict, initialize buffers to speed up the computation """
self.up_axis_idx = 2 # 2 for z, 1 for y -> adapt gravity accordingly
self.gravity_vec = torch.zeros((1, 3), device= self.model_device, dtype= torch.float32)
self.gravity_vec[:, self.up_axis_idx] = -1
# observations
self.clip_obs = self.cfg["normalization"]["clip_observations"]
self.obs_scales = self.cfg["normalization"]["obs_scales"]
for k, v in self.obs_scales.items():
if isinstance(v, (list, tuple)):
self.obs_scales[k] = torch.tensor(v, device= self.model_device, dtype= torch.float32)
# check whether there are embeddings in obs_components and launch encoder process later
if len(self.replace_obs_with_embeddings) > 0:
for comp in self.replace_obs_with_embeddings:
self.get_logger().warn(f"{comp} will be replaced with its embedding when get_obs, don't forget to launch the corresponding process before running the policy.")
self.obs_segments = self.get_obs_segment_from_components(self.cfg["env"]["obs_components"])
self.num_obs = self.get_num_obs_from_components(self.cfg["env"]["obs_components"])
if "privileged_obs_components" in self.cfg["env"].keys():
self.privileged_obs_segments = self.get_obs_segment_from_components(self.cfg["env"]["privileged_obs_components"])
self.num_privileged_obs = self.get_num_obs_from_components(self.cfg["env"]["privileged_obs_components"])
for obs_component in self.cfg["env"]["obs_components"]:
if "orientation_cmds" in obs_component:
self.roll_pitch_yaw_cmd = torch.zeros(1, 3, device= self.model_device, dtype= torch.float32)
# controls
self.control_type = self.cfg["control"]["control_type"]
if not (self.control_type == "P"):
raise NotImplementedError("Only position control is supported for now.")
self.p_gains = []
for i in range(self.NUM_DOF):
name = self.dof_names[i] # set p_gains in simulation order
for k, v in self.cfg["control"]["stiffness"].items():
if k in name:
self.p_gains.append(v)
break # only one match
self.p_gains = torch.tensor(self.p_gains, device= self.model_device, dtype= torch.float32)
self.d_gains = []
for i in range(self.NUM_DOF):
name = self.dof_names[i] # set d_gains in simulation order
for k, v in self.cfg["control"]["damping"].items():
if k in name:
self.d_gains.append(v)
break
self.d_gains = torch.tensor(self.d_gains, device= self.model_device, dtype= torch.float32)
self.default_dof_pos = torch.zeros(self.NUM_DOF, device= self.model_device, dtype= torch.float32)
self.dof_pos_ = torch.empty(1, self.NUM_DOF, device= self.model_device, dtype= torch.float32)
self.dof_vel_ = torch.empty(1, self.NUM_DOF, device= self.model_device, dtype= torch.float32)
for i in range(self.NUM_DOF):
name = self.dof_names[i]
default_joint_angle = self.cfg["init_state"]["default_joint_angles"][name]
# in simulation order.
self.default_dof_pos[i] = default_joint_angle
self.computer_clip_torque = self.cfg["control"].get("computer_clip_torque", True)
self.get_logger().info("Computer Clip Torque (onboard) is " + str(self.computer_clip_torque))
self.torque_limits = getattr(RobotCfgs, self.robot_class_name).torque_limits.to(self.model_device)
if self.computer_clip_torque:
assert hasattr(self, "torque_limits") and (len(self.torque_limits) == self.NUM_DOF), f"torque_limits must be set with the length of {self.NUM_DOF} if computer_clip_torque is True"
self.get_logger().info("[Env] torque limit: " + ",".join("{:.1f}".format(x) for x in self.torque_limits))
# actions
self.num_actions = self.NUM_ACTIONS
self.action_scale = self.cfg["control"]["action_scale"]
self.get_logger().info("[Env] action scale: {:.1f}".format(self.action_scale))
self.clip_actions = self.cfg["normalization"]["clip_actions"]
if self.cfg["normalization"].get("clip_actions_method", None) == "hard":
self.get_logger().info("clip_actions_method with hard mode")
self.get_logger().info("clip_actions_high: " + str(self.cfg["normalization"]["clip_actions_high"]))
self.get_logger().info("clip_actions_low: " + str(self.cfg["normalization"]["clip_actions_low"]))
self.clip_actions_method = "hard"
self.clip_actions_low = torch.tensor(self.cfg["normalization"]["clip_actions_low"], device= self.model_device, dtype= torch.float32)
self.clip_actions_high = torch.tensor(self.cfg["normalization"]["clip_actions_high"], device= self.model_device, dtype= torch.float32)
else:
self.get_logger().info("clip_actions_method is " + str(self.cfg["normalization"].get("clip_actions_method", None)))
self.actions = torch.zeros(self.NUM_ACTIONS, device= self.model_device, dtype= torch.float32)
# hardware related, in simulation order
self.joint_limits_high = getattr(RobotCfgs, self.robot_class_name).joint_limits_high.to(self.model_device)
self.joint_limits_low = getattr(RobotCfgs, self.robot_class_name).joint_limits_low.to(self.model_device)
joint_pos_mid = (self.joint_limits_high + self.joint_limits_low) / 2
joint_pos_range = (self.joint_limits_high - self.joint_limits_low) / 2
self.joint_pos_protect_high = joint_pos_mid + joint_pos_range * self.dof_pos_protect_ratio
self.joint_pos_protect_low = joint_pos_mid - joint_pos_range * self.dof_pos_protect_ratio
def start_ros_handlers(self):
""" after initializing the env and policy, register ros related callbacks and topics
"""
# ROS publishers
self.low_cmd_pub = self.create_publisher(
LowCmd,
self.low_cmd_topic,
1
)
self.low_cmd_buffer = LowCmd()
# ROS subscribers
self.low_state_sub = self.create_subscription(
LowState,
self.low_state_topic,
self._low_state_callback,
1
)
self.joy_stick_sub = self.create_subscription(
WirelessController,
self.joy_stick_topic,
self._joy_stick_callback,
1
)
if self.forward_depth_topic is not None:
self.forward_camera_sub = self.create_subscription(
Image,
self.forward_depth_topic,
self._forward_depth_callback,
1
)
if self.forward_depth_embedding_topic is not None and "forward_depth" in self.replace_obs_with_embeddings:
self.forward_depth_embedding_sub = self.create_subscription(
Float32MultiArray,
self.forward_depth_embedding_topic,
self._forward_depth_embedding_callback,
1,
)
self.get_logger().info("ROS handlers started, waiting to recieve critical low state and wireless controller messages.")
if not self.dryrun:
self.get_logger().warn(f"You are running the code in no-dryrun mode and publishing to '{self.low_cmd_topic}', Please keep safe.")
else:
self.get_logger().warn(f"You are publishing low cmd to '{self.low_cmd_topic}' because of dryrun mode, Please check and be safe.")
while rclpy.ok():
rclpy.spin_once(self)
if hasattr(self, "low_state_buffer") and hasattr(self, "joy_stick_buffer"):
break
self.get_logger().info("Low state message received, the robot is ready to go.")
""" ROS callbacks and handlers that update the buffer """
def _low_state_callback(self, msg):
""" store and handle proprioception data """
self.low_state_buffer = msg # keep the latest low state
# refresh dof_pos and dof_vel
for sim_idx in range(self.NUM_DOF):
real_idx = self.dof_map[sim_idx]
self.dof_pos_[0, sim_idx] = self.low_state_buffer.motor_state[real_idx].q * self.dof_signs[sim_idx]
for sim_idx in range(self.NUM_DOF):
real_idx = self.dof_map[sim_idx]
self.dof_vel_[0, sim_idx] = self.low_state_buffer.motor_state[real_idx].dq * self.dof_signs[sim_idx]
# automatic safety check
for sim_idx in range(self.NUM_DOF):
real_idx = self.dof_map[sim_idx]
if self.dof_pos_[0, sim_idx] > self.joint_pos_protect_high[sim_idx] or \
self.dof_pos_[0, sim_idx] < self.joint_pos_protect_low[sim_idx]:
self.get_logger().error(f"Joint {sim_idx}(sim), {real_idx}(real) position out of range at {self.low_state_buffer.motor_state[real_idx].q}")
self.get_logger().error("The motors and this process shuts down.")
self._turn_off_motors()
raise SystemExit()
def _joy_stick_callback(self, msg):
self.joy_stick_buffer = msg
if self.move_by_wireless_remote:
# left-y for forward/backward
ly = msg.ly
if ly > self.lin_vel_deadband:
vx = (ly - self.lin_vel_deadband) / (1 - self.lin_vel_deadband) # (0, 1)
vx = vx * (self.cmd_px_range[1] - self.cmd_px_range[0]) + self.cmd_px_range[0]
elif ly < -self.lin_vel_deadband:
vx = (ly + self.lin_vel_deadband) / (1 - self.lin_vel_deadband) # (-1, 0)
vx = vx * (self.cmd_nx_range[1] - self.cmd_nx_range[0]) - self.cmd_nx_range[0]
else:
vx = 0
# left-x for turning left/right
lx = -msg.lx
if lx > self.ang_vel_deadband:
yaw = (lx - self.ang_vel_deadband) / (1 - self.ang_vel_deadband)
yaw = yaw * (self.cmd_pyaw_range[1] - self.cmd_pyaw_range[0]) + self.cmd_pyaw_range[0]
elif lx < -self.ang_vel_deadband:
yaw = (lx + self.ang_vel_deadband) / (1 - self.ang_vel_deadband)
yaw = yaw * (self.cmd_nyaw_range[1] - self.cmd_nyaw_range[0]) - self.cmd_nyaw_range[0]
else:
yaw = 0
# right-x for side moving left/right
rx = -msg.rx
if rx > self.lin_vel_deadband:
vy = (rx - self.lin_vel_deadband) / (1 - self.lin_vel_deadband)
vy = vy * (self.cmd_py_range[1] - self.cmd_py_range[0]) + self.cmd_py_range[0]
elif rx < -self.lin_vel_deadband:
vy = (rx + self.lin_vel_deadband) / (1 - self.lin_vel_deadband)
vy = vy * (self.cmd_ny_range[1] - self.cmd_ny_range[0]) - self.cmd_ny_range[0]
else:
vy = 0
self.xyyaw_command = torch.tensor([vx, vy, yaw], device= self.model_device, dtype= torch.float32)
# refer to Unitree Remote Control data structure, msg.keys is a bit mask
# 00000000 00000001 means pressing the 0-th button (R1)
# 00000000 00000010 means pressing the 1-th button (L1)
# 10000000 00000000 means pressing the 15-th button (left)
if (msg.keys & self.WirelessButtons.R2) or (msg.keys & self.WirelessButtons.L2): # R2 or L2 is pressed
self.get_logger().warn("R2 or L2 is pressed, the motors and this process shuts down.")
self._turn_off_motors()
raise SystemExit()
# roll-pitch target
if hasattr(self, "roll_pitch_yaw_cmd"):
if (msg.keys & self.WirelessButtons.up):
self.roll_pitch_yaw_cmd[0, 1] += 0.1
self.get_logger().info("Pitch Command: " + str(self.roll_pitch_yaw_cmd))
if (msg.keys & self.WirelessButtons.down):
self.roll_pitch_yaw_cmd[0, 1] -= 0.1
self.get_logger().info("Pitch Command: " + str(self.roll_pitch_yaw_cmd))
if (msg.keys & self.WirelessButtons.left):
self.roll_pitch_yaw_cmd[0, 0] -= 0.1
self.get_logger().info("Roll Command: " + str(self.roll_pitch_yaw_cmd))
if (msg.keys & self.WirelessButtons.right):
self.roll_pitch_yaw_cmd[0, 0] += 0.1
self.get_logger().info("Roll Command: " + str(self.roll_pitch_yaw_cmd))
def _forward_depth_callback(self, msg):
""" store and handle depth camera data """
pass
def _forward_depth_embedding_callback(self, msg):
self.forward_depth_embedding_buffer = torch.tensor(msg.data, device= self.model_device, dtype= torch.float32).view(1, -1)
""" Done: ROS callbacks and handlers that update the buffer """
""" refresh observation buffer and corresponding sub-functions """
def _get_lin_vel_obs(self):
return torch.zeros(1, 3, device= self.model_device, dtype= torch.float32)
def _get_ang_vel_obs(self):
buffer = torch.from_numpy(self.low_state_buffer.imu_state.gyroscope).unsqueeze(0)
return buffer
def _get_projected_gravity_obs(self):
quat_xyzw = torch.tensor([
self.low_state_buffer.imu_state.quaternion[1],
self.low_state_buffer.imu_state.quaternion[2],
self.low_state_buffer.imu_state.quaternion[3],
self.low_state_buffer.imu_state.quaternion[0],
], device= self.model_device, dtype= torch.float32).unsqueeze(0)
return quat_rotate_inverse(
quat_xyzw,
self.gravity_vec,
)
def _get_commands_obs(self):
return self.xyyaw_command.unsqueeze(0) # (1, 3)
def _get_dof_pos_obs(self):
return self.dof_pos_ - self.default_dof_pos.unsqueeze(0)
def _get_dof_vel_obs(self):
return self.dof_vel_
def _get_last_actions_obs(self):
return self.actions
def _get_forward_depth_embedding_obs(self):
return self.forward_depth_embedding_buffer
def _get_forward_depth_obs(self):
raise NotImplementedError()
def _get_orientation_cmds_obs(self):
return quat_rotate_inverse(
quat_from_euler_xyz(self.roll_pitch_yaw_cmd[:, 0], self.roll_pitch_yaw_cmd[:, 1], self.roll_pitch_yaw_cmd[:, 2]),
self.gravity_vec,
)
def get_num_obs_from_components(self, components):
obs_segments = self.get_obs_segment_from_components(components)
num_obs = 0
for k, v in obs_segments.items():
num_obs += np.prod(v)
return num_obs
def get_obs_segment_from_components(self, components):
""" Observation segment is defined as a list of lists/ints defining the tensor shape with
corresponding order.
"""
segments = OrderedDict()
if "lin_vel" in components:
print("Warning: lin_vel is not typically available or accurate enough on the real robot. Will return zeros.")
segments["lin_vel"] = (3,)
if "ang_vel" in components:
segments["ang_vel"] = (3,)
if "projected_gravity" in components:
segments["projected_gravity"] = (3,)
if "commands" in components:
segments["commands"] = (3,)
if "dof_pos" in components:
segments["dof_pos"] = (self.NUM_DOF,)
if "dof_vel" in components:
segments["dof_vel"] = (self.NUM_DOF,)
if "last_actions" in components:
segments["last_actions"] = (self.NUM_ACTIONS,)
if "height_measurements" in components:
print("Warning: height_measurements is not typically available on the real robot.")
segments["height_measurements"] = (1, len(self.cfg["terrain"]["measured_points_x"]), len(self.cfg["terrain"]["measured_points_y"]))
if "forward_depth" in components:
if "output_resolution" in self.cfg["sensor"]["forward_camera"]:
segments["forward_depth"] = (1, *self.cfg["sensor"]["forward_camera"]["output_resolution"])
else:
segments["forward_depth"] = (1, *self.cfg["sensor"]["forward_camera"]["resolution"])
if "base_pose" in components:
segments["base_pose"] = (6,) # xyz + rpy
if "robot_config" in components:
""" Related to robot_config_buffer attribute, Be careful to change. """
# robot shape friction
# CoM (Center of Mass) x, y, z
# base mass (payload)
# motor strength for each joint
print("Warning: height_measurements is not typically available on the real robot.")
segments["robot_config"] = (1 + 3 + 1 + self.NUM_ACTIONS,)
""" NOTE: The following components are not directly set in legged_robot.py.
Please check the order or extend the class implementation if needed.
"""
if "joints_target" in components:
# o[0] for target value, 0[1] for wether the target should be tracked (1) or not (0)
segments["joints_target"] = (2, self.NUM_DOF)
if "projected_gravity_target" in components:
# projected_gravity for which the robot should track the target
# last value as a mask of whether to follow the target or not
segments["projected_gravity_target"] = (3+1,)
if "orientation_cmds" in components:
segments["orientation_cmds"] = (3,)
return segments
def get_obs(self, obs_segments= None):
""" Extract from the buffers and build the 1d observation tensor
Each get ... obs function does not do the obs_scale multiplication.
NOTE: obs_buffer has the batch dimension, whose size is 1.
"""
if obs_segments is None:
obs_segments = self.obs_segments
obs_buffer = []
for k, v in obs_segments.items():
if k in self.replace_obs_with_embeddings:
obs_component_value = getattr(self, "_get_" + k + "_embedding_obs")()
else:
obs_component_value = getattr(self, "_get_" + k + "_obs")() * self.obs_scales.get(k, 1.0)
obs_buffer.append(obs_component_value)
obs_buffer = torch.cat(obs_buffer, dim=1)
obs_buffer = torch.clamp(obs_buffer, -self.clip_obs, self.clip_obs)
return obs_buffer
""" Done: refresh observation buffer and corresponding sub-functions """
""" Control related functions """
def clip_action_before_scale(self, action):
action = torch.clip(action, -self.clip_actions, self.clip_actions)
if getattr(self, "clip_actions_method", None) == "hard":
action = torch.clip(action, self.clip_actions_low, self.clip_actions_high)
return action
def clip_by_torque_limit(self, actions_scaled):
""" Different from simulation, we reverse the process and clip the actions directly,
so that the PD controller runs in robot but not our script.
"""
control_type = self.cfg["control"]["control_type"]
if control_type == "P":
p_limits_low = (-self.torque_limits) + self.d_gains*self.dof_vel_
p_limits_high = (self.torque_limits) + self.d_gains*self.dof_vel_
actions_low = (p_limits_low/self.p_gains) - self.default_dof_pos + self.dof_pos_
actions_high = (p_limits_high/self.p_gains) - self.default_dof_pos + self.dof_pos_
else:
raise NotImplementedError
return torch.clip(actions_scaled, actions_low, actions_high)
def send_action(self, actions):
""" Send the action to the robot motors, which does the preprocessing
just like env.step in simulation.
Thus, the actions has the batch dimension, whose size is 1.
"""
self.actions = self.clip_action_before_scale(actions)
if self.computer_clip_torque:
clipped_scaled_action = self.clip_by_torque_limit(actions * self.action_scale)
else:
self.get_logger().warn("Computer Clip Torque is False, the robot may be damaged.", throttle_duration_sec= 1)
clipped_scaled_action = actions * self.action_scale
robot_coordinates_action = clipped_scaled_action + self.default_dof_pos.unsqueeze(0)
self._publish_legs_cmd(robot_coordinates_action[0])
""" Done: Control related functions """
""" functions that actually publish the commands and take effect """
def _publish_legs_cmd(self, robot_coordinates_action: torch.Tensor):
""" Publish the joint commands to the robot legs in robot coordinates system.
robot_coordinates_action: shape (NUM_DOF,), in simulation order.
"""
for sim_idx in range(self.NUM_DOF):
real_idx = self.dof_map[sim_idx]
if not self.dryrun:
self.low_cmd_buffer.motor_cmd[real_idx].mode = self.turn_on_motor_mode[sim_idx]
self.low_cmd_buffer.motor_cmd[real_idx].q = robot_coordinates_action[sim_idx].item() * self.dof_signs[sim_idx]
self.low_cmd_buffer.motor_cmd[real_idx].dq = 0.
self.low_cmd_buffer.motor_cmd[real_idx].tau = 0.
self.low_cmd_buffer.motor_cmd[real_idx].kp = self.p_gains[sim_idx].item()
self.low_cmd_buffer.motor_cmd[real_idx].kd = self.d_gains[sim_idx].item()
self.low_cmd_buffer.crc = get_crc(self.low_cmd_buffer)
self.low_cmd_pub.publish(self.low_cmd_buffer)
def _turn_off_motors(self):
""" Turn off the motors """
for sim_idx in range(self.NUM_DOF):
real_idx = self.dof_map[sim_idx]
self.low_cmd_buffer.motor_cmd[real_idx].mode = 0x00
self.low_cmd_buffer.motor_cmd[real_idx].q = 0.
self.low_cmd_buffer.motor_cmd[real_idx].dq = 0.
self.low_cmd_buffer.motor_cmd[real_idx].tau = 0.
self.low_cmd_buffer.motor_cmd[real_idx].kp = 0.
self.low_cmd_buffer.motor_cmd[real_idx].kd = 0.
self.low_cmd_buffer.crc = get_crc(self.low_cmd_buffer)
self.low_cmd_pub.publish(self.low_cmd_buffer)
""" Done: functions that actually publish the commands and take effect """