diff --git a/lerobot/common/cameras/camera.py b/lerobot/common/cameras/camera.py index 5d9ac509..2fbdb0dc 100644 --- a/lerobot/common/cameras/camera.py +++ b/lerobot/common/cameras/camera.py @@ -9,7 +9,7 @@ class Camera(abc.ABC): pass @abc.abstractmethod - def read(self, temporary_color: str | None = None) -> np.ndarray: + def read(self, temporary_color_mode: str | None = None) -> np.ndarray: pass @abc.abstractmethod diff --git a/lerobot/common/cameras/interface_camera_sdk.py b/lerobot/common/cameras/interface_camera_sdk.py new file mode 100644 index 00000000..e4c6788c --- /dev/null +++ b/lerobot/common/cameras/interface_camera_sdk.py @@ -0,0 +1,305 @@ +# ruff: noqa: N802,N803 +import abc +from typing import Optional, Tuple + +import numpy as np + + +# --- Interface Definition --- +class IVideoCapture(abc.ABC): + """Interface for the cv2.VideoCapture class.""" + + @abc.abstractmethod + def __init__(self, index: int | str, backend: Optional[int] = None): + pass + + @abc.abstractmethod + def isOpened(self) -> bool: + pass + + @abc.abstractmethod + def release(self) -> None: + pass + + @abc.abstractmethod + def set(self, propId: int, value: float) -> bool: + pass + + @abc.abstractmethod + def get(self, propId: int) -> float: + pass + + @abc.abstractmethod + def read(self) -> Tuple[bool, Optional[np.ndarray]]: + pass + + +class IOpenCVSDK(abc.ABC): + """Interface defining the contract for OpenCV SDK interactions.""" + + # --- Constants --- + CAP_PROP_FPS: int + CAP_PROP_FRAME_WIDTH: int + CAP_PROP_FRAME_HEIGHT: int + COLOR_BGR2RGB: int + ROTATE_90_COUNTERCLOCKWISE: int + ROTATE_90_CLOCKWISE: int + ROTATE_180: int + CAP_V4L2: int + CAP_DSHOW: int + CAP_AVFOUNDATION: int + CAP_ANY: int + + # --- Inner Class Type Hint --- + VideoCapture: type[IVideoCapture] + + # --- Methods --- + @abc.abstractmethod + def setNumThreads(self, nthreads: int) -> None: + pass + + @abc.abstractmethod + def cvtColor(self, src: np.ndarray, code: int) -> np.ndarray: + pass + + @abc.abstractmethod + def rotate(self, src: np.ndarray, rotateCode: int) -> np.ndarray: + pass + + +# --- Real SDK Adapter --- +class OpenCVSDKAdapter(IOpenCVSDK): + """Adapts the real cv2 library to the IOpenCVSDK interface.""" + + _cv2 = None + + def __init__(self): + try: + import cv2 + + OpenCVSDKAdapter._cv2 = cv2 + except ImportError as e: + raise ImportError( + "OpenCV (cv2) is not installed. Please install it to use the real camera." + ) from e + + # --- Constants --- + self.CAP_PROP_FPS = self._cv2.CAP_PROP_FPS + self.CAP_PROP_FRAME_WIDTH = self._cv2.CAP_PROP_FRAME_WIDTH + self.CAP_PROP_FRAME_HEIGHT = self._cv2.CAP_PROP_FRAME_HEIGHT + self.COLOR_BGR2RGB = self._cv2.COLOR_BGR2RGB + self.ROTATE_90_COUNTERCLOCKWISE = self._cv2.ROTATE_90_COUNTERCLOCKWISE + self.ROTATE_90_CLOCKWISE = self._cv2.ROTATE_90_CLOCKWISE + self.ROTATE_180 = self._cv2.ROTATE_180 + self.CAP_V4L2 = self._cv2.CAP_V4L2 + self.CAP_DSHOW = self._cv2.CAP_DSHOW + self.CAP_AVFOUNDATION = self._cv2.CAP_AVFOUNDATION + self.CAP_ANY = self._cv2.CAP_ANY + + # --- Inner Class Implementation --- + class RealVideoCapture(IVideoCapture): + def __init__(self, index: int | str, backend: Optional[int] = None): + self._cap = OpenCVSDKAdapter._cv2.VideoCapture(index, backend) + + def isOpened(self) -> bool: + return self._cap.isOpened() + + def release(self) -> None: + self._cap.release() + + def set(self, propId: int, value: float) -> bool: + return self._cap.set(propId, value) + + def get(self, propId: int) -> float: + return self._cap.get(propId) + + def read(self) -> Tuple[bool, Optional[np.ndarray]]: + return self._cap.read() + + def __del__(self): + if hasattr(self, "_cap") and self._cap and self._cap.isOpened(): + self._cap.release() + + self.VideoCapture = RealVideoCapture + + # --- Methods --- + def setNumThreads(self, nthreads: int) -> None: + self._cv2.setNumThreads(nthreads) + + def cvtColor(self, src: np.ndarray, code: int) -> np.ndarray: + return self._cv2.cvtColor(src, code) + + def rotate(self, src: np.ndarray, rotateCode: int) -> np.ndarray: + return self._cv2.rotate(src, rotateCode) + + +# Emulates the cheap USB camera +VALID_INDICES = {0, 1, 2, "/dev/video0", "/dev/video1", "/dev/video2"} +DEFAULT_FPS = 30.0 +DEFAULT_WIDTH = 1280 +DEFAULT_HEIGHT = 720 + + +# --- Fake SDK Adapter --- +class FakeOpenCVSDKAdapter(IOpenCVSDK): + """Implements the IOpenCVSDK interface with fake behavior for testing.""" + + # --- Constants --- + CAP_PROP_FPS = DEFAULT_FPS + CAP_PROP_FRAME_WIDTH = DEFAULT_WIDTH + CAP_PROP_FRAME_HEIGHT = DEFAULT_HEIGHT + COLOR_BGR2RGB = 99 + ROTATE_90_COUNTERCLOCKWISE = -90 + ROTATE_90_CLOCKWISE = 90 + ROTATE_180 = 180 + CAP_V4L2 = 91 + CAP_DSHOW = 92 + CAP_AVFOUNDATION = 93 + CAP_ANY = 90 + + _cameras_opened: dict[int | str, bool] = {} + _camera_properties: dict[tuple[int | str, int], float] = {} + _simulated_image: np.ndarray = np.random.randint( + 0, 256, (DEFAULT_HEIGHT, DEFAULT_WIDTH, 3), dtype=np.uint8 + ) + _simulated_fps: float = DEFAULT_FPS + _image_read_count: int = 0 + _fail_read_after: Optional[int] = None # Simulate read failure + + @classmethod + def init_configure_fake( + cls, + simulated_image: Optional[np.ndarray] = None, + simulated_fps: Optional[float] = None, + fail_read_after: Optional[int] = None, + ): + if simulated_image is not None: + cls._simulated_image = simulated_image + if simulated_fps is not None: + cls._simulated_fps = simulated_fps + cls._fail_read_after = fail_read_after + cls._image_read_count = 0 + cls._cameras_opened = {} + cls._camera_properties = {} + + @classmethod + def configure_fake_simulated_image(cls, simulated_image: Optional[np.ndarray] = None): + if simulated_image is not None: + cls._simulated_image = simulated_image + + @classmethod + def configure_fail_read_after(cls, fail_read_after: Optional[int] = None): + cls._fail_read_after = fail_read_after + + @classmethod + def configure_fake_simulated_fps(cls, simulated_fps: Optional[float] = None): + if simulated_fps is not None: + cls._simulated_fps = simulated_fps + + # --- Inner Class Implementation --- + class FakeVideoCapture(IVideoCapture): + def __init__(self, index: int | str, backend: Optional[int] = None): + self.index = index + self.backend = backend + valid_indices = VALID_INDICES + if self.index in valid_indices: + FakeOpenCVSDKAdapter._cameras_opened[self.index] = True + print(f"[FAKE SDK] Opened camera {self.index}") + # Set some default fake properties + FakeOpenCVSDKAdapter._camera_properties[(self.index, FakeOpenCVSDKAdapter.CAP_PROP_FPS)] = ( + DEFAULT_FPS + ) + FakeOpenCVSDKAdapter._camera_properties[ + (self.index, FakeOpenCVSDKAdapter.CAP_PROP_FRAME_WIDTH) + ] = float(FakeOpenCVSDKAdapter._simulated_image.shape[1]) + FakeOpenCVSDKAdapter._camera_properties[ + (self.index, FakeOpenCVSDKAdapter.CAP_PROP_FRAME_HEIGHT) + ] = float(FakeOpenCVSDKAdapter._simulated_image.shape[0]) + else: + FakeOpenCVSDKAdapter._cameras_opened[self.index] = False + print(f"[FAKE SDK] Failed to open camera {self.index}") + + def isOpened(self) -> bool: + return FakeOpenCVSDKAdapter._cameras_opened.get(self.index, False) + + def release(self) -> None: + if self.index in FakeOpenCVSDKAdapter._cameras_opened: + FakeOpenCVSDKAdapter._cameras_opened[self.index] = False + print(f"[FAKE SDK] Released camera {self.index}") + # Clear properties on release + props_to_remove = [k for k in FakeOpenCVSDKAdapter._camera_properties if k[0] == self.index] + for k in props_to_remove: + del FakeOpenCVSDKAdapter._camera_properties[k] + + def set(self, propId: int, value: float) -> bool: + if not self.isOpened(): + return False + print( + f"[FAKE SDK] Ignoring set property {propId} = {value} for camera {self.index} to preserve state." + ) + # FakeOpenCVSDKAdapter._camera_properties[(self.index, propId)] = value + # Simulate failure for specific unrealistic settings if needed + return True + + def get(self, propId: int) -> float: + if not self.isOpened(): + return 0.0 # Or raise error? Mimic cv2 behavior + val = FakeOpenCVSDKAdapter._camera_properties.get((self.index, propId)) + print(f"[FAKE SDK] Get property {propId} for camera {self.index} -> {val}") + return val + + def read(self) -> Tuple[bool, Optional[np.ndarray]]: + if not self.isOpened(): + print(f"[FAKE SDK] Read failed: Camera {self.index} not open.") + return False, None + + FakeOpenCVSDKAdapter._image_read_count += 1 + if ( + FakeOpenCVSDKAdapter._fail_read_after is not None + and FakeOpenCVSDKAdapter._image_read_count > FakeOpenCVSDKAdapter._fail_read_after + ): + print( + f"[FAKE SDK] Simulated read failure for camera {self.index} after {FakeOpenCVSDKAdapter._fail_read_after} reads." + ) + return False, None + + print( + f"[FAKE SDK] Read image from camera {self.index} (read #{FakeOpenCVSDKAdapter._image_read_count})" + ) + # Return a copy to prevent modification issues if the caller changes it + return True, FakeOpenCVSDKAdapter._simulated_image.copy() + + def __del__(self): + # Ensure cleanup if garbage collected + self.release() + + VideoCapture = FakeVideoCapture # Assign inner class + + # --- Methods --- + def setNumThreads(self, nthreads: int) -> None: + print(f"[FAKE SDK] setNumThreads({nthreads}) called.") + # No actual behavior needed in fake + + def cvtColor(self, src: np.ndarray, code: int) -> np.ndarray: + print(f"[FAKE SDK] cvtColor called with code {code}.") + # Just return the source image, or simulate channel swap if needed + if code == self.COLOR_BGR2RGB and src.shape[2] == 3: + print("[FAKE SDK] Simulating BGR -> RGB conversion.") + return src[..., ::-1] + return src.copy() + + def rotate(self, src: np.ndarray, rotateCode: int) -> np.ndarray: + print(f"[FAKE SDK] rotate called with code {rotateCode}.") + if rotateCode == self.ROTATE_90_COUNTERCLOCKWISE: + print("[FAKE SDK] Simulating 90 degree counter-clockwise rotation.") + rotated_img = np.rot90(np.rot90(np.rot90(src.copy()))) + return rotated_img + elif rotateCode == self.ROTATE_90_CLOCKWISE: + print("[FAKE SDK] Simulating 90 degree clockwise rotation.") + rotated_img = np.rot90(src.copy()) + return rotated_img + elif rotateCode == self.ROTATE_180: + print("[FAKE SDK] Simulating 180 degree rotation.") + rotated_img = np.rot90(np.rot90(src.copy())) + return rotated_img + return src.copy() diff --git a/lerobot/common/cameras/opencv/camera_opencv.py b/lerobot/common/cameras/opencv/camera_opencv.py index f4bfb507..4ed4a17c 100644 --- a/lerobot/common/cameras/opencv/camera_opencv.py +++ b/lerobot/common/cameras/opencv/camera_opencv.py @@ -24,7 +24,6 @@ import shutil import threading import time from pathlib import Path -from threading import Thread import numpy as np from PIL import Image @@ -36,6 +35,7 @@ from lerobot.common.utils.robot_utils import ( from lerobot.common.utils.utils import capture_timestamp_utc from ..camera import Camera +from ..interface_camera_sdk import IOpenCVSDK, OpenCVSDKAdapter from .configuration_opencv import OpenCVCameraConfig # The maximum opencv device index depends on your operating system. For instance, @@ -46,12 +46,17 @@ from .configuration_opencv import OpenCVCameraConfig MAX_OPENCV_INDEX = 60 -def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX, mock=False) -> list[dict]: +def find_cameras( + raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX, cv2_sdk: IOpenCVSDK = None +) -> list[dict]: + if cv2_sdk is None: + cv2_sdk = OpenCVSDKAdapter() + cameras = [] if platform.system() == "Linux": print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports") possible_ports = [str(port) for port in Path("/dev").glob("video*")] - ports = _find_cameras(possible_ports, mock=mock) + ports = _find_cameras(possible_ports, cv2_sdk=cv2_sdk) for port in ports: cameras.append( { @@ -65,7 +70,7 @@ def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX f"scanning all indices from 0 to {MAX_OPENCV_INDEX}" ) possible_indices = range(max_index_search_range) - indices = _find_cameras(possible_indices, mock=mock) + indices = _find_cameras(possible_indices, cv2_sdk=cv2_sdk) for index in indices: cameras.append( { @@ -78,16 +83,14 @@ def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX def _find_cameras( - possible_camera_ids: list[int | str], raise_when_empty=False, mock=False + possible_camera_ids: list[int | str], raise_when_empty=False, cv2_sdk: IOpenCVSDK = None ) -> list[int | str]: - if mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 + if cv2_sdk is None: + cv2_sdk = OpenCVSDKAdapter() camera_ids = [] for camera_idx in possible_camera_ids: - camera = cv2.VideoCapture(camera_idx) + camera = cv2_sdk.VideoCapture(camera_idx) is_open = camera.isOpened() camera.release() @@ -128,21 +131,24 @@ def save_images_from_cameras( width=None, height=None, record_time_s=2, - mock=False, + cv2_sdk: IOpenCVSDK = None, ): """ Initializes all the cameras and saves images to the directory. Useful to visually identify the camera associated to a given camera index. """ + if cv2_sdk is None: + cv2_sdk = OpenCVSDKAdapter() + if camera_ids is None or len(camera_ids) == 0: - camera_infos = find_cameras(mock=mock) + camera_infos = find_cameras(cv2_sdk=cv2_sdk) camera_ids = [cam["index"] for cam in camera_infos] print("Connecting cameras") cameras = [] for cam_idx in camera_ids: - config = OpenCVCameraConfig(camera_index=cam_idx, fps=fps, width=width, height=height, mock=mock) - camera = OpenCVCamera(config) + config = OpenCVCameraConfig(camera_index=cam_idx, fps=fps, width=width, height=height) + camera = OpenCVCamera(config, cv2_sdk=cv2_sdk) camera.connect() print( f"OpenCVCamera({camera.camera_index}, fps={camera.fps}, width={camera.capture_width}, " @@ -229,11 +235,16 @@ class OpenCVCamera(Camera): ``` """ - def __init__(self, config: OpenCVCameraConfig): + def __init__(self, config: OpenCVCameraConfig, cv2_sdk: IOpenCVSDK = None): self.config = config self.camera_index = config.camera_index self.port = None + if cv2_sdk is None: + cv2_sdk = OpenCVSDKAdapter() + + self.cv2_sdk = cv2_sdk + # Linux uses ports for connecting to cameras if platform.system() == "Linux": if isinstance(self.camera_index, int): @@ -260,7 +271,6 @@ class OpenCVCamera(Camera): self.fps = config.fps self.channels = config.channels self.color_mode = config.color_mode - self.mock = config.mock self.camera = None self.is_connected = False @@ -269,46 +279,38 @@ class OpenCVCamera(Camera): self.color_image = None self.logs = {} - if self.mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 - self.rotation = None if config.rotation == -90: - self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE + self.rotation = cv2_sdk.ROTATE_90_COUNTERCLOCKWISE elif config.rotation == 90: - self.rotation = cv2.ROTATE_90_CLOCKWISE + self.rotation = cv2_sdk.ROTATE_90_CLOCKWISE elif config.rotation == 180: - self.rotation = cv2.ROTATE_180 + self.rotation = cv2_sdk.ROTATE_180 def connect(self): if self.is_connected: raise DeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.") - if self.mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 + cv2_sdk = self.cv2_sdk - # Use 1 thread to avoid blocking the main thread. Especially useful during data collection - # when other threads are used to save the images. - cv2.setNumThreads(1) + # Use 1 thread to avoid blocking the main thread. Especially useful during data collection + # when other threads are used to save the images. + cv2_sdk.setNumThreads(1) backend = ( - cv2.CAP_V4L2 + cv2_sdk.CAP_V4L2 if platform.system() == "Linux" - else cv2.CAP_DSHOW + else cv2_sdk.CAP_DSHOW if platform.system() == "Windows" - else cv2.CAP_AVFOUNDATION + else cv2_sdk.CAP_AVFOUNDATION if platform.system() == "Darwin" - else cv2.CAP_ANY + else cv2_sdk.CAP_ANY ) camera_idx = f"/dev/video{self.camera_index}" if platform.system() == "Linux" else self.camera_index # First create a temporary camera trying to access `camera_index`, # and verify it is a valid camera by calling `isOpened`. - tmp_camera = cv2.VideoCapture(camera_idx, backend) + tmp_camera = cv2_sdk.VideoCapture(camera_idx, backend) is_camera_open = tmp_camera.isOpened() # Release camera to make it accessible for `find_camera_indices` tmp_camera.release() @@ -318,7 +320,7 @@ class OpenCVCamera(Camera): # valid cameras. if not is_camera_open: # Verify that the provided `camera_index` is valid before printing the traceback - cameras_info = find_cameras() + cameras_info = find_cameras(cv2_sdk=cv2_sdk) available_cam_ids = [cam["index"] for cam in cameras_info] if self.camera_index not in available_cam_ids: raise ValueError( @@ -331,18 +333,18 @@ class OpenCVCamera(Camera): # Secondly, create the camera that will be used downstream. # Note: For some unknown reason, calling `isOpened` blocks the camera which then # needs to be re-created. - self.camera = cv2.VideoCapture(camera_idx, backend) + self.camera = cv2_sdk.VideoCapture(camera_idx, backend) if self.fps is not None: - self.camera.set(cv2.CAP_PROP_FPS, self.fps) + self.camera.set(cv2_sdk.CAP_PROP_FPS, self.fps) if self.capture_width is not None: - self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.capture_width) + self.camera.set(cv2_sdk.CAP_PROP_FRAME_WIDTH, self.capture_width) if self.capture_height is not None: - self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.capture_height) + self.camera.set(cv2_sdk.CAP_PROP_FRAME_HEIGHT, self.capture_height) - actual_fps = self.camera.get(cv2.CAP_PROP_FPS) - actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH) - actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT) + actual_fps = self.camera.get(cv2_sdk.CAP_PROP_FPS) + actual_width = self.camera.get(cv2_sdk.CAP_PROP_FRAME_WIDTH) + actual_height = self.camera.get(cv2_sdk.CAP_PROP_FRAME_HEIGHT) # Using `math.isclose` since actual fps can be a float (e.g. 29.9 instead of 30) if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3): @@ -380,6 +382,8 @@ class OpenCVCamera(Camera): f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first." ) + cv2_sdk = self.cv2_sdk + start_time = time.perf_counter() ret, color_image = self.camera.read() @@ -398,12 +402,7 @@ class OpenCVCamera(Camera): # However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks, # so we convert the image color from BGR to RGB. if requested_color_mode == "rgb": - if self.mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 - - color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) + color_image = cv2_sdk.cvtColor(color_image, cv2_sdk.COLOR_BGR2RGB) h, w, _ = color_image.shape if h != self.capture_height or w != self.capture_width: @@ -412,7 +411,7 @@ class OpenCVCamera(Camera): ) if self.rotation is not None: - color_image = cv2.rotate(color_image, self.rotation) + color_image = cv2_sdk.rotate(color_image, self.rotation) # log the number of seconds it took to read the image self.logs["delta_timestamp_s"] = time.perf_counter() - start_time @@ -439,7 +438,7 @@ class OpenCVCamera(Camera): if self.thread is None: self.stop_event = threading.Event() - self.thread = Thread(target=self.read_loop, args=()) + self.thread = threading.Thread(target=self.read_loop, args=()) self.thread.daemon = True self.thread.start() diff --git a/lerobot/common/cameras/opencv/configuration_opencv.py b/lerobot/common/cameras/opencv/configuration_opencv.py index 983199bf..b8a20fc9 100644 --- a/lerobot/common/cameras/opencv/configuration_opencv.py +++ b/lerobot/common/cameras/opencv/configuration_opencv.py @@ -24,7 +24,6 @@ class OpenCVCameraConfig(CameraConfig): color_mode: str = "rgb" channels: int | None = None rotation: int | None = None - mock: bool = False def __post_init__(self): if self.color_mode not in ["rgb", "bgr"]: diff --git a/tests/cameras/mock_cv2.py b/tests/cameras/mock_cv2.py deleted file mode 100644 index eeaf859c..00000000 --- a/tests/cameras/mock_cv2.py +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright 2024 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from functools import cache - -import numpy as np - -CAP_V4L2 = 200 -CAP_DSHOW = 700 -CAP_AVFOUNDATION = 1200 -CAP_ANY = -1 - -CAP_PROP_FPS = 5 -CAP_PROP_FRAME_WIDTH = 3 -CAP_PROP_FRAME_HEIGHT = 4 -COLOR_RGB2BGR = 4 -COLOR_BGR2RGB = 4 - -ROTATE_90_COUNTERCLOCKWISE = 2 -ROTATE_90_CLOCKWISE = 0 -ROTATE_180 = 1 - - -@cache -def _generate_image(width: int, height: int): - return np.random.randint(0, 256, size=(height, width, 3), dtype=np.uint8) - - -def cvtColor(color_image, color_conversion): # noqa: N802 - if color_conversion in [COLOR_RGB2BGR, COLOR_BGR2RGB]: - return color_image[:, :, [2, 1, 0]] - else: - raise NotImplementedError(color_conversion) - - -def rotate(color_image, rotation): - if rotation is None: - return color_image - elif rotation == ROTATE_90_CLOCKWISE: - return np.rot90(color_image, k=1) - elif rotation == ROTATE_180: - return np.rot90(color_image, k=2) - elif rotation == ROTATE_90_COUNTERCLOCKWISE: - return np.rot90(color_image, k=3) - else: - raise NotImplementedError(rotation) - - -class VideoCapture: - def __init__(self, *args, **kwargs): - self._mock_dict = { - CAP_PROP_FPS: 30, - CAP_PROP_FRAME_WIDTH: 640, - CAP_PROP_FRAME_HEIGHT: 480, - } - self._is_opened = True - - def isOpened(self): # noqa: N802 - return self._is_opened - - def set(self, propId: int, value: float) -> bool: # noqa: N803 - if not self._is_opened: - raise RuntimeError("Camera is not opened") - self._mock_dict[propId] = value - return True - - def get(self, propId: int) -> float: # noqa: N803 - if not self._is_opened: - raise RuntimeError("Camera is not opened") - value = self._mock_dict[propId] - if value == 0: - if propId == CAP_PROP_FRAME_HEIGHT: - value = 480 - elif propId == CAP_PROP_FRAME_WIDTH: - value = 640 - return value - - def read(self): - if not self._is_opened: - raise RuntimeError("Camera is not opened") - h = self.get(CAP_PROP_FRAME_HEIGHT) - w = self.get(CAP_PROP_FRAME_WIDTH) - ret = True - return ret, _generate_image(width=w, height=h) - - def release(self): - self._is_opened = False - - def __del__(self): - if self._is_opened: - self.release() diff --git a/tests/cameras/test_cv2_camera.py b/tests/cameras/test_cv2_camera.py new file mode 100644 index 00000000..fcac6bca --- /dev/null +++ b/tests/cameras/test_cv2_camera.py @@ -0,0 +1,439 @@ +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import platform +import unittest +from pathlib import Path +from unittest.mock import MagicMock, patch + +import numpy as np +import pytest + +from lerobot.common.cameras.interface_camera_sdk import FakeOpenCVSDKAdapter, IVideoCapture, OpenCVSDKAdapter +from lerobot.common.cameras.opencv import OpenCVCamera, OpenCVCameraConfig +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError + +MODULE_PATH = "lerobot.common.cameras.opencv.camera_opencv" + +# Define constants that might be used by mocks +MOCK_CV2_CAP_PROP_FPS = FakeOpenCVSDKAdapter.CAP_PROP_FPS +MOCK_CV2_CAP_PROP_FRAME_WIDTH = FakeOpenCVSDKAdapter.CAP_PROP_FRAME_WIDTH +MOCK_CV2_CAP_PROP_FRAME_HEIGHT = FakeOpenCVSDKAdapter.CAP_PROP_FRAME_HEIGHT + +# TODO(Steven): Consider a CLI argument to set this +# Emulates the cheap USB camera in index 0 +LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS = ( + os.getenv("LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS", "False").lower() == "true" +) + + +# Helper function to create a realistic-looking dummy image +def create_dummy_image(height, width, channels=3): + return np.random.randint(0, 256, size=(height, width, channels), dtype=np.uint8) + + +# --- Test Class --- + + +class TestOpenCVCamera(unittest.TestCase): + def setUp(self): + # Emulates the cheap USB camera in index 0 + self.default_config = OpenCVCameraConfig( + camera_index=0, + width=MOCK_CV2_CAP_PROP_FRAME_WIDTH, + height=MOCK_CV2_CAP_PROP_FRAME_HEIGHT, + fps=MOCK_CV2_CAP_PROP_FPS, + color_mode="rgb", + rotation=None, + ) + # TODO(Steven): Consider calling this only once, instead of in every test + if LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS: + # Use the real OpenCV SDK + self.test_sdk = OpenCVSDKAdapter() + # self.addCleanup() + else: + # Create a default dummy image based on config + self.dummy_bgr_image = create_dummy_image(self.default_config.height, self.default_config.width) + self.dummy_rgb_image = self.dummy_bgr_image[..., ::-1] # Simple BGR -> RGB simulation + FakeOpenCVSDKAdapter.init_configure_fake(simulated_image=self.dummy_bgr_image) + self.test_sdk = FakeOpenCVSDKAdapter() + # self.addCleanup() + + patcher_platform = patch(f"{MODULE_PATH}.platform.system") + self.mock_platform_system = patcher_platform.start() + self.addCleanup(patcher_platform.stop) + self.mock_platform_system.return_value = "Darwin" # Default to macOS + + patcher_is_valid_unix_path = patch(f"{MODULE_PATH}.is_valid_unix_path") + self.mock_is_valid_unix_path = patcher_is_valid_unix_path.start() + self.addCleanup(patcher_is_valid_unix_path.stop) + self.mock_is_valid_unix_path.return_value = True + + patcher_time_sleep = patch(f"{MODULE_PATH}.time.sleep", return_value=None) + self.mock_sleep = patcher_time_sleep.start() + self.addCleanup(patcher_time_sleep.stop) + + # Mock threading + patcher_thread = patch(f"{MODULE_PATH}.threading.Thread", autospec=True) + self.mock_thread_class = patcher_thread.start() + self.addCleanup(patcher_thread.stop) + self.mock_thread_instance = MagicMock() + self.mock_thread_class.return_value = self.mock_thread_instance + + patcher_event = patch(f"{MODULE_PATH}.threading.Event", autospec=True) + self.mock_event_class = patcher_event.start() + self.addCleanup(patcher_event.stop) + self.mock_event_instance = MagicMock() + self.mock_event_class.return_value = self.mock_event_instance + self.mock_event_instance.is_set.return_value = False # Default to not set + + # --- Tests __init__ --- + def test_init_defaults(self): + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + + self.assertEqual(cam.camera_index, 0) + self.assertEqual(cam.capture_width, MOCK_CV2_CAP_PROP_FRAME_WIDTH) + self.assertEqual(cam.capture_height, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) + self.assertEqual(cam.width, MOCK_CV2_CAP_PROP_FRAME_WIDTH) # No rotation + self.assertEqual(cam.height, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) # No rotation + self.assertEqual(cam.fps, MOCK_CV2_CAP_PROP_FPS) + self.assertEqual(cam.color_mode, "rgb") + self.assertIsNone(cam.rotation) # Rotation 0 + self.assertIsNone(cam.port) + self.assertFalse(cam.is_connected) + + def test_init_with_rotation_90(self): + config = self.default_config + config.rotation = 90 + cam = OpenCVCamera(config, cv2_sdk=self.test_sdk) + self.assertEqual(cam.width, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) # Swapped + self.assertEqual(cam.height, MOCK_CV2_CAP_PROP_FRAME_WIDTH) # Swapped + self.assertEqual(cam.rotation, self.test_sdk.ROTATE_90_CLOCKWISE) + + def test_init_with_rotation_minus_90(self): + config = self.default_config + config.rotation = -90 + cam = OpenCVCamera(config, cv2_sdk=self.test_sdk) + self.assertEqual(cam.width, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) # Swapped + self.assertEqual(cam.height, MOCK_CV2_CAP_PROP_FRAME_WIDTH) # Swapped + self.assertEqual(cam.rotation, self.test_sdk.ROTATE_90_COUNTERCLOCKWISE) + + def test_init_with_rotation_180(self): + config = self.default_config + config.rotation = 180 + cam = OpenCVCamera(config, cv2_sdk=self.test_sdk) + self.assertEqual(cam.width, MOCK_CV2_CAP_PROP_FRAME_WIDTH) # Swapped + self.assertEqual(cam.height, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) # Swapped + self.assertEqual(cam.rotation, self.test_sdk.ROTATE_180) + + @pytest.mark.skipif( + LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS and platform.system() != "Linux", + reason="Not valid for real camera for other platform than Linux", + ) + def test_init_linux_with_index(self): + self.mock_platform_system.return_value = "Linux" + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + self.assertEqual(cam.camera_index, 0) + self.assertIsInstance(cam.port, Path) + self.assertEqual(str(cam.port), "/dev/video0") + + @pytest.mark.skipif( + LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS and platform.system() != "Linux", + reason="Not valid for real camera for other platform than Linux", + ) + def test_init_linux_with_valid_path(self): + self.mock_platform_system.return_value = "Linux" + config = self.default_config + config.camera_index = "/dev/video2" + cam = OpenCVCamera(config, cv2_sdk=self.test_sdk) + + self.assertIsInstance(cam.port, Path) + self.assertEqual(str(cam.port), "/dev/video2") + self.assertEqual(cam.camera_index, 2) + + @pytest.mark.skipif( + LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS and platform.system() != "Linux", + reason="Not valid for real camera for other platform than Linux", + ) + def test_init_linux_with_invalid_path(self): + self.mock_platform_system.return_value = "Linux" + self.mock_is_valid_unix_path.return_value = False + config = self.default_config + config.camera_index = "[*?:[/invalid/path" + with self.assertRaisesRegex(ValueError, "Please check the provided camera_index"): + OpenCVCamera(config, cv2_sdk=self.test_sdk) + + # --- Test connect --- + def test_connect_success(self): + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + cam.connect() + + self.assertTrue(cam.is_connected) + self.assertIsNotNone(cam.camera) + self.assertIsInstance(cam.camera, IVideoCapture) + self.assertEqual(cam.fps, self.default_config.fps) + self.assertEqual(cam.capture_width, self.default_config.width) + self.assertEqual(cam.capture_height, self.default_config.height) + + @pytest.mark.skipif( + LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS and platform.system() != "Linux", + reason="Not valid for real camera for other platform than Linux", + ) + def test_connect_success_linux(self): + self.mock_platform_system.return_value = "Linux" + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + + cam.connect() + + self.assertTrue(cam.is_connected) + self.assertIsNotNone(cam.camera) + self.assertIsInstance(cam.camera, IVideoCapture) + # self.assertIn('/dev/video0', self.test_sdk._cameras_opened) + # self.assertEqual(self.test_sdk.FakeVideoCapture.backend, self.test_sdk.CAP_V4L2) + + def test_connect_already_connected(self): + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + # Simulate already connected state + cam.is_connected = True + with self.assertRaisesRegex(DeviceAlreadyConnectedError, "already connected"): + cam.connect() + cam.is_connected = False # To avoid warning when running the destructor + + def test_connect_camera_not_opened_invalid_index(self): + config = self.default_config + config.camera_index = 99 # Invalid index in fakeSDK != (0,1,2) + cam = OpenCVCamera(config, cv2_sdk=self.test_sdk) + + with self.assertRaisesRegex( + ValueError, "expected to be one of these available cameras \\[0, 1, 2\\], but 99 is provided" + ): + cam.connect() + self.assertFalse(cam.is_connected) + + def test_connect_fps_mismatch(self): + config = self.default_config + config.fps = 999 # Different FPS + cam = OpenCVCamera(config, cv2_sdk=self.test_sdk) + + with self.assertRaisesRegex( + OSError, f"Can't set self.fps=999 .* Actual value is {MOCK_CV2_CAP_PROP_FPS}" + ): + cam.connect() + self.assertFalse(cam.is_connected) + + def test_connect_width_mismatch(self): + config = self.default_config + config.width = 9999 + cam = OpenCVCamera(config, cv2_sdk=self.test_sdk) + + with self.assertRaisesRegex( + OSError, f"Can't set self.capture_width=9999.* Actual value is {MOCK_CV2_CAP_PROP_FRAME_WIDTH}" + ): + cam.connect() + self.assertFalse(cam.is_connected) + + def test_connect_height_mismatch(self): + config = self.default_config + config.height = 9999 + cam = OpenCVCamera(config, cv2_sdk=self.test_sdk) + + with self.assertRaisesRegex( + OSError, f"Can't set self.capture_height=9999.* Actual value is {MOCK_CV2_CAP_PROP_FRAME_HEIGHT}" + ): + cam.connect() + self.assertFalse(cam.is_connected) + + # --- Test read --- + def test_read_not_connected(self): + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + with self.assertRaisesRegex(DeviceNotConnectedError, "not connected"): + cam.read() + + def test_read_success_rgb(self): + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + cam.connect() + + img = cam.read() + if isinstance(self.test_sdk, FakeOpenCVSDKAdapter): + # When using fake SDK, verify exact match with dummy image + np.testing.assert_array_equal(img, self.dummy_rgb_image) + else: + # When using real camera, verify basic properties of the captured image + self.assertIsInstance(img, np.ndarray) + + def test_read_success_bgr(self): + config = self.default_config + config.color_mode = "bgr" + cam = OpenCVCamera(config, cv2_sdk=self.test_sdk) + cam.connect() + + img = cam.read() + if isinstance(self.test_sdk, FakeOpenCVSDKAdapter): + # When using fake SDK, verify exact match with dummy image + np.testing.assert_array_equal(img, self.dummy_bgr_image) + else: + # When using real camera, verify basic properties of the captured image + self.assertIsInstance(img, np.ndarray) + + def test_read_success_with_rotation(self): + config = self.default_config + config.rotation = 90 + cam = OpenCVCamera(config, cv2_sdk=self.test_sdk) # Sets cam.rotation internally + cam.connect() + + img = cam.read() + if isinstance(self.test_sdk, FakeOpenCVSDKAdapter): + # When using fake SDK, verify exact match with dummy image + rotated_image = np.rot90(self.dummy_rgb_image) # Simulate rotation + np.testing.assert_array_equal(img, rotated_image) + else: + # When using real camera, verify basic properties of the captured image + self.assertEqual(img.shape, (self.default_config.width, self.default_config.height, 3)) + + @pytest.mark.skipif(LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS, reason="Not valid for real camera") + def test_read_capture_fails(self): + FakeOpenCVSDKAdapter.configure_fail_read_after(fail_read_after=0) # Simulate read failure + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + cam.connect() + + with self.assertRaisesRegex(OSError, f"Can't capture color image from camera {cam.camera_index}"): + cam.read() + + def test_read_invalid_temporary_color_mode(self): + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + cam.connect() + + with self.assertRaisesRegex( + ValueError, "Expected color values are 'rgb' or 'bgr', but xyz is provided" + ): + cam.read(temporary_color_mode="xyz") + + @pytest.mark.skipif(LEROBOT_USE_REAL_OPENCV_CAMERA_TESTS, reason="Not valid for real camera") + def test_read_dimension_mismatch(self): + wrong_dim_image = create_dummy_image(240, 320) + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + cam.connect() + FakeOpenCVSDKAdapter.configure_fake_simulated_image( + simulated_image=wrong_dim_image + ) # Different dimensions + + with self.assertRaisesRegex( + OSError, + f"Can't capture color image with expected height and width \\({MOCK_CV2_CAP_PROP_FRAME_HEIGHT} x {MOCK_CV2_CAP_PROP_FRAME_WIDTH}\\). \\(240 x 320\\) returned instead.", + ): + cam.read() + + # --- Async Read Tests --- + def test_async_read_not_connected(self): + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + with self.assertRaisesRegex(DeviceNotConnectedError, "not connected"): + cam.async_read() + + # TODO(Steven): This is dirty, but at least we don't have to manually mock the read + def test_async_read_starts_thread_and_returns_image( + self, + ): + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + cam.connect() + + def mock_start(): + cam.read_loop() + + self.mock_thread_instance.start.side_effect = mock_start + + # Make Event.is_set return False initially, then True after disconnect/stop + self.mock_event_instance.is_set.side_effect = [False, True] + + img = cam.async_read() + + # Assert thread was created and started + self.mock_event_class.assert_called_once() + self.mock_thread_class.assert_called_once_with(target=cam.read_loop, args=()) + self.mock_thread_instance.start.assert_called_once() + self.assertTrue(self.mock_thread_instance.daemon) + self.assertIsNotNone(cam.thread) + self.assertIsNotNone(cam.stop_event) + + if isinstance(self.test_sdk, FakeOpenCVSDKAdapter): + # When using fake SDK, verify exact match with dummy image + np.testing.assert_array_equal(img, self.dummy_rgb_image) + else: + # When using real camera, verify basic properties of the captured image + self.assertIsInstance(img, np.ndarray) + + def test_async_read_timeout(self): + config = self.default_config + cam = OpenCVCamera(config, cv2_sdk=self.test_sdk) + cam.connect() + + with self.assertRaisesRegex(TimeoutError, "Timed out waiting for async_read"): + cam.async_read() + + # Assert thread was created and started + self.mock_event_class.assert_called_once() + self.mock_thread_class.assert_called_once_with(target=cam.read_loop, args=()) + self.mock_thread_instance.start.assert_called_once() + self.assertTrue(self.mock_thread_instance.daemon) + self.assertIsNotNone(cam.thread) + self.assertIsNotNone(cam.stop_event) + + # --- Disconnection Tests --- + def test_disconnect_not_connected(self): + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + with self.assertRaisesRegex(DeviceNotConnectedError, "not connected"): + cam.disconnect() + + def test_disconnect_no_thread(self): + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + cam.connect() + + cam.disconnect() + + self.assertIsNone(cam.camera) + self.assertFalse(cam.is_connected) + self.assertIsNone(cam.thread) + self.assertIsNone(cam.stop_event) + + def test_disconnect_with_thread(self): + cam = OpenCVCamera(self.default_config, cv2_sdk=self.test_sdk) + cam.connect() + + def mock_start(): + cam.read_loop() + + self.mock_thread_instance.start.side_effect = mock_start + + # Make Event.is_set return False initially, then True after disconnect/stop + self.mock_event_instance.is_set.side_effect = [False, True] + + _img = cam.async_read() + + self.assertIsNotNone(cam.thread) + self.assertIsNotNone(cam.stop_event) + + cam.disconnect() + + # Check thread management + self.mock_event_instance.set.assert_called_once() + self.mock_thread_instance.join.assert_called_once() + + # Check state reset + self.assertIsNone(cam.camera) + self.assertFalse(cam.is_connected) + self.assertIsNone(cam.thread) + self.assertIsNone(cam.stop_event) + + +if __name__ == "__main__": + unittest.main(argv=["first-arg-is-ignored"], exit=False)