This commit is contained in:
yadunund 2025-04-05 12:53:39 +09:00 committed by GitHub
commit 14aebfb212
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 315 additions and 0 deletions
lerobot/common/robot_devices/cameras

View File

@ -112,3 +112,37 @@ class IntelRealSenseCameraConfig(CameraConfig):
if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")
@CameraConfig.register_subclass("ros2")
@dataclass
class ROS2CameraConfig(CameraConfig):
"""
Example of tested options for any camera with a ROS 2 driver:
```python
ROS2CameraConfig(topic="/image_raw", fps=30, width=640, height=480)
```
"""
# TODO(Yadunund): Consider converting inputs to lists to subscribe to multiple
# topics with the same node.
topic: str
fps: int | None = None
width: int | None = None
height: int | None = None
color_mode: str = "rgb"
channels: int | None = None
rotation: int | None = None
mock: bool = False
def __post_init__(self):
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)
self.channels = 3
if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")

View File

@ -0,0 +1,277 @@
"""
This file contains utilities for recording frames from cameras. For more info look at `ROS2Camera` docstring.
"""
import argparse
import concurrent.futures
import shutil
import threading
import time
from pathlib import Path
from threading import Thread
# TODO(Yadunund): Implement mock.
import cv2
import numpy as np
import rclpy
from cv_bridge import CvBridge
from PIL import Image
from sensor_msgs.msg import Image as ImageMsg
from lerobot.common.robot_devices.cameras.configs import ROS2CameraConfig
from lerobot.common.robot_devices.utils import (
RobotDeviceAlreadyConnectedError,
RobotDeviceNotConnectedError,
)
from lerobot.common.utils.utils import capture_timestamp_utc
def save_image(img_array, camera_name, frame_index, images_dir):
img = Image.fromarray(img_array)
path = images_dir / f"camera_{camera_name:02d}_frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path), quality=100)
def save_images_from_cameras(
images_dir: Path,
topic: str | None = None,
record_time_s=2,
mock=False,
):
print("Connecting cameras")
cameras = []
config = ROS2CameraConfig(topic=topic, mock=mock)
camera = ROS2Camera(config)
camera.connect()
print("Here 0")
print(f"ROS2Camera({camera.node.get_name()}")
cameras.append(camera)
images_dir = Path(images_dir)
if images_dir.exists():
shutil.rmtree(
images_dir,
)
images_dir.mkdir(parents=True, exist_ok=True)
print(f"Saving images to {images_dir}")
frame_index = 0
start_time = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
while True:
now = time.perf_counter()
for camera in cameras:
# If we use async_read when fps is None, the loop will go full speed, and we will endup
# saving the same images from the cameras multiple times until the RAM/disk is full.
image = camera.read()
executor.submit(
save_image,
image,
config.topic,
frame_index,
images_dir,
)
print(f"Frame: {frame_index:04d}\tLatency (ms): {(time.perf_counter() - now) * 1000:.2f}")
if time.perf_counter() - start_time > record_time_s:
break
frame_index += 1
print(f"Images have been saved to {images_dir}")
class ROS2Camera:
"""
The ROS2Camera class allows to record images from cameras over ROS 2.
An ROS2Camera instance requires a ROS 2 driver for the camera to be running with images published over a known topic, eg. /camera/image.
It is therefore compatible with any camera that has a ROS 2 driver which may be running on the same host or any other host on the same network.
To find the topic published by the ROS 2 driver, you may list the various topics in your network:
```bash
# Make sure to source your ROS 2 installation first.
ros2 topic list
```
Example of usage:
```python
from lerobot.common.robot_devices.cameras.configs import ROS2CameraConfig
config = ROS2CameraConfig(node.name=0)
camera = ROS2Camera(config)
camera.connect()
color_image = camera.read()
# when done using the camera, consider disconnecting
camera.disconnect()
```
Example of configuring the camera:
Note: Apart from the topic name, all other options are only configurable
on the ROS 2 driver side. We could infer some of these settings once we
successfully subscribe to the camera topic but some layers of the lerobot
stack (eg. recording an episode) require these attributes to be present in
the CameraConfig implementation before the first frame is read from the
camera. So for now, the user needs to ensure the settings here match those
on the ROS 2 driver side.
```python
config = ROS2CameraConfig(topic="/image_raw", fps=30, width=640, height=480)
# Note: might error out open `camera.connect()` if these settings are not compatible with the camera
```
"""
def __init__(self, config: ROS2CameraConfig):
rclpy.init()
self.config = config
self.fps = config.fps
self.width = config.width
self.height = config.height
self.channels = config.channels
self.color_mode = config.color_mode
self.mock = config.mock
self.br: CvBridge = CvBridge()
self.image_msg: ImageMsg | None = None
self.is_connected: bool = False
self.sub = None
self.logs = {}
self.node = rclpy.create_node("lerobot_camera_node")
self.stop_event = threading.Event()
self.spin_thread = Thread(target=self.spin_node)
self.spin_thread.start()
# TODO(aliberts): Do we keep original width/height or do we define them after rotation?
self.rotation = None
if config.rotation == -90:
self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE
elif config.rotation == 90:
self.rotation = cv2.ROTATE_90_CLOCKWISE
elif config.rotation == 180:
self.rotation = cv2.ROTATE_180
def spin_node(self):
while rclpy.ok() and not self.stop_event.is_set():
rclpy.spin_once(self.node)
def connect(self):
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(f"ROS2Camera({self.config.topic}) is already connected.")
self.sub = self.node.create_subscription(ImageMsg, self.config.topic, self.sub_cb, 10)
while self.image_msg is None:
print(f"Waiting to receive message over {self.config.topic}")
time.sleep(1)
print(f"Successfully connected to ROS2Camera({self.config.topic})!")
self.is_connected = True
def sub_cb(self, msg):
self.image_msg = msg
self.width = msg.width
self.height = msg.height
def read(self, temporary_color: str | None = None) -> np.ndarray:
"""Read a frame from the camera returned in the format (height, width, channels)
(e.g. 480 x 640 x 3), contrarily to the pytorch format which is channel first.
Note: Reading a frame is done every `camera.fps` times per second, and it is blocking.
If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`.
"""
if not self.is_connected or self.image_msg is None:
raise RobotDeviceNotConnectedError(
f"ROS2Camera({self.node.get_name()}) is not connected. Try running `camera.connect()` first."
)
start_time = time.perf_counter()
desired_encoding = "passthrough" if temporary_color is None else temporary_color
image = self.br.imgmsg_to_cv2(self.image_msg, desired_encoding)
if self.rotation is not None:
image = cv2.rotate(image, self.rotation)
# log the number of seconds it took to read the image
self.logs["delta_timestamp_s"] = time.perf_counter() - start_time
# log the utc time at which the image was received
self.logs["timestamp_utc"] = capture_timestamp_utc()
print(image.shape)
print(image.dtype)
return image
def async_read(self):
return self.read()
def disconnect(self):
if not self.is_connected:
raise RobotDeviceNotConnectedError(
f"ROS2Camera({self.node.get_name()}) is not connected. Try running `camera.connect()` first."
)
if self.spin_thread is not None:
self.stop_event.set()
self.spin_thread.join() # wait for the thread to finish
self.spin_thread = None
self.stop_event = None
self.sub = None
self.is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()
rclpy.shutdown()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Save a few frames using `ROS2Camera` for all cameras connected to the computer, or a selected subset."
)
parser.add_argument(
"--topic",
type=str,
nargs="*",
default=None,
help="Name of the topic to subscribe to for images.",
)
parser.add_argument(
"--fps",
type=int,
default=30,
help="Set the number of frames recorded per seconds for all cameras. If not provided, use the default fps of each camera.",
)
parser.add_argument(
"--width",
type=str,
default=640,
help="Set the width for all cameras. If not provided, use the default width of each camera.",
)
parser.add_argument(
"--height",
type=str,
default=480,
help="Set the height for all cameras. If not provided, use the default height of each camera.",
)
parser.add_argument(
"--images-dir",
type=Path,
default="outputs/images_from_opencv_cameras",
help="Set directory to save a few frames for each camera.",
)
parser.add_argument(
"--record-time-s",
type=float,
default=4.0,
help="Set the number of seconds used to record the frames. By default, 2 seconds.",
)
args = parser.parse_args()
save_images_from_cameras(**vars(args))

View File

@ -44,6 +44,10 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> list[C
from lerobot.common.robot_devices.cameras.intelrealsense import IntelRealSenseCamera
cameras[key] = IntelRealSenseCamera(cfg)
elif cfg.type == "ros2":
from lerobot.common.robot_devices.cameras.ros2 import ROS2Camera
cameras[key] = ROS2Camera(cfg)
else:
raise ValueError(f"The camera type '{cfg.type}' is not valid.")