Add real robot devices and scripts to control real robot (#288)

Co-authored-by: Simon Alibert <alibert.sim@gmail.com>
This commit is contained in:
Remi 2024-07-15 17:43:10 +02:00 committed by GitHub
parent 5ffcb48a9a
commit 5e54e39795
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 3068 additions and 46 deletions

1
.gitignore vendored
View File

@ -122,7 +122,6 @@ celerybeat.pid
.env
.venv
venv/
ENV/
env.bak/
venv.bak/

View File

@ -21,7 +21,7 @@ RUN echo "source /opt/venv/bin/activate" >> /root/.bashrc
COPY . /lerobot
WORKDIR /lerobot
RUN pip install --upgrade --no-cache-dir pip
RUN pip install --no-cache-dir ".[test, aloha, xarm, pusht]" \
RUN pip install --no-cache-dir ".[test, aloha, xarm, pusht, koch]" \
--extra-index-url https://download.pytorch.org/whl/cpu
# Set EGL as the rendering backend for MuJoCo

View File

@ -43,7 +43,6 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libsvtav1-dev libsvtav1enc-dev libsvtav1dec-dev \
libdav1d-dev
# Install gh cli tool
RUN (type -p wget >/dev/null || (apt update && apt-get install wget -y)) \
&& mkdir -p -m 755 /etc/apt/keyrings \

View File

@ -9,7 +9,7 @@ ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential cmake \
libglib2.0-0 libgl1-mesa-glx libegl1-mesa ffmpeg \
python${PYTHON_VERSION} python${PYTHON_VERSION}-venv \
python${PYTHON_VERSION}-dev python${PYTHON_VERSION}-venv \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
@ -23,7 +23,7 @@ RUN echo "source /opt/venv/bin/activate" >> /root/.bashrc
COPY . /lerobot
WORKDIR /lerobot
RUN pip install --upgrade --no-cache-dir pip
RUN pip install --no-cache-dir ".[test, aloha, xarm, pusht]"
RUN pip install --no-cache-dir ".[test, aloha, xarm, pusht, koch]"
# Set EGL as the rendering backend for MuJoCo
ENV MUJOCO_GL="egl"

View File

@ -207,7 +207,8 @@ def encode_video_frames(
ffmpeg_args.append("-y")
ffmpeg_cmd = ["ffmpeg"] + ffmpeg_args + [str(video_path)]
subprocess.run(ffmpeg_cmd, check=True)
# redirect stdin to subprocess.DEVNULL to prevent reading random keyboard inputs from terminal
subprocess.run(ffmpeg_cmd, check=True, stdin=subprocess.DEVNULL)
@dataclass

View File

@ -19,7 +19,7 @@ import gymnasium as gym
from omegaconf import DictConfig
def make_env(cfg: DictConfig, n_envs: int | None = None) -> gym.vector.VectorEnv:
def make_env(cfg: DictConfig, n_envs: int | None = None) -> gym.vector.VectorEnv | None:
"""Makes a gym vector environment according to the evaluation config.
n_envs can be used to override eval.batch_size in the configuration. Must be at least 1.
@ -27,6 +27,9 @@ def make_env(cfg: DictConfig, n_envs: int | None = None) -> gym.vector.VectorEnv
if n_envs is not None and n_envs < 1:
raise ValueError("`n_envs must be at least 1")
if cfg.env.name == "real_world":
return
package_name = f"gym_{cfg.env.name}"
try:

View File

@ -0,0 +1,404 @@
"""
This file contains utilities for recording frames from cameras. For more info look at `OpenCVCamera` docstring.
"""
import argparse
import concurrent.futures
import math
import shutil
import threading
import time
from dataclasses import dataclass, replace
from pathlib import Path
from threading import Thread
import cv2
import numpy as np
from PIL import Image
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.utils.utils import capture_timestamp_utc
from lerobot.scripts.control_robot import busy_wait
# Use 1 thread to avoid blocking the main thread. Especially useful during data collection
# when other threads are used to save the images.
cv2.setNumThreads(1)
# The maximum opencv device index depends on your operating system. For instance,
# if you have 3 cameras, they should be associated to index 0, 1, and 2. This is the case
# on MacOS. However, on Ubuntu, the indices are different like 6, 16, 23.
# When you change the USB port or reboot the computer, the operating system might
# treat the same cameras as new devices. Thus we select a higher bound to search indices.
MAX_OPENCV_INDEX = 60
def find_camera_indices(raise_when_empty=False, max_index_search_range=MAX_OPENCV_INDEX):
camera_ids = []
for camera_idx in range(max_index_search_range):
camera = cv2.VideoCapture(camera_idx)
is_open = camera.isOpened()
camera.release()
if is_open:
print(f"Camera found at index {camera_idx}")
camera_ids.append(camera_idx)
if raise_when_empty and len(camera_ids) == 0:
raise OSError(
"Not a single camera was detected. Try re-plugging, or re-installing `opencv2`, or your camera driver, or make sure your camera is compatible with opencv2."
)
return camera_ids
def save_image(img_array, camera_index, frame_index, images_dir):
img = Image.fromarray(img_array)
path = images_dir / f"camera_{camera_index:02d}_frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path), quality=100)
def save_images_from_cameras(
images_dir: Path, camera_ids=None, fps=None, width=None, height=None, record_time_s=2
):
if camera_ids is None:
print("Finding available camera indices")
camera_ids = find_camera_indices()
print("Connecting cameras")
cameras = []
for cam_idx in camera_ids:
camera = OpenCVCamera(cam_idx, fps=fps, width=width, height=height)
camera.connect()
print(
f"OpenCVCamera({camera.camera_index}, fps={camera.fps}, width={camera.width}, height={camera.height}, color_mode={camera.color_mode})"
)
cameras.append(camera)
images_dir = Path(
images_dir,
)
if images_dir.exists():
shutil.rmtree(
images_dir,
)
images_dir.mkdir(parents=True, exist_ok=True)
print(f"Saving images to {images_dir}")
frame_index = 0
start_time = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
while True:
now = time.perf_counter()
for camera in cameras:
# If we use async_read when fps is None, the loop will go full speed, and we will endup
# saving the same images from the cameras multiple times until the RAM/disk is full.
image = camera.read() if fps is None else camera.async_read()
executor.submit(
save_image,
image,
camera.camera_index,
frame_index,
images_dir,
)
if fps is not None:
dt_s = time.perf_counter() - now
busy_wait(1 / fps - dt_s)
if time.perf_counter() - start_time > record_time_s:
break
print(f"Frame: {frame_index:04d}\tLatency (ms): {(time.perf_counter() - now) * 1000:.2f}")
frame_index += 1
print(f"Images have been saved to {images_dir}")
@dataclass
class OpenCVCameraConfig:
"""
Example of tested options for Intel Real Sense D405:
```python
OpenCVCameraConfig(30, 640, 480)
OpenCVCameraConfig(60, 640, 480)
OpenCVCameraConfig(90, 640, 480)
OpenCVCameraConfig(30, 1280, 720)
```
"""
fps: int | None = None
width: int | None = None
height: int | None = None
color_mode: str = "rgb"
def __post_init__(self):
if self.color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"Expected color_mode values are 'rgb' or 'bgr', but {self.color_mode} is provided."
)
class OpenCVCamera:
"""
The OpenCVCamera class allows to efficiently record images from cameras. It relies on opencv2 to communicate
with the cameras. Most cameras are compatible. For more info, see the [Video I/O with OpenCV Overview](https://docs.opencv.org/4.x/d0/da7/videoio_overview.html).
An OpenCVCamera instance requires a camera index (e.g. `OpenCVCamera(camera_index=0)`). When you only have one camera
like a webcam of a laptop, the camera index is expected to be 0, but it might also be very different, and the camera index
might change if you reboot your computer or re-plug your camera. This behavior depends on your operation system.
To find the camera indices of your cameras, you can run our utility script that will be save a few frames for each camera:
```bash
python lerobot/common/robot_devices/cameras/opencv.py --images-dir outputs/images_from_opencv_cameras
```
When an OpenCVCamera is instantiated, if no specific config is provided, the default fps, width, height and color_mode
of the given camera will be used.
Example of usage of the class:
```python
camera = OpenCVCamera(camera_index=0)
camera.connect()
color_image = camera.read()
# when done using the camera, consider disconnecting
camera.disconnect()
```
Example of changing default fps, width, height and color_mode:
```python
camera = OpenCVCamera(0, fps=30, width=1280, height=720)
camera = connect() # applies the settings, might error out if these settings are not compatible with the camera
camera = OpenCVCamera(0, fps=90, width=640, height=480)
camera = connect()
camera = OpenCVCamera(0, fps=90, width=640, height=480, color_mode="bgr")
camera = connect()
```
"""
def __init__(self, camera_index: int, config: OpenCVCameraConfig | None = None, **kwargs):
if config is None:
config = OpenCVCameraConfig()
# Overwrite config arguments using kwargs
config = replace(config, **kwargs)
self.camera_index = camera_index
self.fps = config.fps
self.width = config.width
self.height = config.height
self.color_mode = config.color_mode
if not isinstance(self.camera_index, int):
raise ValueError(
f"Camera index must be provided as an int, but {self.camera_index} was given instead."
)
self.camera = None
self.is_connected = False
self.thread = None
self.stop_event = None
self.color_image = None
self.logs = {}
def connect(self):
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(f"Camera {self.camera_index} is already connected.")
# First create a temporary camera trying to access `camera_index`,
# and verify it is a valid camera by calling `isOpened`.
tmp_camera = cv2.VideoCapture(self.camera_index)
is_camera_open = tmp_camera.isOpened()
# Release camera to make it accessible for `find_camera_indices`
del tmp_camera
# If the camera doesn't work, display the camera indices corresponding to
# valid cameras.
if not is_camera_open:
# Verify that the provided `camera_index` is valid before printing the traceback
available_cam_ids = find_camera_indices()
if self.camera_index not in available_cam_ids:
raise ValueError(
f"`camera_index` is expected to be one of these available cameras {available_cam_ids}, but {self.camera_index} is provided instead."
)
raise OSError(f"Can't access camera {self.camera_index}.")
# Secondly, create the camera that will be used downstream.
# Note: For some unknown reason, calling `isOpened` blocks the camera which then
# needs to be re-created.
self.camera = cv2.VideoCapture(self.camera_index)
if self.fps is not None:
self.camera.set(cv2.CAP_PROP_FPS, self.fps)
if self.width is not None:
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
if self.height is not None:
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
actual_fps = self.camera.get(cv2.CAP_PROP_FPS)
actual_width = self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)
actual_height = self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)
if self.fps is not None and not math.isclose(self.fps, actual_fps, rel_tol=1e-3):
raise OSError(
f"Can't set {self.fps=} for camera {self.camera_index}. Actual value is {actual_fps}."
)
if self.width is not None and self.width != actual_width:
raise OSError(
f"Can't set {self.width=} for camera {self.camera_index}. Actual value is {actual_width}."
)
if self.height is not None and self.height != actual_height:
raise OSError(
f"Can't set {self.height=} for camera {self.camera_index}. Actual value is {actual_height}."
)
self.fps = actual_fps
self.width = actual_width
self.height = actual_height
self.is_connected = True
def read(self, temporary_color_mode: str | None = None) -> np.ndarray:
"""Read a frame from the camera returned in the format (height, width, channels)
(e.g. (640, 480, 3)), contrarily to the pytorch format which is channel first.
Note: Reading a frame is done every `camera.fps` times per second, and it is blocking.
If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`.
"""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
)
start_time = time.perf_counter()
ret, color_image = self.camera.read()
if not ret:
raise OSError(f"Can't capture color image from camera {self.camera_index}.")
requested_color_mode = self.color_mode if temporary_color_mode is None else temporary_color_mode
if requested_color_mode not in ["rgb", "bgr"]:
raise ValueError(
f"Expected color values are 'rgb' or 'bgr', but {requested_color_mode} is provided."
)
# OpenCV uses BGR format as default (blue, green red) for all operations, including displaying images.
# However, Deep Learning framework such as LeRobot uses RGB format as default to train neural networks,
# so we convert the image color from BGR to RGB.
if requested_color_mode == "rgb":
color_image = cv2.cvtColor(color_image, cv2.COLOR_BGR2RGB)
h, w, _ = color_image.shape
if h != self.height or w != self.width:
raise OSError(
f"Can't capture color image with expected height and width ({self.height} x {self.width}). ({h} x {w}) returned instead."
)
# log the number of seconds it took to read the image
self.logs["delta_timestamp_s"] = time.perf_counter() - start_time
# log the utc time at which the image was received
self.logs["timestamp_utc"] = capture_timestamp_utc()
return color_image
def read_loop(self):
while self.stop_event is None or not self.stop_event.is_set():
self.color_image = self.read()
def async_read(self):
if not self.is_connected:
raise RobotDeviceNotConnectedError(
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
)
if self.thread is None:
self.stop_event = threading.Event()
self.thread = Thread(target=self.read_loop, args=())
self.thread.daemon = True
self.thread.start()
num_tries = 0
while self.color_image is None:
num_tries += 1
time.sleep(1 / self.fps)
if num_tries > self.fps and (self.thread.ident is None or not self.thread.is_alive()):
raise Exception(
"The thread responsible for `self.async_read()` took too much time to start. There might be an issue. Verify that `self.thread.start()` has been called."
)
return self.color_image
def disconnect(self):
if not self.is_connected:
raise RobotDeviceNotConnectedError(
f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first."
)
if self.thread is not None and self.thread.is_alive():
# wait for the thread to finish
self.stop_event.set()
self.thread.join()
self.thread = None
self.stop_event = None
self.camera.release()
self.camera = None
self.is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Save a few frames using `OpenCVCamera` for all cameras connected to the computer, or a selected subset."
)
parser.add_argument(
"--camera-ids",
type=int,
nargs="*",
default=None,
help="List of camera indices used to instantiate the `OpenCVCamera`. If not provided, find and use all available camera indices.",
)
parser.add_argument(
"--fps",
type=int,
default=None,
help="Set the number of frames recorded per seconds for all cameras. If not provided, use the default fps of each camera.",
)
parser.add_argument(
"--width",
type=str,
default=None,
help="Set the width for all cameras. If not provided, use the default width of each camera.",
)
parser.add_argument(
"--height",
type=str,
default=None,
help="Set the height for all cameras. If not provided, use the default height of each camera.",
)
parser.add_argument(
"--images-dir",
type=Path,
default="outputs/images_from_opencv_cameras",
help="Set directory to save a few frames for each camera.",
)
parser.add_argument(
"--record-time-s",
type=float,
default=2.0,
help="Set the number of seconds used to record the frames. By default, 2 seconds.",
)
args = parser.parse_args()
save_images_from_cameras(**vars(args))

View File

@ -0,0 +1,47 @@
from pathlib import Path
from typing import Protocol
import cv2
import numpy as np
def write_shape_on_image_inplace(image):
height, width = image.shape[:2]
text = f"Width: {width} Height: {height}"
# Define the font, scale, color, and thickness
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
color = (255, 0, 0) # Blue in BGR
thickness = 2
position = (10, height - 10) # 10 pixels from the bottom-left corner
cv2.putText(image, text, position, font, font_scale, color, thickness)
def save_color_image(image, path, write_shape=False):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
if write_shape:
write_shape_on_image_inplace(image)
cv2.imwrite(str(path), image)
def save_depth_image(depth, path, write_shape=False):
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
# Apply colormap on depth image (image must be converted to 8-bit per pixel first)
depth_image = cv2.applyColorMap(cv2.convertScaleAbs(depth, alpha=0.03), cv2.COLORMAP_JET)
if write_shape:
write_shape_on_image_inplace(depth_image)
cv2.imwrite(str(path), depth_image)
# Defines a camera type
class Camera(Protocol):
def connect(self): ...
def read(self, temporary_color: str | None = None) -> np.ndarray: ...
def async_read(self) -> np.ndarray: ...
def disconnect(self): ...

View File

@ -0,0 +1,492 @@
import enum
import time
import traceback
from copy import deepcopy
from pathlib import Path
import numpy as np
from dynamixel_sdk import (
COMM_SUCCESS,
DXL_HIBYTE,
DXL_HIWORD,
DXL_LOBYTE,
DXL_LOWORD,
GroupSyncRead,
GroupSyncWrite,
PacketHandler,
PortHandler,
)
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.utils.utils import capture_timestamp_utc
PROTOCOL_VERSION = 2.0
BAUD_RATE = 1_000_000
TIMEOUT_MS = 1000
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m077
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m288
# https://emanual.robotis.com/docs/en/dxl/x/xl430-w250
# https://emanual.robotis.com/docs/en/dxl/x/xm430-w350
# https://emanual.robotis.com/docs/en/dxl/x/xm540-w270
# data_name: (address, size_byte)
X_SERIES_CONTROL_TABLE = {
"Model_Number": (0, 2),
"Model_Information": (2, 4),
"Firmware_Version": (6, 1),
"ID": (7, 1),
"Baud_Rate": (8, 1),
"Return_Delay_Time": (9, 1),
"Drive_Mode": (10, 1),
"Operating_Mode": (11, 1),
"Secondary_ID": (12, 1),
"Protocol_Type": (13, 1),
"Homing_Offset": (20, 4),
"Moving_Threshold": (24, 4),
"Temperature_Limit": (31, 1),
"Max_Voltage_Limit": (32, 2),
"Min_Voltage_Limit": (34, 2),
"PWM_Limit": (36, 2),
"Current_Limit": (38, 2),
"Acceleration_Limit": (40, 4),
"Velocity_Limit": (44, 4),
"Max_Position_Limit": (48, 4),
"Min_Position_Limit": (52, 4),
"Shutdown": (63, 1),
"Torque_Enable": (64, 1),
"LED": (65, 1),
"Status_Return_Level": (68, 1),
"Registered_Instruction": (69, 1),
"Hardware_Error_Status": (70, 1),
"Velocity_I_Gain": (76, 2),
"Velocity_P_Gain": (78, 2),
"Position_D_Gain": (80, 2),
"Position_I_Gain": (82, 2),
"Position_P_Gain": (84, 2),
"Feedforward_2nd_Gain": (88, 2),
"Feedforward_1st_Gain": (90, 2),
"Bus_Watchdog": (98, 1),
"Goal_PWM": (100, 2),
"Goal_Current": (102, 2),
"Goal_Velocity": (104, 4),
"Profile_Acceleration": (108, 4),
"Profile_Velocity": (112, 4),
"Goal_Position": (116, 4),
"Realtime_Tick": (120, 2),
"Moving": (122, 1),
"Moving_Status": (123, 1),
"Present_PWM": (124, 2),
"Present_Current": (126, 2),
"Present_Velocity": (128, 4),
"Present_Position": (132, 4),
"Velocity_Trajectory": (136, 4),
"Position_Trajectory": (140, 4),
"Present_Input_Voltage": (144, 2),
"Present_Temperature": (146, 1),
}
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]
CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
MODEL_CONTROL_TABLE = {
"x_series": X_SERIES_CONTROL_TABLE,
"xl330-m077": X_SERIES_CONTROL_TABLE,
"xl330-m288": X_SERIES_CONTROL_TABLE,
"xl430-w250": X_SERIES_CONTROL_TABLE,
"xm430-w350": X_SERIES_CONTROL_TABLE,
"xm540-w270": X_SERIES_CONTROL_TABLE,
}
NUM_READ_RETRY = 10
def get_group_sync_key(data_name, motor_names):
group_key = f"{data_name}_" + "_".join(motor_names)
return group_key
def get_result_name(fn_name, data_name, motor_names):
group_key = get_group_sync_key(data_name, motor_names)
rslt_name = f"{fn_name}_{group_key}"
return rslt_name
def get_queue_name(fn_name, data_name, motor_names):
group_key = get_group_sync_key(data_name, motor_names)
queue_name = f"{fn_name}_{group_key}"
return queue_name
def get_log_name(var_name, fn_name, data_name, motor_names):
group_key = get_group_sync_key(data_name, motor_names)
log_name = f"{var_name}_{fn_name}_{group_key}"
return log_name
def assert_same_address(model_ctrl_table, motor_models, data_name):
all_addr = []
all_bytes = []
for model in motor_models:
addr, bytes = model_ctrl_table[model][data_name]
all_addr.append(addr)
all_bytes.append(bytes)
if len(set(all_addr)) != 1:
raise NotImplementedError(
f"At least two motor models use a different address for `data_name`='{data_name}' ({list(zip(motor_models, all_addr, strict=False))}). Contact a LeRobot maintainer."
)
if len(set(all_bytes)) != 1:
raise NotImplementedError(
f"At least two motor models use a different bytes representation for `data_name`='{data_name}' ({list(zip(motor_models, all_bytes, strict=False))}). Contact a LeRobot maintainer."
)
def find_available_ports():
ports = []
for path in Path("/dev").glob("tty*"):
ports.append(str(path))
return ports
def find_port():
print("Finding all available ports for the DynamixelMotorsBus.")
ports_before = find_available_ports()
print(ports_before)
print("Remove the usb cable from your DynamixelMotorsBus and press Enter when done.")
input()
time.sleep(0.5)
ports_after = find_available_ports()
ports_diff = list(set(ports_before) - set(ports_after))
if len(ports_diff) == 1:
port = ports_diff[0]
print(f"The port of this DynamixelMotorsBus is '{port}'")
print("Reconnect the usb cable.")
elif len(ports_diff) == 0:
raise OSError(f"Could not detect the port. No difference was found ({ports_diff}).")
else:
raise OSError(f"Could not detect the port. More than one port was found ({ports_diff}).")
class TorqueMode(enum.Enum):
ENABLED = 1
DISABLED = 0
class OperatingMode(enum.Enum):
VELOCITY = 1
POSITION = 3
EXTENDED_POSITION = 4
CURRENT_CONTROLLED_POSITION = 5
PWM = 16
UNKNOWN = -1
class DriveMode(enum.Enum):
NON_INVERTED = 0
INVERTED = 1
class DynamixelMotorsBus:
# TODO(rcadene): Add a script to find the motor indices without DynamixelWizzard2
"""
The DynamixelMotorsBus class allows to efficiently read and write to the attached motors. It relies on
the python dynamixel sdk to communicate with the motors. For more info, see the [Dynamixel SDK Documentation](https://emanual.robotis.com/docs/en/software/dynamixel/dynamixel_sdk/sample_code/python_read_write_protocol_2_0/#python-read-write-protocol-20).
A DynamixelMotorsBus instance requires a port (e.g. `DynamixelMotorsBus(port="/dev/tty.usbmodem575E0031751"`)).
To find the port, you can run our utility script:
```bash
python lerobot/common/robot_devices/motors/dynamixel.py
>>> Finding all available ports for the DynamixelMotorsBus.
>>> ['/dev/tty.usbmodem575E0032081', '/dev/tty.usbmodem575E0031751']
>>> Remove the usb cable from your DynamixelMotorsBus and press Enter when done.
>>> The port of this DynamixelMotorsBus is /dev/tty.usbmodem575E0031751.
>>> Reconnect the usb cable.
```
To find the motor indices, use [DynamixelWizzard2](https://emanual.robotis.com/docs/en/software/dynamixel/dynamixel_wizard2).
Example of usage for 1 motor connected to the bus:
```python
motor_name = "gripper"
motor_index = 6
motor_model = "xl330-m077"
motors_bus = DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0031751",
motors={motor_name: (motor_index, motor_model)},
)
motors_bus.connect()
motors_bus.teleop_step()
# when done, consider disconnecting
motors_bus.disconnect()
```
"""
def __init__(
self,
port: str,
motors: dict[str, tuple[int, str]],
extra_model_control_table: dict[str, list[tuple]] | None = None,
):
self.port = port
self.motors = motors
self.model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
if extra_model_control_table:
self.model_ctrl_table.update(extra_model_control_table)
self.port_handler = None
self.packet_handler = None
self.calibration = None
self.is_connected = False
self.group_readers = {}
self.group_writers = {}
self.logs = {}
def connect(self):
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(
f"DynamixelMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice."
)
self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION)
try:
if not self.port_handler.openPort():
raise OSError(f"Failed to open port '{self.port}'.")
except Exception:
traceback.print_exc()
print(
"\nTry running `python lerobot/common/robot_devices/motors/dynamixel.py` to make sure you are using the correct port.\n"
)
raise
self.port_handler.setBaudRate(BAUD_RATE)
self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS)
self.is_connected = True
@property
def motor_names(self) -> list[int]:
return list(self.motors.keys())
def set_calibration(self, calibration: dict[str, tuple[int, bool]]):
self.calibration = calibration
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
if not self.calibration:
return values
if motor_names is None:
motor_names = self.motor_names
for i, name in enumerate(motor_names):
homing_offset, drive_mode = self.calibration[name]
if values[i] is not None:
if drive_mode:
values[i] *= -1
values[i] += homing_offset
return values
def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
if not self.calibration:
return values
if motor_names is None:
motor_names = self.motor_names
for i, name in enumerate(motor_names):
homing_offset, drive_mode = self.calibration[name]
if values[i] is not None:
values[i] -= homing_offset
if drive_mode:
values[i] *= -1
return values
def read(self, data_name, motor_names: str | list[str] | None = None):
if not self.is_connected:
raise RobotDeviceNotConnectedError(
f"DynamixelMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`."
)
start_time = time.perf_counter()
if motor_names is None:
motor_names = self.motor_names
if isinstance(motor_names, str):
motor_names = [motor_names]
motor_ids = []
models = []
for name in motor_names:
motor_idx, model = self.motors[name]
motor_ids.append(motor_idx)
models.append(model)
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
if data_name not in self.group_readers:
# create new group reader
self.group_readers[group_key] = GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
for idx in motor_ids:
self.group_readers[group_key].addParam(idx)
for _ in range(NUM_READ_RETRY):
comm = self.group_readers[group_key].txRxPacket()
if comm == COMM_SUCCESS:
break
if comm != COMM_SUCCESS:
raise ConnectionError(
f"Read failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
values = []
for idx in motor_ids:
value = self.group_readers[group_key].getData(idx, addr, bytes)
values.append(value)
values = np.array(values)
# Convert to signed int to use range [-2048, 2048] for our motor positions.
if data_name in CONVERT_UINT32_TO_INT32_REQUIRED:
values = values.astype(np.int32)
if data_name in CALIBRATION_REQUIRED:
values = self.apply_calibration(values, motor_names)
# log the number of seconds it took to read the data from the motors
delta_ts_name = get_log_name("delta_timestamp_s", "read", data_name, motor_names)
self.logs[delta_ts_name] = time.perf_counter() - start_time
# log the utc time at which the data was received
ts_utc_name = get_log_name("timestamp_utc", "read", data_name, motor_names)
self.logs[ts_utc_name] = capture_timestamp_utc()
return values
def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None):
if not self.is_connected:
raise RobotDeviceNotConnectedError(
f"DynamixelMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`."
)
start_time = time.perf_counter()
if motor_names is None:
motor_names = self.motor_names
if isinstance(motor_names, str):
motor_names = [motor_names]
if isinstance(values, (int, float, np.integer)):
values = [int(values)] * len(motor_names)
values = np.array(values)
motor_ids = []
models = []
for name in motor_names:
motor_idx, model = self.motors[name]
motor_ids.append(motor_idx)
models.append(model)
if data_name in CALIBRATION_REQUIRED:
values = self.revert_calibration(values, motor_names)
values = values.tolist()
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
init_group = data_name not in self.group_readers
if init_group:
self.group_writers[group_key] = GroupSyncWrite(
self.port_handler, self.packet_handler, addr, bytes
)
for idx, value in zip(motor_ids, values, strict=True):
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if bytes == 1:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
]
elif bytes == 2:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
DXL_HIBYTE(DXL_LOWORD(value)),
]
elif bytes == 4:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
DXL_HIBYTE(DXL_LOWORD(value)),
DXL_LOBYTE(DXL_HIWORD(value)),
DXL_HIBYTE(DXL_HIWORD(value)),
]
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{bytes} is provided instead."
)
if init_group:
self.group_writers[group_key].addParam(idx, data)
else:
self.group_writers[group_key].changeParam(idx, data)
comm = self.group_writers[group_key].txPacket()
if comm != COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
# log the number of seconds it took to write the data to the motors
delta_ts_name = get_log_name("delta_timestamp_s", "write", data_name, motor_names)
self.logs[delta_ts_name] = time.perf_counter() - start_time
# TODO(rcadene): should we log the time before sending the write command?
# log the utc time when the write has been completed
ts_utc_name = get_log_name("timestamp_utc", "write", data_name, motor_names)
self.logs[ts_utc_name] = capture_timestamp_utc()
def disconnect(self):
if not self.is_connected:
raise RobotDeviceNotConnectedError(
f"DynamixelMotorsBus({self.port}) is not connected. Try running `motors_bus.connect()` first."
)
if self.port_handler is not None:
self.port_handler.closePort()
self.port_handler = None
self.packet_handler = None
self.group_readers = {}
self.group_writers = {}
self.is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()
if __name__ == "__main__":
# Helper to find the usb port associated to all your DynamixelMotorsBus.
find_port()

