diff --git a/lerobot/common/robot_devices/cameras/configs.py b/lerobot/common/robot_devices/cameras/configs.py index 013419a9..41eeaf5f 100644 --- a/lerobot/common/robot_devices/cameras/configs.py +++ b/lerobot/common/robot_devices/cameras/configs.py @@ -112,3 +112,37 @@ class IntelRealSenseCameraConfig(CameraConfig): if self.rotation not in [-90, None, 90, 180]: raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})") + + +@CameraConfig.register_subclass("ros2") +@dataclass +class ROS2CameraConfig(CameraConfig): + """ + Example of tested options for any camera with a ROS 2 driver: + + ```python + ROS2CameraConfig(topic="/image_raw", fps=30, width=640, height=480) + ``` + """ + + # TODO(Yadunund): Consider converting inputs to lists to subscribe to multiple + # topics with the same node. + topic: str + fps: int | None = None + width: int | None = None + height: int | None = None + color_mode: str = "rgb" + channels: int | None = None + rotation: int | None = None + mock: bool = False + + def __post_init__(self): + if self.color_mode not in ["rgb", "bgr"]: + raise ValueError( + f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided." + ) + + self.channels = 3 + + if self.rotation not in [-90, None, 90, 180]: + raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})") diff --git a/lerobot/common/robot_devices/cameras/ros2.py b/lerobot/common/robot_devices/cameras/ros2.py new file mode 100644 index 00000000..e7f814f2 --- /dev/null +++ b/lerobot/common/robot_devices/cameras/ros2.py @@ -0,0 +1,277 @@ +""" +This file contains utilities for recording frames from cameras. For more info look at `ROS2Camera` docstring. +""" + +import argparse +import concurrent.futures +import shutil +import threading +import time +from pathlib import Path +from threading import Thread + +# TODO(Yadunund): Implement mock. +import cv2 +import numpy as np +import rclpy +from cv_bridge import CvBridge +from PIL import Image +from sensor_msgs.msg import Image as ImageMsg + +from lerobot.common.robot_devices.cameras.configs import ROS2CameraConfig +from lerobot.common.robot_devices.utils import ( + RobotDeviceAlreadyConnectedError, + RobotDeviceNotConnectedError, +) +from lerobot.common.utils.utils import capture_timestamp_utc + + +def save_image(img_array, camera_name, frame_index, images_dir): + img = Image.fromarray(img_array) + path = images_dir / f"camera_{camera_name:02d}_frame_{frame_index:06d}.png" + path.parent.mkdir(parents=True, exist_ok=True) + img.save(str(path), quality=100) + + +def save_images_from_cameras( + images_dir: Path, + topic: str | None = None, + record_time_s=2, + mock=False, +): + print("Connecting cameras") + cameras = [] + config = ROS2CameraConfig(topic=topic, mock=mock) + camera = ROS2Camera(config) + camera.connect() + print("Here 0") + print(f"ROS2Camera({camera.node.get_name()}") + cameras.append(camera) + + images_dir = Path(images_dir) + if images_dir.exists(): + shutil.rmtree( + images_dir, + ) + images_dir.mkdir(parents=True, exist_ok=True) + + print(f"Saving images to {images_dir}") + frame_index = 0 + start_time = time.perf_counter() + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + while True: + now = time.perf_counter() + + for camera in cameras: + # If we use async_read when fps is None, the loop will go full speed, and we will endup + # saving the same images from the cameras multiple times until the RAM/disk is full. + image = camera.read() + executor.submit( + save_image, + image, + config.topic, + frame_index, + images_dir, + ) + + print(f"Frame: {frame_index:04d}\tLatency (ms): {(time.perf_counter() - now) * 1000:.2f}") + + if time.perf_counter() - start_time > record_time_s: + break + + frame_index += 1 + + print(f"Images have been saved to {images_dir}") + + +class ROS2Camera: + """ + The ROS2Camera class allows to record images from cameras over ROS 2. + + An ROS2Camera instance requires a ROS 2 driver for the camera to be running with images published over a known topic, eg. /camera/image. + It is therefore compatible with any camera that has a ROS 2 driver which may be running on the same host or any other host on the same network. + + To find the topic published by the ROS 2 driver, you may list the various topics in your network: + ```bash + # Make sure to source your ROS 2 installation first. + ros2 topic list + ``` + + Example of usage: + ```python + from lerobot.common.robot_devices.cameras.configs import ROS2CameraConfig + + config = ROS2CameraConfig(node.name=0) + camera = ROS2Camera(config) + camera.connect() + color_image = camera.read() + # when done using the camera, consider disconnecting + camera.disconnect() + ``` + + Example of configuring the camera: + + Note: Apart from the topic name, all other options are only configurable + on the ROS 2 driver side. We could infer some of these settings once we + successfully subscribe to the camera topic but some layers of the lerobot + stack (eg. recording an episode) require these attributes to be present in + the CameraConfig implementation before the first frame is read from the + camera. So for now, the user needs to ensure the settings here match those + on the ROS 2 driver side. + + ```python + config = ROS2CameraConfig(topic="/image_raw", fps=30, width=640, height=480) + # Note: might error out open `camera.connect()` if these settings are not compatible with the camera + ``` + """ + + def __init__(self, config: ROS2CameraConfig): + rclpy.init() + self.config = config + self.fps = config.fps + self.width = config.width + self.height = config.height + self.channels = config.channels + self.color_mode = config.color_mode + self.mock = config.mock + + self.br: CvBridge = CvBridge() + self.image_msg: ImageMsg | None = None + self.is_connected: bool = False + self.sub = None + self.logs = {} + self.node = rclpy.create_node("lerobot_camera_node") + self.stop_event = threading.Event() + self.spin_thread = Thread(target=self.spin_node) + self.spin_thread.start() + + # TODO(aliberts): Do we keep original width/height or do we define them after rotation? + self.rotation = None + if config.rotation == -90: + self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE + elif config.rotation == 90: + self.rotation = cv2.ROTATE_90_CLOCKWISE + elif config.rotation == 180: + self.rotation = cv2.ROTATE_180 + + def spin_node(self): + while rclpy.ok() and not self.stop_event.is_set(): + rclpy.spin_once(self.node) + + def connect(self): + if self.is_connected: + raise RobotDeviceAlreadyConnectedError(f"ROS2Camera({self.config.topic}) is already connected.") + + self.sub = self.node.create_subscription(ImageMsg, self.config.topic, self.sub_cb, 10) + + while self.image_msg is None: + print(f"Waiting to receive message over {self.config.topic}") + time.sleep(1) + + print(f"Successfully connected to ROS2Camera({self.config.topic})!") + self.is_connected = True + + def sub_cb(self, msg): + self.image_msg = msg + self.width = msg.width + self.height = msg.height + + def read(self, temporary_color: str | None = None) -> np.ndarray: + """Read a frame from the camera returned in the format (height, width, channels) + (e.g. 480 x 640 x 3), contrarily to the pytorch format which is channel first. + + Note: Reading a frame is done every `camera.fps` times per second, and it is blocking. + If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`. + """ + if not self.is_connected or self.image_msg is None: + raise RobotDeviceNotConnectedError( + f"ROS2Camera({self.node.get_name()}) is not connected. Try running `camera.connect()` first." + ) + + start_time = time.perf_counter() + + desired_encoding = "passthrough" if temporary_color is None else temporary_color + + image = self.br.imgmsg_to_cv2(self.image_msg, desired_encoding) + + if self.rotation is not None: + image = cv2.rotate(image, self.rotation) + + # log the number of seconds it took to read the image + self.logs["delta_timestamp_s"] = time.perf_counter() - start_time + + # log the utc time at which the image was received + self.logs["timestamp_utc"] = capture_timestamp_utc() + + print(image.shape) + print(image.dtype) + + return image + + def async_read(self): + return self.read() + + def disconnect(self): + if not self.is_connected: + raise RobotDeviceNotConnectedError( + f"ROS2Camera({self.node.get_name()}) is not connected. Try running `camera.connect()` first." + ) + + if self.spin_thread is not None: + self.stop_event.set() + self.spin_thread.join() # wait for the thread to finish + self.spin_thread = None + self.stop_event = None + + self.sub = None + self.is_connected = False + + def __del__(self): + if getattr(self, "is_connected", False): + self.disconnect() + rclpy.shutdown() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Save a few frames using `ROS2Camera` for all cameras connected to the computer, or a selected subset." + ) + parser.add_argument( + "--topic", + type=str, + nargs="*", + default=None, + help="Name of the topic to subscribe to for images.", + ) + parser.add_argument( + "--fps", + type=int, + default=30, + help="Set the number of frames recorded per seconds for all cameras. If not provided, use the default fps of each camera.", + ) + parser.add_argument( + "--width", + type=str, + default=640, + help="Set the width for all cameras. If not provided, use the default width of each camera.", + ) + parser.add_argument( + "--height", + type=str, + default=480, + help="Set the height for all cameras. If not provided, use the default height of each camera.", + ) + parser.add_argument( + "--images-dir", + type=Path, + default="outputs/images_from_opencv_cameras", + help="Set directory to save a few frames for each camera.", + ) + parser.add_argument( + "--record-time-s", + type=float, + default=4.0, + help="Set the number of seconds used to record the frames. By default, 2 seconds.", + ) + args = parser.parse_args() + save_images_from_cameras(**vars(args)) diff --git a/lerobot/common/robot_devices/cameras/utils.py b/lerobot/common/robot_devices/cameras/utils.py index c6431646..48eca2e4 100644 --- a/lerobot/common/robot_devices/cameras/utils.py +++ b/lerobot/common/robot_devices/cameras/utils.py @@ -44,6 +44,10 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> list[C from lerobot.common.robot_devices.cameras.intelrealsense import IntelRealSenseCamera cameras[key] = IntelRealSenseCamera(cfg) + elif cfg.type == "ros2": + from lerobot.common.robot_devices.cameras.ros2 import ROS2Camera + + cameras[key] = ROS2Camera(cfg) else: raise ValueError(f"The camera type '{cfg.type}' is not valid.")