Compare commits

..

19 Commits

Author SHA1 Message Date
Simon Alibert 2292b514aa Fix calibration functions 2025-03-25 17:58:54 +01:00
Simon Alibert 1f1a01a798 Rename CalibrationMode -> MotorNormMode 2025-03-25 17:42:18 +01:00
Simon Alibert faa476f0d2 Remove deprecated scripts 2025-03-25 17:33:05 +01:00
Simon Alibert 5130b69ece Merge remote-tracking branch 'origin/main' into user/aliberts/2025_02_25_refactor_robots 2025-03-25 16:25:47 +01:00
Simon Alibert aed85241b7 Merge branch 'user/aliberts/2025_02_25_refactor_robots' of github.com:huggingface/lerobot into user/aliberts/2025_02_25_refactor_robots 2025-03-25 16:24:40 +01:00
Pepijn 21c3ac42ee
Add new calibration method for robot refactor (#896)
Co-authored-by: Simon Alibert <simon.alibert@huggingface.co>
2025-03-25 16:24:04 +01:00
Simon Alibert 2d3a5fb2be (WIP) _async_read 2025-03-25 15:37:18 +01:00
Simon Alibert a631e4c11c Rename idx -> id_ 2025-03-25 15:36:36 +01:00
Simon Alibert 222d6f104e Rename idx -> id_ 2025-03-25 14:20:12 +01:00
Simon Alibert 7a3b424cd3 Add calibration 2025-03-25 14:13:55 +01:00
AlexC 2c22f7d76d
Add offline mode in the configuration for wandb logging (#897)
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2025-03-25 13:44:49 +01:00
Simon Alibert af295fadb5 Fix imports 2025-03-25 12:48:58 +01:00
Simon Alibert 9644e2b086 Fix visualize_motors_bus 2025-03-25 12:47:44 +01:00
Simon Alibert 6ccf083127 Update so100 imports 2025-03-25 12:32:38 +01:00
Simon Alibert bb774e7acd Update Koch imports 2025-03-25 12:31:51 +01:00
Simon Alibert dcbbeab80b Move DriveMode & TorqueMode 2025-03-25 12:30:07 +01:00
Simon Alibert b71ac34214 Add test_motors_bus 2025-03-25 12:11:56 +01:00
Qizhi Chen a774af2eab
fix pi0 action padding name (#893)
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2025-03-25 11:24:46 +01:00
Steven Palma 725b446ad6
fix(deps): constrain PyAV version to resolve OpenCV-python ffmpeg version conflict (#883)
Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
2025-03-24 23:40:22 +01:00
35 changed files with 608 additions and 1611 deletions

View File

@ -46,13 +46,6 @@ Using `uv`:
uv sync --extra "dynamixel"
```
/!\ For Linux only, ffmpeg and opencv requires conda install for now. Run this exact sequence of commands:
```bash
conda install -c conda-forge ffmpeg
pip uninstall opencv-python
conda install -c conda-forge "opencv>=4.10.0"
```
You are now ready to plug the 5V power supply to the motor bus of the leader arm (the smaller one) since all its motors only require 5V.
Then plug the 12V power supply to the motor bus of the follower arm. It has two motors that need 12V, and the rest will be powered with 5V through the voltage convertor.
@ -834,11 +827,6 @@ It contains:
- `dtRphone:33.84 (29.5hz)` which is the delta time of capturing an image from the phone camera in the thread running asynchronously.
Troubleshooting:
- On Linux, if you encounter a hanging issue when using cameras, uninstall opencv and re-install it with conda:
```bash
pip uninstall opencv-python
conda install -c conda-forge opencv=4.10.0
```
- On Linux, if you encounter any issue during video encoding with `ffmpeg: unknown encoder libsvtav1`, you can:
- install with conda-forge by running `conda install -c conda-forge ffmpeg` (it should be compiled with `libsvtav1`),
- or, install [Homebrew](https://brew.sh) and run `brew install ffmpeg` (it should be compiled with `libsvtav1`),

View File

@ -1,3 +0,0 @@
from .motors_bus import visualize_motors_bus
__all__ = ["visualize_motors_bus"]

View File

@ -1,82 +0,0 @@
"""
Usage example
```python
from lerobot.common.debugging.motors_bus import visualize_motors_bus
from lerobot.common.robots.so100 import SO100Robot, SO100RobotConfig
cfg = SO100RobotConfig(port="/dev/tty.usbmodem58760430541")
so100 = SO100Robot(cfg)
visualize_motors_bus(so100.arm)
```
"""
import time
from lerobot.common.motors import MotorsBus
from lerobot.common.motors.feetech.feetech_calibration import (
adjusted_to_homing_ticks,
adjusted_to_motor_ticks,
convert_degrees_to_ticks,
convert_ticks_to_degrees,
)
def visualize_motors_bus(motors_bus: MotorsBus):
"""
Reads each joint's (1) raw ticks, (2) homed ticks, (3) degrees, and (4) invert-adjusted ticks.
"""
if not motors_bus.is_connected:
motors_bus.connect()
# Disable torque on all motors so you can move them freely by hand
values_dict = {idx: 0 for idx in motors_bus.motor_ids}
motors_bus.write("Torque_Enable", values_dict)
print("Torque disabled on all joints.")
try:
print("\nPress Ctrl+C to quit.\n")
while True:
# Read *raw* positions (no calibration).
raw_positions = motors_bus.read("Present_Position")
# # Read *already-homed* positions
# homed_positions = motor_bus.read("Present_Position")
for name, raw_ticks in raw_positions.items():
idx = motors_bus.motors[name][0]
model = motors_bus.motors[name][1]
# homed_val = homed_positions[i] # degrees or % if linear
# Manually compute "adjusted ticks" from raw ticks
manual_adjusted = adjusted_to_homing_ticks(raw_ticks, model, motors_bus, idx)
# Convert to degrees
manual_degs = convert_ticks_to_degrees(manual_adjusted, model)
# Convert that deg back to ticks
manual_ticks = convert_degrees_to_ticks(manual_degs, model)
# Then invert them using offset & bus drive mode
inv_ticks = adjusted_to_motor_ticks(manual_ticks, model, motors_bus, idx)
print(
f"{name:15s} | "
f"RAW={raw_ticks:4d} | "
# f"HOMED_FROM_READ={homed_val:7.2f} | "
f"HOMED_TICKS={manual_adjusted:6d} | "
f"MANUAL_ADJ_DEG={manual_degs:7.2f} | "
f"MANUAL_ADJ_TICKS={manual_ticks:6d} | "
f"INV_TICKS={inv_ticks:4d} "
)
print("----------------------------------------------------")
time.sleep(0.25)
except KeyboardInterrupt:
pass
finally:
print("\nExiting. Disconnecting bus...")
motors_bus.disconnect()
if __name__ == "__main__":
visualize_motors_bus()

View File

@ -1 +1 @@
from .motors_bus import CalibrationMode, DriveMode, Motor, MotorsBus, TorqueMode
from .motors_bus import Motor, MotorNormMode, MotorsBus

View File

@ -0,0 +1,178 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import select
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from .dynamixel.dynamixel import DynamixelMotorsBus
from .feetech.feetech import FeetechMotorsBus
from .motors_bus import MotorsBus
@dataclass
class MotorCalibration:
name: str
drive_mode: int
homing_offset: int
range_min: int
range_max: int
def find_offset(motorbus: MotorsBus):
input("Move robot to the middle of its range of motion and press ENTER....")
for name in motorbus.names:
# Also reset to defaults
if isinstance(motorbus, FeetechMotorsBus):
motorbus.write("Offset", name, 0, raw_value=True)
motorbus.write("Min_Angle_Limit", name, 0, raw_value=True)
motorbus.write("Max_Angle_Limit", name, 4095, raw_value=True)
elif isinstance(motorbus, DynamixelMotorsBus):
motorbus.write("Homing_Offset", name, 0, raw_value=True)
motorbus.write("Min_Position_Limit", name, 0, raw_value=True)
motorbus.write("Max_Position_Limit", name, 4095, raw_value=True)
else:
raise ValueError("Motorbus instance is unknown")
middle_values = motorbus.sync_read("Present_Position", raw_values=True)
offsets = {}
for name, pos in middle_values.items():
offset = pos - 2047 # Center the middle reading at 2047.
set_offset(motorbus, offset, name)
offsets[name] = offset
return offsets
def find_min_max(motorbus: MotorsBus):
print("Move all joints (except wrist_roll; id = 5) sequentially through their entire ranges of motion.")
print("Recording positions. Press ENTER to stop...")
recorded_data = {name: [] for name in motorbus.names}
while True:
positions = motorbus.sync_read("Present_Position", raw_values=True)
for name in motorbus.names:
recorded_data[name].append(positions[name])
time.sleep(0.01)
# Check if user pressed Enter
ready_to_read, _, _ = select.select([sys.stdin], [], [], 0)
if ready_to_read:
line = sys.stdin.readline()
if line.strip() == "":
break
min_max = {}
for name in motorbus.names:
motor_values = recorded_data[name]
raw_min = min(motor_values)
raw_max = max(motor_values)
if name == "wrist_roll":
physical_min = 0
physical_max = 4095
else:
physical_min = int(raw_min)
physical_max = int(raw_max)
set_min_max(motorbus, physical_min, physical_max, name)
min_max[name] = {"min": physical_min, "max": physical_max}
return min_max
def set_calibration(motorbus: MotorsBus, calibration_fpath: Path) -> None:
with open(calibration_fpath) as f:
calibration = json.load(f)
motorbus.calibration = {int(id_): val for id_, val in calibration.items()}
for _, cal_data in motorbus.calibration.items():
name = cal_data.get("name")
if name not in motorbus.names:
print(f"Motor name '{name}' from calibration not found in arm names.")
continue
set_offset(motorbus, cal_data["homing_offset"], name)
set_min_max(motorbus, cal_data["min"], cal_data["max"], name)
def set_offset(motorbus: MotorsBus, homing_offset: int, name: str):
homing_offset = int(homing_offset)
# Clamp to [-2047..+2047]
if homing_offset > 2047:
homing_offset = 2047
print(
f"Warning: '{homing_offset}' is getting clamped because its larger then 2047; This should not happen!"
)
elif homing_offset < -2047:
homing_offset = -2047
print(
f"Warning: '{homing_offset}' is getting clamped because its smaller then -2047; This should not happen!"
)
direction_bit = 1 if homing_offset < 0 else 0 # Determine the direction (sign) bit and magnitude
magnitude = abs(homing_offset)
servo_offset = (
direction_bit << 11
) | magnitude # Combine sign bit (bit 11) with the magnitude (bits 0..10)
if isinstance(motorbus, FeetechMotorsBus):
motorbus.write("Offset", name, servo_offset, raw_value=True)
read_offset = motorbus.sync_read("Offset", name, raw_values=True)
elif isinstance(motorbus, DynamixelMotorsBus):
motorbus.write("Homing_Offset", name, servo_offset, raw_value=True)
read_offset = motorbus.sync_read("Homing_Offset", name, raw_values=True)
else:
raise ValueError("Motorbus instance is unknown")
if read_offset[name] != servo_offset:
raise ValueError(
f"Offset not set correctly for motor '{name}'. read: {read_offset} does not equal {servo_offset}"
)
def set_min_max(motorbus: MotorsBus, min: int, max: int, name: str):
if isinstance(motorbus, FeetechMotorsBus):
motorbus.write("Min_Angle_Limit", name, min, raw_value=True)
motorbus.write("Max_Angle_Limit", name, max, raw_value=True)
read_min = motorbus.sync_read("Min_Angle_Limit", name, raw_values=True)
read_max = motorbus.sync_read("Max_Angle_Limit", name, raw_values=True)
elif isinstance(motorbus, DynamixelMotorsBus):
motorbus.write("Min_Position_Limit", name, min, raw_value=True)
motorbus.write("Max_Position_Limit", name, max, raw_value=True)
read_min = motorbus.sync_read("Min_Position_Limit", name, raw_values=True)
read_max = motorbus.sync_read("Max_Position_Limit", name, raw_values=True)
else:
raise ValueError("Motorbus instance is unknown")
if read_min[name] != min:
raise ValueError(
f"Min is not set correctly for motor '{name}'. read: {read_min} does not equal {min}"
)
if read_max[name] != max:
raise ValueError(
f"Max is not set correctly for motor '{name}'. read: {read_max} does not equal {max}"
)

View File

@ -1,3 +1,3 @@
from .dynamixel import DynamixelMotorsBus, OperatingMode
from .dynamixel import DriveMode, DynamixelMotorsBus, OperatingMode, TorqueMode
from .dynamixel_calibration import run_arm_calibration
from .tables import *

View File

@ -65,6 +65,16 @@ class OperatingMode(Enum):
PWM = 16
class DriveMode(Enum):
NON_INVERTED = 0
INVERTED = 1
class TorqueMode(Enum):
ENABLED = 1
DISABLED = 0
class DynamixelMotorsBus(MotorsBus):
"""
The Dynamixel implementation for a MotorsBus. It relies on the python dynamixel sdk to communicate with

View File

@ -17,7 +17,8 @@
import numpy as np
from ..motors_bus import CalibrationMode, MotorsBus, TorqueMode
from ..motors_bus import MotorNormMode, MotorsBus
from .dynamixel import TorqueMode
from .tables import MODEL_RESOLUTION
URL_TEMPLATE = (
@ -132,13 +133,13 @@ def run_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type
print()
# Joints with rotational motions are expressed in degrees in nominal range of [-180, 180]
calib_mode = [CalibrationMode.DEGREE.name] * len(arm.names)
calib_mode = [MotorNormMode.DEGREE.name] * len(arm.names)
# TODO(rcadene): make type of joints (DEGREE or LINEAR) configurable from yaml?
if robot_type in ["aloha"] and "gripper" in arm.names:
# Joints with linear motions (like gripper of Aloha) are expressed in nominal range of [0, 100]
calib_idx = arm.names.index("gripper")
calib_mode[calib_idx] = CalibrationMode.LINEAR.name
calib_mode[calib_idx] = MotorNormMode.LINEAR.name
calib_data = {
"homing_offset": homing_offset.tolist(),

View File

@ -1,895 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import enum
import logging
import math
import time
import traceback
from copy import deepcopy
import numpy as np
import tqdm
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.utils.utils import capture_timestamp_utc
PROTOCOL_VERSION = 2.0
BAUDRATE = 1_000_000
TIMEOUT_MS = 1000
MAX_ID_RANGE = 252
# The following bounds define the lower and upper joints range (after calibration).
# For joints in degree (i.e. revolute joints), their nominal range is [-180, 180] degrees
# which corresponds to a half rotation on the left and half rotation on the right.
# Some joints might require higher range, so we allow up to [-270, 270] degrees until
# an error is raised.
LOWER_BOUND_DEGREE = -270
UPPER_BOUND_DEGREE = 270
# For joints in percentage (i.e. joints that move linearly like the prismatic joint of a gripper),
# their nominal range is [0, 100] %. For instance, for Aloha gripper, 0% is fully
# closed, and 100% is fully open. To account for slight calibration issue, we allow up to
# [-10, 110] until an error is raised.
LOWER_BOUND_LINEAR = -10
UPPER_BOUND_LINEAR = 110
HALF_TURN_DEGREE = 180
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m077
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m288
# https://emanual.robotis.com/docs/en/dxl/x/xl430-w250
# https://emanual.robotis.com/docs/en/dxl/x/xm430-w350
# https://emanual.robotis.com/docs/en/dxl/x/xm540-w270
# https://emanual.robotis.com/docs/en/dxl/x/xc430-w150
# data_name: (address, size_byte)
X_SERIES_CONTROL_TABLE = {
"Model_Number": (0, 2),
"Model_Information": (2, 4),
"Firmware_Version": (6, 1),
"ID": (7, 1),
"Baud_Rate": (8, 1),
"Return_Delay_Time": (9, 1),
"Drive_Mode": (10, 1),
"Operating_Mode": (11, 1),
"Secondary_ID": (12, 1),
"Protocol_Type": (13, 1),
"Homing_Offset": (20, 4),
"Moving_Threshold": (24, 4),
"Temperature_Limit": (31, 1),
"Max_Voltage_Limit": (32, 2),
"Min_Voltage_Limit": (34, 2),
"PWM_Limit": (36, 2),
"Current_Limit": (38, 2),
"Acceleration_Limit": (40, 4),
"Velocity_Limit": (44, 4),
"Max_Position_Limit": (48, 4),
"Min_Position_Limit": (52, 4),
"Shutdown": (63, 1),
"Torque_Enable": (64, 1),
"LED": (65, 1),
"Status_Return_Level": (68, 1),
"Registered_Instruction": (69, 1),
"Hardware_Error_Status": (70, 1),
"Velocity_I_Gain": (76, 2),
"Velocity_P_Gain": (78, 2),
"Position_D_Gain": (80, 2),
"Position_I_Gain": (82, 2),
"Position_P_Gain": (84, 2),
"Feedforward_2nd_Gain": (88, 2),
"Feedforward_1st_Gain": (90, 2),
"Bus_Watchdog": (98, 1),
"Goal_PWM": (100, 2),
"Goal_Current": (102, 2),
"Goal_Velocity": (104, 4),
"Profile_Acceleration": (108, 4),
"Profile_Velocity": (112, 4),
"Goal_Position": (116, 4),
"Realtime_Tick": (120, 2),
"Moving": (122, 1),
"Moving_Status": (123, 1),
"Present_PWM": (124, 2),
"Present_Current": (126, 2),
"Present_Velocity": (128, 4),
"Present_Position": (132, 4),
"Velocity_Trajectory": (136, 4),
"Position_Trajectory": (140, 4),
"Present_Input_Voltage": (144, 2),
"Present_Temperature": (146, 1),
}
X_SERIES_BAUDRATE_TABLE = {
0: 9_600,
1: 57_600,
2: 115_200,
3: 1_000_000,
4: 2_000_000,
5: 3_000_000,
6: 4_000_000,
}
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]
CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
MODEL_CONTROL_TABLE = {
"x_series": X_SERIES_CONTROL_TABLE,
"xl330-m077": X_SERIES_CONTROL_TABLE,
"xl330-m288": X_SERIES_CONTROL_TABLE,
"xl430-w250": X_SERIES_CONTROL_TABLE,
"xm430-w350": X_SERIES_CONTROL_TABLE,
"xm540-w270": X_SERIES_CONTROL_TABLE,
"xc430-w150": X_SERIES_CONTROL_TABLE,
}
MODEL_RESOLUTION = {
"x_series": 4096,
"xl330-m077": 4096,
"xl330-m288": 4096,
"xl430-w250": 4096,
"xm430-w350": 4096,
"xm540-w270": 4096,
"xc430-w150": 4096,
}
MODEL_BAUDRATE_TABLE = {
"x_series": X_SERIES_BAUDRATE_TABLE,
"xl330-m077": X_SERIES_BAUDRATE_TABLE,
"xl330-m288": X_SERIES_BAUDRATE_TABLE,
"xl430-w250": X_SERIES_BAUDRATE_TABLE,
"xm430-w350": X_SERIES_BAUDRATE_TABLE,
"xm540-w270": X_SERIES_BAUDRATE_TABLE,
"xc430-w150": X_SERIES_BAUDRATE_TABLE,
}
NUM_READ_RETRY = 10
NUM_WRITE_RETRY = 10
def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str]) -> np.ndarray:
"""This function converts the degree range to the step range for indicating motors rotation.
It assumes a motor achieves a full rotation by going from -180 degree position to +180.
The motor resolution (e.g. 4096) corresponds to the number of steps needed to achieve a full rotation.
"""
resolutions = [MODEL_RESOLUTION[model] for model in models]
steps = degrees / 180 * np.array(resolutions) / 2
steps = steps.astype(int)
return steps
def convert_to_bytes(value, bytes, mock=False):
if mock:
return value
import dynamixel_sdk as dxl
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if bytes == 1:
data = [
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
]
elif bytes == 2:
data = [
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)),
]
elif bytes == 4:
data = [
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)),
dxl.DXL_LOBYTE(dxl.DXL_HIWORD(value)),
dxl.DXL_HIBYTE(dxl.DXL_HIWORD(value)),
]
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{bytes} is provided instead."
)
return data
def get_group_sync_key(data_name, motor_names):
group_key = f"{data_name}_" + "_".join(motor_names)
return group_key
def get_result_name(fn_name, data_name, motor_names):
group_key = get_group_sync_key(data_name, motor_names)
rslt_name = f"{fn_name}_{group_key}"
return rslt_name
def get_queue_name(fn_name, data_name, motor_names):
group_key = get_group_sync_key(data_name, motor_names)
queue_name = f"{fn_name}_{group_key}"
return queue_name
def get_log_name(var_name, fn_name, data_name, motor_names):
group_key = get_group_sync_key(data_name, motor_names)
log_name = f"{var_name}_{fn_name}_{group_key}"
return log_name
def assert_same_address(model_ctrl_table, motor_models, data_name):
all_addr = []
all_bytes = []
for model in motor_models:
addr, bytes = model_ctrl_table[model][data_name]
all_addr.append(addr)
all_bytes.append(bytes)
if len(set(all_addr)) != 1:
raise NotImplementedError(
f"At least two motor models use a different address for `data_name`='{data_name}' ({list(zip(motor_models, all_addr, strict=False))}). Contact a LeRobot maintainer."
)
if len(set(all_bytes)) != 1:
raise NotImplementedError(
f"At least two motor models use a different bytes representation for `data_name`='{data_name}' ({list(zip(motor_models, all_bytes, strict=False))}). Contact a LeRobot maintainer."
)
class TorqueMode(enum.Enum):
ENABLED = 1
DISABLED = 0
class DriveMode(enum.Enum):
NON_INVERTED = 0
INVERTED = 1
class CalibrationMode(enum.Enum):
# Joints with rotational motions are expressed in degrees in nominal range of [-180, 180]
DEGREE = 0
# Joints with linear motions (like gripper of Aloha) are expressed in nominal range of [0, 100]
LINEAR = 1
class JointOutOfRangeError(Exception):
def __init__(self, message="Joint is out of range"):
self.message = message
super().__init__(self.message)
class DynamixelMotorsBus:
"""
The DynamixelMotorsBus class allows to efficiently read and write to the attached motors. It relies on
the python dynamixel sdk to communicate with the motors. For more info, see the [Dynamixel SDK Documentation](https://emanual.robotis.com/docs/en/software/dynamixel/dynamixel_sdk/sample_code/python_read_write_protocol_2_0/#python-read-write-protocol-20).
A DynamixelMotorsBus instance requires a port (e.g. `DynamixelMotorsBus(port="/dev/tty.usbmodem575E0031751"`)).
To find the port, you can run our utility script:
```bash
python lerobot/scripts/find_motors_bus_port.py
>>> Finding all available ports for the MotorBus.
>>> ['/dev/tty.usbmodem575E0032081', '/dev/tty.usbmodem575E0031751']
>>> Remove the usb cable from your DynamixelMotorsBus and press Enter when done.
>>> The port of this DynamixelMotorsBus is /dev/tty.usbmodem575E0031751.
>>> Reconnect the usb cable.
```
Example of usage for 1 motor connected to the bus:
```python
motor_name = "gripper"
motor_index = 6
motor_model = "xl330-m288"
motors_bus = DynamixelMotorsBus(
port="/dev/tty.usbmodem575E0031751",
motors={motor_name: (motor_index, motor_model)},
)
motors_bus.connect()
position = motors_bus.read("Present_Position")
# move from a few motor steps as an example
few_steps = 30
motors_bus.write("Goal_Position", position + few_steps)
# when done, consider disconnecting
motors_bus.disconnect()
```
"""
def __init__(
self,
port: str,
motors: dict[str, tuple[int, str]],
mock: bool = False,
):
self.port = port
self.motors = motors
self.mock = mock
self.model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
self.model_resolution = deepcopy(MODEL_RESOLUTION)
self.port_handler = None
self.packet_handler = None
self.calibration = None
self.is_connected = False
self.group_readers = {}
self.group_writers = {}
self.logs = {}
def connect(self):
if self.is_connected:
raise DeviceAlreadyConnectedError(
f"DynamixelMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice."
)
if self.mock:
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
self.port_handler = dxl.PortHandler(self.port)
self.packet_handler = dxl.PacketHandler(PROTOCOL_VERSION)
try:
if not self.port_handler.openPort():
raise OSError(f"Failed to open port '{self.port}'.")
except Exception:
traceback.print_exc()
print(
"\nTry running `python lerobot/scripts/find_motors_bus_port.py` to make sure you are using the correct port.\n"
)
raise
# Allow to read and write
self.is_connected = True
self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS)
def reconnect(self):
if self.mock:
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
self.port_handler = dxl.PortHandler(self.port)
self.packet_handler = dxl.PacketHandler(PROTOCOL_VERSION)
if not self.port_handler.openPort():
raise OSError(f"Failed to open port '{self.port}'.")
self.is_connected = True
def are_motors_configured(self):
# Only check the motor indices and not baudrate, since if the motor baudrates are incorrect,
# a ConnectionError will be raised anyway.
try:
return (self.motor_indices == self.read("ID")).all()
except ConnectionError as e:
print(e)
return False
def find_motor_indices(self, possible_ids=None, num_retry=2):
if possible_ids is None:
possible_ids = range(MAX_ID_RANGE)
indices = []
for idx in tqdm.tqdm(possible_ids):
try:
present_idx = self.read_with_motor_ids(self.motor_models, [idx], "ID", num_retry=num_retry)[0]
except ConnectionError:
continue
if idx != present_idx:
# sanity check
raise OSError(
"Motor index used to communicate through the bus is not the same as the one present in the motor memory. The motor memory might be damaged."
)
indices.append(idx)
return indices
def set_bus_baudrate(self, baudrate):
present_bus_baudrate = self.port_handler.getBaudRate()
if present_bus_baudrate != baudrate:
print(f"Setting bus baud rate to {baudrate}. Previously {present_bus_baudrate}.")
self.port_handler.setBaudRate(baudrate)
if self.port_handler.getBaudRate() != baudrate:
raise OSError("Failed to write bus baud rate.")
@property
def motor_names(self) -> list[str]:
return list(self.motors.keys())
@property
def motor_models(self) -> list[str]:
return [model for _, model in self.motors.values()]
@property
def motor_indices(self) -> list[int]:
return [idx for idx, _ in self.motors.values()]
def set_calibration(self, calibration: dict[str, list]):
self.calibration = calibration
def apply_calibration_autocorrect(self, values: np.ndarray | list, motor_names: list[str] | None):
"""This function applies the calibration, automatically detects out of range errors for motors values and attempts to correct.
For more info, see docstring of `apply_calibration` and `autocorrect_calibration`.
"""
try:
values = self.apply_calibration(values, motor_names)
except JointOutOfRangeError as e:
print(e)
self.autocorrect_calibration(values, motor_names)
values = self.apply_calibration(values, motor_names)
return values
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with
a "zero position" at 0 degree.
Note: We say "nominal degree range" since the motors can take values outside this range. For instance, 190 degrees, if the motor
rotate more than a half a turn from the zero position. However, most motors can't rotate more than 180 degrees and will stay in this range.
Joints values are original in [0, 2**32[ (unsigned int32). Each motor are expected to complete a full rotation
when given a goal position that is + or - their resolution. For instance, dynamixel xl330-m077 have a resolution of 4096, and
at any position in their original range, let's say the position 56734, they complete a full rotation clockwise by moving to 60830,
or anticlockwise by moving to 52638. The position in the original range is arbitrary and might change a lot between each motor.
To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work
in the centered nominal degree range ]-180, 180[.
"""
if motor_names is None:
motor_names = self.motor_names
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range
values = values.astype(np.float32)
for i, name in enumerate(motor_names):
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
drive_mode = self.calibration["drive_mode"][calib_idx]
homing_offset = self.calibration["homing_offset"][calib_idx]
_, model = self.motors[name]
resolution = self.model_resolution[model]
# Update direction of rotation of the motor to match between leader and follower.
# In fact, the motor of the leader for a given joint can be assembled in an
# opposite direction in term of rotation than the motor of the follower on the same joint.
if drive_mode:
values[i] *= -1
# Convert from range [-2**31, 2**31] to
# nominal range [-resolution//2, resolution//2] (e.g. [-2048, 2048])
values[i] += homing_offset
# Convert from range [-resolution//2, resolution//2] to
# universal float32 centered degree range [-180, 180]
# (e.g. 2048 / (4096 // 2) * 180 = 180)
values[i] = values[i] / (resolution // 2) * HALF_TURN_DEGREE
if (values[i] < LOWER_BOUND_DEGREE) or (values[i] > UPPER_BOUND_DEGREE):
raise JointOutOfRangeError(
f"Wrong motor position range detected for {name}. "
f"Expected to be in nominal range of [-{HALF_TURN_DEGREE}, {HALF_TURN_DEGREE}] degrees (a full rotation), "
f"with a maximum range of [{LOWER_BOUND_DEGREE}, {UPPER_BOUND_DEGREE}] degrees to account for joints that can rotate a bit more, "
f"but present value is {values[i]} degree. "
"This might be due to a cable connection issue creating an artificial 360 degrees jump in motor values. "
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
)
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Rescale the present position to a nominal range [0, 100] %,
# useful for joints with linear motions like Aloha gripper
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR):
raise JointOutOfRangeError(
f"Wrong motor position range detected for {name}. "
f"Expected to be in nominal range of [0, 100] % (a full linear translation), "
f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, "
f"but present value is {values[i]} %. "
"This might be due to a cable connection issue creating an artificial jump in motor values. "
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
)
return values
def autocorrect_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""This function automatically detects issues with values of motors after calibration, and correct for these issues.
Some motors might have values outside of expected maximum bounds after calibration.
For instance, for a joint in degree, its value can be outside [-270, 270] degrees, which is totally unexpected given
a nominal range of [-180, 180] degrees, which represents half a turn to the left or right starting from zero position.
Known issues:
#1: Motor value randomly shifts of a full turn, caused by hardware/connection errors.
#2: Motor internal homing offset is shifted by a full turn, caused by using default calibration (e.g Aloha).
#3: motor internal homing offset is shifted by less or more than a full turn, caused by using default calibration
or by human error during manual calibration.
Issues #1 and #2 can be solved by shifting the calibration homing offset by a full turn.
Issue #3 will be visually detected by user and potentially captured by the safety feature `max_relative_target`,
that will slow down the motor, raise an error asking to recalibrate. Manual recalibrating will solve the issue.
Note: A full turn corresponds to 360 degrees but also to 4096 steps for a motor resolution of 4096.
"""
if motor_names is None:
motor_names = self.motor_names
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range
values = values.astype(np.float32)
for i, name in enumerate(motor_names):
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
drive_mode = self.calibration["drive_mode"][calib_idx]
homing_offset = self.calibration["homing_offset"][calib_idx]
_, model = self.motors[name]
resolution = self.model_resolution[model]
# Update direction of rotation of the motor to match between leader and follower.
# In fact, the motor of the leader for a given joint can be assembled in an
# opposite direction in term of rotation than the motor of the follower on the same joint.
if drive_mode:
values[i] *= -1
# Convert from initial range to range [-180, 180] degrees
calib_val = (values[i] + homing_offset) / (resolution // 2) * HALF_TURN_DEGREE
in_range = (calib_val > LOWER_BOUND_DEGREE) and (calib_val < UPPER_BOUND_DEGREE)
# Solve this inequality to find the factor to shift the range into [-180, 180] degrees
# values[i] = (values[i] + homing_offset + resolution * factor) / (resolution // 2) * HALF_TURN_DEGREE
# - HALF_TURN_DEGREE <= (values[i] + homing_offset + resolution * factor) / (resolution // 2) * HALF_TURN_DEGREE <= HALF_TURN_DEGREE
# (- (resolution // 2) - values[i] - homing_offset) / resolution <= factor <= ((resolution // 2) - values[i] - homing_offset) / resolution
low_factor = (-(resolution // 2) - values[i] - homing_offset) / resolution
upp_factor = ((resolution // 2) - values[i] - homing_offset) / resolution
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Convert from initial range to range [0, 100] in %
calib_val = (values[i] - start_pos) / (end_pos - start_pos) * 100
in_range = (calib_val > LOWER_BOUND_LINEAR) and (calib_val < UPPER_BOUND_LINEAR)
# Solve this inequality to find the factor to shift the range into [0, 100] %
# values[i] = (values[i] - start_pos + resolution * factor) / (end_pos + resolution * factor - start_pos - resolution * factor) * 100
# values[i] = (values[i] - start_pos + resolution * factor) / (end_pos - start_pos) * 100
# 0 <= (values[i] - start_pos + resolution * factor) / (end_pos - start_pos) * 100 <= 100
# (start_pos - values[i]) / resolution <= factor <= (end_pos - values[i]) / resolution
low_factor = (start_pos - values[i]) / resolution
upp_factor = (end_pos - values[i]) / resolution
if not in_range:
# Get first integer between the two bounds
if low_factor < upp_factor:
factor = math.ceil(low_factor)
if factor > upp_factor:
raise ValueError(f"No integer found between bounds [{low_factor=}, {upp_factor=}]")
else:
factor = math.ceil(upp_factor)
if factor > low_factor:
raise ValueError(f"No integer found between bounds [{low_factor=}, {upp_factor=}]")
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
out_of_range_str = f"{LOWER_BOUND_DEGREE} < {calib_val} < {UPPER_BOUND_DEGREE} degrees"
in_range_str = f"{LOWER_BOUND_DEGREE} < {calib_val} < {UPPER_BOUND_DEGREE} degrees"
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
out_of_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %"
in_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %"
logging.warning(
f"Auto-correct calibration of motor '{name}' by shifting value by {abs(factor)} full turns, "
f"from '{out_of_range_str}' to '{in_range_str}'."
)
# A full turn corresponds to 360 degrees but also to 4096 steps for a motor resolution of 4096.
self.calibration["homing_offset"][calib_idx] += resolution * factor
def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""Inverse of `apply_calibration`."""
if motor_names is None:
motor_names = self.motor_names
for i, name in enumerate(motor_names):
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
drive_mode = self.calibration["drive_mode"][calib_idx]
homing_offset = self.calibration["homing_offset"][calib_idx]
_, model = self.motors[name]
resolution = self.model_resolution[model]
# Convert from nominal 0-centered degree range [-180, 180] to
# 0-centered resolution range (e.g. [-2048, 2048] for resolution=4096)
values[i] = values[i] / HALF_TURN_DEGREE * (resolution // 2)
# Subtract the homing offsets to come back to actual motor range of values
# which can be arbitrary.
values[i] -= homing_offset
# Remove drive mode, which is the rotation direction of the motor, to come back to
# actual motor rotation direction which can be arbitrary.
if drive_mode:
values[i] *= -1
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Convert from nominal lnear range of [0, 100] % to
# actual motor range of values which can be arbitrary.
values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos
values = np.round(values).astype(np.int32)
return values
def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
if self.mock:
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
return_list = True
if not isinstance(motor_ids, list):
return_list = False
motor_ids = [motor_ids]
assert_same_address(self.model_ctrl_table, self.motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = dxl.GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
for idx in motor_ids:
group.addParam(idx)
for _ in range(num_retry):
comm = group.txRxPacket()
if comm == dxl.COMM_SUCCESS:
break
if comm != dxl.COMM_SUCCESS:
raise ConnectionError(
f"Read failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
values = []
for idx in motor_ids:
value = group.getData(idx, addr, bytes)
values.append(value)
if return_list:
return values
else:
return values[0]
def read(self, data_name, motor_names: str | list[str] | None = None):
if not self.is_connected:
raise DeviceNotConnectedError(
f"DynamixelMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`."
)
start_time = time.perf_counter()
if self.mock:
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
if motor_names is None:
motor_names = self.motor_names
if isinstance(motor_names, str):
motor_names = [motor_names]
motor_ids = []
models = []
for name in motor_names:
motor_idx, model = self.motors[name]
motor_ids.append(motor_idx)
models.append(model)
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
if data_name not in self.group_readers:
# create new group reader
self.group_readers[group_key] = dxl.GroupSyncRead(
self.port_handler, self.packet_handler, addr, bytes
)
for idx in motor_ids:
self.group_readers[group_key].addParam(idx)
for _ in range(NUM_READ_RETRY):
comm = self.group_readers[group_key].txRxPacket()
if comm == dxl.COMM_SUCCESS:
break
if comm != dxl.COMM_SUCCESS:
raise ConnectionError(
f"Read failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
values = []
for idx in motor_ids:
value = self.group_readers[group_key].getData(idx, addr, bytes)
values.append(value)
values = np.array(values)
# Convert to signed int to use range [-2048, 2048] for our motor positions.
if data_name in CONVERT_UINT32_TO_INT32_REQUIRED:
values = values.astype(np.int32)
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
values = self.apply_calibration_autocorrect(values, motor_names)
# log the number of seconds it took to read the data from the motors
delta_ts_name = get_log_name("delta_timestamp_s", "read", data_name, motor_names)
self.logs[delta_ts_name] = time.perf_counter() - start_time
# log the utc time at which the data was received
ts_utc_name = get_log_name("timestamp_utc", "read", data_name, motor_names)
self.logs[ts_utc_name] = capture_timestamp_utc()
return values
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
if self.mock:
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
if not isinstance(motor_ids, list):
motor_ids = [motor_ids]
if not isinstance(values, list):
values = [values]
assert_same_address(self.model_ctrl_table, motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = dxl.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
group.addParam(idx, data)
for _ in range(num_retry):
comm = group.txPacket()
if comm == dxl.COMM_SUCCESS:
break
if comm != dxl.COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None):
if not self.is_connected:
raise DeviceNotConnectedError(
f"DynamixelMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`."
)
start_time = time.perf_counter()
if self.mock:
import tests.mock_dynamixel_sdk as dxl
else:
import dynamixel_sdk as dxl
if motor_names is None:
motor_names = self.motor_names
if isinstance(motor_names, str):
motor_names = [motor_names]
if isinstance(values, (int, float, np.integer)):
values = [int(values)] * len(motor_names)
values = np.array(values)
motor_ids = []
models = []
for name in motor_names:
motor_idx, model = self.motors[name]
motor_ids.append(motor_idx)
models.append(model)
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
values = self.revert_calibration(values, motor_names)
values = values.tolist()
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
init_group = data_name not in self.group_readers
if init_group:
self.group_writers[group_key] = dxl.GroupSyncWrite(
self.port_handler, self.packet_handler, addr, bytes
)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
if init_group:
self.group_writers[group_key].addParam(idx, data)
else:
self.group_writers[group_key].changeParam(idx, data)
comm = self.group_writers[group_key].txPacket()
if comm != dxl.COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
# log the number of seconds it took to write the data to the motors
delta_ts_name = get_log_name("delta_timestamp_s", "write", data_name, motor_names)
self.logs[delta_ts_name] = time.perf_counter() - start_time
# TODO(rcadene): should we log the time before sending the write command?
# log the utc time when the write has been completed
ts_utc_name = get_log_name("timestamp_utc", "write", data_name, motor_names)
self.logs[ts_utc_name] = capture_timestamp_utc()
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(
f"DynamixelMotorsBus({self.port}) is not connected. Try running `motors_bus.connect()` first."
)
if self.port_handler is not None:
self.port_handler.closePort()
self.port_handler = None
self.packet_handler = None
self.group_readers = {}
self.group_writers = {}
self.is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()
def set_operating_mode(arm: DynamixelMotorsBus):
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
raise ValueError("To run set robot preset, the torque must be disabled on all motors.")
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm,
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
all_motors_except_gripper = [name for name in arm.motor_names if name != "gripper"]
if len(all_motors_except_gripper) > 0:
# 4 corresponds to Extended Position on Koch motors
arm.write("Operating_Mode", 4, all_motors_except_gripper)
# Use 'position control current based' for gripper to be limited by the limit of the current.
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
# to make it move, and it will move back to its original target position when we release the force.
# 5 corresponds to Current Controlled Position on Koch gripper motors "xl330-m077, xl330-m288"
arm.write("Operating_Mode", 5, "gripper")

View File

@ -1,3 +1,2 @@
from .feetech import FeetechMotorsBus, OperatingMode
from .feetech_calibration import apply_feetech_offsets_from_calibration, run_full_arm_calibration
from .feetech import DriveMode, FeetechMotorsBus, OperatingMode, TorqueMode
from .tables import *

View File

@ -47,6 +47,16 @@ class OperatingMode(Enum):
STEP = 3
class DriveMode(Enum):
NON_INVERTED = 0
INVERTED = 1
class TorqueMode(Enum):
ENABLED = 1
DISABLED = 0
class FeetechMotorsBus(MotorsBus):
"""
The FeetechMotorsBus class allows to efficiently read and write to the attached motors. It relies on the

View File

@ -1,314 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from ..motors_bus import (
CalibrationMode,
MotorsBus,
TorqueMode,
)
from .feetech import FeetechMotorsBus
from .tables import MODEL_RESOLUTION
URL_TEMPLATE = (
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
)
def disable_torque(arm: MotorsBus):
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
raise ValueError("To run calibration, the torque must be disabled on all motors.")
def get_calibration_modes(arm: MotorsBus):
"""Returns calibration modes for each motor (DEGREE for rotational, LINEAR for gripper)."""
return [
CalibrationMode.LINEAR.name if name == "gripper" else CalibrationMode.DEGREE.name
for name in arm.names
]
def reset_offset(motor_id, motor_bus):
# Open the write lock, changes to EEPROM do NOT persist yet
motor_bus.write("Lock", 1)
# Set offset to 0
motor_name = motor_bus.motor_names[motor_id - 1]
motor_bus.write("Offset", 0, motor_names=[motor_name])
# Close the write lock, changes to EEPROM do persist
motor_bus.write("Lock", 0)
# Confirm that the offset is zero by reading it back
confirmed_offset = motor_bus.read("Offset")[motor_id - 1]
print(f"Offset for motor {motor_id} reset to: {confirmed_offset}")
return confirmed_offset
def calibrate_homing_motor(motor_id, motor_bus):
reset_offset(motor_id, motor_bus)
home_ticks = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts at 0
print(f"Encoder offset (present position in homing position): {home_ticks}")
return home_ticks
def calibrate_linear_motor(motor_id, motor_bus):
motor_names = motor_bus.motor_names
motor_name = motor_names[motor_id - 1]
reset_offset(motor_id, motor_bus)
input(f"Close the {motor_name}, then press Enter...")
start_pos = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts ar 0
print(f" [Motor {motor_id}] start position recorded: {start_pos}")
input(f"Open the {motor_name} fully, then press Enter...")
end_pos = motor_bus.read("Present_Position")[motor_id - 1] # Read index starts ar 0
print(f" [Motor {motor_id}] end position recorded: {end_pos}")
return start_pos, end_pos
def single_motor_calibration(arm: MotorsBus, motor_id: int):
"""Calibrates a single motor and returns its calibration data for updating the calibration file."""
disable_torque(arm)
print(f"\n--- Calibrating Motor {motor_id} ---")
start_pos = 0
end_pos = 0
encoder_offset = 0
if motor_id == 6:
start_pos, end_pos = calibrate_linear_motor(motor_id, arm)
else:
input("Move the motor to (zero) position, then press Enter...")
encoder_offset = calibrate_homing_motor(motor_id, arm)
print(f"Calibration for motor ID:{motor_id} done.")
# Create a calibration dictionary for the single motor
calib_dict = {
"homing_offset": int(encoder_offset),
"drive_mode": 0,
"start_pos": int(start_pos),
"end_pos": int(end_pos),
"calib_mode": get_calibration_modes(arm)[motor_id - 1],
"motor_name": arm.names[motor_id - 1],
}
return calib_dict
def run_full_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
"""
Runs a full calibration process for all motors in a robotic arm.
This function calibrates each motor in the arm, determining encoder offsets and
start/end positions for linear and rotational motors. The calibration data is then
stored in a dictionary for later use.
**Calibration Process:**
- The user is prompted to move the arm to its homing position before starting.
- Motors with rotational motion are calibrated using a homing method.
- Linear actuators (e.g., grippers) are calibrated separately.
- Encoder offsets, start positions, and end positions are recorded.
**Example Usage:**
```python
run_full_arm_calibration(arm, "so100", "left", "follower")
```
"""
disable_torque(arm)
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
print("\nMove arm to homing position (middle)")
print(
"See: " + URL_TEMPLATE.format(robot=robot_type, arm=arm_type, position="zero")
) # TODO(pepijn): replace with new instruction homing pos (all motors in middle) in tutorial
input("Press Enter to continue...")
start_positions = np.zeros(len(arm.ids))
end_positions = np.zeros(len(arm.ids))
encoder_offsets = np.zeros(len(arm.ids))
modes = get_calibration_modes(arm)
for i, motor_id in enumerate(arm.ids):
if modes[i] == CalibrationMode.DEGREE.name:
encoder_offsets[i] = calibrate_homing_motor(motor_id, arm)
start_positions[i] = 0
end_positions[i] = 0
for i, motor_id in enumerate(arm.ids):
if modes[i] == CalibrationMode.LINEAR.name:
start_positions[i], end_positions[i] = calibrate_linear_motor(motor_id, arm)
encoder_offsets[i] = 0
print("\nMove arm to rest position")
input("Press Enter to continue...")
print(f"\n calibration of {robot_type} {arm_name} {arm_type} done!")
# Force drive_mode values (can be static)
drive_modes = [0, 1, 0, 0, 1, 0]
calib_dict = {
"homing_offset": encoder_offsets.astype(int).tolist(),
"drive_mode": drive_modes,
"start_pos": start_positions.astype(int).tolist(),
"end_pos": end_positions.astype(int).tolist(),
"calib_mode": get_calibration_modes(arm),
"motor_names": arm.names,
}
return calib_dict
def run_full_auto_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm_type: str):
"""TODO(pepijn): Add this method later as extra
Example of usage:
```python
run_full_auto_arm_calibration(arm, "so100", "left", "follower")
```
"""
print(f"\nRunning calibration of {robot_type} {arm_name} {arm_type}...")
def apply_feetech_offsets_from_calibration(motorsbus: FeetechMotorsBus, calibration_dict: dict):
"""
Reads 'calibration_dict' containing 'homing_offset' and 'motor_names',
then writes each motor's offset to the servo's internal Offset (0x1F) in EPROM.
This version is modified so each homed position (originally 0) will now read
2047, i.e. 180° away from 0 in the 4096-count circle. Offsets are permanently
stored in EEPROM, so the servo's Present_Position is hardware-shifted even
after power cycling.
Steps:
1) Subtract 2047 from the old offset (so 0 -> 2047).
2) Clamp to [-2047..+2047].
3) Encode sign bit and magnitude into a 12-bit number.
"""
homing_offsets = calibration_dict["homing_offset"]
motor_names = calibration_dict["motor_names"]
start_pos = calibration_dict["start_pos"]
# Open the write lock, changes to EEPROM do NOT persist yet
motorsbus.write("Lock", 1)
# For each motor, set the 'Offset' parameter
for m_name, old_offset in zip(motor_names, homing_offsets, strict=False):
# If bus doesnt have a motor named m_name, skip
if m_name not in motorsbus.motors:
print(f"Warning: '{m_name}' not found in motorsbus.motors; skipping offset.")
continue
if m_name == "gripper":
old_offset = start_pos # If gripper set the offset to the start position of the gripper
continue
# Shift the offset so the homed position reads 2047
new_offset = old_offset - 2047
# Clamp to [-2047..+2047]
if new_offset > 2047:
new_offset = 2047
print(
f"Warning: '{new_offset}' is getting clamped because its larger then 2047; This should not happen!"
)
elif new_offset < -2047:
new_offset = -2047
print(
f"Warning: '{new_offset}' is getting clamped because its smaller then -2047; This should not happen!"
)
# Determine the direction (sign) bit and magnitude
direction_bit = 1 if new_offset < 0 else 0
magnitude = abs(new_offset)
# Combine sign bit (bit 11) with the magnitude (bits 0..10)
servo_offset = (direction_bit << 11) | magnitude
# Write offset to servo
motorsbus.write("Offset", servo_offset, motor_names=m_name)
print(
f"Set offset for {m_name}: "
f"old_offset={old_offset}, new_offset={new_offset}, servo_encoded={magnitude} + direction={direction_bit}"
)
motorsbus.write("Lock", 0)
print("Offsets have been saved to EEPROM successfully.")
def convert_ticks_to_degrees(ticks, model):
resolutions = MODEL_RESOLUTION[model]
# Convert the ticks to degrees
return ticks * (360.0 / resolutions)
def convert_degrees_to_ticks(degrees, model):
resolutions = MODEL_RESOLUTION[model]
# Convert degrees to motor ticks
return int(degrees * (resolutions / 360.0))
def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motor_bus: MotorsBus, motor_id: int) -> int:
"""
Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048'
becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution).
"""
resolutions = MODEL_RESOLUTION[model]
# Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1].
ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions
# If above halfway, fold it into negative territory => [-2048..+2047].
if ticks > (resolutions // 2):
ticks -= resolutions
# Flip sign if drive_mode is set.
drive_mode = 0
if motor_bus.calibration is not None:
drive_mode = motor_bus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
ticks *= -1
return ticks
def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motor_bus: MotorsBus, motor_id: int) -> int:
"""
Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047]
and recovers the raw [0..(res-1)] ticks with 2048 as midpoint.
"""
# Flip sign if drive_mode was set.
drive_mode = 0
if motor_bus.calibration is not None:
drive_mode = motor_bus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
adjusted_pos *= -1
resolutions = MODEL_RESOLUTION[model]
# Shift by +half-resolution and wrap into [0..res-1].
# This undoes the earlier shift by -half-resolution.
ticks = (adjusted_pos + (resolutions // 2)) % resolutions
return ticks

View File

@ -20,12 +20,10 @@
# https://github.com/astral-sh/ruff/issues/3711
import abc
import json
import logging
from dataclasses import dataclass
from enum import Enum
from functools import cached_property
from pathlib import Path
from pprint import pformat
from typing import Protocol, TypeAlias, overload
@ -43,19 +41,18 @@ logger = logging.getLogger(__name__)
def get_ctrl_table(model_ctrl_table: dict[str, dict], model: str) -> dict[str, tuple[int, int]]:
try:
return model_ctrl_table[model]
except KeyError:
raise KeyError(f"Control table for {model=} not found.") from None
ctrl_table = model_ctrl_table.get(model)
if ctrl_table is None:
raise KeyError(f"Control table for {model=} not found.")
return ctrl_table
def get_address(model_ctrl_table: dict[str, dict], model: str, data_name: str) -> tuple[int, int]:
ctrl_table = get_ctrl_table(model_ctrl_table, model)
try:
addr, bytes = ctrl_table[data_name]
return addr, bytes
except KeyError:
raise KeyError(f"Address for '{data_name}' not found in {model} control table.") from None
addr_bytes = ctrl_table.get(data_name)
if addr_bytes is None:
raise KeyError(f"Address for '{data_name}' not found in {model} control table.")
return addr_bytes
def assert_same_address(model_ctrl_table: dict[str, dict], motor_models: list[str], data_name: str) -> None:
@ -69,27 +66,17 @@ def assert_same_address(model_ctrl_table: dict[str, dict], motor_models: list[st
if len(set(all_addr)) != 1:
raise NotImplementedError(
f"At least two motor models use a different address for `data_name`='{data_name}'"
f"({list(zip(motor_models, all_addr, strict=False))}). Contact a LeRobot maintainer."
f"({list(zip(motor_models, all_addr, strict=False))})."
)
if len(set(all_bytes)) != 1:
raise NotImplementedError(
f"At least two motor models use a different bytes representation for `data_name`='{data_name}'"
f"({list(zip(motor_models, all_bytes, strict=False))}). Contact a LeRobot maintainer."
f"({list(zip(motor_models, all_bytes, strict=False))})."
)
class TorqueMode(Enum):
ENABLED = 1
DISABLED = 0
class DriveMode(Enum):
NON_INVERTED = 0
INVERTED = 1
class CalibrationMode(Enum):
class MotorNormMode(Enum):
DEGREE = 0
RANGE_0_100 = 1
RANGE_M100_100 = 2
@ -100,7 +87,7 @@ class CalibrationMode(Enum):
class Motor:
id: int
model: str
calibration: CalibrationMode
norm_mode: MotorNormMode
class JointOutOfRangeError(Exception):
@ -425,12 +412,6 @@ class MotorsBus(abc.ABC):
logger.error(e)
return False
def set_calibration(self, calibration_fpath: Path) -> None:
with open(calibration_fpath) as f:
calibration = json.load(f)
self.calibration = {int(id_): val for id_, val in calibration.items()}
@abc.abstractmethod
def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
pass
@ -592,10 +573,14 @@ class MotorsBus(abc.ABC):
# This could be at the cost of increase latency between the moment the data is produced by the motors and
# the moment it is used by a policy.
# def _async_read(self, motor_ids: list[str], address: int, n_bytes: int):
# self.reader.rxPacket()
# self.reader.txPacket()
# if self.sync_reader.start_address != address or self.sync_reader.data_length != n_bytes or ...:
# self._setup_sync_reader(motor_ids, address, n_bytes)
# else:
# self.sync_reader.rxPacket()
# self.sync_reader.txPacket()
# for id_ in motor_ids:
# value = self.reader.getData(id_, address, n_bytes)
# value = self.sync_reader.getData(id_, address, n_bytes)
def sync_write(
self,

View File

@ -313,7 +313,7 @@ class PI0Policy(PreTrainedPolicy):
state = self.prepare_state(batch)
lang_tokens, lang_masks = self.prepare_language(batch)
actions = self.prepare_action(batch)
actions_is_pad = batch.get("actions_is_pad")
actions_is_pad = batch.get("action_is_pad")
loss_dict = {}
losses = self.model.forward(images, img_masks, lang_tokens, lang_masks, state, actions, noise, time)

View File

@ -22,10 +22,11 @@ from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import CalibrationMode, Motor, TorqueMode
from lerobot.common.motors import Motor, MotorNormMode
from lerobot.common.motors.dynamixel import (
DynamixelMotorsBus,
OperatingMode,
TorqueMode,
run_arm_calibration,
)
@ -52,12 +53,12 @@ class KochRobot(Robot):
self.arm = DynamixelMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "xl430-w250", CalibrationMode.RANGE_M100_100),
"shoulder_lift": Motor(2, "xl430-w250", CalibrationMode.RANGE_M100_100),
"elbow_flex": Motor(3, "xl330-m288", CalibrationMode.RANGE_M100_100),
"wrist_flex": Motor(4, "xl330-m288", CalibrationMode.RANGE_M100_100),
"wrist_roll": Motor(5, "xl330-m288", CalibrationMode.RANGE_M100_100),
"gripper": Motor(6, "xl330-m288", CalibrationMode.RANGE_0_100),
"shoulder_pan": Motor(1, "xl430-w250", MotorNormMode.RANGE_M100_100),
"shoulder_lift": Motor(2, "xl430-w250", MotorNormMode.RANGE_M100_100),
"elbow_flex": Motor(3, "xl330-m288", MotorNormMode.RANGE_M100_100),
"wrist_flex": Motor(4, "xl330-m288", MotorNormMode.RANGE_M100_100),
"wrist_roll": Motor(5, "xl330-m288", MotorNormMode.RANGE_M100_100),
"gripper": Motor(6, "xl330-m288", MotorNormMode.RANGE_0_100),
},
)
self.cameras = make_cameras_from_configs(config.cameras)
@ -124,7 +125,8 @@ class KochRobot(Robot):
@property
def is_connected(self) -> bool:
return self.arm.is_connected # TODO(aliberts): add cam.is_connected for cam in self.cameras
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected
def connect(self) -> None:
if self.is_connected:
@ -147,6 +149,7 @@ class KochRobot(Robot):
self.is_connected = True
def calibrate(self) -> None:
# TODO(pepijn): Do calibration in same way as so100
"""After calibration all motors function in human interpretable ranges.
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].

