From cb10f97ccc81754848147e4921c37752b166ff1f Mon Sep 17 00:00:00 2001 From: Steven Palma Date: Tue, 15 Apr 2025 17:47:51 +0200 Subject: [PATCH] test(cameras): add opencv camera patch tests suite --- lerobot/common/cameras/camera.py | 2 +- .../common/cameras/opencv/camera_opencv.py | 46 +- .../cameras/opencv/configuration_opencv.py | 1 - tests/cameras/mock_cv2.py | 101 --- tests/cameras/test_cv2_camera.py | 614 ++++++++++++++++++ 5 files changed, 626 insertions(+), 138 deletions(-) delete mode 100644 tests/cameras/mock_cv2.py create mode 100644 tests/cameras/test_cv2_camera.py diff --git a/lerobot/common/cameras/camera.py b/lerobot/common/cameras/camera.py index 5d9ac509..2fbdb0dc 100644 --- a/lerobot/common/cameras/camera.py +++ b/lerobot/common/cameras/camera.py @@ -9,7 +9,7 @@ class Camera(abc.ABC): pass @abc.abstractmethod - def read(self, temporary_color: str | None = None) -> np.ndarray: + def read(self, temporary_color_mode: str | None = None) -> np.ndarray: pass @abc.abstractmethod diff --git a/lerobot/common/cameras/opencv/camera_opencv.py b/lerobot/common/cameras/opencv/camera_opencv.py index f4bfb507..d35a459e 100644 --- a/lerobot/common/cameras/opencv/camera_opencv.py +++ b/lerobot/common/cameras/opencv/camera_opencv.py @@ -24,8 +24,8 @@ import shutil import threading import time from pathlib import Path -from threading import Thread +import cv2 import numpy as np from PIL import Image @@ -46,12 +46,12 @@ from .configuration_opencv import OpenCVCameraConfig MAX_OPENCV_INDEX = 60 -def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX, mock=False) -> list[dict]: +def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX) -> list[dict]: cameras = [] if platform.system() == "Linux": print("Linux detected. Finding available camera indices through scanning '/dev/video*' ports") possible_ports = [str(port) for port in Path("/dev").glob("video*")] - ports = _find_cameras(possible_ports, mock=mock) + ports = _find_cameras(possible_ports) for port in ports: cameras.append( { @@ -65,7 +65,7 @@ def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX f"scanning all indices from 0 to {MAX_OPENCV_INDEX}" ) possible_indices = range(max_index_search_range) - indices = _find_cameras(possible_indices, mock=mock) + indices = _find_cameras(possible_indices) for index in indices: cameras.append( { @@ -77,14 +77,7 @@ def find_cameras(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX return cameras -def _find_cameras( - possible_camera_ids: list[int | str], raise_when_empty=False, mock=False -) -> list[int | str]: - if mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 - +def _find_cameras(possible_camera_ids: list[int | str], raise_when_empty=False) -> list[int | str]: camera_ids = [] for camera_idx in possible_camera_ids: camera = cv2.VideoCapture(camera_idx) @@ -128,20 +121,19 @@ def save_images_from_cameras( width=None, height=None, record_time_s=2, - mock=False, ): """ Initializes all the cameras and saves images to the directory. Useful to visually identify the camera associated to a given camera index. """ if camera_ids is None or len(camera_ids) == 0: - camera_infos = find_cameras(mock=mock) + camera_infos = find_cameras() camera_ids = [cam["index"] for cam in camera_infos] print("Connecting cameras") cameras = [] for cam_idx in camera_ids: - config = OpenCVCameraConfig(camera_index=cam_idx, fps=fps, width=width, height=height, mock=mock) + config = OpenCVCameraConfig(camera_index=cam_idx, fps=fps, width=width, height=height) camera = OpenCVCamera(config) camera.connect() print( @@ -260,7 +252,6 @@ class OpenCVCamera(Camera): self.fps = config.fps self.channels = config.channels self.color_mode = config.color_mode - self.mock = config.mock self.camera = None self.is_connected = False @@ -269,11 +260,6 @@ class OpenCVCamera(Camera): self.color_image = None self.logs = {} - if self.mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 - self.rotation = None if config.rotation == -90: self.rotation = cv2.ROTATE_90_COUNTERCLOCKWISE @@ -286,14 +272,9 @@ class OpenCVCamera(Camera): if self.is_connected: raise DeviceAlreadyConnectedError(f"OpenCVCamera({self.camera_index}) is already connected.") - if self.mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 - - # Use 1 thread to avoid blocking the main thread. Especially useful during data collection - # when other threads are used to save the images. - cv2.setNumThreads(1) + # Use 1 thread to avoid blocking the main thread. Especially useful during data collection + # when other threads are used to save the images. + cv2.setNumThreads(1) backend = ( cv2.CAP_V4L2 @@ -398,11 +379,6 @@ class OpenCVCamera(Camera): # However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks, # so we convert the image color from BGR to RGB. if requested_color_mode == "rgb": - if self.mock: - import tests.cameras.mock_cv2 as cv2 - else: - import cv2 - color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB) h, w, _ = color_image.shape @@ -439,7 +415,7 @@ class OpenCVCamera(Camera): if self.thread is None: self.stop_event = threading.Event() - self.thread = Thread(target=self.read_loop, args=()) + self.thread = threading.Thread(target=self.read_loop, args=()) self.thread.daemon = True self.thread.start() diff --git a/lerobot/common/cameras/opencv/configuration_opencv.py b/lerobot/common/cameras/opencv/configuration_opencv.py index 983199bf..b8a20fc9 100644 --- a/lerobot/common/cameras/opencv/configuration_opencv.py +++ b/lerobot/common/cameras/opencv/configuration_opencv.py @@ -24,7 +24,6 @@ class OpenCVCameraConfig(CameraConfig): color_mode: str = "rgb" channels: int | None = None rotation: int | None = None - mock: bool = False def __post_init__(self): if self.color_mode not in ["rgb", "bgr"]: diff --git a/tests/cameras/mock_cv2.py b/tests/cameras/mock_cv2.py deleted file mode 100644 index eeaf859c..00000000 --- a/tests/cameras/mock_cv2.py +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright 2024 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from functools import cache - -import numpy as np - -CAP_V4L2 = 200 -CAP_DSHOW = 700 -CAP_AVFOUNDATION = 1200 -CAP_ANY = -1 - -CAP_PROP_FPS = 5 -CAP_PROP_FRAME_WIDTH = 3 -CAP_PROP_FRAME_HEIGHT = 4 -COLOR_RGB2BGR = 4 -COLOR_BGR2RGB = 4 - -ROTATE_90_COUNTERCLOCKWISE = 2 -ROTATE_90_CLOCKWISE = 0 -ROTATE_180 = 1 - - -@cache -def _generate_image(width: int, height: int): - return np.random.randint(0, 256, size=(height, width, 3), dtype=np.uint8) - - -def cvtColor(color_image, color_conversion): # noqa: N802 - if color_conversion in [COLOR_RGB2BGR, COLOR_BGR2RGB]: - return color_image[:, :, [2, 1, 0]] - else: - raise NotImplementedError(color_conversion) - - -def rotate(color_image, rotation): - if rotation is None: - return color_image - elif rotation == ROTATE_90_CLOCKWISE: - return np.rot90(color_image, k=1) - elif rotation == ROTATE_180: - return np.rot90(color_image, k=2) - elif rotation == ROTATE_90_COUNTERCLOCKWISE: - return np.rot90(color_image, k=3) - else: - raise NotImplementedError(rotation) - - -class VideoCapture: - def __init__(self, *args, **kwargs): - self._mock_dict = { - CAP_PROP_FPS: 30, - CAP_PROP_FRAME_WIDTH: 640, - CAP_PROP_FRAME_HEIGHT: 480, - } - self._is_opened = True - - def isOpened(self): # noqa: N802 - return self._is_opened - - def set(self, propId: int, value: float) -> bool: # noqa: N803 - if not self._is_opened: - raise RuntimeError("Camera is not opened") - self._mock_dict[propId] = value - return True - - def get(self, propId: int) -> float: # noqa: N803 - if not self._is_opened: - raise RuntimeError("Camera is not opened") - value = self._mock_dict[propId] - if value == 0: - if propId == CAP_PROP_FRAME_HEIGHT: - value = 480 - elif propId == CAP_PROP_FRAME_WIDTH: - value = 640 - return value - - def read(self): - if not self._is_opened: - raise RuntimeError("Camera is not opened") - h = self.get(CAP_PROP_FRAME_HEIGHT) - w = self.get(CAP_PROP_FRAME_WIDTH) - ret = True - return ret, _generate_image(width=w, height=h) - - def release(self): - self._is_opened = False - - def __del__(self): - if self._is_opened: - self.release() diff --git a/tests/cameras/test_cv2_camera.py b/tests/cameras/test_cv2_camera.py new file mode 100644 index 00000000..a21c4b11 --- /dev/null +++ b/tests/cameras/test_cv2_camera.py @@ -0,0 +1,614 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# Note: The current test approach uses mock/patch per management decision. +# Future maintainers may consider Dependency Injection for improved testability. +# As it currently stands, it is brittle, requires complex mocks and discourages refactoring. +# Everytime that we change the implementation code, we might need to change the tests. +import unittest +from pathlib import Path +from unittest.mock import MagicMock, call, patch + +import numpy as np + +from lerobot.common.cameras.opencv import OpenCVCamera, OpenCVCameraConfig + +# We might need to mock these +from lerobot.common.errors import ( + DeviceAlreadyConnectedError, + DeviceNotConnectedError, +) + +MODULE_PATH = "lerobot.common.cameras.opencv.camera_opencv" + +# Define constants that might be used by mocks +MOCK_CV2_CAP_PROP_FPS = 30 +MOCK_CV2_CAP_PROP_FRAME_WIDTH = 1280 +MOCK_CV2_CAP_PROP_FRAME_HEIGHT = 720 +MOCK_CV2_ROTATE_90_CLOCKWISE = 90 +MOCK_CV2_ROTATE_180 = 180 +MOCK_CV2_ROTATE_90_COUNTERCLOCKWISE = -90 +MOCK_CV2_COLOR_BGR2RGB = 99 +MOCK_CV2_CAP_V4L2 = 91 +MOCK_CV2_CAP_DSHOW = 92 +MOCK_CV2_CAP_AVFOUNDATION = 93 +MOCK_CV2_CAP_ANY = 90 + + +# Helper function to create a realistic-looking dummy image +def create_dummy_image(height, width, channels=3): + return np.random.randint(0, 256, size=(height, width, channels), dtype=np.uint8) + + +# --- Test Class --- + + +class TestOpenCVCamera(unittest.TestCase): + def setUp(self): + # Default config used in many tests + self.default_config = OpenCVCameraConfig( + camera_index=0, + width=MOCK_CV2_CAP_PROP_FRAME_WIDTH, + height=MOCK_CV2_CAP_PROP_FRAME_HEIGHT, + fps=MOCK_CV2_CAP_PROP_FPS, + color_mode="rgb", + rotation=None, + ) + # Create a default dummy image based on config + self.dummy_bgr_image = create_dummy_image(self.default_config.height, self.default_config.width) + self.dummy_rgb_image = self.dummy_bgr_image[..., ::-1] # Simple BGR -> RGB simulation + + self.patch_dependencies() + + # --- Mock Setup --- + def patch_dependencies(self): + # Mock the cv2 module itself + patcher_cv2 = patch(f"{MODULE_PATH}.cv2", autospec=True) + self.mock_cv2 = patcher_cv2.start() + self.addCleanup(patcher_cv2.stop) + + # Mock the VideoCapture class returned by cv2.VideoCapture + self.mock_capture_instance = MagicMock() + self.mock_cv2.VideoCapture.return_value = self.mock_capture_instance + + # Assign mock constants (important for comparisons and calls) + self.mock_cv2.CAP_PROP_FPS = MOCK_CV2_CAP_PROP_FPS + self.mock_cv2.CAP_PROP_FRAME_WIDTH = MOCK_CV2_CAP_PROP_FRAME_WIDTH + self.mock_cv2.CAP_PROP_FRAME_HEIGHT = MOCK_CV2_CAP_PROP_FRAME_HEIGHT + self.mock_cv2.ROTATE_90_CLOCKWISE = MOCK_CV2_ROTATE_90_CLOCKWISE + self.mock_cv2.ROTATE_180 = MOCK_CV2_ROTATE_180 + self.mock_cv2.ROTATE_90_COUNTERCLOCKWISE = MOCK_CV2_ROTATE_90_COUNTERCLOCKWISE + self.mock_cv2.COLOR_BGR2RGB = MOCK_CV2_COLOR_BGR2RGB + self.mock_cv2.CAP_V4L2 = MOCK_CV2_CAP_V4L2 + self.mock_cv2.CAP_DSHOW = MOCK_CV2_CAP_DSHOW + self.mock_cv2.CAP_AVFOUNDATION = MOCK_CV2_CAP_AVFOUNDATION + self.mock_cv2.CAP_ANY = MOCK_CV2_CAP_ANY + + patcher_platform = patch(f"{MODULE_PATH}.platform.system") + self.mock_platform_system = patcher_platform.start() + self.addCleanup(patcher_platform.stop) + self.mock_platform_system.return_value = "Darwin" # Default to macOS + + mock_is_valid_unix_path = patch(f"{MODULE_PATH}.is_valid_unix_path") + self.mock_is_valid_unix_path = mock_is_valid_unix_path.start() + self.addCleanup(mock_is_valid_unix_path.stop) + self.mock_is_valid_unix_path.return_value = True + + patcher_time_sleep = patch(f"{MODULE_PATH}.time.sleep", return_value=None) + self.mock_sleep = patcher_time_sleep.start() + self.addCleanup(patcher_time_sleep.stop) + + # Mock threading + patcher_thread = patch(f"{MODULE_PATH}.threading.Thread", autospec=True) + self.mock_thread_class = patcher_thread.start() + self.addCleanup(patcher_thread.stop) + self.mock_thread_instance = MagicMock() + self.mock_thread_class.return_value = self.mock_thread_instance + + patcher_event = patch(f"{MODULE_PATH}.threading.Event", autospec=True) + self.mock_event_class = patcher_event.start() + self.addCleanup(patcher_event.stop) + self.mock_event_instance = MagicMock() + self.mock_event_class.return_value = self.mock_event_instance + self.mock_event_instance.is_set.return_value = False # Default to not set + + # Mock internal utility functions if they interact with system/cv2 + patcher_find_cameras = patch(f"{MODULE_PATH}.find_cameras") + self.mock_find_cameras = patcher_find_cameras.start() + self.addCleanup(patcher_find_cameras.stop) + + patcher_get_index = patch(f"{MODULE_PATH}.get_camera_index_from_unix_port") + self.mock_get_camera_index_from_unix_port = patcher_get_index.start() + self.addCleanup(patcher_get_index.stop) + + # --- Test __init__ --- + def test_init_defaults(self): + cam = OpenCVCamera(self.default_config) + + self.assertEqual(cam.camera_index, 0) + self.assertEqual(cam.capture_width, MOCK_CV2_CAP_PROP_FRAME_WIDTH) + self.assertEqual(cam.capture_height, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) + self.assertEqual(cam.width, MOCK_CV2_CAP_PROP_FRAME_WIDTH) # No rotation + self.assertEqual(cam.height, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) # No rotation + self.assertEqual(cam.fps, MOCK_CV2_CAP_PROP_FPS) + self.assertEqual(cam.color_mode, "rgb") + self.assertIsNone(cam.rotation) # Rotation 0 + self.assertIsNone(cam.port) + self.assertFalse(cam.is_connected) + + def test_init_with_rotation_90(self): + config = self.default_config + config.rotation = 90 + cam = OpenCVCamera(config) + self.assertEqual(cam.width, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) # Swapped + self.assertEqual(cam.height, MOCK_CV2_CAP_PROP_FRAME_WIDTH) # Swapped + self.assertEqual(cam.rotation, MOCK_CV2_ROTATE_90_CLOCKWISE) + + def test_init_with_rotation_minus_90(self): + config = self.default_config + config.rotation = -90 + cam = OpenCVCamera(config) + self.assertEqual(cam.width, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) # Swapped + self.assertEqual(cam.height, MOCK_CV2_CAP_PROP_FRAME_WIDTH) # Swapped + self.assertEqual(cam.rotation, MOCK_CV2_ROTATE_90_COUNTERCLOCKWISE) + + def test_init_with_rotation_180(self): + config = self.default_config + config.rotation = 180 + cam = OpenCVCamera(config) + self.assertEqual(cam.width, MOCK_CV2_CAP_PROP_FRAME_WIDTH) # Not swapped + self.assertEqual(cam.height, MOCK_CV2_CAP_PROP_FRAME_HEIGHT) # Not swapped + self.assertEqual(cam.rotation, MOCK_CV2_ROTATE_180) + + def test_init_linux_with_index(self): + self.mock_platform_system.return_value = "Linux" + cam = OpenCVCamera(self.default_config) + self.assertEqual(cam.camera_index, 0) + self.assertIsInstance(cam.port, Path) + self.assertEqual(str(cam.port), "/dev/video0") + + def test_init_linux_with_valid_path(self): + self.mock_platform_system.return_value = "Linux" + self.mock_is_valid_unix_path.return_value = True + self.mock_get_camera_index_from_unix_port.return_value = 2 + config = self.default_config + config.camera_index = "/dev/video2" + cam = OpenCVCamera(config) + + self.assertIsInstance(cam.port, Path) + self.assertEqual(str(cam.port), "/dev/video2") + self.assertEqual(cam.camera_index, 2) + self.mock_is_valid_unix_path.assert_called_once_with("/dev/video2") + + def test_init_linux_with_invalid_path(self): + self.mock_platform_system.return_value = "Linux" + self.mock_is_valid_unix_path.return_value = False + config = self.default_config + config.camera_index = "[*?:[/invalid/path" + with self.assertRaisesRegex(ValueError, "Please check the provided camera_index"): + OpenCVCamera(config) + + # --- Test connect --- + def test_connect_success(self): + cam = OpenCVCamera(self.default_config) + + # Mock the temporary camera check + mock_tmp_capture = MagicMock() + mock_tmp_capture.isOpened.return_value = True + # First call to VideoCapture is the temporary one + # Second call is the real one + self.mock_cv2.VideoCapture.side_effect = [mock_tmp_capture, self.mock_capture_instance] + + # Mock the actual camera setup + self.mock_capture_instance.get.side_effect = [ + self.default_config.fps, + self.default_config.width, + self.default_config.height, + ] + + cam.connect() + + # Check temporary camera interactions + self.assertEqual(self.mock_cv2.VideoCapture.call_count, 2) + self.mock_cv2.VideoCapture.assert_any_call( + self.default_config.camera_index, MOCK_CV2_CAP_AVFOUNDATION + ) + mock_tmp_capture.isOpened.assert_called_once() + mock_tmp_capture.release.assert_called_once() + + # Check actual camera interactions + self.mock_capture_instance.set.assert_has_calls( + [ + call(MOCK_CV2_CAP_PROP_FPS, self.default_config.fps), + call(MOCK_CV2_CAP_PROP_FRAME_WIDTH, self.default_config.width), + call(MOCK_CV2_CAP_PROP_FRAME_HEIGHT, self.default_config.height), + ] + ) + self.mock_capture_instance.get.assert_has_calls( + [ + call(MOCK_CV2_CAP_PROP_FPS), + call(MOCK_CV2_CAP_PROP_FRAME_WIDTH), + call(MOCK_CV2_CAP_PROP_FRAME_HEIGHT), + ] + ) + self.assertTrue(cam.is_connected) + self.assertIsNotNone(cam.camera) + self.mock_cv2.setNumThreads.assert_called_once_with(1) + # Check stored actual values (after rounding) + self.assertEqual(cam.fps, self.default_config.fps) + self.assertEqual(cam.capture_width, self.default_config.width) + self.assertEqual(cam.capture_height, self.default_config.height) + + def test_connect_success_linux(self): + self.mock_platform_system.return_value = "Linux" + cam = OpenCVCamera(self.default_config) + + mock_tmp_capture = MagicMock() + mock_tmp_capture.isOpened.return_value = True + self.mock_cv2.VideoCapture.side_effect = [mock_tmp_capture, self.mock_capture_instance] + self.mock_capture_instance.get.side_effect = [ + MOCK_CV2_CAP_PROP_FPS, + MOCK_CV2_CAP_PROP_FRAME_WIDTH, + MOCK_CV2_CAP_PROP_FRAME_HEIGHT, + ] # Floats are ok + + cam.connect() + + self.assertTrue(cam.is_connected) + self.assertIsNotNone(cam.camera) + self.assertEqual(self.mock_cv2.VideoCapture.call_count, 2) + # Check it uses the port path and correct backend + self.mock_cv2.VideoCapture.assert_any_call("/dev/video0", MOCK_CV2_CAP_V4L2) + + def test_connect_already_connected(self): + cam = OpenCVCamera(self.default_config) + # Simulate already connected state + cam.is_connected = True + with self.assertRaisesRegex(DeviceAlreadyConnectedError, "already connected"): + cam.connect() + cam.is_connected = False # To avoid warning when running the destructor + + def test_connect_camera_not_opened_invalid_index(self): + config = self.default_config + config.camera_index = 99 + cam = OpenCVCamera(config) + + # Mock temporary check failing + mock_tmp_capture = MagicMock() + mock_tmp_capture.isOpened.return_value = False + self.mock_cv2.VideoCapture.return_value = mock_tmp_capture # Only tmp is created + + # Mock find_cameras returning *other* indices + self.mock_find_cameras.return_value = [ + {"index": 0, "port": None}, + {"index": 1, "port": None}, + {"index": 2, "port": None}, + ] + + with self.assertRaisesRegex( + ValueError, "expected to be one of these available cameras \\[0, 1, 2\\], but 99 is provided" + ): + cam.connect() + + self.assertFalse(cam.is_connected) + self.mock_find_cameras.assert_called_once() + mock_tmp_capture.release.assert_called_once() # Should still release tmp capture + + def test_connect_fps_mismatch(self): + config = self.default_config + config.fps = 999 + cam = OpenCVCamera(config) + + mock_tmp_capture = MagicMock() + mock_tmp_capture.isOpened.return_value = True + self.mock_cv2.VideoCapture.side_effect = [mock_tmp_capture, self.mock_capture_instance] + + # Mock get to return a different FPS + self.mock_capture_instance.get.side_effect = [ + MOCK_CV2_CAP_PROP_FPS, # Different FPS + self.default_config.width, + self.default_config.height, + ] + + with self.assertRaisesRegex( + OSError, f"Can't set self.fps=999 .* Actual value is {MOCK_CV2_CAP_PROP_FPS}" + ): + cam.connect() + self.assertFalse(cam.is_connected) + + def test_connect_width_mismatch(self): + config = self.default_config + config.width = 9999 + cam = OpenCVCamera(config) + + mock_tmp_capture = MagicMock() + mock_tmp_capture.isOpened.return_value = True + self.mock_cv2.VideoCapture.side_effect = [mock_tmp_capture, self.mock_capture_instance] + + self.mock_capture_instance.get.side_effect = [ + self.default_config.fps, + MOCK_CV2_CAP_PROP_FRAME_WIDTH, # Different width + self.default_config.height, + ] + + with self.assertRaisesRegex( + OSError, f"Can't set self.capture_width=9999 .* Actual value is {MOCK_CV2_CAP_PROP_FRAME_WIDTH}" + ): + cam.connect() + self.assertFalse(cam.is_connected) + + def test_connect_height_mismatch(self): + config = self.default_config + config.height = 9999 + cam = OpenCVCamera(config) + + mock_tmp_capture = MagicMock() + mock_tmp_capture.isOpened.return_value = True + self.mock_cv2.VideoCapture.side_effect = [mock_tmp_capture, self.mock_capture_instance] + + self.mock_capture_instance.get.side_effect = [ + self.default_config.fps, + self.default_config.width, + MOCK_CV2_CAP_PROP_FRAME_HEIGHT, # Different height + ] + + with self.assertRaisesRegex( + OSError, f"Can't set self.capture_height=9999 .* Actual value is {MOCK_CV2_CAP_PROP_FRAME_HEIGHT}" + ): + cam.connect() + self.assertFalse(cam.is_connected) + + # --- Test read --- + def test_read_not_connected(self): + cam = OpenCVCamera(self.default_config) + with self.assertRaisesRegex(DeviceNotConnectedError, "not connected"): + cam.read() + + def test_read_success_rgb(self): + cam = OpenCVCamera(self.default_config) + cam.camera = self.mock_capture_instance # Simulate connection + cam.is_connected = True + cam.capture_width = MOCK_CV2_CAP_PROP_FRAME_WIDTH # Set dimensions as if connect succeeded + cam.capture_height = MOCK_CV2_CAP_PROP_FRAME_HEIGHT + + self.mock_capture_instance.read.return_value = (True, self.dummy_bgr_image) + self.mock_cv2.cvtColor.return_value = self.dummy_rgb_image # Mock conversion + + img = cam.read() + + self.mock_capture_instance.read.assert_called_once() + self.mock_cv2.cvtColor.assert_called_once_with(self.dummy_bgr_image, MOCK_CV2_COLOR_BGR2RGB) + self.mock_cv2.rotate.assert_not_called() + + np.testing.assert_array_equal(img, self.dummy_rgb_image) + + def test_read_success_bgr(self): + config = self.default_config + config.color_mode = "bgr" # Request BGR + cam = OpenCVCamera(config) + cam.camera = self.mock_capture_instance + cam.is_connected = True + cam.capture_width = MOCK_CV2_CAP_PROP_FRAME_WIDTH + cam.capture_height = MOCK_CV2_CAP_PROP_FRAME_HEIGHT + + self.mock_capture_instance.read.return_value = (True, self.dummy_bgr_image) + + img = cam.read() + + self.mock_capture_instance.read.assert_called_once() + self.mock_cv2.cvtColor.assert_not_called() # Should not convert + np.testing.assert_array_equal(img, self.dummy_bgr_image) # Expect BGR + + def test_read_success_with_rotation(self): + config = self.default_config + config.rotation = 90 + cam = OpenCVCamera(config) + cam.camera = self.mock_capture_instance + cam.is_connected = True + cam.capture_width = MOCK_CV2_CAP_PROP_FRAME_WIDTH # Original capture dimensions + cam.capture_height = MOCK_CV2_CAP_PROP_FRAME_HEIGHT + + self.mock_capture_instance.read.return_value = (True, self.dummy_bgr_image) + self.mock_cv2.cvtColor.return_value = self.dummy_rgb_image + rotated_image = np.rot90(self.dummy_rgb_image) # Simulate rotation + self.mock_cv2.rotate.return_value = rotated_image # Mock rotation result + + img = cam.read() + + self.mock_capture_instance.read.assert_called_once() + self.mock_cv2.cvtColor.assert_called_once_with(self.dummy_bgr_image, MOCK_CV2_COLOR_BGR2RGB) + self.mock_cv2.rotate.assert_called_once_with(self.dummy_rgb_image, MOCK_CV2_ROTATE_90_CLOCKWISE) + np.testing.assert_array_equal(img, rotated_image) + + def test_read_capture_fails(self): + cam = OpenCVCamera(self.default_config) + cam.camera = self.mock_capture_instance + cam.is_connected = True + + self.mock_capture_instance.read.return_value = (False, None) # Simulate read failure + + with self.assertRaisesRegex(OSError, f"Can't capture color image from camera {cam.camera_index}"): + cam.read() + + def test_read_invalid_temporary_color_mode(self): + cam = OpenCVCamera(self.default_config) + cam.camera = self.mock_capture_instance + cam.is_connected = True + cam.capture_width = MOCK_CV2_CAP_PROP_FRAME_WIDTH + cam.capture_height = MOCK_CV2_CAP_PROP_FRAME_HEIGHT + self.mock_capture_instance.read.return_value = ( + True, + self.dummy_bgr_image, + ) # Need read to succeed initially + + with self.assertRaisesRegex( + ValueError, "Expected color values are 'rgb' or 'bgr', but xyz is provided" + ): + cam.read(temporary_color_mode="xyz") + + def test_read_dimension_mismatch(self): + wrong_dim_image = create_dummy_image(240, 320) # Different dimensions + cam = OpenCVCamera(self.default_config) + + cam.camera = self.mock_capture_instance + cam.is_connected = True + cam.capture_width = MOCK_CV2_CAP_PROP_FRAME_WIDTH # Expected dims + cam.capture_height = MOCK_CV2_CAP_PROP_FRAME_HEIGHT + + self.mock_capture_instance.read.return_value = (True, wrong_dim_image) + self.mock_cv2.cvtColor.return_value = wrong_dim_image + + with self.assertRaisesRegex( + OSError, + f"Can't capture color image with expected height and width \\({MOCK_CV2_CAP_PROP_FRAME_HEIGHT} x {MOCK_CV2_CAP_PROP_FRAME_WIDTH}\\). \\(240 x 320\\) returned instead.", + ): + cam.read() + + # --- Test async_read --- + def test_async_read_not_connected(self): + cam = OpenCVCamera(self.default_config) + with self.assertRaisesRegex(DeviceNotConnectedError, "not connected"): + cam.async_read() + + # TODO(Steven): Not sure about this one + @patch.object(OpenCVCamera, "read", autospec=True) # Mock the instance's read method + def test_async_read_starts_thread_and_returns_image(self, mock_instance_read): + cam = OpenCVCamera(self.default_config) + cam.is_connected = True # Simulate connection + + # TODO(Steven): This is dirty + def mock_read_effect(self_cam_instance): + self_cam_instance.color_image = self.dummy_rgb_image + return self.dummy_rgb_image + + mock_instance_read.side_effect = mock_read_effect + + def mock_start(): + cam.read_loop() + + self.mock_thread_instance.start.side_effect = mock_start + + # Make Event.is_set return False initially, then True after disconnect/stop + self.mock_event_instance.is_set.side_effect = [False, True] + + img = cam.async_read() + + # Assert thread was created and started + self.mock_event_class.assert_called_once() + self.mock_thread_class.assert_called_once_with(target=cam.read_loop, args=()) + self.mock_thread_instance.start.assert_called_once() + self.assertTrue(self.mock_thread_instance.daemon) + self.assertIsNotNone(cam.thread) + self.assertIsNotNone(cam.stop_event) + mock_instance_read.assert_called_once_with(cam) + + cam.is_connected = False # To avoid warning when running the destructor + + # Assert the image returned by async_read is the one set by the mocked read + np.testing.assert_array_equal(img, self.dummy_rgb_image) + + @patch.object(OpenCVCamera, "read", autospec=True) + def test_async_read_timeout(self, mock_instance_read): + config = self.default_config + cam = OpenCVCamera(config) + cam.is_connected = True + + # Mock 'read' so it *never* sets color_image + def mock_read_effect(self_cam_instance): + self_cam_instance.color_image = None + return None + + mock_instance_read.side_effect = mock_read_effect + + with self.assertRaisesRegex(TimeoutError, "Timed out waiting for async_read"): + cam.async_read() + + # Assert thread was created and started + self.mock_event_class.assert_called_once() + self.mock_thread_class.assert_called_once_with(target=cam.read_loop, args=()) + self.mock_thread_instance.start.assert_called_once() + self.assertTrue(self.mock_thread_instance.daemon) + self.assertIsNotNone(cam.thread) + self.assertIsNotNone(cam.stop_event) + + cam.is_connected = False # To avoid warning when running the destructor + + # --- Test disconnect --- + def test_disconnect_not_connected(self): + cam = OpenCVCamera(self.default_config) + with self.assertRaisesRegex(DeviceNotConnectedError, "not connected"): + cam.disconnect() + + def test_disconnect_no_thread(self): + cam = OpenCVCamera(self.default_config) + cam.camera = self.mock_capture_instance + cam.is_connected = True + + cam.disconnect() + + self.assertIsNone(cam.camera) + self.assertFalse(cam.is_connected) + self.assertIsNone(cam.thread) + self.assertIsNone(cam.stop_event) + + self.mock_capture_instance.release.assert_called_once() + self.mock_event_instance.set.assert_not_called() + self.mock_thread_instance.join.assert_not_called() + + @patch.object(OpenCVCamera, "read", autospec=True) + def test_disconnect_with_thread(self, mock_instance_read): + cam = OpenCVCamera(self.default_config) + cam.camera = self.mock_capture_instance + cam.is_connected = True + + # TODO(Steven): This is dirty + def mock_read_effect(self_cam_instance): + self_cam_instance.color_image = self.dummy_rgb_image + return self.dummy_rgb_image + + mock_instance_read.side_effect = mock_read_effect + + def mock_start(): + cam.read_loop() + + self.mock_thread_instance.start.side_effect = mock_start + + # Make Event.is_set return False initially, then True after disconnect/stop + self.mock_event_instance.is_set.side_effect = [False, True] + + # Start the thread + _img = cam.async_read() + + self.assertIsNotNone(cam.thread) + self.assertIsNotNone(cam.stop_event) + + cam.disconnect() + + # Check thread management + self.mock_event_instance.set.assert_called_once() + self.mock_thread_instance.join.assert_called_once() + + # Check camera release + self.mock_capture_instance.release.assert_called_once() + + # Check state reset + self.assertIsNone(cam.camera) + self.assertFalse(cam.is_connected) + self.assertIsNone(cam.thread) + self.assertIsNone(cam.stop_event) + + +if __name__ == "__main__": + unittest.main(argv=["first-arg-is-ignored"], exit=False)