POC adding a leader/follower hack for Reachy
This commit is contained in:
parent
6366c7f46e
commit
fc4df91883
|
@ -0,0 +1,342 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The Pollen Robotics team and the HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import time
|
||||
from copy import copy
|
||||
from dataclasses import dataclass, field, replace
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from lerobot.common.robot_devices.cameras.reachy2 import ReachyCamera
|
||||
from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus
|
||||
from reachy2_sdk import ReachySDK
|
||||
|
||||
REACHY_MOTORS = [
|
||||
"neck_yaw.pos",
|
||||
"neck_pitch.pos",
|
||||
"neck_roll.pos",
|
||||
"r_shoulder_pitch.pos",
|
||||
"r_shoulder_roll.pos",
|
||||
"r_elbow_yaw.pos",
|
||||
"r_elbow_pitch.pos",
|
||||
"r_wrist_roll.pos",
|
||||
"r_wrist_pitch.pos",
|
||||
"r_wrist_yaw.pos",
|
||||
"r_gripper.pos",
|
||||
"l_shoulder_pitch.pos",
|
||||
"l_shoulder_roll.pos",
|
||||
"l_elbow_yaw.pos",
|
||||
"l_elbow_pitch.pos",
|
||||
"l_wrist_roll.pos",
|
||||
"l_wrist_pitch.pos",
|
||||
"l_wrist_yaw.pos",
|
||||
"l_gripper.pos",
|
||||
"mobile_base.vx",
|
||||
"mobile_base.vy",
|
||||
"mobile_base.vtheta",
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReachyManipulatorRobotConfig:
|
||||
robot_type: str | None = "reachy2"
|
||||
cameras: dict[str, ReachyCamera] = field(default_factory=lambda: {})
|
||||
ip_address: str | None = "172.17.135.207"
|
||||
# ip_address: str | None = "192.168.0.197"
|
||||
# ip_address: str | None = "localhost"
|
||||
|
||||
|
||||
class ReachyManipulatorRobot:
|
||||
"""Wrapper of ReachySDK"""
|
||||
|
||||
def __init__(self, config: ReachyRobotManipulatorConfig | None = None, **kwargs):
|
||||
if config is None:
|
||||
config = ReachyRobotManipulatorConfig()
|
||||
|
||||
# Overwrite config arguments using kwargs
|
||||
self.config = replace(config, **kwargs)
|
||||
|
||||
self.robot_type = self.config.robot_type
|
||||
self.cameras = self.config.cameras
|
||||
self.has_camera = True
|
||||
self.num_cameras = len(self.cameras)
|
||||
self.is_connected = False
|
||||
self.teleop = None
|
||||
self.logs = {}
|
||||
self.reachy = None
|
||||
self.mobile_base_available = False
|
||||
|
||||
self.state_keys = None
|
||||
self.action_keys = None
|
||||
|
||||
self.leader_arm = FeetechMotorsBus(config.leader_arm.port, config.leader_arm.motors)
|
||||
self.leader_calib_dir=config.leader_arm.calibration_dir
|
||||
|
||||
@property
|
||||
def camera_features(self) -> dict:
|
||||
cam_ft = {}
|
||||
for cam_key, cam in self.cameras.items():
|
||||
key = f"observation.images.{cam_key}"
|
||||
cam_ft[key] = {
|
||||
"shape": (cam.height, cam.width, cam.channels),
|
||||
"names": ["height", "width", "channels"],
|
||||
"info": None,
|
||||
}
|
||||
return cam_ft
|
||||
|
||||
@property
|
||||
def motor_features(self) -> dict:
|
||||
motors = REACHY_MOTORS
|
||||
# if self.mobile_base_available:
|
||||
# motors += REACHY_MOBILE_BASE
|
||||
return {
|
||||
"action": {
|
||||
"dtype": "float32",
|
||||
"shape": (len(motors),),
|
||||
"names": motors,
|
||||
},
|
||||
"observation.state": {
|
||||
"dtype": "float32",
|
||||
"shape": (len(motors),),
|
||||
"names": motors,
|
||||
},
|
||||
}
|
||||
|
||||
@property
|
||||
def features(self):
|
||||
return {**self.motor_features, **self.camera_features}
|
||||
|
||||
def connect(self) -> None:
|
||||
self.reachy = ReachySDK(host=self.config.ip_address)
|
||||
print("Connecting to Reachy")
|
||||
self.reachy.connect()
|
||||
self.is_connected = self.reachy.is_connected
|
||||
if not self.is_connected:
|
||||
print(
|
||||
f"Cannot connect to Reachy at address {self.config.ip_address}. Maybe a connection already exists."
|
||||
)
|
||||
raise ConnectionError()
|
||||
# self.reachy.turn_on()
|
||||
print(self.cameras)
|
||||
if self.cameras is not None:
|
||||
for name in self.cameras:
|
||||
print(f"Connecting camera: {name}")
|
||||
self.cameras[name].connect()
|
||||
self.is_connected = self.is_connected and self.cameras[name].is_connected
|
||||
|
||||
if not self.is_connected:
|
||||
print("Could not connect to the cameras, check that all cameras are plugged-in.")
|
||||
raise ConnectionError()
|
||||
|
||||
self.mobile_base_available = self.reachy.mobile_base is not None
|
||||
|
||||
print("Connecting to leader arm")
|
||||
self.leader_arm.connect()
|
||||
|
||||
with open(self.leader_arm.calibration_dir) as f:
|
||||
self.leader_arm.calibration = json.load(f)
|
||||
self.leader_arm.set_calibration(self.leader_arm.calibration)
|
||||
self.leader_arm.apply_calibration()
|
||||
|
||||
def run_calibration(self):
|
||||
pass
|
||||
|
||||
def teleop_step(
|
||||
self, record_data=False
|
||||
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
|
||||
if not record_data:
|
||||
return
|
||||
|
||||
|
||||
#get leader arm
|
||||
leader_pos = {}
|
||||
for name in self.leader_arm:
|
||||
before_lread_t = time.perf_counter()
|
||||
leader_pos[name] = self.leader_arm[name].read("Present_Position")
|
||||
leader_pos[name] = torch.from_numpy(leader_pos[name])
|
||||
self.logs[f"read_leader_{name}_pos_dt_s"] = time.perf_counter() - before_lread_t
|
||||
|
||||
|
||||
#TODO leader arm FK
|
||||
#TODO senf task space poses
|
||||
action = {}
|
||||
action["neck_roll.pos"] = self.reachy.head.neck.roll.goal_position
|
||||
action["neck_pitch.pos"] = self.reachy.head.neck.pitch.goal_position
|
||||
action["neck_yaw.pos"] = self.reachy.head.neck.yaw.goal_position
|
||||
|
||||
action["r_shoulder_pitch.pos"] = self.reachy.r_arm.shoulder.pitch.goal_position
|
||||
action["r_shoulder_roll.pos"] = self.reachy.r_arm.shoulder.roll.goal_position
|
||||
action["r_elbow_yaw.pos"] = self.reachy.r_arm.elbow.yaw.goal_position
|
||||
action["r_elbow_pitch.pos"] = self.reachy.r_arm.elbow.pitch.goal_position
|
||||
action["r_wrist_roll.pos"] = self.reachy.r_arm.wrist.roll.goal_position
|
||||
action["r_wrist_pitch.pos"] = self.reachy.r_arm.wrist.pitch.goal_position
|
||||
action["r_wrist_yaw.pos"] = self.reachy.r_arm.wrist.yaw.goal_position
|
||||
action["r_gripper.pos"] = self.reachy.r_arm.gripper.opening
|
||||
|
||||
action["l_shoulder_pitch.pos"] = self.reachy.l_arm.shoulder.pitch.goal_position
|
||||
action["l_shoulder_roll.pos"] = self.reachy.l_arm.shoulder.roll.goal_position
|
||||
action["l_elbow_yaw.pos"] = self.reachy.l_arm.elbow.yaw.goal_position
|
||||
action["l_elbow_pitch.pos"] = self.reachy.l_arm.elbow.pitch.goal_position
|
||||
action["l_wrist_roll.pos"] = self.reachy.l_arm.wrist.roll.goal_position
|
||||
action["l_wrist_pitch.pos"] = self.reachy.l_arm.wrist.pitch.goal_position
|
||||
action["l_wrist_yaw.pos"] = self.reachy.l_arm.wrist.yaw.goal_position
|
||||
action["l_gripper.pos"] = self.reachy.l_arm.gripper.opening
|
||||
|
||||
if self.mobile_base_available:
|
||||
last_cmd_vel = self.reachy.mobile_base.last_cmd_vel
|
||||
action["mobile_base_x.vel"] = last_cmd_vel["x"]
|
||||
action["mobile_base_y.vel"] = last_cmd_vel["y"]
|
||||
action["mobile_base_theta.vel"] = last_cmd_vel["theta"]
|
||||
else:
|
||||
action["mobile_base_x.vel"] = 0
|
||||
action["mobile_base_y.vel"] = 0
|
||||
action["mobile_base_theta.vel"] = 0
|
||||
|
||||
dtype = self.motor_features["action"]["dtype"]
|
||||
action = np.array(list(action.values()), dtype=dtype)
|
||||
# action = torch.as_tensor(list(action.values()))
|
||||
|
||||
obs_dict = self.capture_observation()
|
||||
action_dict = {}
|
||||
action_dict["action"] = action
|
||||
|
||||
return obs_dict, action_dict
|
||||
|
||||
def get_state(self) -> dict:
|
||||
# neck roll, pitch, yaw
|
||||
# r_shoulder_pitch, r_shoulder_roll, r_elbow_yaw, r_elbow_pitch, r_wrist_roll, r_wrist_pitch, r_wrist_yaw, r_gripper
|
||||
# l_shoulder_pitch, l_shoulder_roll, l_elbow_yaw, l_elbow_pitch, l_wrist_roll, l_wrist_pitch, l_wrist_yaw, l_gripper
|
||||
# mobile base x, y, theta
|
||||
if self.is_connected:
|
||||
if self.mobile_base_available:
|
||||
odometry = self.reachy.mobile_base.odometry
|
||||
else:
|
||||
odometry = {"x": 0, "y": 0, "theta": 0, "vx": 0, "vy": 0, "vtheta": 0}
|
||||
return {
|
||||
"neck_yaw.pos": self.reachy.head.neck.yaw.present_position,
|
||||
"neck_pitch.pos": self.reachy.head.neck.pitch.present_position,
|
||||
"neck_roll.pos": self.reachy.head.neck.roll.present_position,
|
||||
"r_shoulder_pitch.pos": self.reachy.r_arm.shoulder.pitch.present_position,
|
||||
"r_shoulder_roll.pos": self.reachy.r_arm.shoulder.roll.present_position,
|
||||
"r_elbow_yaw.pos": self.reachy.r_arm.elbow.yaw.present_position,
|
||||
"r_elbow_pitch.pos": self.reachy.r_arm.elbow.pitch.present_position,
|
||||
"r_wrist_roll.pos": self.reachy.r_arm.wrist.roll.present_position,
|
||||
"r_wrist_pitch.pos": self.reachy.r_arm.wrist.pitch.present_position,
|
||||
"r_wrist_yaw.pos": self.reachy.r_arm.wrist.yaw.present_position,
|
||||
"r_gripper.pos": self.reachy.r_arm.gripper.present_position,
|
||||
"l_shoulder_pitch.pos": self.reachy.l_arm.shoulder.pitch.present_position,
|
||||
"l_shoulder_roll.pos": self.reachy.l_arm.shoulder.roll.present_position,
|
||||
"l_elbow_yaw.pos": self.reachy.l_arm.elbow.yaw.present_position,
|
||||
"l_elbow_pitch.pos": self.reachy.l_arm.elbow.pitch.present_position,
|
||||
"l_wrist_roll.pos": self.reachy.l_arm.wrist.roll.present_position,
|
||||
"l_wrist_pitch.pos": self.reachy.l_arm.wrist.pitch.present_position,
|
||||
"l_wrist_yaw.pos": self.reachy.l_arm.wrist.yaw.present_position,
|
||||
"l_gripper.pos": self.reachy.l_arm.gripper.present_position,
|
||||
"mobile_base.vx": odometry["vx"],
|
||||
"mobile_base.vy": odometry["vy"],
|
||||
"mobile_base.vtheta": odometry["vtheta"],
|
||||
}
|
||||
else:
|
||||
return {}
|
||||
|
||||
def capture_observation(self) -> dict:
|
||||
if self.is_connected:
|
||||
before_read_t = time.perf_counter()
|
||||
state = self.get_state()
|
||||
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
|
||||
|
||||
if self.state_keys is None:
|
||||
self.state_keys = list(state)
|
||||
|
||||
dtype = self.motor_features["observation.state"]["dtype"]
|
||||
state = np.array(list(state.values()), dtype=dtype)
|
||||
# state = torch.as_tensor(list(state.values()))
|
||||
|
||||
# Capture images from cameras
|
||||
images = {}
|
||||
for name in self.cameras:
|
||||
# before_camread_t = time.perf_counter()
|
||||
images[name] = self.cameras[name].read() # Reachy cameras read() is not blocking?
|
||||
# print(f'name: {name} img: {images[name]}')
|
||||
if images[name] is not None:
|
||||
# images[name] = copy(images[name][0]) # seems like I need to copy?
|
||||
images[name] = torch.from_numpy(copy(images[name][0])) # seems like I need to copy?
|
||||
self.logs[f"read_camera_{name}_dt_s"] = images[name][1] # full timestamp, TODO dt
|
||||
|
||||
# Populate output dictionnaries
|
||||
obs_dict = {}
|
||||
obs_dict["observation.state"] = state
|
||||
for name in self.cameras:
|
||||
obs_dict[f"observation.images.{name}"] = images[name]
|
||||
|
||||
return obs_dict
|
||||
else:
|
||||
return {}
|
||||
|
||||
def send_action(self, action: torch.Tensor) -> torch.Tensor:
|
||||
if not self.is_connected:
|
||||
raise ConnectionError()
|
||||
|
||||
self.reachy.head.neck.yaw.goal_position = float(action[0])
|
||||
self.reachy.head.neck.pitch.goal_position = float(action[1])
|
||||
self.reachy.head.neck.roll.goal_position = float(action[2])
|
||||
|
||||
self.reachy.r_arm.shoulder.pitch.goal_position = float(action[3])
|
||||
self.reachy.r_arm.shoulder.roll.goal_position = float(action[4])
|
||||
self.reachy.r_arm.elbow.yaw.goal_position = float(action[5])
|
||||
self.reachy.r_arm.elbow.pitch.goal_position = float(action[6])
|
||||
self.reachy.r_arm.wrist.roll.goal_position = float(action[7])
|
||||
self.reachy.r_arm.wrist.roll.goal_position = float(action[8])
|
||||
self.reachy.r_arm.wrist.yaw.goal_position = float(action[9])
|
||||
self.reachy.r_arm.gripper.set_opening(float(action[10]))
|
||||
|
||||
self.reachy.l_arm.shoulder.pitch.goal_position = float(action[11])
|
||||
self.reachy.l_arm.shoulder.roll.goal_position = float(action[12])
|
||||
self.reachy.l_arm.elbow.yaw.goal_position = float(action[13])
|
||||
self.reachy.l_arm.elbow.pitch.goal_position = float(action[14])
|
||||
self.reachy.l_arm.wrist.roll.goal_position = float(action[15])
|
||||
self.reachy.l_arm.wrist.roll.goal_position = float(action[16])
|
||||
self.reachy.l_arm.wrist.yaw.goal_position = float(action[17])
|
||||
self.reachy.l_arm.gripper.set_opening(float(action[18]))
|
||||
|
||||
s = time.time()
|
||||
self.reachy.send_goal_positions(check_positions=False)
|
||||
print("send_goal_positions", time.time() - s)
|
||||
|
||||
if self.mobile_base_available:
|
||||
self.reachy.mobile_base.set_goal_speed(action[19], action[20], action[21])
|
||||
self.reachy.mobile_base.send_speed_command()
|
||||
|
||||
# TODO: what shape is the action tensor?
|
||||
# 7 dofs per arm (x2)
|
||||
# 1 dof per gripper (x2)
|
||||
# 3 dofs for the neck
|
||||
# 3 dofs for the mobile base (x, y, theta)
|
||||
# 7+7+1+1+3+3 = 22
|
||||
return action
|
||||
|
||||
def print_logs(self) -> None:
|
||||
pass
|
||||
|
||||
def disconnect(self) -> None:
|
||||
print("Disconnecting")
|
||||
self.is_connected = False
|
||||
print("Turn off")
|
||||
# self.reachy.turn_off_smoothly()
|
||||
# self.reachy.turn_off()
|
||||
print("\t turn off done")
|
||||
self.reachy.disconnect()
|
|
@ -0,0 +1,65 @@
|
|||
# [Reachy2 from Pollen Robotics](https://www.pollen-robotics.com)
|
||||
|
||||
# Requires installing extras packages
|
||||
# With pip: `pip install -e ".[reachy2]"`
|
||||
# With poetry: `poetry install --sync --extras "reachy2"`
|
||||
|
||||
|
||||
_target_: lerobot.common.robot_devices.robots.reachy2.ReachyRobotManipulatorconfig
|
||||
robot_type: reachy2
|
||||
|
||||
|
||||
leader_arm:
|
||||
main:
|
||||
_target_: lerobot.common.robot_devices.motors.feetech.FeetechMotorsBus
|
||||
port: /dev/tty.usbmodem585A0077581
|
||||
calibration_dir: .cache/calibration/so100
|
||||
motors:
|
||||
# name: (index, model)
|
||||
shoulder_pan: [1, "sts3215"]
|
||||
shoulder_lift: [2, "sts3215"]
|
||||
elbow_flex: [3, "sts3215"]
|
||||
wrist_flex: [4, "sts3215"]
|
||||
wrist_roll: [5, "sts3215"]
|
||||
gripper: [6, "sts3215"]
|
||||
|
||||
cameras:
|
||||
head_left:
|
||||
_target_: lerobot.common.robot_devices.cameras.reachy2.ReachyCamera
|
||||
name: teleop
|
||||
host: 172.17.134.85
|
||||
# host: 192.168.0.197
|
||||
# host: localhost
|
||||
port: 50065
|
||||
fps: 30
|
||||
width: 960
|
||||
height: 720
|
||||
image_type: left
|
||||
# head_right:
|
||||
# _target_: lerobot.common.robot_devices.cameras.reachy2.ReachyCamera
|
||||
# name: teleop
|
||||
# host: 172.17.135.207
|
||||
# port: 50065
|
||||
# image_type: right
|
||||
# fps: 30
|
||||
# width: 960
|
||||
# height: 720
|
||||
# torso_rgb:
|
||||
# _target_: lerobot.common.robot_devices.cameras.reachy2.ReachyCamera
|
||||
# name: depth
|
||||
# host: 172.17.135.207
|
||||
# # host: localhost
|
||||
# port: 50065
|
||||
# image_type: rgb
|
||||
# fps: 30
|
||||
# width: 1280
|
||||
# height: 720
|
||||
# torso_depth:
|
||||
# _target_: lerobot.common.robot_devices.cameras.reachy2.ReachyCamera
|
||||
# name: depth
|
||||
# host: 172.17.135.207
|
||||
# port: 50065
|
||||
# image_type: depth
|
||||
# fps: 30
|
||||
# width: 1280
|
||||
# height: 720
|
Loading…
Reference in New Issue