View File

@ -0,0 +1,10 @@
from typing import Protocol
class MotorsBus(Protocol):
def motor_names(self): ...
def set_calibration(self): ...
def apply_calibration(self): ...
def revert_calibration(self): ...
def read(self): ...
def write(self): ...

View File

@ -0,0 +1,46 @@
def make_robot(name):
if name == "koch":
# TODO(rcadene): Add configurable robot from command line and yaml config
# TODO(rcadene): Add example with and without cameras
from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus
from lerobot.common.robot_devices.robots.koch import KochRobot
robot = KochRobot(
leader_arms={
"main": DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0031751",
motors={
# name: (index, model)
"shoulder_pan": (1, "xl330-m077"),
"shoulder_lift": (2, "xl330-m077"),
"elbow_flex": (3, "xl330-m077"),
"wrist_flex": (4, "xl330-m077"),
"wrist_roll": (5, "xl330-m077"),
"gripper": (6, "xl330-m077"),
},
),
},
follower_arms={
"main": DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0032081",
motors={
# name: (index, model)
"shoulder_pan": (1, "xl430-w250"),
"shoulder_lift": (2, "xl430-w250"),
"elbow_flex": (3, "xl330-m288"),
"wrist_flex": (4, "xl330-m288"),
"wrist_roll": (5, "xl330-m288"),
"gripper": (6, "xl330-m288"),
},
),
},
cameras={
"laptop": OpenCVCamera(0, fps=30, width=640, height=480),
"phone": OpenCVCamera(1, fps=30, width=640, height=480),
},
)
else:
raise ValueError(f"Robot '{name}' not found.")
return robot

