adding missing functions
This commit is contained in:
parent
fa7eb860ea
commit
52b7bde587
|
@ -24,22 +24,20 @@ from reachy2_sdk import ReachySDK
|
|||
from copy import copy
|
||||
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReachyRobotConfig:
|
||||
robot_type: str | None = "Reachy2"
|
||||
cameras: dict[str, Camera] = field(default_factory=lambda: {})
|
||||
ip_address: str | None = "localhost"
|
||||
|
||||
class ReachyRobot():
|
||||
"""Wrapper of ReachySDK """
|
||||
|
||||
class ReachyRobot:
|
||||
"""Wrapper of ReachySDK"""
|
||||
|
||||
def __init__(self, config: ReachyRobotConfig | None = None, **kwargs):
|
||||
if config is None:
|
||||
config = ReachyRobotConfig()
|
||||
|
||||
|
||||
|
||||
# Overwrite config arguments using kwargs
|
||||
self.config = replace(config, **kwargs)
|
||||
|
||||
|
@ -48,8 +46,9 @@ class ReachyRobot():
|
|||
self.is_connected = False
|
||||
self.teleop = None
|
||||
self.logs = {}
|
||||
self.reachy=ReachySDK(host=config.ip_address)
|
||||
self.is_connected=True #at init Reachy2 is in fact connected...
|
||||
self.reachy: ReachySDK = ReachySDK(host=config.ip_address)
|
||||
self.is_connected = True # at init Reachy2 is in fact connected...
|
||||
self.mobile_base_available = self.reachy.mobile_base is not None
|
||||
|
||||
self.state_keys = None
|
||||
self.action_keys = None
|
||||
|
@ -58,60 +57,141 @@ class ReachyRobot():
|
|||
print("Connecting to Reachy")
|
||||
self.reachy.is_connected = self.reachy.connect()
|
||||
if not self.is_connected:
|
||||
print(f'Cannot connect to Reachy at address {self.config.ip_address}. Maybe a connection already exists.')
|
||||
print(
|
||||
f"Cannot connect to Reachy at address {self.config.ip_address}. Maybe a connection already exists."
|
||||
)
|
||||
raise ConnectionError()
|
||||
|
||||
self.reachy.turn_on()
|
||||
print(self.cameras)
|
||||
if self.cameras is not None:
|
||||
for name in self.cameras:
|
||||
print(f'Connecting camera: {name}')
|
||||
print(f"Connecting camera: {name}")
|
||||
self.cameras[name].connect()
|
||||
self.is_connected = self.is_connected and self.cameras[name].is_connected
|
||||
self.is_connected = (
|
||||
self.is_connected and self.cameras[name].is_connected
|
||||
)
|
||||
|
||||
if not self.is_connected:
|
||||
print("Could not connect to the cameras, check that all cameras are plugged-in.")
|
||||
print(
|
||||
"Could not connect to the cameras, check that all cameras are plugged-in."
|
||||
)
|
||||
raise ConnectionError()
|
||||
|
||||
def run_calibration(self):
|
||||
pass
|
||||
|
||||
def teleop_step(
|
||||
self, record_data=False
|
||||
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
|
||||
# TODO
|
||||
if not record_data:
|
||||
return
|
||||
action = {}
|
||||
action["neck_roll.pos"] = self.reachy.head.neck.roll.goal_position
|
||||
action["neck_pitch.pos"] = self.reachy.head.neck.pitch.goal_position
|
||||
action["neck_yaw.pos"] = self.reachy.head.neck.yaw.goal_position
|
||||
|
||||
return None
|
||||
action["r_shoulder_pitch.pos"] = self.reachy.r_arm.shoulder.pitch.goal_position
|
||||
action["r_shoulder_roll.pos"] = self.reachy.r_arm.shoulder.roll.goal_position
|
||||
action["r_elbow_yaw.pos"] = self.reachy.r_arm.elbow.yaw.goal_position
|
||||
action["r_elbow_pitch.pos"] = self.reachy.r_arm.elbow.pitch.goal_position
|
||||
action["r_wrist_roll.pos"] = self.reachy.r_arm.wrist.roll.goal_position
|
||||
action["r_wrist_pitch.pos"] = self.reachy.r_arm.wrist.pitch.goal_position
|
||||
action["r_wrist_yaw.pos"] = self.reachy.r_arm.wrist.yaw.goal_position
|
||||
action["r_gripper.pos"] = self.reachy.r_arm.gripper.opening
|
||||
|
||||
action["l_shoulder_pitch.pos"] = self.reachy.l_arm.shoulder.pitch.goal_position
|
||||
action["l_shoulder_roll.pos"] = self.reachy.l_arm.shoulder.roll.goal_position
|
||||
action["l_elbow_yaw.pos"] = self.reachy.l_arm.elbow.yaw.goal_position
|
||||
action["l_elbow_pitch.pos"] = self.reachy.l_arm.elbow.pitch.goal_position
|
||||
action["l_wrist_roll.pos"] = self.reachy.l_arm.wrist.roll.goal_position
|
||||
action["l_wrist_pitch.pos"] = self.reachy.l_arm.wrist.pitch.goal_position
|
||||
action["l_wrist_yaw.pos"] = self.reachy.l_arm.wrist.yaw.goal_position
|
||||
action["l_gripper.pos"] = self.reachy.l_arm.gripper.opening
|
||||
|
||||
if self.mobile_base_available:
|
||||
last_cmd_vel = self.reachy.mobile_base.last_cmd_vel
|
||||
action["mobile_base_x.vel"] = last_cmd_vel["x"]
|
||||
action["mobile_base_y.vel"] = last_cmd_vel["y"]
|
||||
action["mobile_base_theta.vel"] = last_cmd_vel["theta"]
|
||||
|
||||
obs_dict = self.capture_observation()
|
||||
action_dict = {}
|
||||
action_dict["action"] = action
|
||||
|
||||
return obs_dict, action_dict
|
||||
|
||||
def get_state(self) -> dict:
|
||||
|
||||
# neck roll, pitch, yaw
|
||||
# r_shoulder_pitch, r_shoulder_roll, r_elbow_yaw, r_elbow_pitch, r_wrist_roll, r_wrist_pitch, r_wrist_yaw, r_gripper
|
||||
# l_shoulder_pitch, l_shoulder_roll, l_elbow_yaw, l_elbow_pitch, l_wrist_roll, l_wrist_pitch, l_wrist_yaw, l_gripper
|
||||
# mobile base x, y, theta
|
||||
if self.is_connected:
|
||||
if self.mobile_base_available:
|
||||
odometry = self.reachy.mobile_base.odometry
|
||||
else:
|
||||
odometry = {"x": 0, "y": 0, "theta": 0, "vx": 0, "vy": 0, "vtheta": 0}
|
||||
return {
|
||||
"neck_yaw.pos": np.radians(self.reachy.head.neck.yaw.present_position),
|
||||
"neck_pitch.pos": np.radians(self.reachy.head.neck.pitch.present_position),
|
||||
"neck_roll.pos": np.radians(self.reachy.head.neck.roll.present_position),
|
||||
"r_shoulder_pitch.pos": np.radians(self.reachy.r_arm.shoulder.pitch.present_position),
|
||||
"r_shoulder_roll.pos": np.radians(self.reachy.r_arm.shoulder.roll.present_position),
|
||||
"r_elbow_yaw.pos": np.radians(self.reachy.r_arm.elbow.yaw.present_position),
|
||||
"r_elbow_pitch.pos": np.radians(self.reachy.r_arm.elbow.pitch.present_position),
|
||||
"r_wrist_roll.pos": np.radians(self.reachy.r_arm.wrist.roll.present_position),
|
||||
"r_wrist_pitch.pos": np.radians(self.reachy.r_arm.wrist.pitch.present_position),
|
||||
"r_wrist_yaw.pos": np.radians(self.reachy.r_arm.wrist.yaw.present_position),
|
||||
"neck_pitch.pos": np.radians(
|
||||
self.reachy.head.neck.pitch.present_position
|
||||
),
|
||||
"neck_roll.pos": np.radians(
|
||||
self.reachy.head.neck.roll.present_position
|
||||
),
|
||||
"r_shoulder_pitch.pos": np.radians(
|
||||
self.reachy.r_arm.shoulder.pitch.present_position
|
||||
),
|
||||
"r_shoulder_roll.pos": np.radians(
|
||||
self.reachy.r_arm.shoulder.roll.present_position
|
||||
),
|
||||
"r_elbow_yaw.pos": np.radians(
|
||||
self.reachy.r_arm.elbow.yaw.present_position
|
||||
),
|
||||
"r_elbow_pitch.pos": np.radians(
|
||||
self.reachy.r_arm.elbow.pitch.present_position
|
||||
),
|
||||
"r_wrist_roll.pos": np.radians(
|
||||
self.reachy.r_arm.wrist.roll.present_position
|
||||
),
|
||||
"r_wrist_pitch.pos": np.radians(
|
||||
self.reachy.r_arm.wrist.pitch.present_position
|
||||
),
|
||||
"r_wrist_yaw.pos": np.radians(
|
||||
self.reachy.r_arm.wrist.yaw.present_position
|
||||
),
|
||||
"r_gripper.pos": np.radians(self.reachy.r_arm.gripper.present_position),
|
||||
"l_shouldel_pitch.pos": np.radians(self.reachy.l_arm.shoulder.pitch.present_position),
|
||||
"l_shouldel_roll.pos": np.radians(self.reachy.l_arm.shoulder.roll.present_position),
|
||||
"l_elbow_yaw.pos": np.radians(self.reachy.l_arm.elbow.yaw.present_position),
|
||||
"l_elbow_pitch.pos": np.radians(self.reachy.l_arm.elbow.pitch.present_position),
|
||||
"l_wrist_roll.pos": np.radians(self.reachy.l_arm.wrist.roll.present_position),
|
||||
"l_wrist_pitch.pos": np.radians(self.reachy.l_arm.wrist.pitch.present_position),
|
||||
"l_wrist_yaw.pos": np.radians(self.reachy.l_arm.wrist.yaw.present_position),
|
||||
"l_shoulder_pitch.pos": np.radians(
|
||||
self.reachy.l_arm.shoulder.pitch.present_position
|
||||
),
|
||||
"l_shoulder_roll.pos": np.radians(
|
||||
self.reachy.l_arm.shoulder.roll.present_position
|
||||
),
|
||||
"l_elbow_yaw.pos": np.radians(
|
||||
self.reachy.l_arm.elbow.yaw.present_position
|
||||
),
|
||||
"l_elbow_pitch.pos": np.radians(
|
||||
self.reachy.l_arm.elbow.pitch.present_position
|
||||
),
|
||||
"l_wrist_roll.pos": np.radians(
|
||||
self.reachy.l_arm.wrist.roll.present_position
|
||||
),
|
||||
"l_wrist_pitch.pos": np.radians(
|
||||
self.reachy.l_arm.wrist.pitch.present_position
|
||||
),
|
||||
"l_wrist_yaw.pos": np.radians(
|
||||
self.reachy.l_arm.wrist.yaw.present_position
|
||||
),
|
||||
"l_gripper.pos": np.radians(self.reachy.l_arm.gripper.present_position),
|
||||
#TODO mobile base
|
||||
"mobile_base.vx": odometry["vx"],
|
||||
"mobile_base.vy": odometry["vy"],
|
||||
"mobile_base.vtheta": odometry["vtheta"],
|
||||
}
|
||||
else:
|
||||
return {}
|
||||
|
||||
def capture_observation(self) -> dict:
|
||||
|
||||
if self.is_connected:
|
||||
|
||||
before_read_t = time.perf_counter()
|
||||
state = self.get_state()
|
||||
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
|
||||
|
@ -125,11 +205,17 @@ class ReachyRobot():
|
|||
images = {}
|
||||
for name in self.cameras:
|
||||
before_camread_t = time.perf_counter()
|
||||
images[name] = self.cameras[name].read() #Reachy cameras read() is not blocking?
|
||||
images[name] = self.cameras[
|
||||
name
|
||||
].read() # Reachy cameras read() is not blocking?
|
||||
# print(f'name: {name} img: {images[name]}')
|
||||
if images[name] is not None:
|
||||
images[name] = torch.from_numpy(copy(images[name][0])) #seems like I need to copy?
|
||||
self.logs[f"read_camera_{name}_dt_s"] = images[name][1] #full timestamp, TODO dt
|
||||
images[name] = torch.from_numpy(
|
||||
copy(images[name][0])
|
||||
) # seems like I need to copy?
|
||||
self.logs[f"read_camera_{name}_dt_s"] = images[name][
|
||||
1
|
||||
] # full timestamp, TODO dt
|
||||
|
||||
# Populate output dictionnaries
|
||||
obs_dict = {}
|
||||
|
@ -140,18 +226,50 @@ class ReachyRobot():
|
|||
return obs_dict
|
||||
else:
|
||||
return {}
|
||||
def send_action(self, action: torch.Tensor) -> torch.Tensor:
|
||||
|
||||
def send_action(self, action: torch.Tensor) -> torch.Tensor:
|
||||
if not self.is_connected:
|
||||
raise ConnectionError()
|
||||
|
||||
#TODO: what shape is the action tensor?
|
||||
self.reachy.head.neck.yaw.goal_position = action[0]
|
||||
self.reachy.head.neck.pitch.goal_position = action[1]
|
||||
self.reachy.head.neck.roll.goal_position = action[2]
|
||||
|
||||
self.reachy.r_arm.shoulder.pitch.goal_position = action[3]
|
||||
self.reachy.r_arm.shoulder.roll.goal_position = action[4]
|
||||
self.reachy.r_arm.elbow.yaw.goal_position = action[5]
|
||||
self.reachy.r_arm.elbow.pitch.goal_position = action[6]
|
||||
self.reachy.r_arm.wrist.roll.goal_position = action[7]
|
||||
self.reachy.r_arm.pitch.roll.goal_position = action[8]
|
||||
self.reachy.r_arm.wrist.yaw.goal_position = action[9]
|
||||
self.reachy.r_arm.gripper.set_opening(action[10])
|
||||
|
||||
self.reachy.l_arm.shoulder.pitch.goal_position = action[11]
|
||||
self.reachy.l_arm.shoulder.roll.goal_position = action[12]
|
||||
self.reachy.l_arm.elbow.yaw.goal_position = action[13]
|
||||
self.reachy.l_arm.elbow.pitch.goal_position = action[14]
|
||||
self.reachy.l_arm.wrist.roll.goal_position = action[15]
|
||||
self.reachy.l_arm.pitch.roll.goal_position = action[16]
|
||||
self.reachy.l_arm.wrist.yaw.goal_position = action[17]
|
||||
self.reachy.l_arm.gripper.set_opening(action[18])
|
||||
|
||||
self.reachy.send_goal_positions()
|
||||
|
||||
if self.mobile_base_available:
|
||||
self.reachy.mobile_base.set_goal_speed(action[19], action[20], action[21])
|
||||
self.reachy.mobile_base.send_speed_command()
|
||||
|
||||
# TODO: what shape is the action tensor?
|
||||
# 7 dofs per arm (x2)
|
||||
# 1 dof per gripper (x2)
|
||||
# 3 dofs for the neck
|
||||
# 3 dofs for the mobile base (x, y, theta)
|
||||
# 7+7+1+1+3+3 = 22
|
||||
return action
|
||||
|
||||
def print_logs(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def disconnect(self) -> None:
|
||||
print("Disconnecting")
|
||||
self.is_connected = False
|
||||
|
|
Loading…
Reference in New Issue