import collections import numpy as np from aloha.constants import ( PUPPET_GRIPPER_POSITION_CLOSE, START_ARM_POSE, normalize_puppet_gripper_position, normalize_puppet_gripper_velocity, unnormalize_puppet_gripper_position, ) from aloha.utils import sample_box_pose, sample_insertion_pose from dm_control.suite import base """ Environment for simulated robot bi-manual manipulation, with end-effector control. Action space: [left_arm_pose (7), # position and quaternion for end effector left_gripper_positions (1), # normalized gripper position (0: close, 1: open) right_arm_pose (7), # position and quaternion for end effector right_gripper_positions (1),] # normalized gripper position (0: close, 1: open) Observation space: {"qpos": Concat[ left_arm_qpos (6), # absolute joint position left_gripper_position (1), # normalized gripper position (0: close, 1: open) right_arm_qpos (6), # absolute joint position right_gripper_qpos (1)] # normalized gripper position (0: close, 1: open) "qvel": Concat[ left_arm_qvel (6), # absolute joint velocity (rad) left_gripper_velocity (1), # normalized gripper velocity (pos: opening, neg: closing) right_arm_qvel (6), # absolute joint velocity (rad) right_gripper_qvel (1)] # normalized gripper velocity (pos: opening, neg: closing) "images": {"main": (480x640x3)} # h, w, c, dtype='uint8' """ class BimanualViperXEndEffectorTask(base.Task): def __init__(self, random=None): super().__init__(random=random) def before_step(self, action, physics): a_len = len(action) // 2 action_left = action[:a_len] action_right = action[a_len:] # set mocap position and quat # left np.copyto(physics.data.mocap_pos[0], action_left[:3]) np.copyto(physics.data.mocap_quat[0], action_left[3:7]) # right np.copyto(physics.data.mocap_pos[1], action_right[:3]) np.copyto(physics.data.mocap_quat[1], action_right[3:7]) # set gripper g_left_ctrl = unnormalize_puppet_gripper_position(action_left[7]) g_right_ctrl = unnormalize_puppet_gripper_position(action_right[7]) np.copyto(physics.data.ctrl, np.array([g_left_ctrl, -g_left_ctrl, g_right_ctrl, -g_right_ctrl])) def initialize_robots(self, physics): # reset joint position physics.named.data.qpos[:16] = START_ARM_POSE # reset mocap to align with end effector # to obtain these numbers: # (1) make an ee_sim env and reset to the same start_pose # (2) get env._physics.named.data.xpos['vx300s_left/gripper_link'] # get env._physics.named.data.xquat['vx300s_left/gripper_link'] # repeat the same for right side np.copyto(physics.data.mocap_pos[0], [-0.31718881, 0.5, 0.29525084]) np.copyto(physics.data.mocap_quat[0], [1, 0, 0, 0]) # right np.copyto(physics.data.mocap_pos[1], np.array([0.31718881, 0.49999888, 0.29525084])) np.copyto(physics.data.mocap_quat[1], [1, 0, 0, 0]) # reset gripper control close_gripper_control = np.array( [ PUPPET_GRIPPER_POSITION_CLOSE, -PUPPET_GRIPPER_POSITION_CLOSE, PUPPET_GRIPPER_POSITION_CLOSE, -PUPPET_GRIPPER_POSITION_CLOSE, ] ) np.copyto(physics.data.ctrl, close_gripper_control) def initialize_episode(self, physics): """Sets the state of the environment at the start of each episode.""" super().initialize_episode(physics) @staticmethod def get_qpos(physics): qpos_raw = physics.data.qpos.copy() left_qpos_raw = qpos_raw[:8] right_qpos_raw = qpos_raw[8:16] left_arm_qpos = left_qpos_raw[:6] right_arm_qpos = right_qpos_raw[:6] left_gripper_qpos = [normalize_puppet_gripper_position(left_qpos_raw[6])] right_gripper_qpos = [normalize_puppet_gripper_position(right_qpos_raw[6])] return np.concatenate([left_arm_qpos, left_gripper_qpos, right_arm_qpos, right_gripper_qpos]) @staticmethod def get_qvel(physics): qvel_raw = physics.data.qvel.copy() left_qvel_raw = qvel_raw[:8] right_qvel_raw = qvel_raw[8:16] left_arm_qvel = left_qvel_raw[:6] right_arm_qvel = right_qvel_raw[:6] left_gripper_qvel = [normalize_puppet_gripper_velocity(left_qvel_raw[6])] right_gripper_qvel = [normalize_puppet_gripper_velocity(right_qvel_raw[6])] return np.concatenate([left_arm_qvel, left_gripper_qvel, right_arm_qvel, right_gripper_qvel]) @staticmethod def get_env_state(physics): raise NotImplementedError def get_observation(self, physics): # note: it is important to do .copy() obs = collections.OrderedDict() obs["qpos"] = self.get_qpos(physics) obs["qvel"] = self.get_qvel(physics) obs["env_state"] = self.get_env_state(physics) obs["images"] = {} obs["images"]["top"] = physics.render(height=480, width=640, camera_id="top") obs["images"]["angle"] = physics.render(height=480, width=640, camera_id="angle") obs["images"]["vis"] = physics.render(height=480, width=640, camera_id="front_close") # used in scripted policy to obtain starting pose obs["mocap_pose_left"] = np.concatenate( [physics.data.mocap_pos[0], physics.data.mocap_quat[0]] ).copy() obs["mocap_pose_right"] = np.concatenate( [physics.data.mocap_pos[1], physics.data.mocap_quat[1]] ).copy() # used when replaying joint trajectory obs["gripper_ctrl"] = physics.data.ctrl.copy() return obs def get_reward(self, physics): raise NotImplementedError class TransferCubeEndEffectorTask(BimanualViperXEndEffectorTask): def __init__(self, random=None): super().__init__(random=random) self.max_reward = 4 def initialize_episode(self, physics): """Sets the state of the environment at the start of each episode.""" self.initialize_robots(physics) # randomize box position cube_pose = sample_box_pose() box_start_idx = physics.model.name2id("red_box_joint", "joint") np.copyto(physics.data.qpos[box_start_idx : box_start_idx + 7], cube_pose) # print(f"randomized cube position to {cube_position}") super().initialize_episode(physics) @staticmethod def get_env_state(physics): env_state = physics.data.qpos.copy()[16:] return env_state def get_reward(self, physics): # return whether left gripper is holding the box all_contact_pairs = [] for i_contact in range(physics.data.ncon): id_geom_1 = physics.data.contact[i_contact].geom1 id_geom_2 = physics.data.contact[i_contact].geom2 name_geom_1 = physics.model.id2name(id_geom_1, "geom") name_geom_2 = physics.model.id2name(id_geom_2, "geom") contact_pair = (name_geom_1, name_geom_2) all_contact_pairs.append(contact_pair) touch_left_gripper = ("red_box", "vx300s_left/10_left_gripper_finger") in all_contact_pairs touch_right_gripper = ("red_box", "vx300s_right/10_right_gripper_finger") in all_contact_pairs touch_table = ("red_box", "table") in all_contact_pairs reward = 0 if touch_right_gripper: reward = 1 if touch_right_gripper and not touch_table: # lifted reward = 2 if touch_left_gripper: # attempted transfer reward = 3 if touch_left_gripper and not touch_table: # successful transfer reward = 4 return reward class InsertionEndEffectorTask(BimanualViperXEndEffectorTask): def __init__(self, random=None): super().__init__(random=random) self.max_reward = 4 def initialize_episode(self, physics): """Sets the state of the environment at the start of each episode.""" self.initialize_robots(physics) # randomize peg and socket position peg_pose, socket_pose = sample_insertion_pose() def id2index(j_id): return 16 + (j_id - 16) * 7 # first 16 is robot qpos, 7 is pose dim # hacky peg_start_id = physics.model.name2id("red_peg_joint", "joint") peg_start_idx = id2index(peg_start_id) np.copyto(physics.data.qpos[peg_start_idx : peg_start_idx + 7], peg_pose) # print(f"randomized cube position to {cube_position}") socket_start_id = physics.model.name2id("blue_socket_joint", "joint") socket_start_idx = id2index(socket_start_id) np.copyto(physics.data.qpos[socket_start_idx : socket_start_idx + 7], socket_pose) # print(f"randomized cube position to {cube_position}") super().initialize_episode(physics) @staticmethod def get_env_state(physics): env_state = physics.data.qpos.copy()[16:] return env_state def get_reward(self, physics): # return whether peg touches the pin all_contact_pairs = [] for i_contact in range(physics.data.ncon): id_geom_1 = physics.data.contact[i_contact].geom1 id_geom_2 = physics.data.contact[i_contact].geom2 name_geom_1 = physics.model.id2name(id_geom_1, "geom") name_geom_2 = physics.model.id2name(id_geom_2, "geom") contact_pair = (name_geom_1, name_geom_2) all_contact_pairs.append(contact_pair) touch_right_gripper = ("red_peg", "vx300s_right/10_right_gripper_finger") in all_contact_pairs touch_left_gripper = ( ("socket-1", "vx300s_left/10_left_gripper_finger") in all_contact_pairs or ("socket-2", "vx300s_left/10_left_gripper_finger") in all_contact_pairs or ("socket-3", "vx300s_left/10_left_gripper_finger") in all_contact_pairs or ("socket-4", "vx300s_left/10_left_gripper_finger") in all_contact_pairs ) peg_touch_table = ("red_peg", "table") in all_contact_pairs socket_touch_table = ( ("socket-1", "table") in all_contact_pairs or ("socket-2", "table") in all_contact_pairs or ("socket-3", "table") in all_contact_pairs or ("socket-4", "table") in all_contact_pairs ) peg_touch_socket = ( ("red_peg", "socket-1") in all_contact_pairs or ("red_peg", "socket-2") in all_contact_pairs or ("red_peg", "socket-3") in all_contact_pairs or ("red_peg", "socket-4") in all_contact_pairs ) pin_touched = ("red_peg", "pin") in all_contact_pairs reward = 0 if touch_left_gripper and touch_right_gripper: # touch both reward = 1 if ( touch_left_gripper and touch_right_gripper and (not peg_touch_table) and (not socket_touch_table) ): # grasp both reward = 2 if peg_touch_socket and (not peg_touch_table) and (not socket_touch_table): # peg and socket touching reward = 3 if pin_touched: # successful insertion reward = 4 return reward