View File

@ -0,0 +1,548 @@
import pickle
import time
from dataclasses import dataclass, field, replace
from pathlib import Path
import numpy as np
import torch
from lerobot.common.robot_devices.cameras.utils import Camera
from lerobot.common.robot_devices.motors.dynamixel import (
DriveMode,
DynamixelMotorsBus,
OperatingMode,
TorqueMode,
)
from lerobot.common.robot_devices.motors.utils import MotorsBus
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
URL_HORIZONTAL_POSITION = {
"follower": "https://raw.githubusercontent.com/huggingface/lerobot/main/media/koch/follower_horizontal.png",
"leader": "https://raw.githubusercontent.com/huggingface/lerobot/main/media/koch/leader_horizontal.png",
}
URL_90_DEGREE_POSITION = {
"follower": "https://raw.githubusercontent.com/huggingface/lerobot/main/media/koch/follower_90_degree.png",
"leader": "https://raw.githubusercontent.com/huggingface/lerobot/main/media/koch/leader_90_degree.png",
}
########################################################################
# Calibration logic
########################################################################
TARGET_HORIZONTAL_POSITION = np.array([0, -1024, 1024, 0, -1024, 0])
TARGET_90_DEGREE_POSITION = np.array([1024, 0, 0, 1024, 0, -1024])
GRIPPER_OPEN = np.array([-400])
def apply_homing_offset(values: np.array, homing_offset: np.array) -> np.array:
for i in range(len(values)):
if values[i] is not None:
values[i] += homing_offset[i]
return values
def apply_drive_mode(values: np.array, drive_mode: np.array) -> np.array:
for i in range(len(values)):
if values[i] is not None and drive_mode[i]:
values[i] = -values[i]
return values
def apply_calibration(values: np.array, homing_offset: np.array, drive_mode: np.array) -> np.array:
values = apply_drive_mode(values, drive_mode)
values = apply_homing_offset(values, homing_offset)
return values
def revert_calibration(values: np.array, homing_offset: np.array, drive_mode: np.array) -> np.array:
"""
Transform working position into real position for the robot.
"""
values = apply_homing_offset(
values,
np.array([-homing_offset if homing_offset is not None else None for homing_offset in homing_offset]),
)
values = apply_drive_mode(values, drive_mode)
return values
def revert_appropriate_positions(positions: np.array, drive_mode: list[bool]) -> np.array:
for i, revert in enumerate(drive_mode):
if not revert and positions[i] is not None:
positions[i] = -positions[i]
return positions
def compute_corrections(positions: np.array, drive_mode: list[bool], target_position: np.array) -> np.array:
correction = revert_appropriate_positions(positions, drive_mode)
for i in range(len(positions)):
if correction[i] is not None:
if drive_mode[i]:
correction[i] -= target_position[i]
else:
correction[i] += target_position[i]
return correction
def compute_nearest_rounded_positions(positions: np.array) -> np.array:
return np.array(
[
round(positions[i] / 1024) * 1024 if positions[i] is not None else None
for i in range(len(positions))
]
)
def compute_homing_offset(
arm: DynamixelMotorsBus, drive_mode: list[bool], target_position: np.array
) -> np.array:
# Get the present positions of the servos
present_positions = apply_calibration(
arm.read("Present_Position"), np.array([0, 0, 0, 0, 0, 0]), drive_mode
)
nearest_positions = compute_nearest_rounded_positions(present_positions)
correction = compute_corrections(nearest_positions, drive_mode, target_position)
return correction
def compute_drive_mode(arm: DynamixelMotorsBus, offset: np.array):
# Get current positions
present_positions = apply_calibration(
arm.read("Present_Position"), offset, np.array([False, False, False, False, False, False])
)
nearest_positions = compute_nearest_rounded_positions(present_positions)
# construct 'drive_mode' list comparing nearest_positions and TARGET_90_DEGREE_POSITION
drive_mode = []
for i in range(len(nearest_positions)):
drive_mode.append(nearest_positions[i] != TARGET_90_DEGREE_POSITION[i])
return drive_mode
def reset_arm(arm: MotorsBus):
# To be configured, all servos must be in "torque disable" mode
arm.write("Torque_Enable", TorqueMode.DISABLED.value)
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm,
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
all_motors_except_gripper = [name for name in arm.motor_names if name != "gripper"]
arm.write("Operating_Mode", OperatingMode.EXTENDED_POSITION.value, all_motors_except_gripper)
# TODO(rcadene): why?
# Use 'position control current based' for gripper
arm.write("Operating_Mode", OperatingMode.CURRENT_CONTROLLED_POSITION.value, "gripper")
# Make sure the native calibration (homing offset abd drive mode) is disabled, since we use our own calibration layer to be more generic
arm.write("Homing_Offset", 0)
arm.write("Drive_Mode", DriveMode.NON_INVERTED.value)
def run_arm_calibration(arm: MotorsBus, name: str, arm_type: str):
"""Example of usage:
```python
run_arm_calibration(arm, "left", "follower")
```
"""
reset_arm(arm)
# TODO(rcadene): document what position 1 mean
print(
f"Please move the '{name} {arm_type}' arm to the horizontal position (gripper fully closed, see {URL_HORIZONTAL_POSITION[arm_type]})"
)
input("Press Enter to continue...")
horizontal_homing_offset = compute_homing_offset(
arm, [False, False, False, False, False, False], TARGET_HORIZONTAL_POSITION
)
# TODO(rcadene): document what position 2 mean
print(
f"Please move the '{name} {arm_type}' arm to the 90 degree position (gripper fully open, see {URL_90_DEGREE_POSITION[arm_type]})"
)
input("Press Enter to continue...")
drive_mode = compute_drive_mode(arm, horizontal_homing_offset)
homing_offset = compute_homing_offset(arm, drive_mode, TARGET_90_DEGREE_POSITION)
# Invert offset for all drive_mode servos
for i in range(len(drive_mode)):
if drive_mode[i]:
homing_offset[i] = -homing_offset[i]
print("Calibration is done!")
print("=====================================")
print(" HOMING_OFFSET: ", " ".join([str(i) for i in homing_offset]))
print(" DRIVE_MODE: ", " ".join([str(i) for i in drive_mode]))
print("=====================================")
return homing_offset, drive_mode
########################################################################
# Alexander Koch robot arm
########################################################################
@dataclass
class KochRobotConfig:
"""
Example of usage:
```python
KochRobotConfig()
```
"""
# Define all components of the robot
leader_arms: dict[str, MotorsBus] = field(default_factory=lambda: {})
follower_arms: dict[str, MotorsBus] = field(default_factory=lambda: {})
cameras: dict[str, Camera] = field(default_factory=lambda: {})
class KochRobot:
# TODO(rcadene): Implement force feedback
"""Tau Robotics: https://tau-robotics.com
Example of highest frequency teleoperation without camera:
```python
# Defines how to communicate with the motors of the leader and follower arms
leader_arms = {
"main": DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0031751",
motors={
# name: (index, model)
"shoulder_pan": (1, "xl330-m077"),
"shoulder_lift": (2, "xl330-m077"),
"elbow_flex": (3, "xl330-m077"),
"wrist_flex": (4, "xl330-m077"),
"wrist_roll": (5, "xl330-m077"),
"gripper": (6, "xl330-m077"),
},
),
}
follower_arms = {
"main": DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0032081",
motors={
# name: (index, model)
"shoulder_pan": (1, "xl430-w250"),
"shoulder_lift": (2, "xl430-w250"),
"elbow_flex": (3, "xl330-m288"),
"wrist_flex": (4, "xl330-m288"),
"wrist_roll": (5, "xl330-m288"),
"gripper": (6, "xl330-m288"),
},
),
}
robot = KochRobot(leader_arms, follower_arms)
# Connect motors buses and cameras if any (Required)
robot.connect()
while True:
robot.teleop_step()
```
Example of highest frequency data collection without camera:
```python
# Assumes leader and follower arms have been instantiated already (see first example)
robot = KochRobot(leader_arms, follower_arms)
robot.connect()
while True:
observation, action = robot.teleop_step(record_data=True)
```
Example of highest frequency data collection with cameras:
```python
# Defines how to communicate with 2 cameras connected to the computer.
# Here, the webcam of the mackbookpro and the iphone (connected in USB to the macbookpro)
# can be reached respectively using the camera indices 0 and 1. These indices can be
# arbitrary. See the documentation of `OpenCVCamera` to find your own camera indices.
cameras = {
"macbookpro": OpenCVCamera(camera_index=0, fps=30, width=640, height=480),
"iphone": OpenCVCamera(camera_index=1, fps=30, width=640, height=480),
}
# Assumes leader and follower arms have been instantiated already (see first example)
robot = KochRobot(leader_arms, follower_arms, cameras)
robot.connect()
while True:
observation, action = robot.teleop_step(record_data=True)
```
Example of controlling the robot with a policy (without running multiple policies in parallel to ensure highest frequency):
```python
# Assumes leader and follower arms + cameras have been instantiated already (see previous example)
robot = KochRobot(leader_arms, follower_arms, cameras)
robot.connect()
while True:
# Uses the follower arms and cameras to capture an observation
observation = robot.capture_observation()
# Assumes a policy has been instantiated
with torch.inference_mode():
action = policy.select_action(observation)
# Orders the robot to move
robot.send_action(action)
```
Example of disconnecting which is not mandatory since we disconnect when the object is deleted:
```python
robot.disconnect()
```
"""
def __init__(
self,
config: KochRobotConfig | None = None,
calibration_path: Path = ".cache/calibration/koch.pkl",
**kwargs,
):
if config is None:
config = KochRobotConfig()
# Overwrite config arguments using kwargs
self.config = replace(config, **kwargs)
self.calibration_path = Path(calibration_path)
self.leader_arms = self.config.leader_arms
self.follower_arms = self.config.follower_arms
self.cameras = self.config.cameras
self.is_connected = False
self.logs = {}
def connect(self):
if self.is_connected:
raise RobotDeviceAlreadyConnectedError(
"KochRobot is already connected. Do not run `robot.connect()` twice."
)
if not self.leader_arms and not self.follower_arms and not self.cameras:
raise ValueError(
"KochRobot doesn't have any device to connect. See example of usage in docstring of the class."
)
# Connect the arms
for name in self.follower_arms:
self.follower_arms[name].connect()
self.leader_arms[name].connect()
# Reset the arms and load or run calibration
if self.calibration_path.exists():
# Reset all arms before setting calibration
for name in self.follower_arms:
reset_arm(self.follower_arms[name])
for name in self.leader_arms:
reset_arm(self.leader_arms[name])
with open(self.calibration_path, "rb") as f:
calibration = pickle.load(f)
else:
# Run calibration process which begins by reseting all arms
calibration = self.run_calibration()
self.calibration_path.parent.mkdir(parents=True, exist_ok=True)
with open(self.calibration_path, "wb") as f:
pickle.dump(calibration, f)
# Set calibration
for name in self.follower_arms:
self.follower_arms[name].set_calibration(calibration[f"follower_{name}"])
for name in self.leader_arms:
self.leader_arms[name].set_calibration(calibration[f"leader_{name}"])
# Set better PID values to close the gap between recored states and actions
# TODO(rcadene): Implement an automatic procedure to set optimial PID values for each motor
for name in self.follower_arms:
self.follower_arms[name].write("Position_P_Gain", 1500, "elbow_flex")
self.follower_arms[name].write("Position_I_Gain", 0, "elbow_flex")
self.follower_arms[name].write("Position_D_Gain", 600, "elbow_flex")
# Enable torque on all motors of the follower arms
for name in self.follower_arms:
self.follower_arms[name].write("Torque_Enable", 1)
# Enable torque on the gripper of the leader arms, and move it to 45 degrees,
# so that we can use it as a trigger to close the gripper of the follower arms.
for name in self.leader_arms:
self.leader_arms[name].write("Torque_Enable", 1, "gripper")
self.leader_arms[name].write("Goal_Position", GRIPPER_OPEN, "gripper")
# Connect the cameras
for name in self.cameras:
self.cameras[name].connect()
self.is_connected = True
def run_calibration(self):
calibration = {}
for name in self.follower_arms:
homing_offset, drive_mode = run_arm_calibration(self.follower_arms[name], name, "follower")
calibration[f"follower_{name}"] = {}
for idx, motor_name in enumerate(self.follower_arms[name].motor_names):
calibration[f"follower_{name}"][motor_name] = (homing_offset[idx], drive_mode[idx])
for name in self.leader_arms:
homing_offset, drive_mode = run_arm_calibration(self.leader_arms[name], name, "leader")
calibration[f"leader_{name}"] = {}
for idx, motor_name in enumerate(self.leader_arms[name].motor_names):
calibration[f"leader_{name}"][motor_name] = (homing_offset[idx], drive_mode[idx])
return calibration
def teleop_step(
self, record_data=False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"KochRobot is not connected. You need to run `robot.connect()`."
)
# Prepare to assign the positions of the leader to the follower
leader_pos = {}
for name in self.leader_arms:
now = time.perf_counter()
leader_pos[name] = self.leader_arms[name].read("Present_Position")
self.logs[f"read_leader_{name}_pos_dt_s"] = time.perf_counter() - now
follower_goal_pos = {}
for name in self.leader_arms:
follower_goal_pos[name] = leader_pos[name]
# Send action
for name in self.follower_arms:
now = time.perf_counter()
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name])
self.logs[f"write_follower_{name}_goal_pos_dt_s"] = time.perf_counter() - now
# Early exit when recording data is not requested
if not record_data:
return
# TODO(rcadene): Add velocity and other info
# Read follower position
follower_pos = {}
for name in self.follower_arms:
now = time.perf_counter()
follower_pos[name] = self.follower_arms[name].read("Present_Position")
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - now
# Create state by concatenating follower current position
state = []
for name in self.follower_arms:
if name in follower_pos:
state.append(follower_pos[name])
state = np.concatenate(state)
# Create action by concatenating follower goal position
action = []
for name in self.follower_arms:
if name in follower_goal_pos:
action.append(follower_goal_pos[name])
action = np.concatenate(action)
# Capture images from cameras
images = {}
for name in self.cameras:
now = time.perf_counter()
images[name] = self.cameras[name].async_read()
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - now
# Populate output dictionnaries and format to pytorch
obs_dict, action_dict = {}, {}
obs_dict["observation.state"] = torch.from_numpy(state)
action_dict["action"] = torch.from_numpy(action)
for name in self.cameras:
obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name])
return obs_dict, action_dict
def capture_observation(self):
"""The returned observations do not have a batch dimension."""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"KochRobot is not connected. You need to run `robot.connect()`."
)
# Read follower position
follower_pos = {}
for name in self.follower_arms:
now = time.perf_counter()
follower_pos[name] = self.follower_arms[name].read("Present_Position")
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - now
# Create state by concatenating follower current position
state = []
for name in self.follower_arms:
if name in follower_pos:
state.append(follower_pos[name])
state = np.concatenate(state)
# Capture images from cameras
images = {}
for name in self.cameras:
now = time.perf_counter()
images[name] = self.cameras[name].async_read()
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - now
# Populate output dictionnaries and format to pytorch
obs_dict = {}
obs_dict["observation.state"] = torch.from_numpy(state)
for name in self.cameras:
# Convert to pytorch format: channel first and float32 in [0,1]
img = torch.from_numpy(images[name])
img = img.type(torch.float32) / 255
img = img.permute(2, 0, 1).contiguous()
obs_dict[f"observation.images.{name}"] = img
return obs_dict
def send_action(self, action: torch.Tensor):
"""The provided action is expected to be a vector."""
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"KochRobot is not connected. You need to run `robot.connect()`."
)
from_idx = 0
to_idx = 0
follower_goal_pos = {}
for name in self.follower_arms:
if name in self.follower_arms:
to_idx += len(self.follower_arms[name].motor_names)
follower_goal_pos[name] = action[from_idx:to_idx].numpy()
from_idx = to_idx
for name in self.follower_arms:
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name].astype(np.int32))
def disconnect(self):
if not self.is_connected:
raise RobotDeviceNotConnectedError(
"KochRobot is not connected. You need to run `robot.connect()` before disconnecting."
)
for name in self.follower_arms:
self.follower_arms[name].disconnect()
for name in self.leader_arms:
self.leader_arms[name].disconnect()
for name in self.cameras:
self.cameras[name].disconnect()
self.is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()