View File

@ -113,12 +113,6 @@ git clone https://github.com/huggingface/lerobot.git ~/lerobot
cd ~/lerobot && pip install -e ".[feetech]"
```
*EXTRA: For Linux only (not Mac)*: install extra dependencies for recording datasets:
```bash
conda install -y -c conda-forge ffmpeg
pip uninstall -y opencv-python
conda install -y -c conda-forge "opencv>=4.10.0"
```
Great :hugs:! You are now done installing LeRobot and we can begin assembling the SO100 arms and Mobile base :robot:.
Every time you now want to use LeRobot you can go to the `~/lerobot` folder where we installed LeRobot and run one of the commands.

View File

@ -18,7 +18,6 @@ and send orders to its motors.
# TODO(rcadene, aliberts): reorganize the codebase into one file per robot, with the associated
# calibration procedure, to make it easy for people to add their own robot.
import json
import time
import warnings
from dataclasses import dataclass, field
@ -81,73 +80,6 @@ class ManipulatorRobotConfig(RobotConfig):
)
def apply_feetech_offsets_from_calibration(motorsbus, calibration_dict: dict):
"""
Reads 'calibration_dict' containing 'homing_offset' and 'motor_names',
then writes each motor's offset to the servo's internal Offset (0x1F) in EPROM.
This version is modified so each homed position (originally 0) will now read
2047, i.e. 180° away from 0 in the 4096-count circle. Offsets are permanently
stored in EEPROM, so the servo's Present_Position is hardware-shifted even
after power cycling.
Steps:
1) Subtract 2047 from the old offset (so 0 -> 2047).
2) Clamp to [-2047..+2047].
3) Encode sign bit and magnitude into a 12-bit number.
"""
homing_offsets = calibration_dict["homing_offset"]
motor_names = calibration_dict["motor_names"]
start_pos = calibration_dict["start_pos"]
# Open the write lock, changes to EEPROM do NOT persist yet
motorsbus.write("Lock", 1)
# For each motor, set the 'Offset' parameter
for m_name, old_offset in zip(motor_names, homing_offsets, strict=False):
# If bus doesnt have a motor named m_name, skip
if m_name not in motorsbus.motors:
print(f"Warning: '{m_name}' not found in motorsbus.motors; skipping offset.")
continue
if m_name == "gripper":
old_offset = start_pos # If gripper set the offset to the start position of the gripper
continue
# Shift the offset so the homed position reads 2047
new_offset = old_offset - 2047
# Clamp to [-2047..+2047]
if new_offset > 2047:
new_offset = 2047
print(
f"Warning: '{new_offset}' is getting clamped because its larger then 2047; This should not happen!"
)
elif new_offset < -2047:
new_offset = -2047
print(
f"Warning: '{new_offset}' is getting clamped because its smaller then -2047; This should not happen!"
)
# Determine the direction (sign) bit and magnitude
direction_bit = 1 if new_offset < 0 else 0
magnitude = abs(new_offset)
# Combine sign bit (bit 11) with the magnitude (bits 0..10)
servo_offset = (direction_bit << 11) | magnitude
# Write offset to servo
motorsbus.write("Offset", servo_offset, motor_names=m_name)
print(
f"Set offset for {m_name}: "
f"old_offset={old_offset}, new_offset={new_offset}, servo_encoded={magnitude} + direction={direction_bit}"
)
motorsbus.write("Lock", 0)
print("Offsets have been saved to EEPROM successfully.")
class ManipulatorRobot:
# TODO(rcadene): Implement force feedback
"""This class allows to control any manipulator robot of various number of motors.
@ -347,8 +279,6 @@ class ManipulatorRobot:
for name in self.leader_arms:
self.leader_arms[name].write("Torque_Enable", TorqueMode.DISABLED.value)
self.activate_calibration()
# Set robot preset (e.g. torque in leader gripper for Koch v1.1)
if self.robot_type in ["koch", "koch_bimanual"]:
self.set_koch_robot_preset()
@ -385,61 +315,6 @@ class ManipulatorRobot:
self.is_connected = True
def activate_calibration(self):
"""After calibration all motors function in human interpretable ranges.
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
def load_or_run_calibration_(name, arm, arm_type):
arm_id = get_arm_id(name, arm_type)
arm_calib_path = self.calibration_dir / f"{arm_id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
print(f"Missing calibration file '{arm_calib_path}'")
if self.robot_type in ["koch", "koch_bimanual", "aloha"]:
from lerobot.common.motors.dynamixel.dynamixel_calibration import run_arm_calibration
calibration = run_arm_calibration(arm, self.robot_type, name, arm_type)
elif self.robot_type in ["so100", "moss", "lekiwi"]:
from lerobot.common.motors.feetech.feetech_calibration import (
run_full_arm_calibration,
)
calibration = run_full_arm_calibration(arm, self.robot_type, name, arm_type)
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
json.dump(calibration, f)
return calibration
# For each follower arm
for name, arm_bus in self.follower_arms.items():
calibration = load_or_run_calibration_(name, arm_bus, "follower")
arm_bus.set_calibration(calibration)
# If this is a Feetech robot, also set the servo offset into EEPROM
if self.robot_type in ["so100", "lekiwi"]:
apply_feetech_offsets_from_calibration(arm_bus, calibration)
# For each leader arm
for name, arm_bus in self.leader_arms.items():
calibration = load_or_run_calibration_(name, arm_bus, "leader")
arm_bus.set_calibration(calibration)
# Optionally also set offset for leader if you want the servo offsets as well
if self.robot_type in ["so100", "lekiwi"]:
apply_feetech_offsets_from_calibration(arm_bus, calibration)
def set_koch_robot_preset(self):
def set_operating_mode_(arm):
from lerobot.common.motors.dynamixel.dynamixel import TorqueMode
@ -540,9 +415,6 @@ class ManipulatorRobot:
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.follower_arms[name].write("I_Coefficient", 0)
self.follower_arms[name].write("D_Coefficient", 32)
# Close the write lock so that Maximum_Acceleration gets written to EPROM address,
# which is mandatory for Maximum_Acceleration to take effect after rebooting.
self.follower_arms[name].write("Lock", 0)
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table
self.follower_arms[name].write("Maximum_Acceleration", 254)

View File

@ -36,13 +36,6 @@ git clone https://github.com/huggingface/lerobot.git ~/lerobot
cd ~/lerobot && pip install -e ".[feetech]"
```
For Linux only (not Mac), install extra dependencies for recording datasets:
```bash
conda install -y -c conda-forge ffmpeg
pip uninstall -y opencv-python
conda install -y -c conda-forge "opencv>=4.10.0"
```
## Configure the motors
Follow steps 1 of the [assembly video](https://www.youtube.com/watch?v=DA91NJOtMic) which illustrates the use of our scripts below.

View File

@ -62,12 +62,6 @@ git clone https://github.com/huggingface/lerobot.git ~/lerobot
cd ~/lerobot && pip install -e ".[feetech]"
```
*EXTRA: For Linux only (not Mac)*: install extra dependencies for recording datasets:
```bash
conda install -y -c conda-forge ffmpeg
pip uninstall -y opencv-python
conda install -y -c conda-forge "opencv>=4.10.0"
```
Great :hugs:! You are now done installing LeRobot and we can begin assembling the SO100 arms :robot:.
Every time you now want to use LeRobot you can go to the `~/lerobot` folder where we installed LeRobot and run one of the commands.

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import time
@ -22,11 +23,12 @@ import numpy as np
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import CalibrationMode, Motor, TorqueMode
from lerobot.common.motors import Motor, MotorNormMode
from lerobot.common.motors.calibration import find_min_max, find_offset, set_calibration
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
OperatingMode,
apply_feetech_offsets_from_calibration,
TorqueMode,
)
from ..robot import Robot
@ -51,12 +53,12 @@ class SO100Robot(Robot):
self.arm = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "sts3215", CalibrationMode.RANGE_M100_100),
"shoulder_lift": Motor(2, "sts3215", CalibrationMode.RANGE_M100_100),
"elbow_flex": Motor(3, "sts3215", CalibrationMode.RANGE_M100_100),
"wrist_flex": Motor(4, "sts3215", CalibrationMode.RANGE_M100_100),
"wrist_roll": Motor(5, "sts3215", CalibrationMode.RANGE_M100_100),
"gripper": Motor(6, "sts3215", CalibrationMode.RANGE_0_100),
"shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
"shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100),
"elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100),
"gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
},
)
self.cameras = make_cameras_from_configs(config.cameras)
@ -96,15 +98,17 @@ class SO100Robot(Robot):
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.arm.write("I_Coefficient", name, 0)
self.arm.write("D_Coefficient", name, 32)
# Close the write lock so that Maximum_Acceleration gets written to EPROM address,
# which is mandatory for Maximum_Acceleration to take effect after rebooting.
self.arm.write("Lock", name, 0)
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table
self.arm.write("Maximum_Acceleration", name, 254)
self.arm.write("Acceleration", name, 254)
self.calibrate()
if not self.calibration_fpath.exists():
print("Calibration file not found. Running calibration.")
self.calibrate()
else:
print("Calibration file found. Loading calibration data.")
set_calibration(self.arm, self.calibration_fpath)
for name in self.arm.names:
self.arm.write("Torque_Enable", name, TorqueMode.ENABLED.value)
@ -134,15 +138,25 @@ class SO100Robot(Robot):
cam.connect()
def calibrate(self) -> None:
raise NotImplementedError
print(f"\nRunning calibration of {self.name} robot")
def set_calibration(self) -> None:
if not self.calibration_fpath.exists():
logging.error("Calibration file not found. Please run calibration first")
raise FileNotFoundError(self.calibration_fpath)
offsets = find_offset(self.arm)
min_max = find_min_max(self.arm)
self.arm.set_calibration(self.calibration_fpath)
apply_feetech_offsets_from_calibration(self.arm, self.arm.calibration)
calibration = {}
for name in self.arm.names:
motor_id = self.arm.motors[name].id
calibration[str(motor_id)] = {
"name": name,
"homing_offset": offsets.get(name, 0),
"drive_mode": 0,
"min": min_max[name]["min"],
"max": min_max[name]["max"],
}
with open(self.calibration_fpath, "w") as f:
json.dump(calibration, f, indent=4)
print("Calibration saved to", self.calibration_fpath)
def get_observation(self) -> dict[str, np.ndarray]:
"""The returned observations do not have a batch dimension."""

View File

@ -50,13 +50,6 @@ cd ~/lerobot && pip install -e ".[stretch]"
> **Note:** If you get this message, you can ignore it: `ERROR: pip's dependency resolver does not currently take into account all the packages that are installed.`
For Linux only (not Mac), install extra dependencies for recording datasets:
```bash
conda install -y -c conda-forge ffmpeg
pip uninstall -y opencv-python
conda install -y -c conda-forge "opencv>=4.10.0"
```
7. Run a [system check](https://docs.hello-robot.com/0.3/getting_started/stretch_hardware_overview/#system-check) to make sure your robot is ready:
```bash
stretch_system_check.py

View File

@ -35,13 +35,6 @@ git clone https://github.com/huggingface/lerobot.git ~/lerobot
cd ~/lerobot && pip install -e ".[dynamixel, intelrealsense]"
```
For Linux only (not Mac), install extra dependencies for recording datasets:
```bash
conda install -y -c conda-forge ffmpeg
pip uninstall -y opencv-python
conda install -y -c conda-forge "opencv>=4.10.0"
```
## Teleoperate
**/!\ FOR SAFETY, READ THIS /!\**

View File

@ -13,7 +13,7 @@ import numpy as np
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import CalibrationMode, Motor, TorqueMode
from lerobot.common.motors import Motor, MotorNormMode, TorqueMode
from lerobot.common.motors.dynamixel import (
DynamixelMotorsBus,
run_arm_calibration,
@ -43,15 +43,15 @@ class ViperXRobot(Robot):
self.arm = DynamixelMotorsBus(
port=self.config.port,
motors={
"waist": Motor(1, "xm540-w270", CalibrationMode.RANGE_M100_100),
"shoulder": Motor(2, "xm540-w270", CalibrationMode.RANGE_M100_100),
"shoulder_shadow": Motor(3, "xm540-w270", CalibrationMode.RANGE_M100_100),
"elbow": Motor(4, "xm540-w270", CalibrationMode.RANGE_M100_100),
"elbow_shadow": Motor(5, "xm540-w270", CalibrationMode.RANGE_M100_100),
"forearm_roll": Motor(6, "xm540-w270", CalibrationMode.RANGE_M100_100),
"wrist_angle": Motor(7, "xm540-w270", CalibrationMode.RANGE_M100_100),
"wrist_rotate": Motor(8, "xm430-w350", CalibrationMode.RANGE_M100_100),
"gripper": Motor(9, "xm430-w350", CalibrationMode.RANGE_0_100),
"waist": Motor(1, "xm540-w270", MotorNormMode.RANGE_M100_100),
"shoulder": Motor(2, "xm540-w270", MotorNormMode.RANGE_M100_100),
"shoulder_shadow": Motor(3, "xm540-w270", MotorNormMode.RANGE_M100_100),
"elbow": Motor(4, "xm540-w270", MotorNormMode.RANGE_M100_100),
"elbow_shadow": Motor(5, "xm540-w270", MotorNormMode.RANGE_M100_100),
"forearm_roll": Motor(6, "xm540-w270", MotorNormMode.RANGE_M100_100),
"wrist_angle": Motor(7, "xm540-w270", MotorNormMode.RANGE_M100_100),
"wrist_rotate": Motor(8, "xm430-w350", MotorNormMode.RANGE_M100_100),
"gripper": Motor(9, "xm430-w350", MotorNormMode.RANGE_0_100),
},
)
self.cameras = make_cameras_from_configs(config.cameras)

View File

@ -21,10 +21,11 @@ import time
import numpy as np
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import CalibrationMode, Motor, TorqueMode
from lerobot.common.motors import Motor, MotorNormMode
from lerobot.common.motors.dynamixel import (
DynamixelMotorsBus,
OperatingMode,
TorqueMode,
run_arm_calibration,
)
@ -50,12 +51,12 @@ class KochTeleop(Teleoperator):
self.arm = DynamixelMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "xl330-m077", CalibrationMode.RANGE_M100_100),
"shoulder_lift": Motor(2, "xl330-m077", CalibrationMode.RANGE_M100_100),
"elbow_flex": Motor(3, "xl330-m077", CalibrationMode.RANGE_M100_100),
"wrist_flex": Motor(4, "xl330-m077", CalibrationMode.RANGE_M100_100),
"wrist_roll": Motor(5, "xl330-m077", CalibrationMode.RANGE_M100_100),
"gripper": Motor(6, "xl330-m077", CalibrationMode.RANGE_0_100),
"shoulder_pan": Motor(1, "xl330-m077", MotorNormMode.RANGE_M100_100),
"shoulder_lift": Motor(2, "xl330-m077", MotorNormMode.RANGE_M100_100),
"elbow_flex": Motor(3, "xl330-m077", MotorNormMode.RANGE_M100_100),
"wrist_flex": Motor(4, "xl330-m077", MotorNormMode.RANGE_M100_100),
"wrist_roll": Motor(5, "xl330-m077", MotorNormMode.RANGE_M100_100),
"gripper": Motor(6, "xl330-m077", MotorNormMode.RANGE_0_100),
},
)

