220 lines
9.5 KiB
Python
220 lines
9.5 KiB
Python
import collections
|
|
|
|
import numpy as np
|
|
from dm_control.suite import base
|
|
|
|
from lerobot.common.envs.aloha.constants import (
|
|
START_ARM_POSE,
|
|
normalize_puppet_gripper_position,
|
|
normalize_puppet_gripper_velocity,
|
|
unnormalize_puppet_gripper_position,
|
|
)
|
|
|
|
BOX_POSE = [None] # to be changed from outside
|
|
|
|
"""
|
|
Environment for simulated robot bi-manual manipulation, with joint position control
|
|
Action space: [left_arm_qpos (6), # absolute joint position
|
|
left_gripper_positions (1), # normalized gripper position (0: close, 1: open)
|
|
right_arm_qpos (6), # absolute joint position
|
|
right_gripper_positions (1),] # normalized gripper position (0: close, 1: open)
|
|
|
|
Observation space: {"qpos": Concat[ left_arm_qpos (6), # absolute joint position
|
|
left_gripper_position (1), # normalized gripper position (0: close, 1: open)
|
|
right_arm_qpos (6), # absolute joint position
|
|
right_gripper_qpos (1)] # normalized gripper position (0: close, 1: open)
|
|
"qvel": Concat[ left_arm_qvel (6), # absolute joint velocity (rad)
|
|
left_gripper_velocity (1), # normalized gripper velocity (pos: opening, neg: closing)
|
|
right_arm_qvel (6), # absolute joint velocity (rad)
|
|
right_gripper_qvel (1)] # normalized gripper velocity (pos: opening, neg: closing)
|
|
"images": {"main": (480x640x3)} # h, w, c, dtype='uint8'
|
|
"""
|
|
|
|
|
|
class BimanualViperXTask(base.Task):
|
|
def __init__(self, random=None):
|
|
super().__init__(random=random)
|
|
|
|
def before_step(self, action, physics):
|
|
left_arm_action = action[:6]
|
|
right_arm_action = action[7 : 7 + 6]
|
|
normalized_left_gripper_action = action[6]
|
|
normalized_right_gripper_action = action[7 + 6]
|
|
|
|
left_gripper_action = unnormalize_puppet_gripper_position(normalized_left_gripper_action)
|
|
right_gripper_action = unnormalize_puppet_gripper_position(normalized_right_gripper_action)
|
|
|
|
full_left_gripper_action = [left_gripper_action, -left_gripper_action]
|
|
full_right_gripper_action = [right_gripper_action, -right_gripper_action]
|
|
|
|
env_action = np.concatenate(
|
|
[left_arm_action, full_left_gripper_action, right_arm_action, full_right_gripper_action]
|
|
)
|
|
super().before_step(env_action, physics)
|
|
return
|
|
|
|
def initialize_episode(self, physics):
|
|
"""Sets the state of the environment at the start of each episode."""
|
|
super().initialize_episode(physics)
|
|
|
|
@staticmethod
|
|
def get_qpos(physics):
|
|
qpos_raw = physics.data.qpos.copy()
|
|
left_qpos_raw = qpos_raw[:8]
|
|
right_qpos_raw = qpos_raw[8:16]
|
|
left_arm_qpos = left_qpos_raw[:6]
|
|
right_arm_qpos = right_qpos_raw[:6]
|
|
left_gripper_qpos = [normalize_puppet_gripper_position(left_qpos_raw[6])]
|
|
right_gripper_qpos = [normalize_puppet_gripper_position(right_qpos_raw[6])]
|
|
return np.concatenate([left_arm_qpos, left_gripper_qpos, right_arm_qpos, right_gripper_qpos])
|
|
|
|
@staticmethod
|
|
def get_qvel(physics):
|
|
qvel_raw = physics.data.qvel.copy()
|
|
left_qvel_raw = qvel_raw[:8]
|
|
right_qvel_raw = qvel_raw[8:16]
|
|
left_arm_qvel = left_qvel_raw[:6]
|
|
right_arm_qvel = right_qvel_raw[:6]
|
|
left_gripper_qvel = [normalize_puppet_gripper_velocity(left_qvel_raw[6])]
|
|
right_gripper_qvel = [normalize_puppet_gripper_velocity(right_qvel_raw[6])]
|
|
return np.concatenate([left_arm_qvel, left_gripper_qvel, right_arm_qvel, right_gripper_qvel])
|
|
|
|
@staticmethod
|
|
def get_env_state(physics):
|
|
raise NotImplementedError
|
|
|
|
def get_observation(self, physics):
|
|
obs = collections.OrderedDict()
|
|
obs["qpos"] = self.get_qpos(physics)
|
|
obs["qvel"] = self.get_qvel(physics)
|
|
obs["env_state"] = self.get_env_state(physics)
|
|
obs["images"] = {}
|
|
obs["images"]["top"] = physics.render(height=480, width=640, camera_id="top")
|
|
obs["images"]["angle"] = physics.render(height=480, width=640, camera_id="angle")
|
|
obs["images"]["vis"] = physics.render(height=480, width=640, camera_id="front_close")
|
|
|
|
return obs
|
|
|
|
def get_reward(self, physics):
|
|
# return whether left gripper is holding the box
|
|
raise NotImplementedError
|
|
|
|
|
|
class TransferCubeTask(BimanualViperXTask):
|
|
def __init__(self, random=None):
|
|
super().__init__(random=random)
|
|
self.max_reward = 4
|
|
|
|
def initialize_episode(self, physics):
|
|
"""Sets the state of the environment at the start of each episode."""
|
|
# TODO Notice: this function does not randomize the env configuration. Instead, set BOX_POSE from outside
|
|
# reset qpos, control and box position
|
|
with physics.reset_context():
|
|
physics.named.data.qpos[:16] = START_ARM_POSE
|
|
np.copyto(physics.data.ctrl, START_ARM_POSE)
|
|
assert BOX_POSE[0] is not None
|
|
physics.named.data.qpos[-7:] = BOX_POSE[0]
|
|
# print(f"{BOX_POSE=}")
|
|
super().initialize_episode(physics)
|
|
|
|
@staticmethod
|
|
def get_env_state(physics):
|
|
env_state = physics.data.qpos.copy()[16:]
|
|
return env_state
|
|
|
|
def get_reward(self, physics):
|
|
# return whether left gripper is holding the box
|
|
all_contact_pairs = []
|
|
for i_contact in range(physics.data.ncon):
|
|
id_geom_1 = physics.data.contact[i_contact].geom1
|
|
id_geom_2 = physics.data.contact[i_contact].geom2
|
|
name_geom_1 = physics.model.id2name(id_geom_1, "geom")
|
|
name_geom_2 = physics.model.id2name(id_geom_2, "geom")
|
|
contact_pair = (name_geom_1, name_geom_2)
|
|
all_contact_pairs.append(contact_pair)
|
|
|
|
touch_left_gripper = ("red_box", "vx300s_left/10_left_gripper_finger") in all_contact_pairs
|
|
touch_right_gripper = ("red_box", "vx300s_right/10_right_gripper_finger") in all_contact_pairs
|
|
touch_table = ("red_box", "table") in all_contact_pairs
|
|
|
|
reward = 0
|
|
if touch_right_gripper:
|
|
reward = 1
|
|
if touch_right_gripper and not touch_table: # lifted
|
|
reward = 2
|
|
if touch_left_gripper: # attempted transfer
|
|
reward = 3
|
|
if touch_left_gripper and not touch_table: # successful transfer
|
|
reward = 4
|
|
return reward
|
|
|
|
|
|
class InsertionTask(BimanualViperXTask):
|
|
def __init__(self, random=None):
|
|
super().__init__(random=random)
|
|
self.max_reward = 4
|
|
|
|
def initialize_episode(self, physics):
|
|
"""Sets the state of the environment at the start of each episode."""
|
|
# TODO Notice: this function does not randomize the env configuration. Instead, set BOX_POSE from outside
|
|
# reset qpos, control and box position
|
|
with physics.reset_context():
|
|
physics.named.data.qpos[:16] = START_ARM_POSE
|
|
np.copyto(physics.data.ctrl, START_ARM_POSE)
|
|
assert BOX_POSE[0] is not None
|
|
physics.named.data.qpos[-7 * 2 :] = BOX_POSE[0] # two objects
|
|
# print(f"{BOX_POSE=}")
|
|
super().initialize_episode(physics)
|
|
|
|
@staticmethod
|
|
def get_env_state(physics):
|
|
env_state = physics.data.qpos.copy()[16:]
|
|
return env_state
|
|
|
|
def get_reward(self, physics):
|
|
# return whether peg touches the pin
|
|
all_contact_pairs = []
|
|
for i_contact in range(physics.data.ncon):
|
|
id_geom_1 = physics.data.contact[i_contact].geom1
|
|
id_geom_2 = physics.data.contact[i_contact].geom2
|
|
name_geom_1 = physics.model.id2name(id_geom_1, "geom")
|
|
name_geom_2 = physics.model.id2name(id_geom_2, "geom")
|
|
contact_pair = (name_geom_1, name_geom_2)
|
|
all_contact_pairs.append(contact_pair)
|
|
|
|
touch_right_gripper = ("red_peg", "vx300s_right/10_right_gripper_finger") in all_contact_pairs
|
|
touch_left_gripper = (
|
|
("socket-1", "vx300s_left/10_left_gripper_finger") in all_contact_pairs
|
|
or ("socket-2", "vx300s_left/10_left_gripper_finger") in all_contact_pairs
|
|
or ("socket-3", "vx300s_left/10_left_gripper_finger") in all_contact_pairs
|
|
or ("socket-4", "vx300s_left/10_left_gripper_finger") in all_contact_pairs
|
|
)
|
|
|
|
peg_touch_table = ("red_peg", "table") in all_contact_pairs
|
|
socket_touch_table = (
|
|
("socket-1", "table") in all_contact_pairs
|
|
or ("socket-2", "table") in all_contact_pairs
|
|
or ("socket-3", "table") in all_contact_pairs
|
|
or ("socket-4", "table") in all_contact_pairs
|
|
)
|
|
peg_touch_socket = (
|
|
("red_peg", "socket-1") in all_contact_pairs
|
|
or ("red_peg", "socket-2") in all_contact_pairs
|
|
or ("red_peg", "socket-3") in all_contact_pairs
|
|
or ("red_peg", "socket-4") in all_contact_pairs
|
|
)
|
|
pin_touched = ("red_peg", "pin") in all_contact_pairs
|
|
|
|
reward = 0
|
|
if touch_left_gripper and touch_right_gripper: # touch both
|
|
reward = 1
|
|
if (
|
|
touch_left_gripper and touch_right_gripper and (not peg_touch_table) and (not socket_touch_table)
|
|
): # grasp both
|
|
reward = 2
|
|
if peg_touch_socket and (not peg_touch_table) and (not socket_touch_table): # peg and socket touching
|
|
reward = 3
|
|
if pin_touched: # successful insertion
|
|
reward = 4
|
|
return reward
|