Move & organize cameras, add base class

This commit is contained in:
Simon Alibert 2025-03-03 18:01:48 +01:00
parent 48469ec674
commit 212c6095a2
10 changed files with 187 additions and 161 deletions

View File

@ -0,0 +1,4 @@
from .camera import Camera
from .configs import CameraConfig
__all__ = ["Camera", "CameraConfig"]

View File

@ -0,0 +1,25 @@
import abc
import numpy as np
class Camera(abc.ABC):
@abc.abstractmethod
def connect(self):
pass
@abc.abstractmethod
def read(self, temporary_color: str | None = None) -> np.ndarray:
pass
@abc.abstractmethod
def async_read(self) -> np.ndarray:
pass
@abc.abstractmethod
def disconnect(self):
pass
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()

View File

@ -9,92 +9,3 @@ class CameraConfig(draccus.ChoiceRegistry, abc.ABC):
@property
def type(self) -> str:
return self.get_choice_name(self.__class__)
@CameraConfig.register_subclass("opencv")
@dataclass
class OpenCVCameraConfig(CameraConfig):
"""
Example of tested options for Intel Real Sense D405:
```python
OpenCVCameraConfig(0, 30, 640, 480)
OpenCVCameraConfig(0, 60, 640, 480)
OpenCVCameraConfig(0, 90, 640, 480)
OpenCVCameraConfig(0, 30, 1280, 720)
```
"""
camera_index: int
fps: int | None = None
width: int | None = None
height: int | None = None
color_mode: str = "rgb"
channels: int | None = None
rotation: int | None = None
mock: bool = False
def __post_init__(self):
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)
self.channels = 3
if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")
@CameraConfig.register_subclass("intelrealsense")
@dataclass
class IntelRealSenseCameraConfig(CameraConfig):
"""
Example of tested options for Intel Real Sense D405:
```python
IntelRealSenseCameraConfig(128422271347, 30, 640, 480)
IntelRealSenseCameraConfig(128422271347, 60, 640, 480)
IntelRealSenseCameraConfig(128422271347, 90, 640, 480)
IntelRealSenseCameraConfig(128422271347, 30, 1280, 720)
IntelRealSenseCameraConfig(128422271347, 30, 640, 480, use_depth=True)
IntelRealSenseCameraConfig(128422271347, 30, 640, 480, rotation=90)
```
"""
name: str | None = None
serial_number: int | None = None
fps: int | None = None
width: int | None = None
height: int | None = None
color_mode: str = "rgb"
channels: int | None = None
use_depth: bool = False
force_hardware_reset: bool = True
rotation: int | None = None
mock: bool = False
def __post_init__(self):
# bool is stronger than is None, since it works with empty strings
if bool(self.name) and bool(self.serial_number):
raise ValueError(
f"One of them must be set: name or serial_number, but {self.name=} and {self.serial_number=} provided."
)
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)
self.channels = 3
at_least_one_is_not_none = self.fps is not None or self.width is not None or self.height is not None
at_least_one_is_none = self.fps is None or self.width is None or self.height is None
if at_least_one_is_not_none and at_least_one_is_none:
raise ValueError(
"For `fps`, `width` and `height`, either all of them need to be set, or none of them, "
f"but {self.fps=}, {self.width=}, {self.height=} were provided."
)
if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")

View File

@ -0,0 +1,4 @@
from .camera_realsense import RealSenseCamera
from .configuration_realsense import RealSenseCameraConfig
__all__ = ["RealSenseCamera", "RealSenseCameraConfig"]

View File