View File

@ -14,17 +14,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import time
import numpy as np
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import CalibrationMode, Motor, TorqueMode
from lerobot.common.motors import Motor, MotorNormMode
from lerobot.common.motors.calibration import find_min_max, find_offset, set_calibration
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
apply_feetech_offsets_from_calibration,
)
from lerobot.common.motors.feetech.feetech import TorqueMode
from ..teleoperator import Teleoperator
from .configuration_so100 import SO100TeleopConfig
@ -46,12 +48,12 @@ class SO100Teleop(Teleoperator):
self.arm = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "sts3215", CalibrationMode.RANGE_M100_100),
"shoulder_lift": Motor(2, "sts3215", CalibrationMode.RANGE_M100_100),
"elbow_flex": Motor(3, "sts3215", CalibrationMode.RANGE_M100_100),
"wrist_flex": Motor(4, "sts3215", CalibrationMode.RANGE_M100_100),
"wrist_roll": Motor(5, "sts3215", CalibrationMode.RANGE_M100_100),
"gripper": Motor(6, "sts3215", CalibrationMode.RANGE_0_100),
"shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
"shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100),
"elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100),
"gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
},
)
@ -70,6 +72,13 @@ class SO100Teleop(Teleoperator):
return {}
def configure(self) -> None:
if not self.calibration_fpath.exists():
print("Calibration file not found. Running calibration.")
self.calibrate()
else:
print("Calibration file found. Loading calibration data.")
set_calibration(self.arm, self.calibration_fpath)
# We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration.
for name in self.arm.names:
@ -88,13 +97,32 @@ class SO100Teleop(Teleoperator):
logging.info("Connecting arm.")
self.arm.connect()
self.configure()
self.calibrate()
# Check arm can be read
self.arm.sync_read("Present_Position")
def calibrate(self) -> None:
raise NotImplementedError
print(f"\nRunning calibration of {self.name} teleop")
for name in self.arm.names:
self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value)
offsets = find_offset(self.arm)
min_max = find_min_max(self.arm)
calibration = {}
for name in self.arm.names:
motor_id = self.arm.motors[name].id
calibration[str(motor_id)] = {
"name": name,
"homing_offset": offsets.get(name, 0),
"drive_mode": 0,
"min": min_max[name]["min"],
"max": min_max[name]["max"],
}
with open(self.calibration_fpath, "w") as f:
json.dump(calibration, f, indent=4)
print("Calibration saved to", self.calibration_fpath)
def set_calibration(self) -> None:
"""After calibration all motors function in human interpretable ranges.
@ -106,7 +134,6 @@ class SO100Teleop(Teleoperator):
raise FileNotFoundError(self.calibration_fpath)
self.arm.set_calibration(self.calibration_fpath)
apply_feetech_offsets_from_calibration(self.arm, self.arm.calibration)
def get_action(self) -> np.ndarray:
"""The returned action does not have a batch dimension."""

View File

@ -21,7 +21,7 @@ import time
import numpy as np
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import CalibrationMode, Motor, TorqueMode
from lerobot.common.motors import Motor, MotorNormMode, TorqueMode
from lerobot.common.motors.dynamixel import (
DynamixelMotorsBus,
run_arm_calibration,
@ -47,15 +47,15 @@ class WidowXTeleop(Teleoperator):
self.arm = DynamixelMotorsBus(
port=self.config.port,
motors={
"waist": Motor(1, "xm430-w350", CalibrationMode.RANGE_M100_100),
"shoulder": Motor(2, "xm430-w350", CalibrationMode.RANGE_M100_100),
"shoulder_shadow": Motor(3, "xm430-w350", CalibrationMode.RANGE_M100_100),
"elbow": Motor(4, "xm430-w350", CalibrationMode.RANGE_M100_100),
"elbow_shadow": Motor(5, "xm430-w350", CalibrationMode.RANGE_M100_100),
"forearm_roll": Motor(6, "xm430-w350", CalibrationMode.RANGE_M100_100),
"wrist_angle": Motor(7, "xm430-w350", CalibrationMode.RANGE_M100_100),
"wrist_rotate": Motor(8, "xl430-w250", CalibrationMode.RANGE_M100_100),
"gripper": Motor(9, "xc430-w150", CalibrationMode.RANGE_0_100),
"waist": Motor(1, "xm430-w350", MotorNormMode.RANGE_M100_100),
"shoulder": Motor(2, "xm430-w350", MotorNormMode.RANGE_M100_100),
"shoulder_shadow": Motor(3, "xm430-w350", MotorNormMode.RANGE_M100_100),
"elbow": Motor(4, "xm430-w350", MotorNormMode.RANGE_M100_100),
"elbow_shadow": Motor(5, "xm430-w350", MotorNormMode.RANGE_M100_100),
"forearm_roll": Motor(6, "xm430-w350", MotorNormMode.RANGE_M100_100),
"wrist_angle": Motor(7, "xm430-w350", MotorNormMode.RANGE_M100_100),
"wrist_rotate": Motor(8, "xl430-w250", MotorNormMode.RANGE_M100_100),
"gripper": Motor(9, "xc430-w150", MotorNormMode.RANGE_0_100),
},
)

View File

@ -90,6 +90,7 @@ class WandBLogger:
# TODO(rcadene): split train and eval, and run async eval with job_type="eval"
job_type="train_eval",
resume="must" if cfg.resume else None,
mode=self.cfg.mode if self.cfg.mode in ["online", "offline", "disabled"] else "online",
)
print(colored("Logs will be synced with wandb.", "blue", attrs=["bold"]))
logging.info(f"Track this run --> {colored(wandb.run.get_url(), 'yellow', attrs=['bold'])}")

View File

@ -48,6 +48,7 @@ class WandBConfig:
entity: str | None = None
notes: str | None = None
run_id: str | None = None
mode: str | None = None # Allowed values: 'online', 'offline' 'disabled'. Defaults to 'online'
@dataclass

View File

@ -62,7 +62,7 @@ dependencies = [
"omegaconf>=2.3.0",
"opencv-python>=4.9.0",
"packaging>=24.2",
"av>=12.0.5",
"av>=12.0.5,<13.0.0",
"pymunk>=6.6.0",
"pynput>=1.7.7",
"pyzmq>=26.2.1",

View File

@ -281,9 +281,9 @@ class MockInstructionPacket(MockDynamixelPacketv2):
+2 is for the CRC at the end.
"""
data = []
for idx, value in ids_values.items():
for id_, value in ids_values.items():
split_value = DynamixelMotorsBus._split_int_to_bytes(value, data_length)
data += [idx, *split_value]
data += [id_, *split_value]
params = [
dxl.DXL_LOBYTE(start_address),
dxl.DXL_HIBYTE(start_address),
@ -444,10 +444,10 @@ class MockMotors(MockSerial):
self, ids_models: dict[int, list[int]] | None = None, num_invalid_try: int = 0
) -> str:
ping_request = MockInstructionPacket.ping(dxl.BROADCAST_ID)
return_packets = b"".join(MockStatusPacket.ping(idx, model) for idx, model in ids_models.items())
return_packets = b"".join(MockStatusPacket.ping(id_, model) for id_, model in ids_models.items())
ping_response = self._build_send_fn(return_packets, num_invalid_try)
stub_name = "Ping_" + "_".join([str(idx) for idx in ids_models])
stub_name = "Ping_" + "_".join([str(id_) for id_ in ids_models])
self.stub(
name=stub_name,
receive_bytes=ping_request,
@ -482,10 +482,10 @@ class MockMotors(MockSerial):
address, length = self.ctrl_table[data_name]
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
return_packets = b"".join(
MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items()
MockStatusPacket.present_position(id_, pos) for id_, pos in ids_values.items()
)
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values])
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
self.stub(
name=stub_name,
receive_bytes=sync_read_request,
@ -498,7 +498,7 @@ class MockMotors(MockSerial):
) -> str:
address, length = self.ctrl_table[data_name]
sync_read_request = MockInstructionPacket.sync_write(ids_values, address, length)
stub_name = f"Sync_Write_{data_name}_" + "_".join([str(idx) for idx in ids_values])
stub_name = f"Sync_Write_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
self.stub(
name=stub_name,
receive_bytes=sync_read_request,

View File

@ -46,8 +46,8 @@ class MockFeetechPacket(abc.ABC):
@staticmethod
def _add_checksum(packet: list[int]) -> list[int]:
checksum = 0
for idx in range(2, len(packet) - 1): # except header & checksum
checksum += packet[idx]
for id_ in range(2, len(packet) - 1): # except header & checksum
checksum += packet[id_]
packet[-1] = scs.SCS_LOBYTE(~checksum)
@ -171,9 +171,9 @@ class MockInstructionPacket(MockFeetechPacket):
+1 is for the checksum at the end.
"""
data = []
for idx, value in ids_values.items():
for id_, value in ids_values.items():
split_value = FeetechMotorsBus._split_int_to_bytes(value, data_length)
data += [idx, *split_value]
data += [id_, *split_value]
params = [start_address, data_length, *data]
length = len(ids_values) * (1 + data_length) + 4
return cls.build(scs_id=scs.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Write")
@ -331,9 +331,9 @@ class MockMotors(MockSerial):
def build_broadcast_ping_stub(self, ids: list[int] | None = None, num_invalid_try: int = 0) -> str:
ping_request = MockInstructionPacket.ping(scs.BROADCAST_ID)
return_packets = b"".join(MockStatusPacket.ping(idx) for idx in ids)
return_packets = b"".join(MockStatusPacket.ping(id_) for id_ in ids)
ping_response = self._build_send_fn(return_packets, num_invalid_try)
stub_name = "Ping_" + "_".join([str(idx) for idx in ids])
stub_name = "Ping_" + "_".join([str(id_) for id_ in ids])
self.stub(
name=stub_name,
receive_bytes=ping_request,
@ -386,17 +386,17 @@ class MockMotors(MockSerial):
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
if data_name == "Present_Position":
return_packets = b"".join(
MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items()
MockStatusPacket.present_position(id_, pos) for id_, pos in ids_values.items()
)
elif data_name == "Model_Number":
return_packets = b"".join(
MockStatusPacket.model_number(idx, model_nb) for idx, model_nb in ids_values.items()
MockStatusPacket.model_number(id_, model_nb) for id_, model_nb in ids_values.items()
)
else:
raise NotImplementedError
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values])
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
self.stub(
name=stub_name,
receive_bytes=sync_read_request,
@ -409,7 +409,7 @@ class MockMotors(MockSerial):
) -> str:
address, length = self.ctrl_table[data_name]
sync_read_request = MockInstructionPacket.sync_write(ids_values, address, length)
stub_name = f"Sync_Write_{data_name}_" + "_".join([str(idx) for idx in ids_values])
stub_name = f"Sync_Write_{data_name}_" + "_".join([str(id_) for id_ in ids_values])
self.stub(
name=stub_name,
receive_bytes=sync_read_request,

View File

@ -0,0 +1,147 @@
import sys
from typing import Generator
from unittest.mock import Mock, call, patch
import pytest
import scservo_sdk as scs
from lerobot.common.motors import Motor, MotorNormMode
from lerobot.common.motors.calibration import find_min_max, find_offset, set_min_max, set_offset
from lerobot.common.motors.feetech import FeetechMotorsBus
from tests.mocks.mock_feetech import MockMotors, MockPortHandler
@pytest.fixture(autouse=True)
def patch_port_handler():
if sys.platform == "darwin":
with patch.object(scs, "PortHandler", MockPortHandler):
yield
else:
yield
@pytest.fixture
def mock_motors() -> Generator[MockMotors, None, None]:
motors = MockMotors()
motors.open()
yield motors
motors.close()
@pytest.fixture
def dummy_motors() -> dict[str, Motor]:
return {
"dummy_1": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_roll": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100),
"dummy_3": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100),
}
@pytest.fixture(autouse=True)
def patch_broadcast_ping():
with patch.object(FeetechMotorsBus, "broadcast_ping", return_value={1: 777, 2: 777, 3: 777}):
yield
@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}")
def test_autouse_patch():
"""Ensures that the autouse fixture correctly patches scs.PortHandler with MockPortHandler."""
assert scs.PortHandler is MockPortHandler
@pytest.mark.parametrize(
"motor_names, read_values",
[
(
["dummy_1"],
[{"dummy_1": 3000}],
),
],
ids=["two-motors"],
)
def test_find_offset(mock_motors, dummy_motors, motor_names, read_values):
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
with (
patch("builtins.input", return_value=""),
patch("lerobot.common.motors.calibration.set_offset") as mock_set_offset,
):
motors_bus.sync_read = Mock(side_effect=[{"dummy_1": 3000}])
motors_bus.motor_names = motor_names
motors_bus.write = Mock(return_value=None)
find_offset(motors_bus)
# Compute the expected offset: 3000 - 2047 = 953.
expected_calls = [call(motors_bus, 953, "dummy_1")]
mock_set_offset.assert_has_calls(expected_calls, any_order=False)
def test_find_min_max(mock_motors, dummy_motors):
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.motor_names = list(dummy_motors.keys())
read_side_effect = [
{"dummy_1": 10, "wrist_roll": 20, "dummy_3": 30}, # For first sync_read call.
{"dummy_1": 4000, "wrist_roll": 2000, "dummy_3": 100}, # For second sync_read call.
{"dummy_1": 100, "wrist_roll": 4050, "dummy_3": 2010}, # For third sync_read call.
]
motors_bus.sync_read = Mock(side_effect=read_side_effect)
select_returns = [
([], [], []), # First iteration: no input.
([], [], []), # Second iteration.
([sys.stdin], [], []), # Third iteration: simulate pressing ENTER.
]
with (
patch("lerobot.common.motors.calibration.set_min_max") as mock_set_min_max,
patch("lerobot.common.motors.calibration.select.select", side_effect=select_returns),
patch("sys.stdin.readline", return_value="\n"),
):
find_min_max(motors_bus)
mock_set_min_max.assert_any_call(motors_bus, 10, 4000, "dummy_1")
mock_set_min_max.assert_any_call(motors_bus, 0, 4095, "wrist_roll") # wrist_roll is forced to [0,4095]
mock_set_min_max.assert_any_call(motors_bus, 30, 2010, "dummy_3")
assert mock_set_min_max.call_count == 3
def test_set_offset_clamping(mock_motors, dummy_motors):
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.sync_read = Mock(return_value={"dummy_1": 2047})
motors_bus.write = Mock()
# A very large offset should be clamped to +2047.
set_offset(motors_bus, 9999, "dummy_1")
motors_bus.write.assert_any_call("Offset", "dummy_1", 2047, raw_value=True)
def test_set_min_max(mock_motors, dummy_motors):
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
def _sync_read_side_effect(data_name, motors, *, raw_values=False):
if data_name == "Min_Angle_Limit":
return {"dummy_1": 100}
elif data_name == "Max_Angle_Limit":
return {"dummy_1": 3000}
return {}
motors_bus.sync_read = Mock(side_effect=_sync_read_side_effect)
motors_bus.write = Mock()
set_min_max(motors_bus, 100, 3000, "dummy_1")
motors_bus.write.assert_any_call("Min_Angle_Limit", "dummy_1", 100, raw_value=True)
motors_bus.write.assert_any_call("Max_Angle_Limit", "dummy_1", 3000, raw_value=True)

View File

@ -5,7 +5,7 @@ from unittest.mock import patch
import dynamixel_sdk as dxl
import pytest
from lerobot.common.motors import CalibrationMode, Motor
from lerobot.common.motors import Motor, MotorNormMode
from lerobot.common.motors.dynamixel import MODEL_NUMBER, DynamixelMotorsBus
from tests.mocks.mock_dynamixel import MockMotors, MockPortHandler
@ -30,9 +30,9 @@ def mock_motors() -> Generator[MockMotors, None, None]:
@pytest.fixture
def dummy_motors() -> dict[str, Motor]:
return {
"dummy_1": Motor(1, "xl430-w250", CalibrationMode.RANGE_M100_100),
"dummy_2": Motor(2, "xm540-w270", CalibrationMode.RANGE_M100_100),
"dummy_3": Motor(3, "xl330-m077", CalibrationMode.RANGE_M100_100),
"dummy_1": Motor(1, "xl430-w250", MotorNormMode.RANGE_M100_100),
"dummy_2": Motor(2, "xm540-w270", MotorNormMode.RANGE_M100_100),
"dummy_3": Motor(3, "xl330-m077", MotorNormMode.RANGE_M100_100),
}
@ -92,17 +92,17 @@ def test_abc_implementation(dummy_motors):
DynamixelMotorsBus(port="/dev/dummy-port", motors=dummy_motors)
@pytest.mark.parametrize("idx", [1, 2, 3])
def test_ping(idx, mock_motors, dummy_motors):
expected_model_nb = MODEL_NUMBER[dummy_motors[f"dummy_{idx}"].model]
stub_name = mock_motors.build_ping_stub(idx, expected_model_nb)
@pytest.mark.parametrize("id_", [1, 2, 3])
def test_ping(id_, mock_motors, dummy_motors):
expected_model_nb = MODEL_NUMBER[dummy_motors[f"dummy_{id_}"].model]
stub_name = mock_motors.build_ping_stub(id_, expected_model_nb)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
ping_model_nb = motors_bus.ping(idx)
ping_model_nb = motors_bus.ping(id_)
assert ping_model_nb == expected_model_nb
assert mock_motors.stubs[stub_name].called

View File

@ -5,7 +5,7 @@ from unittest.mock import patch
import pytest
import scservo_sdk as scs
from lerobot.common.motors import CalibrationMode, Motor
from lerobot.common.motors import Motor, MotorNormMode
from lerobot.common.motors.feetech import MODEL_NUMBER, FeetechMotorsBus
from tests.mocks.mock_feetech import MockMotors, MockPortHandler
@ -30,9 +30,9 @@ def mock_motors() -> Generator[MockMotors, None, None]:
@pytest.fixture
def dummy_motors() -> dict[str, Motor]:
return {
"dummy_1": Motor(1, "sts3215", CalibrationMode.RANGE_M100_100),
"dummy_2": Motor(2, "sts3215", CalibrationMode.RANGE_M100_100),
"dummy_3": Motor(3, "sts3215", CalibrationMode.RANGE_M100_100),
"dummy_1": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
"dummy_2": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100),
"dummy_3": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100),
}
@ -93,18 +93,18 @@ def test_abc_implementation(dummy_motors):
# @pytest.mark.skip("TODO")
@pytest.mark.parametrize("idx", [1, 2, 3])
def test_ping(idx, mock_motors, dummy_motors):
expected_model_nb = MODEL_NUMBER[dummy_motors[f"dummy_{idx}"].model]
ping_stub = mock_motors.build_ping_stub(idx)
mobel_nb_stub = mock_motors.build_read_stub("Model_Number", idx, expected_model_nb)
@pytest.mark.parametrize("id_", [1, 2, 3])
def test_ping(id_, mock_motors, dummy_motors):
expected_model_nb = MODEL_NUMBER[dummy_motors[f"dummy_{id_}"].model]
ping_stub = mock_motors.build_ping_stub(id_)
mobel_nb_stub = mock_motors.build_read_stub("Model_Number", id_, expected_model_nb)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
ping_model_nb = motors_bus.ping(idx)
ping_model_nb = motors_bus.ping(id_)
assert ping_model_nb == expected_model_nb
assert mock_motors.stubs[ping_stub].called

View File

@ -0,0 +1,87 @@
import re
import pytest
from lerobot.common.motors.motors_bus import assert_same_address, get_address, get_ctrl_table
# TODO(aliberts)
# class DummyMotorsBus(MotorsBus):
# def __init__(self, port: str, motors: dict[str, Motor]):
# super().__init__(port, motors)
@pytest.fixture
def ctrl_table_1() -> dict:
return {
"Firmware_Version": (0, 1),
"Model_Number": (1, 2),
"Present_Position": (3, 4),
"Goal_Position": (7, 2),
}
@pytest.fixture
def ctrl_table_2() -> dict:
return {
"Model_Number": (0, 2),
"Firmware_Version": (2, 1),
"Present_Position": (3, 4),
"Goal_Position": (7, 4),
"Lock": (7, 4),
}
@pytest.fixture
def model_ctrl_table(ctrl_table_1, ctrl_table_2) -> dict:
return {
"model_1": ctrl_table_1,
"model_2": ctrl_table_2,
}
def test_get_ctrl_table(model_ctrl_table, ctrl_table_1):
model = "model_1"
ctrl_table = get_ctrl_table(model_ctrl_table, model)
assert ctrl_table == ctrl_table_1
def test_get_ctrl_table_error(model_ctrl_table):
model = "model_99"
with pytest.raises(KeyError, match=f"Control table for {model=} not found."):
get_ctrl_table(model_ctrl_table, model)
def test_get_address(model_ctrl_table):
addr, n_bytes = get_address(model_ctrl_table, "model_1", "Firmware_Version")
assert addr == 0
assert n_bytes == 1
def test_get_address_error(model_ctrl_table):
model = "model_1"
data_name = "Lock"
with pytest.raises(KeyError, match=f"Address for '{data_name}' not found in {model} control table."):
get_address(model_ctrl_table, "model_1", data_name)
def test_assert_same_address(model_ctrl_table):
models = ["model_1", "model_2"]
assert_same_address(model_ctrl_table, models, "Present_Position")
def test_assert_same_address_different_addresses(model_ctrl_table):
models = ["model_1", "model_2"]
with pytest.raises(
NotImplementedError,
match=re.escape("At least two motor models use a different address"),
):
assert_same_address(model_ctrl_table, models, "Model_Number")
def test_assert_same_address_different_bytes(model_ctrl_table):
models = ["model_1", "model_2"]
with pytest.raises(
NotImplementedError,
match=re.escape("At least two motor models use a different bytes representation"),
):
assert_same_address(model_ctrl_table, models, "Goal_Position")