basic wrapper, can read position and cameras

This commit is contained in:
steve 2024-11-18 16:21:05 +01:00
parent beed4763f9
commit 58495d5c86
3 changed files with 22 additions and 59 deletions

View File

@ -21,54 +21,10 @@ import numpy as np
import torch
from lerobot.common.robot_devices.cameras.utils import Camera
from reachy2_sdk import ReachySDK
from reachy2_sdk.media.camera import CameraView
from reachy2_sdk.media.camera_manager import CameraManager
@dataclass
class ReachyCameraConfig:
fps: int | None = None
width: int | None = None
height: int | None = None
color_mode: str = "rgb"
rotation: int | None = None
mock: bool = False
class ReachyCamera:
def __init__(self, host: str, post: int,name: str, image_type: str, config: ReachyCameraConfig):
self.host=host
self.port=port
self.image_type = image_type
self.name = name
self.config = config
self.camera = Camera()
self.cam_manager=None
self.is_connected = False
self.logs = {}
def connect(self):
if not self.is_connected:
self.cam_manager=CameraManager(host=self.host, port=self.port)
self.cam_manager.initialize_cameras()#FIXME: maybe we should not re-initialize
self.is_connected=True
def read(self) -> np.ndarray:
if self.name=='teleop':
if self.image_type=='left':
return self.cam_manager.teleop.get_frame(CameraView.LEFT)
elif self.image_type=='right':
return self.cam_manager.teleop.get_frame(CameraView.RIGHT)
else:
return None
elif self.name=='depth':
if self.image_type=='depth':
return self.cam_manager.depth.get_depth_frame()
elif self.image_type=='rgb':
return self.cam_manager.depth.get_frame()
else:
return None
@dataclass
class ReachyRobotConfig:
robot_type: str | None = "Reachy2"
@ -94,8 +50,6 @@ class ReachyRobot():
self.logs = {}
self.reachy=ReachySDK(host=config.ip_address)
RobotParams.set_logging_level("WARNING")
RobotParams.set_logging_formatter("brief_console_formatter")
self.state_keys = None
self.action_keys = None
@ -107,9 +61,12 @@ class ReachyRobot():
raise ConnectionError()
self.reachy.turn_on()
for name in self.cameras:
self.cameras[name].connect()
self.is_connected = self.is_connected and self.cameras[name].is_connected
print(self.cameras)
if self.cameras is not None:
for name in self.cameras:
print(f'Connecting camera: {name}')
self.cameras[name].connect()
self.is_connected = self.is_connected and self.cameras[name].is_connected
if not self.is_connected:
print("Could not connect to the cameras, check that all cameras are plugged-in.")
@ -163,9 +120,10 @@ class ReachyRobot():
for name in self.cameras:
before_camread_t = time.perf_counter()
images[name] = self.cameras[name].read() #Reachy cameras read() is not blocking?
images[name] = torch.from_numpy(images[name])
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
print(f'name: {name} img: {images[name]}')
if images[name] is not None:
images[name] = torch.from_numpy(images[name][0])
self.logs[f"read_camera_{name}_dt_s"] = images[name][1] #full timestamp, TODO dt
# Populate output dictionnaries
obs_dict = {}
@ -188,10 +146,10 @@ class ReachyRobot():
def disconnect(self) -> None:
self.reachy.turn_off_smoothly()
self.is_connected = False
self.reachy.turn_off_smoothly()
# self.reachy.turn_off()
time.sleep(6)
def __del__(self):
self.disconnect()

View File

@ -11,25 +11,25 @@ robot_type: Reachy2
cameras:
head_left:
_target_: lerobot.common.robot_devices.robots.reachy2.Reachycamera
_target_: lerobot.common.robot_devices.cameras.reachy2.ReachyCamera
name: teleop
host: localhost
port: 50065
image_type: left
head_right:
_target_: lerobot.common.robot_devices.robots.reachy2.Reachycamera
_target_: lerobot.common.robot_devices.cameras.reachy2.ReachyCamera
name: teleop
host: localhost
port: 50065
image_type: right
torso_rgb:
_target_: lerobot.common.robot_devices.robots.reachy2.Reachycamera
_target_: lerobot.common.robot_devices.cameras.reachy2.ReachyCamera
name: depth
host: localhost
port: 50065
image_type: rgb
torso_depth:
_target_: lerobot.common.robot_devices.robots.reachy2.Reachycamera
_target_: lerobot.common.robot_devices.cameras.reachy2.ReachyCamera
name: depth
host: localhost
port: 50065

View File

@ -21,6 +21,8 @@ from lerobot.common.robot_devices.utils import busy_wait, safe_disconnect
from lerobot.common.utils.utils import (init_hydra_config, init_logging,
log_say, none_or_int)
import cv2
if __name__ == '__main__':
init_logging()
@ -30,3 +32,6 @@ if __name__ == '__main__':
robot_cfg = init_hydra_config(robot_path, robot_overrides)
robot = make_robot(robot_cfg)
print(robot.get_state())
print(robot.capture_observation())