import numpy as np import torch import torch.nn.functional as F from torch.autograd import Variable import json import os import os.path as osp from collections import OrderedDict from typing import Tuple import rospy from unitree_legged_msgs.msg import LowState from unitree_legged_msgs.msg import LegsCmd from unitree_legged_msgs.msg import Float32MultiArrayStamped from std_msgs.msg import Float32MultiArray from geometry_msgs.msg import Twist, Pose from nav_msgs.msg import Odometry from sensor_msgs.msg import Image import ros_numpy @torch.no_grad() def resize2d(img, size): return (F.adaptive_avg_pool2d(Variable(img), size)).data @torch.jit.script def quat_rotate_inverse(q, v): shape = q.shape q_w = q[:, -1] q_vec = q[:, :3] a = v * (2.0 * q_w ** 2 - 1.0).unsqueeze(-1) b = torch.cross(q_vec, v, dim=-1) * q_w.unsqueeze(-1) * 2.0 c = q_vec * \ torch.bmm(q_vec.view(shape[0], 1, 3), v.view( shape[0], 3, 1)).squeeze(-1) * 2.0 return a - b + c class UnitreeA1Real: """ This is the handler that works for ROS 1 on unitree. """ def __init__(self, robot_namespace= "a112138", low_state_topic= "/low_state", legs_cmd_topic= "/legs_cmd", forward_depth_topic = "/camera/depth/image_rect_raw", forward_depth_embedding_dims = None, odom_topic= "/odom/filtered", lin_vel_deadband= 0.1, ang_vel_deadband= 0.1, move_by_wireless_remote= False, # if True, command will not listen to move_cmd_subscriber, but wireless remote. cfg= dict(), extra_cfg= dict(), model_device= torch.device("cpu"), ): """ NOTE: * Must call start_ros() before using this class's get_obs() and send_action() * Joint order of simulation and of real A1 protocol are different, see dof_names * We store all joints values in the order of simulation in this class Args: forward_depth_embedding_dims: If a real number, the obs will not be built as a normal env. The segment of obs will be subsituted by the embedding of forward depth image from the ROS topic. cfg: same config from a1_config but a dict object. extra_cfg: some other configs that is hard to load from file. """ self.model_device = model_device self.num_envs = 1 self.robot_namespace = robot_namespace self.low_state_topic = low_state_topic self.legs_cmd_topic = legs_cmd_topic self.forward_depth_topic = forward_depth_topic self.forward_depth_embedding_dims = forward_depth_embedding_dims self.odom_topic = odom_topic self.lin_vel_deadband = lin_vel_deadband self.ang_vel_deadband = ang_vel_deadband self.move_by_wireless_remote = move_by_wireless_remote self.cfg = cfg self.extra_cfg = dict( torque_limits= torch.tensor([33.5] * 12, dtype= torch.float32, device= self.model_device, requires_grad= False), # Nm # torque_limits= torch.tensor([1, 5, 5] * 4, dtype= torch.float32, device= self.model_device, requires_grad= False), # Nm dof_map= [ # from isaacgym simulation joint order to URDF order 3, 4, 5, 0, 1, 2, 9, 10,11, 6, 7, 8, ], # real_joint_idx = dof_map[sim_joint_idx] dof_names= [ # NOTE: order matters. This list is the order in simulation. "FL_hip_joint", "FL_thigh_joint", "FL_calf_joint", "FR_hip_joint", "FR_thigh_joint", "FR_calf_joint", "RL_hip_joint", "RL_thigh_joint", "RL_calf_joint", "RR_hip_joint", "RR_thigh_joint", "RR_calf_joint", ], # motor strength is multiplied directly to the action. motor_strength= torch.ones(12, dtype= torch.float32, device= self.model_device, requires_grad= False), ); self.extra_cfg.update(extra_cfg) if "torque_limits" in self.cfg["control"]: if isinstance(self.cfg["control"]["torque_limits"], (tuple, list)): for i in range(len(self.cfg["control"]["torque_limits"])): self.extra_cfg["torque_limits"][i] = self.cfg["control"]["torque_limits"][i] else: self.extra_cfg["torque_limits"][:] = self.cfg["control"]["torque_limits"] self.command_buf = torch.zeros((self.num_envs, 3,), device= self.model_device, dtype= torch.float32) # zeros for initialization self.actions = torch.zeros((1, 12), device= model_device, dtype= torch.float32) self.process_configs() def start_ros(self): # initialze several buffers so that the system works even without message update. # self.low_state_buffer = LowState() # not initialized, let input message update it. self.base_position_buffer = torch.zeros((self.num_envs, 3), device= self.model_device, requires_grad= False) self.legs_cmd_publisher = rospy.Publisher( self.robot_namespace + self.legs_cmd_topic, LegsCmd, queue_size= 1, ) # self.debug_publisher = rospy.Publisher( # "/DNNmodel_debug", # Float32MultiArray, # queue_size= 1, # ) # NOTE: this launches the subscriber callback function self.low_state_subscriber = rospy.Subscriber( self.robot_namespace + self.low_state_topic, LowState, self.update_low_state, queue_size= 1, ) self.odom_subscriber = rospy.Subscriber( self.robot_namespace + self.odom_topic, Odometry, self.update_base_pose, queue_size= 1, ) if not self.move_by_wireless_remote: self.move_cmd_subscriber = rospy.Subscriber( "/cmd_vel", Twist, self.update_move_cmd, queue_size= 1, ) if "forward_depth" in self.all_obs_components: if not self.forward_depth_embedding_dims: self.forward_depth_subscriber = rospy.Subscriber( self.robot_namespace + self.forward_depth_topic, Image, self.update_forward_depth, queue_size= 1, ) else: self.forward_depth_subscriber = rospy.Subscriber( self.robot_namespace + self.forward_depth_topic, Float32MultiArrayStamped, self.update_forward_depth_embedding, queue_size= 1, ) self.pose_cmd_subscriber = rospy.Subscriber( "/body_pose", Pose, self.dummy_handler, queue_size= 1, ) def wait_untill_ros_working(self): rate = rospy.Rate(100) while not hasattr(self, "low_state_buffer"): rate.sleep() rospy.loginfo("UnitreeA1Real.low_state_buffer acquired, stop waiting.") def process_configs(self): self.up_axis_idx = 2 # 2 for z, 1 for y -> adapt gravity accordingly self.gravity_vec = torch.zeros((self.num_envs, 3), dtype= torch.float32) self.gravity_vec[:, self.up_axis_idx] = -1 self.obs_scales = self.cfg["normalization"]["obs_scales"] self.obs_scales["dof_pos"] = torch.tensor(self.obs_scales["dof_pos"], device= self.model_device, dtype= torch.float32) if not isinstance(self.cfg["control"]["damping"]["joint"], (list, tuple)): self.cfg["control"]["damping"]["joint"] = [self.cfg["control"]["damping"]["joint"]] * 12 if not isinstance(self.cfg["control"]["stiffness"]["joint"], (list, tuple)): self.cfg["control"]["stiffness"]["joint"] = [self.cfg["control"]["stiffness"]["joint"]] * 12 self.d_gains = torch.tensor(self.cfg["control"]["damping"]["joint"], device= self.model_device, dtype= torch.float32) self.p_gains = torch.tensor(self.cfg["control"]["stiffness"]["joint"], device= self.model_device, dtype= torch.float32) self.default_dof_pos = torch.zeros(12, device= self.model_device, dtype= torch.float32) for i in range(12): name = self.extra_cfg["dof_names"][i] default_joint_angle = self.cfg["init_state"]["default_joint_angles"][name] self.default_dof_pos[i] = default_joint_angle self.computer_clip_torque = self.cfg["control"].get("computer_clip_torque", True) rospy.loginfo("Computer Clip Torque (onboard) is " + str(self.computer_clip_torque)) if self.computer_clip_torque: self.torque_limits = self.extra_cfg["torque_limits"] rospy.loginfo("[Env] torque limit: {:.1f} {:.1f} {:.1f}".format(*self.torque_limits[:3])) self.commands_scale = torch.tensor([ self.obs_scales["lin_vel"], self.obs_scales["lin_vel"], self.obs_scales["lin_vel"], ], device= self.model_device, requires_grad= False) self.obs_segments = self.get_obs_segment_from_components(self.cfg["env"]["obs_components"]) self.num_obs = self.get_num_obs_from_components(self.cfg["env"]["obs_components"]) components = self.cfg["env"].get("privileged_obs_components", None) self.privileged_obs_segments = None if components is None else self.get_num_obs_from_components(components) self.num_privileged_obs = None if components is None else self.get_num_obs_from_components(components) self.all_obs_components = self.cfg["env"]["obs_components"] + (self.cfg["env"].get("privileged_obs_components", []) if components is not None else []) # store config values to attributes to improve speed self.clip_obs = self.cfg["normalization"]["clip_observations"] self.control_type = self.cfg["control"]["control_type"] self.action_scale = self.cfg["control"]["action_scale"] rospy.loginfo("[Env] action scale: {:.1f}".format(self.action_scale)) self.clip_actions = self.cfg["normalization"]["clip_actions"] if self.cfg["normalization"].get("clip_actions_method", None) == "hard": rospy.loginfo("clip_actions_method with hard mode") rospy.loginfo("clip_actions_high: " + str(self.cfg["normalization"]["clip_actions_high"])) rospy.loginfo("clip_actions_low: " + str(self.cfg["normalization"]["clip_actions_low"])) self.clip_actions_method = "hard" self.clip_actions_low = torch.tensor(self.cfg["normalization"]["clip_actions_low"], device= self.model_device, dtype= torch.float32) self.clip_actions_high = torch.tensor(self.cfg["normalization"]["clip_actions_high"], device= self.model_device, dtype= torch.float32) else: rospy.loginfo("clip_actions_method is " + str(self.cfg["normalization"].get("clip_actions_method", None))) self.dof_map = self.extra_cfg["dof_map"] # get ROS params for hardware configs self.joint_limits_high = torch.tensor([ rospy.get_param(self.robot_namespace + "/joint_limits/{}_max".format(s)) \ for s in ["hip", "thigh", "calf"] * 4 ]) self.joint_limits_low = torch.tensor([ rospy.get_param(self.robot_namespace + "/joint_limits/{}_min".format(s)) \ for s in ["hip", "thigh", "calf"] * 4 ]) if "forward_depth" in self.all_obs_components: resolution = self.cfg["sensor"]["forward_camera"].get( "output_resolution", self.cfg["sensor"]["forward_camera"]["resolution"], ) if not self.forward_depth_embedding_dims: self.forward_depth_buf = torch.zeros( (self.num_envs, *resolution), device= self.model_device, dtype= torch.float32, ) else: self.forward_depth_embedding_buf = torch.zeros( (1, self.forward_depth_embedding_dims), device= self.model_device, dtype= torch.float32, ) def _init_height_points(self): """ Returns points at which the height measurments are sampled (in base frame) Returns: [torch.Tensor]: Tensor of shape (num_envs, self.num_height_points, 3) """ return None def _get_heights(self): """ TODO: get estimated terrain heights around the robot base """ # currently return a zero tensor with valid size return torch.zeros(self.num_envs, 187, device= self.model_device, requires_grad= False) def clip_action_before_scale(self, actions): actions = torch.clip(actions, -self.clip_actions, self.clip_actions) if getattr(self, "clip_actions_method", None) == "hard": actions = torch.clip(actions, self.clip_actions_low, self.clip_actions_high) return actions def clip_by_torque_limit(self, actions_scaled): """ Different from simulation, we reverse the process and clip the actions directly, so that the PD controller runs in robot but not our script. """ control_type = self.cfg["control"]["control_type"] if control_type == "P": p_limits_low = (-self.torque_limits) + self.d_gains*self.dof_vel p_limits_high = (self.torque_limits) + self.d_gains*self.dof_vel actions_low = (p_limits_low/self.p_gains) - self.default_dof_pos + self.dof_pos actions_high = (p_limits_high/self.p_gains) - self.default_dof_pos + self.dof_pos else: raise NotImplementedError return torch.clip(actions_scaled, actions_low, actions_high) """ Get obs components and cat to a single obs input """ def _get_proprioception_obs(self): # base_ang_vel = quat_rotate_inverse( # torch.tensor(self.low_state_buffer.imu.quaternion).unsqueeze(0), # torch.tensor(self.low_state_buffer.imu.gyroscope).unsqueeze(0), # ).to(self.model_device) # NOTE: Different from the isaacgym. # The anglar velocity is already in base frame, no need to rotate base_ang_vel = torch.tensor(self.low_state_buffer.imu.gyroscope, device= self.model_device).unsqueeze(0) projected_gravity = quat_rotate_inverse( torch.tensor(self.low_state_buffer.imu.quaternion).unsqueeze(0), self.gravity_vec, ).to(self.model_device) self.dof_pos = dof_pos = torch.tensor([ self.low_state_buffer.motorState[self.dof_map[i]].q for i in range(12) ], dtype= torch.float32, device= self.model_device).unsqueeze(0) self.dof_vel = dof_vel = torch.tensor([ self.low_state_buffer.motorState[self.dof_map[i]].dq for i in range(12) ], dtype= torch.float32, device= self.model_device).unsqueeze(0) # rospy.loginfo_throttle(5, # "projected_gravity: " + \ # ", ".join([str(i) for i in projected_gravity[0].cpu().tolist()]) # ) # rospy.loginfo_throttle(5, "Hacking projected_gravity y -= 0.15") # projected_gravity[:, 1] -= 0.15 return torch.cat([ torch.zeros((1, 3), device= self.model_device), # no linear velocity base_ang_vel * self.obs_scales["ang_vel"], projected_gravity, self.command_buf * self.commands_scale, (dof_pos - self.default_dof_pos) * self.obs_scales["dof_pos"], dof_vel * self.obs_scales["dof_vel"], self.actions ], dim= -1) def _get_forward_depth_obs(self): if not self.forward_depth_embedding_dims: return self.forward_depth_buf.flatten(start_dim= 1) else: if self.low_state_get_time.to_sec() - self.forward_depth_embedding_stamp.to_sec() > 0.4: rospy.logerr("Getting depth embedding later than low_state later than 0.4s") return self.forward_depth_embedding_buf.flatten(start_dim= 1) def compute_observation(self): """ use the updated low_state_buffer to compute observation vector """ assert hasattr(self, "legs_cmd_publisher"), "start_ros() not called, ROS handlers are not initialized!" obs_segments = self.obs_segments obs = [] for k, v in obs_segments.items(): obs.append( getattr(self, "_get_" + k + "_obs")() * \ self.obs_scales.get(k, 1.) ) obs = torch.cat(obs, dim= 1) self.obs_buf = obs """ The methods combined with outer model forms the step function NOTE: the outer user handles the loop frequency. """ def send_action(self, actions): """ The function that send commands to the real robot. """ self.actions = self.clip_action_before_scale(actions) if self.computer_clip_torque: robot_coordinates_action = self.clip_by_torque_limit(actions * self.action_scale) + self.default_dof_pos.unsqueeze(0) else: rospy.logwarn_throttle(60, "You are using control without any torque clip. The network might output torques larger than the system can provide.") robot_coordinates_action = self.actions * self.action_scale + self.default_dof_pos.unsqueeze(0) # debugging and logging # transfered_action = torch.zeros_like(self.actions[0]) # for i in range(12): # transfered_action[self.dof_map[i]] = self.actions[0, i] + self.default_dof_pos[i] # self.debug_publisher.publish(Float32MultiArray(data= # transfered_action\ # .cpu().numpy().astype(np.float32).tolist() # )) # restrict the target action delta in order to avoid robot shutdown (maybe there is another solution) # robot_coordinates_action = torch.clip( # robot_coordinates_action, # self.dof_pos - 0.3, # self.dof_pos + 0.3, # ) # wrap the message and publish self.publish_legs_cmd(robot_coordinates_action) def publish_legs_cmd(self, robot_coordinates_action, kp= None, kd= None): """ publish the joint position directly to the robot. NOTE: The joint order from input should be in simulation order. The value should be absolute value rather than related to dof_pos. """ robot_coordinates_action = torch.clip( robot_coordinates_action.cpu(), self.joint_limits_low, self.joint_limits_high, ) legs_cmd = LegsCmd() for sim_joint_idx in range(12): real_joint_idx = self.dof_map[sim_joint_idx] legs_cmd.cmd[real_joint_idx].mode = 10 legs_cmd.cmd[real_joint_idx].q = robot_coordinates_action[0, sim_joint_idx] if self.control_type == "P" else rospy.get_param(self.robot_namespace + "/PosStopF", (2.146e+9)) legs_cmd.cmd[real_joint_idx].dq = 0. legs_cmd.cmd[real_joint_idx].tau = 0. legs_cmd.cmd[real_joint_idx].Kp = self.p_gains[sim_joint_idx] if kp is None else kp legs_cmd.cmd[real_joint_idx].Kd = self.d_gains[sim_joint_idx] if kd is None else kd self.legs_cmd_publisher.publish(legs_cmd) def get_obs(self): """ The function that refreshes the buffer and return the observation vector. """ self.compute_observation() self.obs_buf = torch.clip(self.obs_buf, -self.clip_obs, self.clip_obs) return self.obs_buf.to(self.model_device) """ Copied from legged_robot_field. Please check whether these are consistent. """ def get_obs_segment_from_components(self, components): segments = OrderedDict() if "proprioception" in components: segments["proprioception"] = (48,) if "height_measurements" in components: segments["height_measurements"] = (187,) if "forward_depth" in components: resolution = self.cfg["sensor"]["forward_camera"].get( "output_resolution", self.cfg["sensor"]["forward_camera"]["resolution"], ) segments["forward_depth"] = (1, *resolution) # The following components are only for rebuilding the non-actor module. # DO NOT use these in actor network and check consistency with simulator implementation. if "base_pose" in components: segments["base_pose"] = (6,) # xyz + rpy if "robot_config" in components: segments["robot_config"] = (1 + 3 + 1 + 12,) if "engaging_block" in components: # This could be wrong, please check the implementation of BarrierTrack segments["engaging_block"] = (1 + (4 + 1) + 2,) if "sidewall_distance" in components: segments["sidewall_distance"] = (2,) return segments def get_num_obs_from_components(self, components): obs_segments = self.get_obs_segment_from_components(components) num_obs = 0 for k, v in obs_segments.items(): num_obs += np.prod(v) return num_obs """ ROS callbacks and handlers that update the buffer """ def update_low_state(self, ros_msg): self.low_state_buffer = ros_msg if self.move_by_wireless_remote: self.command_buf[0, 0] = self.low_state_buffer.wirelessRemote.ly self.command_buf[0, 1] = -self.low_state_buffer.wirelessRemote.lx # right-moving stick is positive self.command_buf[0, 2] = -self.low_state_buffer.wirelessRemote.rx # right-moving stick is positive # set the command to zero if it is too small if np.linalg.norm(self.command_buf[0, :2]) < self.lin_vel_deadband: self.command_buf[0, :2] = 0. if np.abs(self.command_buf[0, 2]) < self.ang_vel_deadband: self.command_buf[0, 2] = 0. self.low_state_get_time = rospy.Time.now() def update_base_pose(self, ros_msg): """ update robot odometry for position """ self.base_position_buffer[0, 0] = ros_msg.pose.pose.position.x self.base_position_buffer[0, 1] = ros_msg.pose.pose.position.y self.base_position_buffer[0, 2] = ros_msg.pose.pose.position.z def update_move_cmd(self, ros_msg): self.command_buf[0, 0] = ros_msg.linear.x self.command_buf[0, 1] = ros_msg.linear.y self.command_buf[0, 2] = ros_msg.angular.z def update_forward_depth(self, ros_msg): # TODO not checked. self.forward_depth_header = ros_msg.header buf = ros_numpy.numpify(ros_msg) self.forward_depth_buf = resize2d( torch.from_numpy(buf.astype(np.float32)).unsqueeze(0).unsqueeze(0).to(self.model_device), self.forward_depth_buf.shape[-2:], ) def update_forward_depth_embedding(self, ros_msg): rospy.loginfo_once("a1_ros_run recieved forward depth embedding.") self.forward_depth_embedding_stamp = ros_msg.header.stamp self.forward_depth_embedding_buf[:] = torch.tensor(ros_msg.data).unsqueeze(0) # (1, d) def dummy_handler(self, ros_msg): """ To meet the need of teleop-legged-robots requirements """ pass