@ -17,14 +17,15 @@ from threading import Thread
import numpy as np
from PIL import Image
from lerobot.common.cameras.configs import IntelRealSenseCameraConfig
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.utils.robot_utils import (
RobotDeviceAlreadyConnectedError,
RobotDeviceNotConnectedError,
busy_wait,
)
from lerobot.common.utils.utils import capture_timestamp_utc
from ..camera import Camera
from .configuration_realsense import RealSenseCameraConfig
SERIAL_NUMBER_INDEX = 1
@ -94,13 +95,11 @@ def save_images_from_cameras(
cameras = []
for cam_sn in serial_numbers:
print(f"{cam_sn=}")
config = IntelRealSenseCameraConfig(
serial_number=cam_sn, fps=fps, width=width, height=height, mock=mock
)
camera = IntelRealSenseCamera(config)
config = RealSenseCameraConfig(serial_number=cam_sn, fps=fps, width=width, height=height, mock=mock)
camera = RealSenseCamera(config)
camera.connect()
print(
f"IntelRealSenseCamera({camera.serial_number}, fps={camera.fps}, width={camera.width}, height={camera.height}, color_mode={camera.color_mode})"
f"RealSenseCamera({camera.serial_number}, fps={camera.fps}, width={camera.width}, height={camera.height}, color_mode={camera.color_mode})"
)
cameras.append(camera)
@ -152,11 +151,11 @@ def save_images_from_cameras(
camera.disconnect()
class IntelRealSenseCamera:
class RealSenseCamera(Camera):
"""
The IntelRealSenseCamera class is similar to OpenCVCamera class but adds additional features for Intel Real Sense cameras:
The RealSenseCamera class is similar to OpenCVCamera class but adds additional features for Intel Real Sense cameras:
- is instantiated with the serial number of the camera - won't randomly change as it can be the case of OpenCVCamera for Linux,
- can also be instantiated with the camera's name — if it's unique using IntelRealSenseCamera.init_from_name(),
- can also be instantiated with the camera's name — if it's unique using RealSenseCamera.init_from_name(),
- depth map can be returned.
To find the camera indices of your cameras, you can run our utility script that will save a few frames for each camera:
@ -164,15 +163,15 @@ class IntelRealSenseCamera:
python lerobot/common/robot_devices/cameras/intelrealsense.py --images-dir outputs/images_from_intelrealsense_cameras
```
When an IntelRealSenseCamera is instantiated, if no specific config is provided, the default fps, width, height and color_mode
When an RealSenseCamera is instantiated, if no specific config is provided, the default fps, width, height and color_mode
of the given camera will be used.
Example of instantiating with a serial number:
```python
from lerobot.common.robot_devices.cameras.configs import IntelRealSenseCameraConfig
from lerobot.common.robot_devices.cameras.configs import RealSenseCameraConfig
config = IntelRealSenseCameraConfig(serial_number=128422271347)
camera = IntelRealSenseCamera(config)
config = RealSenseCameraConfig(serial_number=128422271347)
camera = RealSenseCamera(config)
camera.connect()
color_image = camera.read()
# when done using the camera, consider disconnecting
@ -181,21 +180,21 @@ class IntelRealSenseCamera:
Example of instantiating with a name if it's unique:
```
config = IntelRealSenseCameraConfig(name="Intel RealSense D405")
config = RealSenseCameraConfig(name="Intel RealSense D405")
```
Example of changing default fps, width, height and color_mode:
```python
config = IntelRealSenseCameraConfig(serial_number=128422271347, fps=30, width=1280, height=720)
config = IntelRealSenseCameraConfig(serial_number=128422271347, fps=90, width=640, height=480)
config = IntelRealSenseCameraConfig(serial_number=128422271347, fps=90, width=640, height=480, color_mode="bgr")
config = RealSenseCameraConfig(serial_number=128422271347, fps=30, width=1280, height=720)
config = RealSenseCameraConfig(serial_number=128422271347, fps=90, width=640, height=480)
config = RealSenseCameraConfig(serial_number=128422271347, fps=90, width=640, height=480, color_mode="bgr")
# Note: might error out upon `camera.connect()` if these settings are not compatible with the camera
```
Example of returning depth:
```python
config = IntelRealSenseCameraConfig(serial_number=128422271347, use_depth=True)
camera = IntelRealSenseCamera(config)
config = RealSenseCameraConfig(serial_number=128422271347, use_depth=True)
camera = RealSenseCamera(config)
camera.connect()
color_image, depth_map = camera.read()
```
@ -203,7 +202,7 @@ class IntelRealSenseCamera:
def __init__(
self,
config: IntelRealSenseCameraConfig,
config: RealSenseCameraConfig,
):
self.config = config
if config.name is not None:
@ -258,9 +257,7 @@ class IntelRealSenseCamera:
def connect(self):
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(
f"IntelRealSenseCamera({self.serial_number}) is already connected."
)
raise DeviceAlreadyConnectedError(f"RealSenseCamera({self.serial_number}) is already connected.")
if self.mock:
import tests.mock_pyrealsense2 as rs
@ -302,7 +299,7 @@ class IntelRealSenseCamera:
"To find the serial number you should use, run `python lerobot/common/robot_devices/cameras/intelrealsense.py`."
)
raise OSError(f"Can't access IntelRealSenseCamera({self.serial_number}).")
raise OSError(f"Can't access RealSenseCamera({self.serial_number}).")
color_stream = profile.get_stream(rs.stream.color)
color_profile = color_stream.as_video_stream_profile()
@ -314,15 +311,15 @@ class IntelRealSenseCamera:
if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
# Using `OSError` since it's a broad that encompasses issues related to device communication
raise OSError(
f"Can't set {self.fps=} for IntelRealSenseCamera({self.serial_number}). Actual value is {actual_fps}."
f"Can't set {self.fps=} for RealSenseCamera({self.serial_number}). Actual value is {actual_fps}."
)
if self.width is not None and self.width != actual_width:
raise OSError(
f"Can't set {self.width=} for IntelRealSenseCamera({self.serial_number}). Actual value is {actual_width}."
f"Can't set {self.width=} for RealSenseCamera({self.serial_number}). Actual value is {actual_width}."
)
if self.height is not None and self.height != actual_height:
raise OSError(
f"Can't set {self.height=} for IntelRealSenseCamera({self.serial_number}). Actual value is {actual_height}."
f"Can't set {self.height=} for RealSenseCamera({self.serial_number}). Actual value is {actual_height}."
)
self.fps = round(actual_fps)
@ -342,8 +339,8 @@ class IntelRealSenseCamera:
If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`.
"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
f"IntelRealSenseCamera({self.serial_number}) is not connected. Try running `camera.connect()` first."
raise DeviceNotConnectedError(
f"RealSenseCamera({self.serial_number}) is not connected. Try running `camera.connect()` first."
)
if self.mock:
@ -358,7 +355,7 @@ class IntelRealSenseCamera:
color_frame = frame.get_color_frame()
if not color_frame:
raise OSError(f"Can't capture color image from IntelRealSenseCamera({self.serial_number}).")
raise OSError(f"Can't capture color image from RealSenseCamera({self.serial_number}).")
color_image = np.asanyarray(color_frame.get_data())
@ -390,7 +387,7 @@ class IntelRealSenseCamera:
if self.use_depth:
depth_frame = frame.get_depth_frame()
if not depth_frame:
raise OSError(f"Can't capture depth image from IntelRealSenseCamera({self.serial_number}).")
raise OSError(f"Can't capture depth image from RealSenseCamera({self.serial_number}).")
depth_map = np.asanyarray(depth_frame.get_data())
@ -417,8 +414,8 @@ class IntelRealSenseCamera:
def async_read(self):
"""Access the latest color image"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
f"IntelRealSenseCamera({self.serial_number}) is not connected. Try running `camera.connect()` first."
raise DeviceNotConnectedError(
f"RealSenseCamera({self.serial_number}) is not connected. Try running `camera.connect()` first."
)
if self.thread is None:
@ -444,8 +441,8 @@ class IntelRealSenseCamera:
def disconnect(self):
if not self.is_connected:
raise RobotDeviceNotConnectedError(
f"IntelRealSenseCamera({self.serial_number}) is not connected. Try running `camera.connect()` first."
raise DeviceNotConnectedError(
f"RealSenseCamera({self.serial_number}) is not connected. Try running `camera.connect()` first."
)
if self.thread is not None and self.thread.is_alive():
@ -467,14 +464,14 @@ class IntelRealSenseCamera:
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Save a few frames using `IntelRealSenseCamera` for all cameras connected to the computer, or a selected subset."
description="Save a few frames using `RealSenseCamera` for all cameras connected to the computer, or a selected subset."
)
parser.add_argument(
"--serial-numbers",
type=int,
nargs="*",
default=None,
help="List of serial numbers used to instantiate the `IntelRealSenseCamera`. If not provided, find and use all available camera indices.",
help="List of serial numbers used to instantiate the `RealSenseCamera`. If not provided, find and use all available camera indices.",
)
parser.add_argument(
"--fps",

View File

@ -0,0 +1,57 @@
from dataclasses import dataclass
from ..configs import CameraConfig
@CameraConfig.register_subclass("intelrealsense")
@dataclass
class RealSenseCameraConfig(CameraConfig):
"""
Example of tested options for Intel Real Sense D405:
```python
RealSenseCameraConfig(128422271347, 30, 640, 480)
RealSenseCameraConfig(128422271347, 60, 640, 480)
RealSenseCameraConfig(128422271347, 90, 640, 480)
RealSenseCameraConfig(128422271347, 30, 1280, 720)
RealSenseCameraConfig(128422271347, 30, 640, 480, use_depth=True)
RealSenseCameraConfig(128422271347, 30, 640, 480, rotation=90)
```
"""
name: str | None = None
serial_number: int | None = None
fps: int | None = None
width: int | None = None
height: int | None = None
color_mode: str = "rgb"
channels: int | None = None
use_depth: bool = False
force_hardware_reset: bool = True
rotation: int | None = None
mock: bool = False
def __post_init__(self):
# bool is stronger than is None, since it works with empty strings
if bool(self.name) and bool(self.serial_number):
raise ValueError(
f"One of them must be set: name or serial_number, but {self.name=} and {self.serial_number=} provided."
)
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)
self.channels = 3
at_least_one_is_not_none = self.fps is not None or self.width is not None or self.height is not None
at_least_one_is_none = self.fps is None or self.width is None or self.height is None
if at_least_one_is_not_none and at_least_one_is_none:
raise ValueError(
"For `fps`, `width` and `height`, either all of them need to be set, or none of them, "
f"but {self.fps=}, {self.width=}, {self.height=} were provided."
)
if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")

View File

@ -0,0 +1,4 @@
from .camera_opencv import OpenCVCamera
from .configuration_opencv import OpenCVCameraConfig
__all__ = ["OpenCVCamera", "OpenCVCameraConfig"]

View File

@ -15,14 +15,15 @@ from threading import Thread
import numpy as np
from PIL import Image
from lerobot.common.cameras.configs import OpenCVCameraConfig
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.utils.robot_utils import (
RobotDeviceAlreadyConnectedError,
RobotDeviceNotConnectedError,
busy_wait,
)
from lerobot.common.utils.utils import capture_timestamp_utc
from ..camera import Camera
from .configuration_opencv import OpenCVCameraConfig
# The maximum opencv device index depends on your operating system. For instance,
# if you have 3 cameras, they should be associated to index 0, 1, and 2. This is the case
# on MacOS. However, on Ubuntu, the indices are different like 6, 16, 23.
@ -176,7 +177,7 @@ def save_images_from_cameras(
print(f"Images have been saved to {images_dir}")
class OpenCVCamera:
class OpenCVCamera(Camera):
"""
The OpenCVCamera class allows to efficiently record images from cameras. It relies on opencv2 to communicate
with the cameras. Most cameras are compatible. For more info, see the [Video I/O with OpenCV Overview](https://docs.opencv.org/4.x/d0/da7/videoio_overview.html).
@ -260,7 +261,7 @@ class OpenCVCamera:
def connect(self):
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.")
raise DeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.")
if self.mock:
import tests.mock_cv2 as cv2
@ -339,7 +340,7 @@ class OpenCVCamera:
If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`.
"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
raise DeviceNotConnectedError(
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
)
@ -396,7 +397,7 @@ class OpenCVCamera:
def async_read(self):
if not self.is_connected:
raise RobotDeviceNotConnectedError(
raise DeviceNotConnectedError(
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
)
@ -418,7 +419,7 @@ class OpenCVCamera:
def disconnect(self):
if not self.is_connected:
raise RobotDeviceNotConnectedError(
raise DeviceNotConnectedError(
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
)

View File

@ -0,0 +1,38 @@
from dataclasses import dataclass
from ..configs import CameraConfig
@CameraConfig.register_subclass("opencv")
@dataclass
class OpenCVCameraConfig(CameraConfig):
"""
Example of tested options for Intel Real Sense D405:
```python
OpenCVCameraConfig(0, 30, 640, 480)
OpenCVCameraConfig(0, 60, 640, 480)
OpenCVCameraConfig(0, 90, 640, 480)
OpenCVCameraConfig(0, 30, 1280, 720)
```
"""
camera_index: int
fps: int | None = None
width: int | None = None
height: int | None = None
color_mode: str = "rgb"
channels: int | None = None
rotation: int | None = None
mock: bool = False
def __post_init__(self):
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"`color_mode` is expected to be 'rgb' or 'bgr', but {self.color_mode} is provided."
)
self.channels = 3
if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")

View File

@ -1,35 +1,20 @@
from typing import Protocol
import numpy as np
from lerobot.common.cameras.configs import (
CameraConfig,
IntelRealSenseCameraConfig,
OpenCVCameraConfig,
)
from .camera import Camera
from .configs import CameraConfig
# Defines a camera type
class Camera(Protocol):
def connect(self): ...
def read(self, temporary_color: str | None = None) -> np.ndarray: ...
def async_read(self) -> np.ndarray: ...
def disconnect(self): ...
def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> list[Camera]:
def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> dict[str, Camera]:
cameras = {}
for key, cfg in camera_configs.items():
if cfg.type == "opencv":
from lerobot.common.cameras.opencv import OpenCVCamera
from .opencv import OpenCVCamera
cameras[key] = OpenCVCamera(cfg)
elif cfg.type == "intelrealsense":
from lerobot.common.cameras.intelrealsense import IntelRealSenseCamera
from .intel.camera_realsense import RealSenseCamera
cameras[key] = IntelRealSenseCamera(cfg)
cameras[key] = RealSenseCamera(cfg)
else:
raise ValueError(f"The motor type '{cfg.type}' is not valid.")
@ -38,16 +23,16 @@ def make_cameras_from_configs(camera_configs: dict[str, CameraConfig]) -> list[C
def make_camera(camera_type, **kwargs) -> Camera:
if camera_type == "opencv":
from lerobot.common.cameras.opencv import OpenCVCamera
from .opencv import OpenCVCamera, OpenCVCameraConfig
config = OpenCVCameraConfig(**kwargs)
return OpenCVCamera(config)
elif camera_type == "intelrealsense":
from lerobot.common.cameras.intelrealsense import IntelRealSenseCamera
from .intel import RealSenseCamera, RealSenseCameraConfig
config = IntelRealSenseCameraConfig(**kwargs)
return IntelRealSenseCamera(config)
config = RealSenseCameraConfig(**kwargs)
return RealSenseCamera(config)
else:
raise ValueError(f"The camera type '{camera_type}' is not valid.")