View File

@ -0,0 +1,9 @@
from typing import Protocol
class Robot(Protocol):
def init_teleop(self): ...
def run_calibration(self): ...
def teleop_step(self, record_data=False): ...
def capture_observation(self): ...
def send_action(self, action): ...

View File

@ -0,0 +1,19 @@
class RobotDeviceNotConnectedError(Exception):
"""Exception raised when the robot device is not connected."""
def __init__(
self, message="This robot device is not connected. Try calling `robot_device.connect()` first."
):
self.message = message
super().__init__(self.message)
class RobotDeviceAlreadyConnectedError(Exception):
"""Exception raised when the robot device is already connected."""
def __init__(
self,
message="This robot device is already connected. Try not calling `robot_device.connect()` twice.",
):
self.message = message
super().__init__(self.message)

View File

@ -17,7 +17,7 @@ import logging
import os.path as osp
import random
from contextlib import contextmanager
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Generator
@ -172,3 +172,7 @@ def print_cuda_memory_usage():
print("Maximum GPU Memory Allocated: {:.2f} MB".format(torch.cuda.max_memory_allocated(0) / 1024**2))
print("Current GPU Memory Reserved: {:.2f} MB".format(torch.cuda.memory_reserved(0) / 1024**2))
print("Maximum GPU Memory Reserved: {:.2f} MB".format(torch.cuda.max_memory_reserved(0) / 1024**2))
def capture_timestamp_utc():
return datetime.now(timezone.utc)

10
lerobot/configs/env/koch_real.yaml vendored Normal file
View File

@ -0,0 +1,10 @@
# @package _global_
fps: 30
env:
name: real_world
task: null
state_dim: 6
action_dim: 6
fps: ${fps}

View File

@ -0,0 +1,102 @@
# @package _global_
# Use `act_koch_real.yaml` to train on real-world datasets collected on Alexander Koch's robots.
# Compared to `act.yaml`, it contains 2 cameras (i.e. laptop, phone) instead of 1 camera (i.e. top).
# Also, `training.eval_freq` is set to -1. This config is used to evaluate checkpoints at a certain frequency of training steps.
# When it is set to -1, it deactivates evaluation. This is because real-world evaluation is done through our `control_robot.py` script.
# Look at the documentation in header of `control_robot.py` for more information on how to collect data , train and evaluate a policy.
#
# Example of usage for training:
# ```bash
# python lerobot/scripts/train.py \
# policy=act_koch_real \
# env=koch_real
# ```
seed: 1000
dataset_repo_id: lerobot/koch_pick_place_lego
override_dataset_stats:
observation.images.laptop:
# stats from imagenet, since we use a pretrained vision model
mean: [[[0.485]], [[0.456]], [[0.406]]] # (c,1,1)
std: [[[0.229]], [[0.224]], [[0.225]]] # (c,1,1)
observation.images.phone:
# stats from imagenet, since we use a pretrained vision model
mean: [[[0.485]], [[0.456]], [[0.406]]] # (c,1,1)
std: [[[0.229]], [[0.224]], [[0.225]]] # (c,1,1)
training:
offline_steps: 80000
online_steps: 0
eval_freq: -1
save_freq: 10000
log_freq: 100
save_checkpoint: true
batch_size: 8
lr: 1e-5
lr_backbone: 1e-5
weight_decay: 1e-4
grad_clip_norm: 10
online_steps_between_rollouts: 1
delta_timestamps:
action: "[i / ${fps} for i in range(${policy.chunk_size})]"
eval:
n_episodes: 50
batch_size: 50
# See `configuration_act.py` for more details.
policy:
name: act
# Input / output structure.
n_obs_steps: 1
chunk_size: 100
n_action_steps: 100
input_shapes:
# TODO(rcadene, alexander-soare): add variables for height and width from the dataset/env?
observation.images.laptop: [3, 480, 640]
observation.images.phone: [3, 480, 640]
observation.state: ["${env.state_dim}"]
output_shapes:
action: ["${env.action_dim}"]
# Normalization / Unnormalization
input_normalization_modes:
observation.images.laptop: mean_std
observation.images.phone: mean_std
observation.state: mean_std
output_normalization_modes:
action: mean_std
# Architecture.
# Vision backbone.
vision_backbone: resnet18
pretrained_backbone_weights: ResNet18_Weights.IMAGENET1K_V1
replace_final_stride_with_dilation: false
# Transformer layers.
pre_norm: false
dim_model: 512
n_heads: 8
dim_feedforward: 3200
feedforward_activation: relu
n_encoder_layers: 4
# Note: Although the original ACT implementation has 7 for `n_decoder_layers`, there is a bug in the code
# that means only the first layer is used. Here we match the original implementation by setting this to 1.
# See this issue https://github.com/tonyzhaozh/act/issues/25#issue-2258740521.
n_decoder_layers: 1
# VAE.
use_vae: true
latent_dim: 32
n_vae_encoder_layers: 4
# Inference.
temporal_ensemble_momentum: null
# Training and loss computation.
dropout: 0.1
kl_weight: 10.0

