[pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci
This commit is contained in:
pre-commit-ci[bot] 2025-03-16 06:54:44 +00:00
parent 130c16b43d
commit d978053ebf
3 changed files with 13 additions and 24 deletions

View File

@ -113,6 +113,7 @@ class IntelRealSenseCameraConfig(CameraConfig):
if self.rotation not in [-90, None, 90, 180]: if self.rotation not in [-90, None, 90, 180]:
raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})") raise ValueError(f"`rotation` must be in [-90, None, 90, 180] (got {self.rotation})")
@CameraConfig.register_subclass("ros2") @CameraConfig.register_subclass("ros2")
@dataclass @dataclass
class ROS2CameraConfig(CameraConfig): class ROS2CameraConfig(CameraConfig):
@ -123,6 +124,7 @@ class ROS2CameraConfig(CameraConfig):
ROS2CameraConfig(topic="/image_raw", fps=30, width=640, height=480) ROS2CameraConfig(topic="/image_raw", fps=30, width=640, height=480)
``` ```
""" """
# TODO(Yadunund): Consider converting inputs to lists to subscribe to multiple # TODO(Yadunund): Consider converting inputs to lists to subscribe to multiple
# topics with the same node. # topics with the same node.
topic: str topic: str

View File

@ -8,26 +8,23 @@ import shutil
import threading import threading
import time import time
from pathlib import Path from pathlib import Path
import time
from threading import Thread from threading import Thread
# TODO(Yadunund): Implement mock.
import cv2
import numpy as np import numpy as np
import rclpy
from cv_bridge import CvBridge
from PIL import Image from PIL import Image
from sensor_msgs.msg import Image as ImageMsg
from lerobot.common.robot_devices.cameras.configs import ROS2CameraConfig from lerobot.common.robot_devices.cameras.configs import ROS2CameraConfig
from lerobot.common.robot_devices.utils import ( from lerobot.common.robot_devices.utils import (
RobotDeviceAlreadyConnectedError, RobotDeviceAlreadyConnectedError,
RobotDeviceNotConnectedError, RobotDeviceNotConnectedError,
busy_wait,
) )
from lerobot.common.utils.utils import capture_timestamp_utc from lerobot.common.utils.utils import capture_timestamp_utc
# TODO(Yadunund): Implement mock.
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image as ImageMsg
import rclpy
def save_image(img_array, camera_name, frame_index, images_dir): def save_image(img_array, camera_name, frame_index, images_dir):
img = Image.fromarray(img_array) img = Image.fromarray(img_array)
@ -48,9 +45,7 @@ def save_images_from_cameras(
camera = ROS2Camera(config) camera = ROS2Camera(config)
camera.connect() camera.connect()
print("Here 0") print("Here 0")
print( print(f"ROS2Camera({camera.node.get_name()}")
f"ROS2Camera({camera.node.get_name()}"
)
cameras.append(camera) cameras.append(camera)
images_dir = Path(images_dir) images_dir = Path(images_dir)
@ -140,9 +135,9 @@ class ROS2Camera:
self.color_mode = config.color_mode self.color_mode = config.color_mode
self.mock = config.mock self.mock = config.mock
self.br : CvBridge = CvBridge() self.br: CvBridge = CvBridge()
self.image_msg : ImageMsg | None = None self.image_msg: ImageMsg | None = None
self.is_connected : bool = False self.is_connected: bool = False
self.sub = None self.sub = None
self.logs = {} self.logs = {}
self.node = rclpy.create_node("lerobot_camera_node") self.node = rclpy.create_node("lerobot_camera_node")
@ -167,12 +162,7 @@ class ROS2Camera:
if self.is_connected: if self.is_connected:
raise RobotDeviceAlreadyConnectedError(f"ROS2Camera({self.config.topic}) is already connected.") raise RobotDeviceAlreadyConnectedError(f"ROS2Camera({self.config.topic}) is already connected.")
self.sub = self.node.create_subscription( self.sub = self.node.create_subscription(ImageMsg, self.config.topic, self.sub_cb, 10)
ImageMsg,
self.config.topic,
self.sub_cb,
10
)
while self.image_msg is None: while self.image_msg is None:
print(f"Waiting to receive message over {self.config.topic}") print(f"Waiting to receive message over {self.config.topic}")
@ -198,7 +188,6 @@ class ROS2Camera:
f"ROS2Camera({self.node.get_name()}) is not connected. Try running `camera.connect()` first." f"ROS2Camera({self.node.get_name()}) is not connected. Try running `camera.connect()` first."
) )
start_time = time.perf_counter() start_time = time.perf_counter()
desired_encoding = "passthrough" if temporary_color is None else temporary_color desired_encoding = "passthrough" if temporary_color is None else temporary_color
@ -219,9 +208,8 @@ class ROS2Camera:
return image return image
def async_read(self): def async_read(self):
return self.read() return self.read()
def disconnect(self): def disconnect(self):
if not self.is_connected: if not self.is_connected:

View File

@ -22,7 +22,6 @@ from lerobot.common.robot_devices.cameras.configs import (
CameraConfig, CameraConfig,
IntelRealSenseCameraConfig, IntelRealSenseCameraConfig,
OpenCVCameraConfig, OpenCVCameraConfig,
ROS2CameraConfig,
) )
from lerobot.common.robot_devices.motors.configs import ( from lerobot.common.robot_devices.motors.configs import (
DynamixelMotorsBusConfig, DynamixelMotorsBusConfig,