From 130c16b43dfdeb23a5d76cb4f6cf02a21d2ab16a Mon Sep 17 00:00:00 2001 From: Yadunund Date: Sat, 15 Mar 2025 21:04:17 -0700 Subject: [PATCH 1/2] Add support for cameras over ROS 2 Signed-off-by: Yadunund --- .../common/robot_devices/cameras/configs.py | 32 ++ lerobot/common/robot_devices/cameras/ros2.py | 289 ++++++++++++++++++ lerobot/common/robot_devices/cameras/utils.py | 4 + .../common/robot_devices/robots/configs.py | 1 + 4 files changed, 326 insertions(+) create mode 100644 lerobot/common/robot_devices/cameras/ros2.py diff --git a/lerobot/common/robot_devices/cameras/configs.py b/lerobot/common/robot_devices/cameras/configs.py index 013419a9..b7c1e37d 100644 --- a/lerobot/common/robot_devices/cameras/configs.py +++ b/lerobot/common/robot_devices/cameras/configs.py @@ -112,3 +112,35 @@ class IntelRealSenseCameraConfig(CameraConfig): if self.rotation not in [-90, None, 90, 180]: raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})") + +@CameraConfig.register_subclass("ros2") +@dataclass +class ROS2CameraConfig(CameraConfig): + """ + Example of tested options for any camera with a ROS 2 driver: + + ```python + ROS2CameraConfig(topic="/image_raw", fps=30, width=640, height=480) + ``` + """ + # TODO(Yadunund): Consider converting inputs to lists to subscribe to multiple + # topics with the same node. + topic: str + fps: int | None = None + width: int | None = None + height: int | None = None + color_mode: str = "rgb" + channels: int | None = None + rotation: int | None = None + mock: bool = False + + def __post_init__(self): + if self.color_mode not in ["rgb", "bgr"]: + raise ValueError( + f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided." + ) + + self.channels = 3 + + if self.rotation not in [-90, None, 90, 180]: + raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})") diff --git a/lerobot/common/robot_devices/cameras/ros2.py b/lerobot/common/robot_devices/cameras/ros2.py new file mode 100644 index 00000000..a6a8d0d9 --- /dev/null +++ b/lerobot/common/robot_devices/cameras/ros2.py @@ -0,0 +1,289 @@ +""" +This file contains utilities for recording frames from cameras. For more info look at `ROS2Camera` docstring. +""" + +import argparse +import concurrent.futures +import shutil +import threading +import time +from pathlib import Path +import time +from threading import Thread + +import numpy as np +from PIL import Image + +from lerobot.common.robot_devices.cameras.configs import ROS2CameraConfig +from lerobot.common.robot_devices.utils import ( + RobotDeviceAlreadyConnectedError, + RobotDeviceNotConnectedError, + busy_wait, +) +from lerobot.common.utils.utils import capture_timestamp_utc + +# TODO(Yadunund): Implement mock. +import cv2 +from cv_bridge import CvBridge +from sensor_msgs.msg import Image as ImageMsg +import rclpy + + +def save_image(img_array, camera_name, frame_index, images_dir): + img = Image.fromarray(img_array) + path = images_dir / f"camera_{camera_name:02d}_frame_{frame_index:06d}.png" + path.parent.mkdir(parents=True, exist_ok=True) + img.save(str(path), quality=100) + + +def save_images_from_cameras( + images_dir: Path, + topic: str | None = None, + record_time_s=2, + mock=False, +): + print("Connecting cameras") + cameras = [] + config = ROS2CameraConfig(topic=topic, mock=mock) + camera = ROS2Camera(config) + camera.connect() + print("Here 0") + print( + f"ROS2Camera({camera.node.get_name()}" + ) + cameras.append(camera) + + images_dir = Path(images_dir) + if images_dir.exists(): + shutil.rmtree( + images_dir, + ) + images_dir.mkdir(parents=True, exist_ok=True) + + print(f"Saving images to {images_dir}") + frame_index = 0 + start_time = time.perf_counter() + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + while True: + now = time.perf_counter() + + for camera in cameras: + # If we use async_read when fps is None, the loop will go full speed, and we will endup + # saving the same images from the cameras multiple times until the RAM/disk is full. + image = camera.read() + executor.submit( + save_image, + image, + config.topic, + frame_index, + images_dir, + ) + + print(f"Frame: {frame_index:04d}\tLatency (ms): {(time.perf_counter() - now) * 1000:.2f}") + + if time.perf_counter() - start_time > record_time_s: + break + + frame_index += 1 + + print(f"Images have been saved to {images_dir}") + + +class ROS2Camera: + """ + The ROS2Camera class allows to record images from cameras over ROS 2. + + An ROS2Camera instance requires a ROS 2 driver for the camera to be running with images published over a known topic, eg. /camera/image. + It is therefore compatible with any camera that has a ROS 2 driver which may be running on the same host or any other host on the same network. + + To find the topic published by the ROS 2 driver, you may list the various topics in your network: + ```bash + # Make sure to source your ROS 2 installation first. + ros2 topic list + ``` + + Example of usage: + ```python + from lerobot.common.robot_devices.cameras.configs import ROS2CameraConfig + + config = ROS2CameraConfig(node.name=0) + camera = ROS2Camera(config) + camera.connect() + color_image = camera.read() + # when done using the camera, consider disconnecting + camera.disconnect() + ``` + + Example of configuring the camera: + + Note: Apart from the topic name, all other options are only configurable + on the ROS 2 driver side. We could infer some of these settings once we + successfully subscribe to the camera topic but some layers of the lerobot + stack (eg. recording an episode) require these attributes to be present in + the CameraConfig implementation before the first frame is read from the + camera. So for now, the user needs to ensure the settings here match those + on the ROS 2 driver side. + + ```python + config = ROS2CameraConfig(topic="/image_raw", fps=30, width=640, height=480) + # Note: might error out open `camera.connect()` if these settings are not compatible with the camera + ``` + """ + + def __init__(self, config: ROS2CameraConfig): + rclpy.init() + self.config = config + self.fps = config.fps + self.width = config.width + self.height = config.height + self.channels = config.channels + self.color_mode = config.color_mode + self.mock = config.mock + + self.br : CvBridge = CvBridge() + self.image_msg : ImageMsg | None = None + self.is_connected : bool = False + self.sub = None + self.logs = {} + self.node = rclpy.create_node("lerobot_camera_node") + self.stop_event = threading.Event() + self.spin_thread = Thread(target=self.spin_node) + self.spin_thread.start() + + # TODO(aliberts): Do we keep original width/height or do we define them after rotation? + self.rotation = None + if config.rotation == -90: + self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE + elif config.rotation == 90: + self.rotation = cv2.ROTATE_90_CLOCKWISE + elif config.rotation == 180: + self.rotation = cv2.ROTATE_180 + + def spin_node(self): + while rclpy.ok() and not self.stop_event.is_set(): + rclpy.spin_once(self.node) + + def connect(self): + if self.is_connected: + raise RobotDeviceAlreadyConnectedError(f"ROS2Camera({self.config.topic}) is already connected.") + + self.sub = self.node.create_subscription( + ImageMsg, + self.config.topic, + self.sub_cb, + 10 + ) + + while self.image_msg is None: + print(f"Waiting to receive message over {self.config.topic}") + time.sleep(1) + + print(f"Successfully connected to ROS2Camera({self.config.topic})!") + self.is_connected = True + + def sub_cb(self, msg): + self.image_msg = msg + self.width = msg.width + self.height = msg.height + + def read(self, temporary_color: str | None = None) -> np.ndarray: + """Read a frame from the camera returned in the format (height, width, channels) + (e.g. 480 x 640 x 3), contrarily to the pytorch format which is channel first. + + Note: Reading a frame is done every `camera.fps` times per second, and it is blocking. + If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`. + """ + if not self.is_connected or self.image_msg is None: + raise RobotDeviceNotConnectedError( + f"ROS2Camera({self.node.get_name()}) is not connected. Try running `camera.connect()` first." + ) + + + start_time = time.perf_counter() + + desired_encoding = "passthrough" if temporary_color is None else temporary_color + + image = self.br.imgmsg_to_cv2(self.image_msg, desired_encoding) + + if self.rotation is not None: + image = cv2.rotate(image, self.rotation) + + # log the number of seconds it took to read the image + self.logs["delta_timestamp_s"] = time.perf_counter() - start_time + + # log the utc time at which the image was received + self.logs["timestamp_utc"] = capture_timestamp_utc() + + print(image.shape) + print(image.dtype) + + return image + + + def async_read(self): + return self.read() + + def disconnect(self): + if not self.is_connected: + raise RobotDeviceNotConnectedError( + f"ROS2Camera({self.node.get_name()}) is not connected. Try running `camera.connect()` first." + ) + + if self.spin_thread is not None: + self.stop_event.set() + self.spin_thread.join() # wait for the thread to finish + self.spin_thread = None + self.stop_event = None + + self.sub = None + self.is_connected = False + + def __del__(self): + if getattr(self, "is_connected", False): + self.disconnect() + rclpy.shutdown() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Save a few frames using `ROS2Camera` for all cameras connected to the computer, or a selected subset." + ) + parser.add_argument( + "--topic", + type=str, + nargs="*", + default=None, + help="Name of the topic to subscribe to for images.", + ) + parser.add_argument( + "--fps", + type=int, + default=30, + help="Set the number of frames recorded per seconds for all cameras. If not provided, use the default fps of each camera.", + ) + parser.add_argument( + "--width", + type=str, + default=640, + help="Set the width for all cameras. If not provided, use the default width of each camera.", + ) + parser.add_argument( + "--height", + type=str, + default=480, + help="Set the height for all cameras. If not provided, use the default height of each camera.", + ) + parser.add_argument( + "--images-dir", + type=Path, + default="outputs/images_from_opencv_cameras", + help="Set directory to save a few frames for each camera.", + ) + parser.add_argument( + "--record-time-s", + type=float, + default=4.0, + help="Set the number of seconds used to record the frames. By default, 2 seconds.", + ) + args = parser.parse_args() + save_images_from_cameras(**vars(args)) diff --git a/lerobot/common/robot_devices/cameras/utils.py b/lerobot/common/robot_devices/cameras/utils.py index c6431646..48eca2e4 100644 --- a/lerobot/common/robot_devices/cameras/utils.py +++ b/lerobot/common/robot_devices/cameras/utils.py @@ -44,6 +44,10 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> list[C from lerobot.common.robot_devices.cameras.intelrealsense import IntelRealSenseCamera cameras[key] = IntelRealSenseCamera(cfg) + elif cfg.type == "ros2": + from lerobot.common.robot_devices.cameras.ros2 import ROS2Camera + + cameras[key] = ROS2Camera(cfg) else: raise ValueError(f"The camera type '{cfg.type}' is not valid.") diff --git a/lerobot/common/robot_devices/robots/configs.py b/lerobot/common/robot_devices/robots/configs.py index e940b442..a970cb5e 100644 --- a/lerobot/common/robot_devices/robots/configs.py +++ b/lerobot/common/robot_devices/robots/configs.py @@ -22,6 +22,7 @@ from lerobot.common.robot_devices.cameras.configs import ( CameraConfig, IntelRealSenseCameraConfig, OpenCVCameraConfig, + ROS2CameraConfig, ) from lerobot.common.robot_devices.motors.configs import ( DynamixelMotorsBusConfig, From d978053ebf5f6db1f8cf315d5b7ed24c94bff66f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 16 Mar 2025 06:54:44 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../common/robot_devices/cameras/configs.py | 2 ++ lerobot/common/robot_devices/cameras/ros2.py | 34 ++++++------------- .../common/robot_devices/robots/configs.py | 1 - 3 files changed, 13 insertions(+), 24 deletions(-) diff --git a/lerobot/common/robot_devices/cameras/configs.py b/lerobot/common/robot_devices/cameras/configs.py index b7c1e37d..41eeaf5f 100644 --- a/lerobot/common/robot_devices/cameras/configs.py +++ b/lerobot/common/robot_devices/cameras/configs.py @@ -113,6 +113,7 @@ class IntelRealSenseCameraConfig(CameraConfig): if self.rotation not in [-90, None, 90, 180]: raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})") + @CameraConfig.register_subclass("ros2") @dataclass class ROS2CameraConfig(CameraConfig): @@ -123,6 +124,7 @@ class ROS2CameraConfig(CameraConfig): ROS2CameraConfig(topic="/image_raw", fps=30, width=640, height=480) ``` """ + # TODO(Yadunund): Consider converting inputs to lists to subscribe to multiple # topics with the same node. topic: str diff --git a/lerobot/common/robot_devices/cameras/ros2.py b/lerobot/common/robot_devices/cameras/ros2.py index a6a8d0d9..e7f814f2 100644 --- a/lerobot/common/robot_devices/cameras/ros2.py +++ b/lerobot/common/robot_devices/cameras/ros2.py @@ -8,26 +8,23 @@ import shutil import threading import time from pathlib import Path -import time from threading import Thread +# TODO(Yadunund): Implement mock. +import cv2 import numpy as np +import rclpy +from cv_bridge import CvBridge from PIL import Image +from sensor_msgs.msg import Image as ImageMsg from lerobot.common.robot_devices.cameras.configs import ROS2CameraConfig from lerobot.common.robot_devices.utils import ( RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError, - busy_wait, ) from lerobot.common.utils.utils import capture_timestamp_utc -# TODO(Yadunund): Implement mock. -import cv2 -from cv_bridge import CvBridge -from sensor_msgs.msg import Image as ImageMsg -import rclpy - def save_image(img_array, camera_name, frame_index, images_dir): img = Image.fromarray(img_array) @@ -48,9 +45,7 @@ def save_images_from_cameras( camera = ROS2Camera(config) camera.connect() print("Here 0") - print( - f"ROS2Camera({camera.node.get_name()}" - ) + print(f"ROS2Camera({camera.node.get_name()}") cameras.append(camera) images_dir = Path(images_dir) @@ -140,9 +135,9 @@ class ROS2Camera: self.color_mode = config.color_mode self.mock = config.mock - self.br : CvBridge = CvBridge() - self.image_msg : ImageMsg | None = None - self.is_connected : bool = False + self.br: CvBridge = CvBridge() + self.image_msg: ImageMsg | None = None + self.is_connected: bool = False self.sub = None self.logs = {} self.node = rclpy.create_node("lerobot_camera_node") @@ -167,12 +162,7 @@ class ROS2Camera: if self.is_connected: raise RobotDeviceAlreadyConnectedError(f"ROS2Camera({self.config.topic}) is already connected.") - self.sub = self.node.create_subscription( - ImageMsg, - self.config.topic, - self.sub_cb, - 10 - ) + self.sub = self.node.create_subscription(ImageMsg, self.config.topic, self.sub_cb, 10) while self.image_msg is None: print(f"Waiting to receive message over {self.config.topic}") @@ -198,7 +188,6 @@ class ROS2Camera: f"ROS2Camera({self.node.get_name()}) is not connected. Try running `camera.connect()` first." ) - start_time = time.perf_counter() desired_encoding = "passthrough" if temporary_color is None else temporary_color @@ -219,9 +208,8 @@ class ROS2Camera: return image - def async_read(self): - return self.read() + return self.read() def disconnect(self): if not self.is_connected: diff --git a/lerobot/common/robot_devices/robots/configs.py b/lerobot/common/robot_devices/robots/configs.py index a970cb5e..e940b442 100644 --- a/lerobot/common/robot_devices/robots/configs.py +++ b/lerobot/common/robot_devices/robots/configs.py @@ -22,7 +22,6 @@ from lerobot.common.robot_devices.cameras.configs import ( CameraConfig, IntelRealSenseCameraConfig, OpenCVCameraConfig, - ROS2CameraConfig, ) from lerobot.common.robot_devices.motors.configs import ( DynamixelMotorsBusConfig,