View File

@ -0,0 +1,734 @@
"""
Examples of usage:
- Unlimited teleoperation at highest frequency (~200 Hz is expected), to exit with CTRL+C:
```bash
python lerobot/scripts/control_robot.py teleoperate
```
- Unlimited teleoperation at a limited frequency of 30 Hz, to simulate data recording frequency:
```bash
python lerobot/scripts/control_robot.py teleoperate \
--fps 30
```
- Record one episode in order to test replay:
```bash
python lerobot/scripts/control_robot.py record_dataset \
--fps 30 \
--root tmp/data \
--repo-id $USER/koch_test \
--num-episodes 1 \
--run-compute-stats 0
```
- Visualize dataset:
```bash
python lerobot/scripts/visualize_dataset.py \
--root tmp/data \
--repo-id $USER/koch_test \
--episode-index 0
```
- Replay this test episode:
```bash
python lerobot/scripts/control_robot.py replay_episode \
--fps 30 \
--root tmp/data \
--repo-id $USER/koch_test \
--episode 0
```
- Record a full dataset in order to train a policy, with 2 seconds of warmup,
30 seconds of recording for each episode, and 10 seconds to reset the environment in between episodes:
```bash
python lerobot/scripts/control_robot.py record_dataset \
--fps 30 \
--root data \
--repo-id $USER/koch_pick_place_lego \
--num-episodes 50 \
--run-compute-stats 1 \
--warmup-time-s 2 \
--episode-time-s 30 \
--reset-time-s 10
```
**NOTE**: You can use your keyboard to control data recording flow.
- Tap right arrow key '->' to early exit while recording an episode and go to resseting the environment.
- Tap right arrow key '->' to early exit while resetting the environment and got to recording the next episode.
- Tap left arrow key '<-' to early exit and re-record the current episode.
- Tap escape key 'esc' to stop the data recording.
This might require a sudo permission to allow your terminal to monitor keyboard events.
**NOTE**: You can resume/continue data recording by running the same data recording command twice.
To avoid resuming by deleting the dataset, use `--force-override 1`.
- Train on this dataset with the ACT policy:
```bash
DATA_DIR=data python lerobot/scripts/train.py \
policy=act_koch_real \
env=koch_real \
dataset_repo_id=$USER/koch_pick_place_lego \
hydra.run.dir=outputs/train/act_koch_real
```
- Run the pretrained policy on the robot:
```bash
python lerobot/scripts/control_robot.py run_policy \
-p outputs/train/act_koch_real/checkpoints/080000/pretrained_model
```
"""
import argparse
import concurrent.futures
import json
import logging
import os
import platform
import shutil
import time
from contextlib import nullcontext
from pathlib import Path
import torch
import tqdm
from huggingface_hub import create_branch
from omegaconf import DictConfig
from PIL import Image
from termcolor import colored
# from safetensors.torch import load_file, save_file
from lerobot.common.datasets.compute_stats import compute_stats
from lerobot.common.datasets.lerobot_dataset import CODEBASE_VERSION, LeRobotDataset
from lerobot.common.datasets.push_dataset_to_hub.aloha_hdf5_format import to_hf_dataset
from lerobot.common.datasets.push_dataset_to_hub.utils import concatenate_episodes
from lerobot.common.datasets.utils import calculate_episode_data_index
from lerobot.common.datasets.video_utils import encode_video_frames
from lerobot.common.policies.factory import make_policy
from lerobot.common.robot_devices.robots.factory import make_robot
from lerobot.common.robot_devices.robots.utils import Robot
from lerobot.common.utils.utils import get_safe_torch_device, init_hydra_config, init_logging, set_global_seed
from lerobot.scripts.eval import get_pretrained_policy_path
from lerobot.scripts.push_dataset_to_hub import push_meta_data_to_hub, push_videos_to_hub, save_meta_data
########################################################################################
# Utilities
########################################################################################
def save_image(img_tensor, key, frame_index, episode_index, videos_dir):
img = Image.fromarray(img_tensor.numpy())
path = videos_dir / f"{key}_episode_{episode_index:06d}" / f"frame_{frame_index:06d}.png"
path.parent.mkdir(parents=True, exist_ok=True)
img.save(str(path), quality=100)
def busy_wait(seconds):
# Significantly more accurate than `time.sleep`, and mendatory for our use case,
# but it consumes CPU cycles.
# TODO(rcadene): find an alternative: from python 11, time.sleep is precise
end_time = time.perf_counter() + seconds
while time.perf_counter() < end_time:
pass
def none_or_int(value):
if value == "None":
return None
return int(value)
def log_control_info(robot, dt_s, episode_index=None, frame_index=None, fps=None):
log_items = []
if episode_index is not None:
log_items += [f"ep:{episode_index}"]
if frame_index is not None:
log_items += [f"frame:{frame_index}"]
def log_dt(shortname, dt_val_s):
nonlocal log_items
log_items += [f"{shortname}:{dt_val_s * 1000:5.2f} ({1/ dt_val_s:3.1f}hz)"]
# total step time displayed in milliseconds and its frequency
log_dt("dt", dt_s)
for name in robot.leader_arms:
key = f"read_leader_{name}_pos_dt_s"
if key in robot.logs:
log_dt("dtRlead", robot.logs[key])
for name in robot.follower_arms:
key = f"write_follower_{name}_goal_pos_dt_s"
if key in robot.logs:
log_dt("dtRfoll", robot.logs[key])
key = f"read_follower_{name}_pos_dt_s"
if key in robot.logs:
log_dt("dtWfoll", robot.logs[key])
for name in robot.cameras:
key = f"read_camera_{name}_dt_s"
if key in robot.logs:
log_dt(f"dtR{name}", robot.logs[key])
info_str = " ".join(log_items)
if fps is not None:
actual_fps = 1 / dt_s
if actual_fps < fps - 1:
info_str = colored(info_str, "yellow")
logging.info(info_str)
def get_is_headless():
if platform.system() == "Linux":
display = os.environ.get("DISPLAY")
if display is None or display == "":
return True
return False
########################################################################################
# Control modes
########################################################################################
def teleoperate(robot: Robot, fps: int | None = None, teleop_time_s: float | None = None):
# TODO(rcadene): Add option to record logs
if not robot.is_connected:
robot.connect()
start_time = time.perf_counter()
while True:
now = time.perf_counter()
robot.teleop_step()
if fps is not None:
dt_s = time.perf_counter() - now
busy_wait(1 / fps - dt_s)
dt_s = time.perf_counter() - now
log_control_info(robot, dt_s, fps=fps)
if teleop_time_s is not None and time.perf_counter() - start_time > teleop_time_s:
break
def record_dataset(
robot: Robot,
fps: int | None = None,
root="data",
repo_id="lerobot/debug",
warmup_time_s=2,
episode_time_s=10,
reset_time_s=5,
num_episodes=50,
video=True,
run_compute_stats=True,
push_to_hub=True,
num_image_writers=8,
force_override=False,
):
# TODO(rcadene): Add option to record logs
if not video:
raise NotImplementedError()
if not robot.is_connected:
robot.connect()
local_dir = Path(root) / repo_id
if local_dir.exists() and force_override:
shutil.rmtree(local_dir)
episodes_dir = local_dir / "episodes"
episodes_dir.mkdir(parents=True, exist_ok=True)
videos_dir = local_dir / "videos"
videos_dir.mkdir(parents=True, exist_ok=True)
# Logic to resume data recording
rec_info_path = episodes_dir / "data_recording_info.json"
if rec_info_path.exists():
with open(rec_info_path) as f:
rec_info = json.load(f)
episode_index = rec_info["last_episode_index"] + 1
else:
episode_index = 0
is_headless = get_is_headless()
# Execute a few seconds without recording data, to give times
# to the robot devices to connect and start synchronizing.
timestamp = 0
start_time = time.perf_counter()
is_warmup_print = False
while timestamp < warmup_time_s:
if not is_warmup_print:
logging.info("Warming up (no data recording)")
os.system('say "Warmup" &')
is_warmup_print = True
now = time.perf_counter()
observation, action = robot.teleop_step(record_data=True)
if not is_headless:
image_keys = [key for key in observation if "image" in key]
dt_s = time.perf_counter() - now
busy_wait(1 / fps - dt_s)
dt_s = time.perf_counter() - now
log_control_info(robot, dt_s, fps=fps)
timestamp = time.perf_counter() - start_time
# Allow to exit early while recording an episode or resetting the environment,
# by tapping the right arrow key '->'. This might require a sudo permission
# to allow your terminal to monitor keyboard events.
exit_early = False
rerecord_episode = False
stop_recording = False
# Only import pynput if not in a headless environment
if is_headless:
logging.info("Headless environment detected. Keyboard input will not be available.")
else:
from pynput import keyboard
def on_press(key):
nonlocal exit_early, rerecord_episode, stop_recording
try:
if key == keyboard.Key.right:
print("Right arrow key pressed. Exiting loop...")
exit_early = True
elif key == keyboard.Key.left:
print("Left arrow key pressed. Exiting loop and rerecord the last episode...")
rerecord_episode = True
exit_early = True
elif key == keyboard.Key.esc:
print("Escape key pressed. Stopping data recording...")
stop_recording = True
exit_early = True
except Exception as e:
print(f"Error handling key press: {e}")
listener = keyboard.Listener(on_press=on_press)
listener.start()
# Save images using threads to reach high fps (30 and more)
# Using `with` to exist smoothly if an execption is raised.
# Using only 4 worker threads to avoid blocking the main thread.
futures = []
with concurrent.futures.ThreadPoolExecutor(max_workers=num_image_writers) as executor:
# Start recording all episodes
while episode_index < num_episodes:
logging.info(f"Recording episode {episode_index}")
os.system(f'say "Recording episode {episode_index}" &')
ep_dict = {}
frame_index = 0
timestamp = 0
start_time = time.perf_counter()
while timestamp < episode_time_s:
now = time.perf_counter()
observation, action = robot.teleop_step(record_data=True)
image_keys = [key for key in observation if "image" in key]
not_image_keys = [key for key in observation if "image" not in key]
for key in image_keys:
futures += [
executor.submit(
save_image, observation[key], key, frame_index, episode_index, videos_dir
)
]
for key in not_image_keys:
if key not in ep_dict:
ep_dict[key] = []
ep_dict[key].append(observation[key])
for key in action:
if key not in ep_dict:
ep_dict[key] = []
ep_dict[key].append(action[key])
frame_index += 1
dt_s = time.perf_counter() - now
busy_wait(1 / fps - dt_s)
dt_s = time.perf_counter() - now
log_control_info(robot, dt_s, fps=fps)
timestamp = time.perf_counter() - start_time
if exit_early:
exit_early = False
break
if not stop_recording:
# Start resetting env while the executor are finishing
logging.info("Reset the environment")
os.system('say "Reset the environment" &')
timestamp = 0
start_time = time.perf_counter()
# During env reset we save the data and encode the videos
num_frames = frame_index
for key in image_keys:
tmp_imgs_dir = videos_dir / f"{key}_episode_{episode_index:06d}"
fname = f"{key}_episode_{episode_index:06d}.mp4"
video_path = local_dir / "videos" / fname
if video_path.exists():
video_path.unlink()
# Store the reference to the video frame, even tho the videos are not yet encoded
ep_dict[key] = []
for i in range(num_frames):
ep_dict[key].append({"path": f"videos/{fname}", "timestamp": i / fps})
for key in not_image_keys:
ep_dict[key] = torch.stack(ep_dict[key])
for key in action:
ep_dict[key] = torch.stack(ep_dict[key])
ep_dict["episode_index"] = torch.tensor([episode_index] * num_frames)
ep_dict["frame_index"] = torch.arange(0, num_frames, 1)
ep_dict["timestamp"] = torch.arange(0, num_frames, 1) / fps
done = torch.zeros(num_frames, dtype=torch.bool)
done[-1] = True
ep_dict["next.done"] = done
ep_path = episodes_dir / f"episode_{episode_index}.pth"
print("Saving episode dictionary...")
torch.save(ep_dict, ep_path)
rec_info = {
"last_episode_index": episode_index,
}
with open(rec_info_path, "w") as f:
json.dump(rec_info, f)
is_last_episode = stop_recording or (episode_index == (num_episodes - 1))
# Wait if necessary
with tqdm.tqdm(total=reset_time_s, desc="Waiting") as pbar:
while timestamp < reset_time_s and not is_last_episode:
time.sleep(1)
timestamp = time.perf_counter() - start_time
pbar.update(1)
if exit_early:
exit_early = False
break
# Skip updating episode index which forces re-recording episode
if rerecord_episode:
rerecord_episode = False
continue
episode_index += 1
if is_last_episode:
logging.info("Done recording")
os.system('say "Done recording"')
if not is_headless:
listener.stop()
logging.info("Waiting for threads writing the images on disk to terminate...")
for _ in tqdm.tqdm(
concurrent.futures.as_completed(futures), total=len(futures), desc="Writting images"
):
pass
break
num_episodes = episode_index
logging.info("Encoding videos")
os.system('say "Encoding videos" &')
# Use ffmpeg to convert frames stored as png into mp4 videos
for episode_index in tqdm.tqdm(range(num_episodes)):
for key in image_keys:
tmp_imgs_dir = videos_dir / f"{key}_episode_{episode_index:06d}"
fname = f"{key}_episode_{episode_index:06d}.mp4"
video_path = local_dir / "videos" / fname
if video_path.exists():
continue
# note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding,
# since video encoding with ffmpeg is already using multithreading.
encode_video_frames(tmp_imgs_dir, video_path, fps, overwrite=True)
shutil.rmtree(tmp_imgs_dir)
logging.info("Concatenating episodes")
ep_dicts = []
for episode_index in tqdm.tqdm(range(num_episodes)):
ep_path = episodes_dir / f"episode_{episode_index}.pth"
ep_dict = torch.load(ep_path)
ep_dicts.append(ep_dict)
data_dict = concatenate_episodes(ep_dicts)
total_frames = data_dict["frame_index"].shape[0]
data_dict["index"] = torch.arange(0, total_frames, 1)
hf_dataset = to_hf_dataset(data_dict, video)
episode_data_index = calculate_episode_data_index(hf_dataset)
info = {
"fps": fps,
"video": video,
}
lerobot_dataset = LeRobotDataset.from_preloaded(
repo_id=repo_id,
hf_dataset=hf_dataset,
episode_data_index=episode_data_index,
info=info,
videos_dir=videos_dir,
)
if run_compute_stats:
logging.info("Computing dataset statistics")
os.system('say "Computing dataset statistics" &')
stats = compute_stats(lerobot_dataset)
lerobot_dataset.stats = stats
else:
logging.info("Skipping computation of the dataset statistrics")
hf_dataset = hf_dataset.with_format(None) # to remove transforms that cant be saved
hf_dataset.save_to_disk(str(local_dir / "train"))
meta_data_dir = local_dir / "meta_data"
save_meta_data(info, stats, episode_data_index, meta_data_dir)
if push_to_hub:
hf_dataset.push_to_hub(repo_id, revision="main")
push_meta_data_to_hub(repo_id, meta_data_dir, revision="main")
if video:
push_videos_to_hub(repo_id, videos_dir, revision="main")
create_branch(repo_id, repo_type="dataset", branch=CODEBASE_VERSION)
logging.info("Exiting")
os.system('say "Exiting" &')
return lerobot_dataset
def replay_episode(robot: Robot, episode: int, fps: int | None = None, root="data", repo_id="lerobot/debug"):
# TODO(rcadene): Add option to record logs
local_dir = Path(root) / repo_id
if not local_dir.exists():
raise ValueError(local_dir)
dataset = LeRobotDataset(repo_id, root=root)
items = dataset.hf_dataset.select_columns("action")
from_idx = dataset.episode_data_index["from"][episode].item()
to_idx = dataset.episode_data_index["to"][episode].item()
if not robot.is_connected:
robot.connect()
logging.info("Replaying episode")
os.system('say "Replaying episode"')
for idx in range(from_idx, to_idx):
now = time.perf_counter()
action = items[idx]["action"]
robot.send_action(action)
dt_s = time.perf_counter() - now
busy_wait(1 / fps - dt_s)
dt_s = time.perf_counter() - now
log_control_info(robot, dt_s, fps=fps)
def run_policy(robot: Robot, policy: torch.nn.Module, hydra_cfg: DictConfig, run_time_s: float | None = None):
# TODO(rcadene): Add option to record eval dataset and logs
# Check device is available
device = get_safe_torch_device(hydra_cfg.device, log=True)
policy.eval()
policy.to(device)
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True
set_global_seed(hydra_cfg.seed)
fps = hydra_cfg.env.fps
if not robot.is_connected:
robot.connect()
start_time = time.perf_counter()
while True:
now = time.perf_counter()
observation = robot.capture_observation()
with (
torch.inference_mode(),
torch.autocast(device_type=device.type)
if device.type == "cuda" and hydra_cfg.use_amp
else nullcontext(),
):
# add batch dimension to 1
for name in observation:
observation[name] = observation[name].unsqueeze(0)
if device.type == "mps":
for name in observation:
observation[name] = observation[name].to(device)
action = policy.select_action(observation)
# remove batch dimension
action = action.squeeze(0)
robot.send_action(action.to("cpu"))
dt_s = time.perf_counter() - now
busy_wait(1 / fps - dt_s)
dt_s = time.perf_counter() - now
log_control_info(robot, dt_s, fps=fps)
if run_time_s is not None and time.perf_counter() - start_time > run_time_s:
break
if __name__ == "__main__":
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="mode", required=True)
# Set common options for all the subparsers
base_parser = argparse.ArgumentParser(add_help=False)
base_parser.add_argument(
"--robot",
type=str,
default="koch",
help="Name of the robot provided to the `make_robot(name)` factory function.",
)
parser_teleop = subparsers.add_parser("teleoperate", parents=[base_parser])
parser_teleop.add_argument(
"--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)"
)
parser_record = subparsers.add_parser("record_dataset", parents=[base_parser])
parser_record.add_argument(
"--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)"
)
parser_record.add_argument(
"--root",
type=Path,
default="data",
help="Root directory where the dataset will be stored locally at '{root}/{repo_id}' (e.g. 'data/hf_username/dataset_name').",
)
parser_record.add_argument(
"--repo-id",
type=str,
default="lerobot/test",
help="Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).",
)
parser_record.add_argument(
"--warmup-time-s",
type=int,
default=2,
help="Number of seconds before starting data collection. It allows the robot devices to warmup and synchronize.",
)
parser_record.add_argument(
"--episode-time-s",
type=int,
default=10,
help="Number of seconds for data recording for each episode.",
)
parser_record.add_argument(
"--reset-time-s",
type=int,
default=5,
help="Number of seconds for resetting the environment after each episode.",
)
parser_record.add_argument("--num-episodes", type=int, default=50, help="Number of episodes to record.")
parser_record.add_argument(
"--run-compute-stats",
type=int,
default=1,
help="By default, run the computation of the data statistics at the end of data collection. Compute intensive and not required to just replay an episode.",
)
parser_record.add_argument(
"--push-to-hub",
type=int,
default=1,
help="Upload dataset to Hugging Face hub.",
)
parser_record.add_argument(
"--num-image-writers",
type=int,
default=8,
help="Number of threads writing the frames as png images on disk. Don't set too much as you might get unstable fps due to main thread being blocked.",
)
parser_record.add_argument(
"--force-override",
type=int,
default=0,
help="By default, data recording is resumed. When set to 1, delete the local directory and start data recording from scratch.",
)
parser_replay = subparsers.add_parser("replay_episode", parents=[base_parser])
parser_replay.add_argument(
"--fps", type=none_or_int, default=None, help="Frames per second (set to None to disable)"
)
parser_replay.add_argument(
"--root",
type=Path,
default="data",
help="Root directory where the dataset will be stored locally at '{root}/{repo_id}' (e.g. 'data/hf_username/dataset_name').",
)
parser_replay.add_argument(
"--repo-id",
type=str,
default="lerobot/test",
help="Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).",
)
parser_replay.add_argument("--episode", type=int, default=0, help="Index of the episode to replay.")
parser_policy = subparsers.add_parser("run_policy", parents=[base_parser])
parser_policy.add_argument(
"-p",
"--pretrained-policy-name-or-path",
type=str,
help=(
"Either the repo ID of a model hosted on the Hub or a path to a directory containing weights "
"saved using `Policy.save_pretrained`."
),
)
parser_policy.add_argument(
"overrides",
nargs="*",
help="Any key=value arguments to override config values (use dots for.nested=overrides)",
)
args = parser.parse_args()
init_logging()
control_mode = args.mode
robot_name = args.robot
kwargs = vars(args)
del kwargs["mode"]
del kwargs["robot"]
robot = make_robot(robot_name)
if control_mode == "teleoperate":
teleoperate(robot, **kwargs)
elif control_mode == "record_dataset":
record_dataset(robot, **kwargs)
elif control_mode == "replay_episode":
replay_episode(robot, **kwargs)
elif control_mode == "run_policy":
pretrained_policy_path = get_pretrained_policy_path(args.pretrained_policy_name_or_path)
hydra_cfg = init_hydra_config(pretrained_policy_path / "config.yaml", args.overrides)
policy = make_policy(hydra_cfg=hydra_cfg, pretrained_policy_name_or_path=pretrained_policy_path)
run_policy(robot, policy, hydra_cfg)

