initial support for Reachy2. Add basic camera access and joint positions

This commit is contained in:
Steve Nguyen 2024-11-15 18:02:46 +01:00
parent 963738d983
commit beed4763f9
5 changed files with 1756 additions and 1410 deletions

View File

@ -0,0 +1,197 @@
#!/usr/bin/env python
# Copyright 2024 The Pollen Robotics team and the HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from dataclasses import dataclass, field, replace
import numpy as np
import torch
from lerobot.common.robot_devices.cameras.utils import Camera
from reachy2_sdk import ReachySDK
from reachy2_sdk.media.camera import CameraView
from reachy2_sdk.media.camera_manager import CameraManager
@dataclass
class ReachyCameraConfig:
fps: int | None = None
width: int | None = None
height: int | None = None
color_mode: str = "rgb"
rotation: int | None = None
mock: bool = False
class ReachyCamera:
def __init__(self, host: str, post: int,name: str, image_type: str, config: ReachyCameraConfig):
self.host=host
self.port=port
self.image_type = image_type
self.name = name
self.config = config
self.camera = Camera()
self.cam_manager=None
self.is_connected = False
self.logs = {}
def connect(self):
if not self.is_connected:
self.cam_manager=CameraManager(host=self.host, port=self.port)
self.cam_manager.initialize_cameras()#FIXME: maybe we should not re-initialize
self.is_connected=True
def read(self) -> np.ndarray:
if self.name=='teleop':
if self.image_type=='left':
return self.cam_manager.teleop.get_frame(CameraView.LEFT)
elif self.image_type=='right':
return self.cam_manager.teleop.get_frame(CameraView.RIGHT)
else:
return None
elif self.name=='depth':
if self.image_type=='depth':
return self.cam_manager.depth.get_depth_frame()
elif self.image_type=='rgb':
return self.cam_manager.depth.get_frame()
else:
return None
@dataclass
class ReachyRobotConfig:
robot_type: str | None = "Reachy2"
cameras: dict[str, Camera] = field(default_factory=lambda: {})
ip_address: str | None = "localhost"
class ReachyRobot():
"""Wrapper of ReachySDK """
def __init__(self, config: ReachyRobotConfig | None = None, **kwargs):
if config is None:
config = ReachyRobotConfig()
# Overwrite config arguments using kwargs
self.config = replace(config, **kwargs)
self.robot_type = self.config.robot_type
self.cameras = self.config.cameras
self.is_connected = False #at init Reachy2 is in fact connected...
self.teleop = None
self.logs = {}
self.reachy=ReachySDK(host=config.ip_address)
RobotParams.set_logging_level("WARNING")
RobotParams.set_logging_formatter("brief_console_formatter")
self.state_keys = None
self.action_keys = None
def connect(self) -> None:
self.reachy.is_connected = self.reachy.connect()
if not self.is_connected:
print(f'Cannot connect to Reachy at address {self.config.ip_address}. Maybe a connection already exists.')
raise ConnectionError()
self.reachy.turn_on()
for name in self.cameras:
self.cameras[name].connect()
self.is_connected = self.is_connected and self.cameras[name].is_connected
if not self.is_connected:
print("Could not connect to the cameras, check that all cameras are plugged-in.")
raise ConnectionError()
def teleop_step(
self, record_data=False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
# TODO
return None
def get_state(self) -> dict:
return {
"neck_yaw.pos": np.radians(self.reachy.head.neck.yaw.present_position),
"neck_pitch.pos": np.radians(self.reachy.head.neck.pitch.present_position),
"neck_roll.pos": np.radians(self.reachy.head.neck.roll.present_position),
"r_shoulder_pitch.pos": np.radians(self.reachy.r_arm.shoulder.pitch.present_position),
"r_shoulder_roll.pos": np.radians(self.reachy.r_arm.shoulder.roll.present_position),
"r_elbow_yaw.pos": np.radians(self.reachy.r_arm.elbow.yaw.present_position),
"r_elbow_pitch.pos": np.radians(self.reachy.r_arm.elbow.pitch.present_position),
"r_wrist_roll.pos": np.radians(self.reachy.r_arm.wrist.roll.present_position),
"r_wrist_pitch.pos": np.radians(self.reachy.r_arm.wrist.pitch.present_position),
"r_wrist_yaw.pos": np.radians(self.reachy.r_arm.wrist.yaw.present_position),
"r_gripper.pos": np.radians(self.reachy.r_arm.gripper.present_position),
"l_shouldel_pitch.pos": np.radians(self.reachy.l_arm.shoulder.pitch.present_position),
"l_shouldel_roll.pos": np.radians(self.reachy.l_arm.shoulder.roll.present_position),
"l_elbow_yaw.pos": np.radians(self.reachy.l_arm.elbow.yaw.present_position),
"l_elbow_pitch.pos": np.radians(self.reachy.l_arm.elbow.pitch.present_position),
"l_wrist_roll.pos": np.radians(self.reachy.l_arm.wrist.roll.present_position),
"l_wrist_pitch.pos": np.radians(self.reachy.l_arm.wrist.pitch.present_position),
"l_wrist_yaw.pos": np.radians(self.reachy.l_arm.wrist.yaw.present_position),
"l_gripper.pos": np.radians(self.reachy.l_arm.gripper.present_position),
#TODO mobile base
}
def capture_observation(self) -> dict:
before_read_t = time.perf_counter()
state = self.get_state()
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
if self.state_keys is None:
self.state_keys = list(state)
state = torch.as_tensor(list(state.values()))
# Capture images from cameras
images = {}
for name in self.cameras:
before_camread_t = time.perf_counter()
images[name] = self.cameras[name].read() #Reachy cameras read() is not blocking?
images[name] = torch.from_numpy(images[name])
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# Populate output dictionnaries
obs_dict = {}
obs_dict["observation.state"] = state
for name in self.cameras:
obs_dict[f"observation.images.{name}"] = images[name]
return obs_dict
def send_action(self, action: torch.Tensor) -> torch.Tensor:
if not self.is_connected:
raise ConnectionError()
#TODO
return action
def print_logs(self) -> None:
pass
def disconnect(self) -> None:
self.reachy.turn_off_smoothly()
self.is_connected = False
def __del__(self):
self.disconnect()

View File

@ -0,0 +1,36 @@
# [Reachy2 from Pollen Robotics](https://www.pollen-robotics.com)
# Requires installing extras packages
# With pip: `pip install -e ".[reachy2]"`
# With poetry: `poetry install --sync --extras "reachy2"`
_target_: lerobot.common.robot_devices.robots.reachy2.ReachyRobot
robot_type: Reachy2
cameras:
head_left:
_target_: lerobot.common.robot_devices.robots.reachy2.Reachycamera
name: teleop
host: localhost
port: 50065
image_type: left
head_right:
_target_: lerobot.common.robot_devices.robots.reachy2.Reachycamera
name: teleop
host: localhost
port: 50065
image_type: right
torso_rgb:
_target_: lerobot.common.robot_devices.robots.reachy2.Reachycamera
name: depth
host: localhost
port: 50065
image_type: rgb
torso_depth:
_target_: lerobot.common.robot_devices.robots.reachy2.Reachycamera
name: depth
host: localhost
port: 50065
image_type: depth

View File

@ -0,0 +1,32 @@
import argparse
import logging
import time
from pathlib import Path
from typing import List
# from safetensors.torch import load_file, save_file
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
from lerobot.common.datasets.populate_dataset import (create_lerobot_dataset,
delete_current_episode,
init_dataset,
save_current_episode)
from lerobot.common.robot_devices.control_utils import (
control_loop, has_method, init_keyboard_listener, init_policy,
log_control_info, record_episode, reset_environment,
sanity_check_dataset_name, stop_recording, warmup_record)
from lerobot.common.robot_devices.robots.factory import make_robot
from lerobot.common.robot_devices.robots.utils import Robot
from lerobot.common.robot_devices.utils import busy_wait, safe_disconnect
from lerobot.common.utils.utils import (init_hydra_config, init_logging,
log_say, none_or_int)
if __name__ == '__main__':
init_logging()
control_mode = "test"
robot_path = "lerobot/configs/robot/reachy2.yaml"
robot_overrides = None
robot_cfg = init_hydra_config(robot_path, robot_overrides)
robot = make_robot(robot_cfg)

2899
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -71,6 +71,7 @@ pyrealsense2 = {version = ">=2.55.1.6486", markers = "sys_platform != 'darwin'",
pyrender = {git = "https://github.com/mmatl/pyrender.git", markers = "sys_platform == 'linux'", optional = true}
hello-robot-stretch-body = {version = ">=0.7.27", markers = "sys_platform == 'linux'", optional = true}
pyserial = {version = ">=3.5", optional = true}
reachy2-sdk = {git = "https://github.com/pollen-robotics/reachy2-sdk", branch="450-opencv-dependency-version", optional = true}
[tool.poetry.extras]
@ -86,6 +87,7 @@ dynamixel = ["dynamixel-sdk", "pynput"]
feetech = ["feetech-servo-sdk", "pynput"]
intelrealsense = ["pyrealsense2"]
stretch = ["hello-robot-stretch-body", "pyrender", "pyrealsense2", "pynput"]
reachy2 = ["reachy2-sdk"]
[tool.ruff]
line-length = 110