View File

@ -578,6 +578,29 @@ def main(
logging.info("End of eval")
def get_pretrained_policy_path(pretrained_policy_name_or_path, revision=None):
try:
pretrained_policy_path = Path(snapshot_download(pretrained_policy_name_or_path, revision=revision))
except (HFValidationError, RepositoryNotFoundError) as e:
if isinstance(e, HFValidationError):
error_message = (
"The provided pretrained_policy_name_or_path is not a valid Hugging Face Hub repo ID."
)
else:
error_message = (
"The provided pretrained_policy_name_or_path was not found on the Hugging Face Hub."
)
logging.warning(f"{error_message} Treating it as a local directory.")
pretrained_policy_path = Path(pretrained_policy_name_or_path)
if not pretrained_policy_path.is_dir() or not pretrained_policy_path.exists():
raise ValueError(
"The provided pretrained_policy_name_or_path is not a valid/existing Hugging Face Hub "
"repo ID, nor is it an existing local directory."
)
return pretrained_policy_path
if __name__ == "__main__":
init_logging()
@ -619,27 +642,9 @@ if __name__ == "__main__":
if args.pretrained_policy_name_or_path is None:
main(hydra_cfg_path=args.config, out_dir=args.out_dir, config_overrides=args.overrides)
else:
try:
pretrained_policy_path = Path(
snapshot_download(args.pretrained_policy_name_or_path, revision=args.revision)
)
except (HFValidationError, RepositoryNotFoundError) as e:
if isinstance(e, HFValidationError):
error_message = (
"The provided pretrained_policy_name_or_path is not a valid Hugging Face Hub repo ID."
)
else:
error_message = (
"The provided pretrained_policy_name_or_path was not found on the Hugging Face Hub."
)
logging.warning(f"{error_message} Treating it as a local directory.")
pretrained_policy_path = Path(args.pretrained_policy_name_or_path)
if not pretrained_policy_path.is_dir() or not pretrained_policy_path.exists():
raise ValueError(
"The provided pretrained_policy_name_or_path is not a valid/existing Hugging Face Hub "
"repo ID, nor is it an existing local directory."
)
pretrained_policy_path = get_pretrained_policy_path(
args.pretrained_policy_name_or_path, revision=args.revision
)
main(
pretrained_policy_path=pretrained_policy_path,

Binary file not shown.

After

Width:  |  Height:  |  Size: 416 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 446 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 318 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 420 KiB

176
poetry.lock generated
View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
[[package]]
name = "absl-py"
@ -807,6 +807,19 @@ files = [
[package.dependencies]
pyarrow = "*"
[[package]]
name = "dynamixel-sdk"
version = "3.7.31"
description = "Dynamixel SDK 3. python package"
optional = true
python-versions = "*"
files = [
{file = "dynamixel_sdk-3.7.31-py3-none-any.whl", hash = "sha256:74e8c112ca6b0b869b196dd8c6a44ffd5dd5c1a3cb9fe2030e9933922406b466"},
]
[package.dependencies]
pyserial = "*"
[[package]]
name = "einops"
version = "0.8.0"
@ -818,6 +831,16 @@ files = [
{file = "einops-0.8.0.tar.gz", hash = "sha256:63486517fed345712a8385c100cb279108d9d47e6ae59099b07657e983deae85"},
]
[[package]]
name = "evdev"
version = "1.7.1"
description = "Bindings to the Linux input handling subsystem"
optional = true
python-versions = ">=3.6"
files = [
{file = "evdev-1.7.1.tar.gz", hash = "sha256:0c72c370bda29d857e188d931019c32651a9c1ea977c08c8d939b1ced1637fde"},
]
[[package]]
name = "exceptiongroup"
version = "1.2.1"
@ -2987,6 +3010,126 @@ cffi = ">=1.15.0"
[package.extras]
dev = ["aafigure", "matplotlib", "numpy", "pygame", "pyglet (<2.0.0)", "sphinx", "wheel"]
[[package]]
name = "pynput"
version = "1.7.7"
description = "Monitor and control user input devices"
optional = true
python-versions = "*"
files = [
{file = "pynput-1.7.7-py2.py3-none-any.whl", hash = "sha256:afc43f651684c98818de048abc76adf9f2d3d797083cb07c1f82be764a2d44cb"},
]
[package.dependencies]
evdev = {version = ">=1.3", markers = "sys_platform in \"linux\""}
pyobjc-framework-ApplicationServices = {version = ">=8.0", markers = "sys_platform == \"darwin\""}
pyobjc-framework-Quartz = {version = ">=8.0", markers = "sys_platform == \"darwin\""}
python-xlib = {version = ">=0.17", markers = "sys_platform in \"linux\""}
six = "*"
[[package]]
name = "pyobjc-core"
version = "10.3.1"
description = "Python<->ObjC Interoperability Module"
optional = true
python-versions = ">=3.8"
files = [
{file = "pyobjc_core-10.3.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:ea46d2cda17921e417085ac6286d43ae448113158afcf39e0abe484c58fb3d78"},
{file = "pyobjc_core-10.3.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:899d3c84d2933d292c808f385dc881a140cf08632907845043a333a9d7c899f9"},
{file = "pyobjc_core-10.3.1-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:6ff5823d13d0a534cdc17fa4ad47cf5bee4846ce0fd27fc40012e12b46db571b"},
{file = "pyobjc_core-10.3.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:2581e8e68885bcb0e11ec619e81ef28e08ee3fac4de20d8cc83bc5af5bcf4a90"},
{file = "pyobjc_core-10.3.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ea98d4c2ec39ca29e62e0327db21418696161fb138ee6278daf2acbedf7ce504"},
{file = "pyobjc_core-10.3.1-cp38-cp38-macosx_11_0_universal2.whl", hash = "sha256:4c179c26ee2123d0aabffb9dbc60324b62b6f8614fb2c2328b09386ef59ef6d8"},
{file = "pyobjc_core-10.3.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:cb901fce65c9be420c40d8a6ee6fff5ff27c6945f44fd7191989b982baa66dea"},
{file = "pyobjc_core-10.3.1.tar.gz", hash = "sha256:b204a80ccc070f9ab3f8af423a3a25a6fd787e228508d00c4c30f8ac538ba720"},
]
[[package]]
name = "pyobjc-framework-applicationservices"
version = "10.3.1"
description = "Wrappers for the framework ApplicationServices on macOS"
optional = true
python-versions = ">=3.8"
files = [
{file = "pyobjc_framework_ApplicationServices-10.3.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:b694260d423c470cb90c3a7009cfde93e332ea6fb4b9b9526ad3acbd33460e3d"},
{file = "pyobjc_framework_ApplicationServices-10.3.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:d886ba1f65df47b77ff7546f3fc9bc7d08cfb6b3c04433b719f6b0689a2c0d1f"},
{file = "pyobjc_framework_ApplicationServices-10.3.1-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:be157f2c3ffb254064ef38249670af8cada5e519a714d2aa5da3740934d89bc8"},
{file = "pyobjc_framework_ApplicationServices-10.3.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:57737f41731661e4a3b78793ec9173f61242a32fa560c3e4e58484465d049c32"},
{file = "pyobjc_framework_ApplicationServices-10.3.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c429eca69ee675e781e4e55f79e939196b47f02560ad865b1ba9ac753b90bd77"},
{file = "pyobjc_framework_ApplicationServices-10.3.1-cp38-cp38-macosx_11_0_universal2.whl", hash = "sha256:4f1814a17041a20adca454044080b52e39a4ebc567ad2c6a48866dd4beaa192a"},
{file = "pyobjc_framework_ApplicationServices-10.3.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:1252f1137f83eb2c6b9968d8c591363e8859dd2484bc9441d8f365bcfb43a0e4"},
{file = "pyobjc_framework_applicationservices-10.3.1.tar.gz", hash = "sha256:f27cb64aa4d129ce671fd42638c985eb2a56d544214a95fe3214a007eacc4790"},
]
[package.dependencies]
pyobjc-core = ">=10.3.1"
pyobjc-framework-Cocoa = ">=10.3.1"
pyobjc-framework-CoreText = ">=10.3.1"
pyobjc-framework-Quartz = ">=10.3.1"
[[package]]
name = "pyobjc-framework-cocoa"
version = "10.3.1"
description = "Wrappers for the Cocoa frameworks on macOS"
optional = true
python-versions = ">=3.8"
files = [
{file = "pyobjc_framework_Cocoa-10.3.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:4cb4f8491ab4d9b59f5187e42383f819f7a46306a4fa25b84f126776305291d1"},
{file = "pyobjc_framework_Cocoa-10.3.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:5f31021f4f8fdf873b57a97ee1f3c1620dbe285e0b4eaed73dd0005eb72fd773"},
{file = "pyobjc_framework_Cocoa-10.3.1-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:11b4e0bad4bbb44a4edda128612f03cdeab38644bbf174de0c13129715497296"},
{file = "pyobjc_framework_Cocoa-10.3.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:de5e62e5ccf2871a94acf3bf79646b20ea893cc9db78afa8d1fe1b0d0f7cbdb0"},
{file = "pyobjc_framework_Cocoa-10.3.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:6c5af24610ab639bd1f521ce4500484b40787f898f691b7a23da3339e6bc8b90"},
{file = "pyobjc_framework_Cocoa-10.3.1-cp38-cp38-macosx_11_0_universal2.whl", hash = "sha256:a7151186bb7805deea434fae9a4423335e6371d105f29e73cc2036c6779a9dbc"},
{file = "pyobjc_framework_Cocoa-10.3.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:743d2a1ac08027fd09eab65814c79002a1d0421d7c0074ffd1217b6560889744"},
{file = "pyobjc_framework_cocoa-10.3.1.tar.gz", hash = "sha256:1cf20714daaa986b488fb62d69713049f635c9d41a60c8da97d835710445281a"},
]
[package.dependencies]
pyobjc-core = ">=10.3.1"
[[package]]
name = "pyobjc-framework-coretext"
version = "10.3.1"
description = "Wrappers for the framework CoreText on macOS"
optional = true
python-versions = ">=3.8"
files = [
{file = "pyobjc_framework_CoreText-10.3.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:dd6123cfccc38e32be884d1a13fb62bd636ecb192b9e8ae2b8011c977dec229e"},
{file = "pyobjc_framework_CoreText-10.3.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:834142a14235bd80edaef8d3a28d1e203ed3c988810a9b78005df7c561390288"},
{file = "pyobjc_framework_CoreText-10.3.1-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:ae6c09d29eeaf30a67aa70e08a465b1f1e47d12e22b3a34ae8bc8fdb7e2e7342"},
{file = "pyobjc_framework_CoreText-10.3.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:51ca95df1db9401366f11a7467f64be57f9a0630d31c357237d4062df0216938"},
{file = "pyobjc_framework_CoreText-10.3.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:8b75bdc267945b3f33c937c108d79405baf9d7c4cd530f922e5df243082a5031"},
{file = "pyobjc_framework_CoreText-10.3.1-cp38-cp38-macosx_11_0_universal2.whl", hash = "sha256:029b24c338f58fc32a004256d8559507e4f366dfe4eb09d3144273d536012d90"},
{file = "pyobjc_framework_CoreText-10.3.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:418a55047dbff999fcd2b78cca167c4105587020b6c51567cfa28993bbfdc8ed"},
{file = "pyobjc_framework_coretext-10.3.1.tar.gz", hash = "sha256:b8fa2d5078ed774431ae64ba886156e319aec0b8c6cc23dabfd86778265b416f"},
]
[package.dependencies]
pyobjc-core = ">=10.3.1"
pyobjc-framework-Cocoa = ">=10.3.1"
pyobjc-framework-Quartz = ">=10.3.1"
[[package]]
name = "pyobjc-framework-quartz"
version = "10.3.1"
description = "Wrappers for the Quartz frameworks on macOS"
optional = true
python-versions = ">=3.8"
files = [
{file = "pyobjc_framework_Quartz-10.3.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:5ef4fd315ed2bc42ef77fdeb2bae28a88ec986bd7b8079a87ba3b3475348f96e"},
{file = "pyobjc_framework_Quartz-10.3.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:96578d4a3e70164efe44ad7dc320ecd4e211758ffcde5dcd694de1bbdfe090a4"},
{file = "pyobjc_framework_Quartz-10.3.1-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:ca35f92486869a41847a1703bb176aab8a53dbfd8e678d1f4d68d8e6e1581c71"},
{file = "pyobjc_framework_Quartz-10.3.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:00a0933267e3a46ea4afcc35d117b2efb920f06de797fa66279c52e7057e3590"},
{file = "pyobjc_framework_Quartz-10.3.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:a161bedb4c5257a02ad56a910cd7eefb28bdb0ea78607df0d70ed4efe4ea54c1"},
{file = "pyobjc_framework_Quartz-10.3.1-cp38-cp38-macosx_11_0_universal2.whl", hash = "sha256:d7a8028e117a94923a511944bfa9daf9744e212f06cf89010c60934a479863a5"},
{file = "pyobjc_framework_Quartz-10.3.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:de00c983b3267eb26fa42c6ed9f15e2bf006bde8afa7fe2b390646aa21a5d6fc"},
{file = "pyobjc_framework_quartz-10.3.1.tar.gz", hash = "sha256:b6d7e346d735c9a7f147cd78e6da79eeae416a0b7d3874644c83a23786c6f886"},
]
[package.dependencies]
pyobjc-core = ">=10.3.1"
pyobjc-framework-Cocoa = ">=10.3.1"
[[package]]
name = "pyopengl"
version = "3.1.7"
@ -3012,6 +3155,20 @@ files = [
[package.extras]
diagrams = ["jinja2", "railroad-diagrams"]
[[package]]
name = "pyserial"
version = "3.5"
description = "Python Serial Port Extension"
optional = true
python-versions = "*"
files = [
{file = "pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0"},
{file = "pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb"},
]
[package.extras]
cp2110 = ["hidapi"]
[[package]]
name = "pysocks"
version = "1.7.1"
@ -3095,6 +3252,20 @@ files = [
[package.dependencies]
six = ">=1.5"
[[package]]
name = "python-xlib"
version = "0.33"
description = "Python X Library"
optional = true
python-versions = "*"
files = [
{file = "python-xlib-0.33.tar.gz", hash = "sha256:55af7906a2c75ce6cb280a584776080602444f75815a7aff4d287bb2d7018b32"},
{file = "python_xlib-0.33-py2.py3-none-any.whl", hash = "sha256:c3534038d42e0df2f1392a1b30a15a4ff5fdc2b86cfa94f072bf11b10a164398"},
]
[package.dependencies]
six = ">=1.10.0"
[[package]]
name = "pytz"
version = "2024.1"
@ -4326,6 +4497,7 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools",
aloha = ["gym-aloha"]
dev = ["debugpy", "pre-commit"]
dora = ["gym-dora"]
koch = ["dynamixel-sdk", "pynput"]
pusht = ["gym-pusht"]
test = ["pytest", "pytest-cov", "pytest-mock"]
umi = ["imagecodecs"]
@ -4335,4 +4507,4 @@ xarm = ["gym-xarm"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<3.13"
content-hash = "91a402588458645c146da00cccf7627c5dddad61bd1168e539900eaec99987b3"
content-hash = "2c59d869c6b1f2132070387f3d371b5b004765ae853501bbd522eb400738f2d0"

View File

@ -63,6 +63,9 @@ deepdiff = ">=7.0.1"
scikit-image = {version = "^0.23.2", optional = true}
pandas = {version = "^2.2.2", optional = true}
pytest-mock = {version = "^3.14.0", optional = true}
dynamixel-sdk = {version = "^3.7.31", optional = true}
pynput = {version = "^1.7.7", optional = true}
[tool.poetry.extras]
@ -74,6 +77,7 @@ dev = ["pre-commit", "debugpy"]
test = ["pytest", "pytest-cov", "pytest-mock"]
umi = ["imagecodecs"]
video_benchmark = ["scikit-image", "pandas"]
koch = ["dynamixel-sdk", "pynput"]
[tool.ruff]
line-length = 110

View File

@ -13,8 +13,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from .utils import DEVICE
def pytest_collection_finish():
print(f"\nTesting with {DEVICE=}")
@pytest.fixture(scope="session")
def is_koch_available():
try:
from lerobot.common.robot_devices.robots.factory import make_robot
robot = make_robot("koch")
robot.connect()
del robot
return True
except Exception as e:
print("An alexander koch robot is not available.")
print(e)
return False

125
tests/test_cameras.py Normal file
View File

@ -0,0 +1,125 @@
import numpy as np
import pytest
from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera, save_images_from_cameras
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import require_koch
CAMERA_INDEX = 2
# Maximum absolute difference between two consecutive images recored by a camera.
# This value differs with respect to the camera.
MAX_PIXEL_DIFFERENCE = 25
def compute_max_pixel_difference(first_image, second_image):
return np.abs(first_image.astype(float) - second_image.astype(float)).max()
@require_koch
def test_camera(request):
"""Test assumes that `camera.read()` returns the same image when called multiple times in a row.
So the environment should not change (you shouldnt be in front of the camera) and the camera should not be moving.
Warning: The tests worked for a macbookpro camera, but I am getting assertion error (`np.allclose(color_image, async_color_image)`)
for my iphone camera and my LG monitor camera.
"""
# TODO(rcadene): measure fps in nightly?
# TODO(rcadene): test logs
# TODO(rcadene): add compatibility with other camera APIs
# Test instantiating
camera = OpenCVCamera(CAMERA_INDEX)
# Test reading, async reading, disconnecting before connecting raises an error
with pytest.raises(RobotDeviceNotConnectedError):
camera.read()
with pytest.raises(RobotDeviceNotConnectedError):
camera.async_read()
with pytest.raises(RobotDeviceNotConnectedError):
camera.disconnect()
# Test deleting the object without connecting first
del camera
# Test connecting
camera = OpenCVCamera(CAMERA_INDEX)
camera.connect()
assert camera.is_connected
assert camera.fps is not None
assert camera.width is not None
assert camera.height is not None
# Test connecting twice raises an error
with pytest.raises(RobotDeviceAlreadyConnectedError):
camera.connect()
# Test reading from the camera
color_image = camera.read()
assert isinstance(color_image, np.ndarray)
assert color_image.ndim == 3
h, w, c = color_image.shape
assert c == 3
assert w > h
# Test read and async_read outputs similar images
# ...warming up as the first frames can be black
for _ in range(30):
camera.read()
color_image = camera.read()
async_color_image = camera.async_read()
print(
"max_pixel_difference between read() and async_read()",
compute_max_pixel_difference(color_image, async_color_image),
)
assert np.allclose(color_image, async_color_image, rtol=1e-5, atol=MAX_PIXEL_DIFFERENCE)
# Test disconnecting
camera.disconnect()
assert camera.camera is None
assert camera.thread is None
# Test disconnecting with `__del__`
camera = OpenCVCamera(CAMERA_INDEX)
camera.connect()
del camera
# Test acquiring a bgr image
camera = OpenCVCamera(CAMERA_INDEX, color_mode="bgr")
camera.connect()
assert camera.color_mode == "bgr"
bgr_color_image = camera.read()
assert np.allclose(color_image, bgr_color_image[:, :, [2, 1, 0]], rtol=1e-5, atol=MAX_PIXEL_DIFFERENCE)
del camera
# TODO(rcadene): Add a test for a camera that doesnt support fps=60 and raises an OSError
# TODO(rcadene): Add a test for a camera that supports fps=60
# Test fps=10 raises an OSError
camera = OpenCVCamera(CAMERA_INDEX, fps=10)
with pytest.raises(OSError):
camera.connect()
del camera
# Test width and height can be set
camera = OpenCVCamera(CAMERA_INDEX, fps=30, width=1280, height=720)
camera.connect()
assert camera.fps == 30
assert camera.width == 1280
assert camera.height == 720
color_image = camera.read()
h, w, c = color_image.shape
assert h == 720
assert w == 1280
assert c == 3
del camera
# Test not supported width and height raise an error
camera = OpenCVCamera(CAMERA_INDEX, fps=30, width=0, height=0)
with pytest.raises(OSError):
camera.connect()
del camera
@require_koch
def test_save_images_from_cameras(tmpdir, request):
save_images_from_cameras(tmpdir, record_time_s=1)

View File

@ -0,0 +1,48 @@
from pathlib import Path
from lerobot.common.policies.factory import make_policy
from lerobot.common.robot_devices.robots.factory import make_robot
from lerobot.common.utils.utils import init_hydra_config
from lerobot.scripts.control_robot import record_dataset, replay_episode, run_policy, teleoperate
from tests.utils import DEFAULT_CONFIG_PATH, DEVICE, require_koch
@require_koch
def test_teleoperate(request):
robot = make_robot("koch")
teleoperate(robot, teleop_time_s=1)
teleoperate(robot, fps=30, teleop_time_s=1)
teleoperate(robot, fps=60, teleop_time_s=1)
del robot
@require_koch
def test_record_dataset_and_replay_episode_and_run_policy(tmpdir, request):
robot_name = "koch"
env_name = "koch_real"
policy_name = "act_koch_real"
root = Path(tmpdir)
repo_id = "lerobot/debug"
robot = make_robot(robot_name)
dataset = record_dataset(
robot, fps=30, root=root, repo_id=repo_id, warmup_time_s=1, episode_time_s=1, num_episodes=2
)
replay_episode(robot, episode=0, fps=30, root=root, repo_id=repo_id)
cfg = init_hydra_config(
DEFAULT_CONFIG_PATH,
overrides=[
f"env={env_name}",
f"policy={policy_name}",
f"device={DEVICE}",
],
)
policy = make_policy(hydra_cfg=cfg, dataset_stats=dataset.stats)
run_policy(robot, policy, cfg, run_time_s=1)
del robot

92
tests/test_motors.py Normal file
View File

@ -0,0 +1,92 @@
import time
import numpy as np
import pytest
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import require_koch
@require_koch
def test_motors_bus(request):
# TODO(rcadene): measure fps in nightly?
# TODO(rcadene): test logs
# TODO(rcadene): test calibration
# TODO(rcadene): add compatibility with other motors bus
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus
# Test instantiating a common motors structure.
# Here the one from Alexander Koch follower arm.
port = "/dev/tty.usbmodem575E0032081"
motors = {
# name: (index, model)
"shoulder_pan": (1, "xl430-w250"),
"shoulder_lift": (2, "xl430-w250"),
"elbow_flex": (3, "xl330-m288"),
"wrist_flex": (4, "xl330-m288"),
"wrist_roll": (5, "xl330-m288"),
"gripper": (6, "xl330-m288"),
}
motors_bus = DynamixelMotorsBus(port, motors)
# Test reading and writting before connecting raises an error
with pytest.raises(RobotDeviceNotConnectedError):
motors_bus.read("Torque_Enable")
with pytest.raises(RobotDeviceNotConnectedError):
motors_bus.write("Torque_Enable", 1)
with pytest.raises(RobotDeviceNotConnectedError):
motors_bus.disconnect()
# Test deleting the object without connecting first
del motors_bus
# Test connecting
motors_bus = DynamixelMotorsBus(port, motors)
motors_bus.connect()
# Test connecting twice raises an error
with pytest.raises(RobotDeviceAlreadyConnectedError):
motors_bus.connect()
# Test disabling torque and reading torque on all motors
motors_bus.write("Torque_Enable", 0)
values = motors_bus.read("Torque_Enable")
assert isinstance(values, np.ndarray)
assert len(values) == len(motors)
assert (values == 0).all()
# Test writing torque on a specific motor
motors_bus.write("Torque_Enable", 1, "gripper")
# Test reading torque from this specific motor. It is now 1
values = motors_bus.read("Torque_Enable", "gripper")
assert len(values) == 1
assert values[0] == 1
# Test reading torque from all motors. It is 1 for the specific motor,
# and 0 on the others.
values = motors_bus.read("Torque_Enable")
gripper_index = motors_bus.motor_names.index("gripper")
assert values[gripper_index] == 1
assert values.sum() == 1 # gripper is the only motor to have torque 1
# Test writing torque on all motors and it is 1 for all.
motors_bus.write("Torque_Enable", 1)
values = motors_bus.read("Torque_Enable")
assert (values == 1).all()
# Test ordering the motors to move slightly (+1 value among 4096) and this move
# can be executed and seen by the motor position sensor
values = motors_bus.read("Present_Position")
motors_bus.write("Goal_Position", values + 1)
# Give time for the motors to move to the goal position
time.sleep(1)
new_values = motors_bus.read("Present_Position")
assert (new_values == values).all()
@require_koch
def test_find_port(request):
from lerobot.common.robot_devices.motors.dynamixel import find_port
find_port()

128
tests/test_robots.py Normal file
View File

@ -0,0 +1,128 @@
import pickle
from pathlib import Path
import pytest
import torch
from lerobot.common.robot_devices.robots.factory import make_robot
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import require_koch
@require_koch
def test_robot(tmpdir, request):
# TODO(rcadene): measure fps in nightly?
# TODO(rcadene): test logs
# TODO(rcadene): add compatibility with other robots
from lerobot.common.robot_devices.robots.koch import KochRobot
# Save calibration preset
calibration = {
"follower_main": {
"shoulder_pan": (-2048, False),
"shoulder_lift": (2048, True),
"elbow_flex": (-1024, False),
"wrist_flex": (2048, True),
"wrist_roll": (2048, True),
"gripper": (2048, True),
},
"leader_main": {
"shoulder_pan": (-2048, False),
"shoulder_lift": (1024, True),
"elbow_flex": (2048, True),
"wrist_flex": (-2048, False),
"wrist_roll": (2048, True),
"gripper": (2048, True),
},
}
tmpdir = Path(tmpdir)
calibration_path = tmpdir / "calibration.pkl"
calibration_path.parent.mkdir(parents=True, exist_ok=True)
with open(calibration_path, "wb") as f:
pickle.dump(calibration, f)
# Test connecting without devices raises an error
robot = KochRobot()
with pytest.raises(ValueError):
robot.connect()
del robot
# Test using robot before connecting raises an error
robot = KochRobot()
with pytest.raises(RobotDeviceNotConnectedError):
robot.teleop_step()
with pytest.raises(RobotDeviceNotConnectedError):
robot.teleop_step(record_data=True)
with pytest.raises(RobotDeviceNotConnectedError):
robot.capture_observation()
with pytest.raises(RobotDeviceNotConnectedError):
robot.send_action(None)
with pytest.raises(RobotDeviceNotConnectedError):
robot.disconnect()
# Test deleting the object without connecting first
del robot
# Test connecting
robot = make_robot("koch")
# TODO(rcadene): proper monkey patch
robot.calibration_path = calibration_path
robot.connect() # run the manual calibration precedure
assert robot.is_connected
# Test connecting twice raises an error
with pytest.raises(RobotDeviceAlreadyConnectedError):
robot.connect()
# Test disconnecting with `__del__`
del robot
# Test teleop can run
robot = make_robot("koch")
robot.calibration_path = calibration_path
robot.connect()
robot.teleop_step()
# Test data recorded during teleop are well formated
observation, action = robot.teleop_step(record_data=True)
# State
assert "observation.state" in observation
assert isinstance(observation["observation.state"], torch.Tensor)
assert observation["observation.state"].ndim == 1
dim_state = sum(len(robot.follower_arms[name].motors) for name in robot.follower_arms)
assert observation["observation.state"].shape[0] == dim_state
# Cameras
for name in robot.cameras:
assert f"observation.images.{name}" in observation
assert isinstance(observation[f"observation.images.{name}"], torch.Tensor)
assert observation[f"observation.images.{name}"].ndim == 3
# Action
assert "action" in action
assert isinstance(action["action"], torch.Tensor)
assert action["action"].ndim == 1
dim_action = sum(len(robot.follower_arms[name].motors) for name in robot.follower_arms)
assert action["action"].shape[0] == dim_action
# TODO(rcadene): test if observation and action data are returned as expected
# Test capture_observation can run and observation returned are the same (since the arm didnt move)
captured_observation = robot.capture_observation()
assert set(captured_observation.keys()) == set(observation.keys())
for name in captured_observation:
if "image" in name:
# TODO(rcadene): skipping image for now as it's challenging to assess equality between two consecutive frames
continue
assert torch.allclose(captured_observation[name], observation[name], atol=1)
# Test send_action can run
robot.send_action(action["action"])
# Test disconnecting
robot.disconnect()
assert not robot.is_connected
for name in robot.follower_arms:
assert not robot.follower_arms[name].is_connected
for name in robot.leader_arms:
assert not robot.leader_arms[name].is_connected
for name in robot.cameras:
assert not robot.cameras[name].is_connected
del robot

View File

@ -20,21 +20,6 @@ import pytest
from lerobot.scripts.visualize_dataset import visualize_dataset
@pytest.mark.parametrize(
"repo_id",
["lerobot/pusht"],
)
def test_visualize_dataset(tmpdir, repo_id):
rrd_path = visualize_dataset(
repo_id,
episode_index=0,
batch_size=32,
save=True,
output_dir=tmpdir,
)
assert rrd_path.exists()
@pytest.mark.parametrize(
"repo_id",
["lerobot/pusht"],

View File

@ -147,3 +147,22 @@ def require_package(package_name):
return wrapper
return decorator
def require_koch(func):
"""
Decorator that skips the test if an alexander koch robot is not available
"""
@wraps(func)
def wrapper(*args, **kwargs):
# Access the pytest request context to get the is_koch_available fixture
request = kwargs.get("request")
if request is None:
raise ValueError("The 'request' fixture must be passed to the test function as a parameter.")
if not request.getfixturevalue("is_koch_available"):
pytest.skip("An alexander koch robot is not available.")
return func(*args, **kwargs)
return wrapper