Compare commits

...

24 Commits

Author SHA1 Message Date
Simon Alibert 6541982dff Merge remote-tracking branch 'origin/main' into user/aliberts/2025_02_25_refactor_robots 2025-03-20 14:48:19 +01:00
Simon Alibert 43bc9404bb Remove motors from koch teleop config 2025-03-20 14:47:53 +01:00
Simon Alibert 375499c323 Remove set_operating_mode 2025-03-20 14:47:17 +01:00
Simon Alibert 17a4447cef Add debugging init 2025-03-20 14:45:18 +01:00
Simon Alibert 287dc13d96 Remove CLI for calibration visualization + move to debugging 2025-03-20 14:44:23 +01:00
Simon Alibert 02a1cf6a4e Fix calibration visualization 2025-03-20 14:33:36 +01:00
Simon Alibert 34cd1e47bf Remove obsolete test 2025-03-20 14:07:55 +01:00
Simon Alibert 74d56834af Fix dxl calib import 2025-03-20 14:03:11 +01:00
Simon Alibert dd80dbb4cd Simplify Dxl motors bus import 2025-03-20 14:01:34 +01:00
Simon Alibert bc020ee0a4 Remove mock_feetech sdk & add feetech new tests 2025-03-20 14:00:10 +01:00
Simon Alibert a15767aff1 Fix feetech reader/writer 2025-03-20 13:59:00 +01:00
Simon Alibert 9af0a9bf37 Add mock_feetech 2025-03-20 13:58:02 +01:00
Simon Alibert e2c8bc6948 Fix packet length, remove bytearray for easier debug, improve doctrings 2025-03-20 13:57:15 +01:00
Simon Alibert 2c68c6ca40 Implement FeetechMotorsBus & move functions to calibration 2025-03-20 10:22:47 +01:00
Simon Alibert dd1f33e5ed Add pytest param ids 2025-03-20 09:44:47 +01:00
Simon Alibert 2c1bb766ff Refactor MockMotors, add return values 2025-03-20 09:40:58 +01:00
Simon Alibert c1c71fb994 Ignore patching when not on MacOS 2025-03-20 09:38:36 +01:00
Simon Alibert 2d56f35071 Improve formats & docstrings 2025-03-20 09:36:17 +01:00
Simon Alibert 64ce2669ca Add bytes stuffing 2025-03-20 09:33:33 +01:00
Cole f39652707c
add docs details for resolving firmware update issues (#627)
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
2025-03-19 19:17:07 +01:00
Steven Palma 712d5dae4f
fix(os): fix default codec for windows (#875) 2025-03-18 22:04:21 +01:00
Pepijn 952e892fe5
Use float32 instead of int (#877) 2025-03-18 16:36:37 +01:00
Pepijn e8159997c7
User/pepijn/2025 03 17 act different image shapes (#870) 2025-03-18 11:09:05 +01:00
Steven Palma 1c15bab70f
fix(codec): hot-fix for default codec in linux arm platforms (#868) 2025-03-17 13:23:11 +01:00
23 changed files with 984 additions and 941 deletions

View File

@ -51,7 +51,7 @@ For a comprehensive list and documentation of these parameters, see the ffmpeg d
### Decoding parameters
**Decoder**
We tested two video decoding backends from torchvision:
- `pyav` (default)
- `pyav`
- `video_reader` (requires to build torchvision from source)
**Requested timestamps**

View File

@ -292,6 +292,11 @@ Steps:
- Scan for devices. All 12 motors should appear.
- Select the motors one by one and move the arm. Check that the graphical indicator near the top right shows the movement.
** There is a common issue with the Dynamixel XL430-W250 motors where the motors become undiscoverable after upgrading their firmware from Mac and Windows Dynamixel Wizard2 applications. When this occurs, it is required to do a firmware recovery (Select `DYNAMIXEL Firmware Recovery` and follow the prompts). There are two known workarounds to conduct this firmware reset:
1) Install the Dynamixel Wizard on a linux machine and complete the firmware recovery
2) Use the Dynamixel U2D2 in order to perform the reset with Windows or Mac. This U2D2 can be purchased [here](https://www.robotis.us/u2d2/).
For either solution, open DYNAMIXEL Wizard 2.0 and select the appropriate port. You will likely be unable to see the motor in the GUI at this time. Select `Firmware Recovery`, carefully choose the correct model, and wait for the process to complete. Finally, re-scan to confirm the firmware recovery was successful.
**Read and Write with DynamixelMotorsBus**
To get familiar with how `DynamixelMotorsBus` communicates with the motors, you can start by reading data from them. Copy past this code in the same interactive python session:

View File

@ -69,6 +69,7 @@ from lerobot.common.datasets.video_utils import (
VideoFrame,
decode_video_frames,
encode_video_frames,
get_safe_default_codec,
get_video_info,
)
from lerobot.common.robots.utils import Robot
@ -462,7 +463,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
download_videos (bool, optional): Flag to download the videos. Note that when set to True but the
video files are already present on local disk, they won't be downloaded again. Defaults to
True.
video_backend (str | None, optional): Video backend to use for decoding videos. Defaults to torchcodec.
video_backend (str | None, optional): Video backend to use for decoding videos. Defaults to torchcodec when available int the platform; otherwise, defaults to 'pyav'.
You can also use the 'pyav' decoder used by Torchvision, which used to be the default option, or 'video_reader' which is another decoder of Torchvision.
"""
super().__init__()
@ -473,7 +474,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.episodes = episodes
self.tolerance_s = tolerance_s
self.revision = revision if revision else CODEBASE_VERSION
self.video_backend = video_backend if video_backend else "torchcodec"
self.video_backend = video_backend if video_backend else get_safe_default_codec()
self.delta_indices = None
# Unused attributes
@ -1027,7 +1028,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
obj.delta_timestamps = None
obj.delta_indices = None
obj.episode_data_index = None
obj.video_backend = video_backend if video_backend is not None else "torchcodec"
obj.video_backend = video_backend if video_backend is not None else get_safe_default_codec()
return obj

View File

@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import json
import logging
import subprocess
@ -27,14 +28,23 @@ import torch
import torchvision
from datasets.features.features import register_feature
from PIL import Image
from torchcodec.decoders import VideoDecoder
def get_safe_default_codec():
if importlib.util.find_spec("torchcodec"):
return "torchcodec"
else:
logging.warning(
"'torchcodec' is not available in your platform, falling back to 'pyav' as a default decoder"
)
return "pyav"
def decode_video_frames(
video_path: Path | str,
timestamps: list[float],
tolerance_s: float,
backend: str = "torchcodec",
backend: str | None = None,
) -> torch.Tensor:
"""
Decodes video frames using the specified backend.
@ -43,13 +53,15 @@ def decode_video_frames(
video_path (Path): Path to the video file.
timestamps (list[float]): List of timestamps to extract frames.
tolerance_s (float): Allowed deviation in seconds for frame retrieval.
backend (str, optional): Backend to use for decoding. Defaults to "torchcodec".
backend (str, optional): Backend to use for decoding. Defaults to "torchcodec" when available in the platform; otherwise, defaults to "pyav"..
Returns:
torch.Tensor: Decoded frames.
Currently supports torchcodec on cpu and pyav.
"""
if backend is None:
backend = get_safe_default_codec()
if backend == "torchcodec":
return decode_video_frames_torchcodec(video_path, timestamps, tolerance_s)
elif backend in ["pyav", "video_reader"]:
@ -173,6 +185,12 @@ def decode_video_frames_torchcodec(
and all subsequent frames until reaching the requested frame. The number of key frames in a video
can be adjusted during encoding to take into account decoding time and video size in bytes.
"""
if importlib.util.find_spec("torchcodec"):
from torchcodec.decoders import VideoDecoder
else:
raise ImportError("torchcodec is required but not available.")
# initialize video decoder
decoder = VideoDecoder(video_path, device=device, seek_mode="approximate")
loaded_frames = []

View File

@ -0,0 +1,3 @@
from .motors_bus import visualize_motors_bus
__all__ = ["visualize_motors_bus"]

View File

@ -0,0 +1,82 @@
"""
Usage example
```python
from lerobot.common.debugging.motors_bus import visualize_motors_bus
from lerobot.common.robots.so100 import SO100Robot, SO100RobotConfig
cfg = SO100RobotConfig(port="/dev/tty.usbmodem58760430541")
so100 = SO100Robot(cfg)
visualize_motors_bus(so100.arm)
```
"""
import time
from lerobot.common.motors import MotorsBus
from lerobot.common.motors.feetech.feetech_calibration import (
adjusted_to_homing_ticks,
adjusted_to_motor_ticks,
convert_degrees_to_ticks,
convert_ticks_to_degrees,
)
def visualize_motors_bus(motors_bus: MotorsBus):
"""
Reads each joint's (1) raw ticks, (2) homed ticks, (3) degrees, and (4) invert-adjusted ticks.
"""
if not motors_bus.is_connected:
motors_bus.connect()
# Disable torque on all motors so you can move them freely by hand
values_dict = {idx: 0 for idx in motors_bus.motor_ids}
motors_bus.write("Torque_Enable", values_dict)
print("Torque disabled on all joints.")
try:
print("\nPress Ctrl+C to quit.\n")
while True:
# Read *raw* positions (no calibration).
raw_positions = motors_bus.read("Present_Position")
# # Read *already-homed* positions
# homed_positions = motor_bus.read("Present_Position")
for name, raw_ticks in raw_positions.items():
idx = motors_bus.motors[name][0]
model = motors_bus.motors[name][1]
# homed_val = homed_positions[i] # degrees or % if linear
# Manually compute "adjusted ticks" from raw ticks
manual_adjusted = adjusted_to_homing_ticks(raw_ticks, model, motors_bus, idx)
# Convert to degrees
manual_degs = convert_ticks_to_degrees(manual_adjusted, model)
# Convert that deg back to ticks
manual_ticks = convert_degrees_to_ticks(manual_degs, model)
# Then invert them using offset & bus drive mode
inv_ticks = adjusted_to_motor_ticks(manual_ticks, model, motors_bus, idx)
print(
f"{name:15s} | "
f"RAW={raw_ticks:4d} | "
# f"HOMED_FROM_READ={homed_val:7.2f} | "
f"HOMED_TICKS={manual_adjusted:6d} | "
f"MANUAL_ADJ_DEG={manual_degs:7.2f} | "
f"MANUAL_ADJ_TICKS={manual_ticks:6d} | "
f"INV_TICKS={inv_ticks:4d} "
)
print("----------------------------------------------------")
time.sleep(0.25)
except KeyboardInterrupt:
pass
finally:
print("\nExiting. Disconnecting bus...")
motors_bus.disconnect()
if __name__ == "__main__":
visualize_motors_bus()

View File

@ -17,10 +17,8 @@
import numpy as np
from ..motors_bus import MotorsBus
from ..motors_bus import CalibrationMode, MotorsBus, TorqueMode
from .dynamixel import (
CalibrationMode,
TorqueMode,
convert_degrees_to_steps,
)

View File

@ -14,19 +14,11 @@
from copy import deepcopy
import numpy as np
from ..motors_bus import (
CalibrationMode,
JointOutOfRangeError,
MotorsBus,
assert_same_address,
get_group_sync_key,
)
from ..motors_bus import MotorsBus
PROTOCOL_VERSION = 0
BAUDRATE = 1_000_000
TIMEOUT_MS = 1000
DEFAULT_TIMEOUT_MS = 1000
MAX_ID_RANGE = 252
@ -125,94 +117,6 @@ NUM_READ_RETRY = 20
NUM_WRITE_RETRY = 20
def convert_ticks_to_degrees(ticks, model):
resolutions = MODEL_RESOLUTION[model]
# Convert the ticks to degrees
return ticks * (360.0 / resolutions)
def convert_degrees_to_ticks(degrees, model):
resolutions = MODEL_RESOLUTION[model]
# Convert degrees to motor ticks
return int(degrees * (resolutions / 360.0))
def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motorbus, motor_id: int) -> int:
"""
Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048'
becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution).
"""
resolutions = MODEL_RESOLUTION[model]
# Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1].
ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions
# If above halfway, fold it into negative territory => [-2048..+2047].
if ticks > (resolutions // 2):
ticks -= resolutions
# Flip sign if drive_mode is set.
drive_mode = 0
if motorbus.calibration is not None:
drive_mode = motorbus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
ticks *= -1
return ticks
def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motorbus, motor_id: int) -> int:
"""
Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047]
and recovers the raw [0..(res-1)] ticks with 2048 as midpoint.
"""
# Flip sign if drive_mode was set.
drive_mode = 0
if motorbus.calibration is not None:
drive_mode = motorbus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
adjusted_pos *= -1
resolutions = MODEL_RESOLUTION[model]
# Shift by +half-resolution and wrap into [0..res-1].
# This undoes the earlier shift by -half-resolution.
ticks = (adjusted_pos + (resolutions // 2)) % resolutions
return ticks
def convert_to_bytes(value, n_bytes: int):
import scservo_sdk as scs
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if n_bytes == 1:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
]
elif n_bytes == 2:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
]
elif n_bytes == 4:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
scs.SCS_LOBYTE(scs.SCS_HIWORD(value)),
scs.SCS_HIBYTE(scs.SCS_HIWORD(value)),
]
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{n_bytes} is provided instead."
)
return data
class FeetechMotorsBus(MotorsBus):
"""
The FeetechMotorsBus class allows to efficiently read and write to the attached motors. It relies on the
@ -222,6 +126,8 @@ class FeetechMotorsBus(MotorsBus):
model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
model_resolution_table = deepcopy(MODEL_RESOLUTION)
model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
calibration_required = deepcopy(CALIBRATION_REQUIRED)
default_timeout = DEFAULT_TIMEOUT_MS
def __init__(
self,
@ -229,224 +135,60 @@ class FeetechMotorsBus(MotorsBus):
motors: dict[str, tuple[int, str]],
):
super().__init__(port, motors)
def _set_handlers(self):
import scservo_sdk as scs
self.port_handler = scs.PortHandler(self.port)
self.packet_handler = scs.PacketHandler(PROTOCOL_VERSION)
self.reader = scs.GroupSyncRead(self.port_handler, self.packet_handler, 0, 0)
self.writer = scs.GroupSyncWrite(self.port_handler, self.packet_handler, 0, 0)
def _set_timeout(self, timeout: int = TIMEOUT_MS):
self.port_handler.setPacketTimeoutMillis(timeout)
def broadcast_ping(self, num_retry: int | None = None):
raise NotImplementedError # TODO
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
if motor_names is None:
motor_names = self.motor_names
def calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
# TODO
return ids_values
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range
values = values.astype(np.float32)
def uncalibrate_values(self, ids_values: dict[int, float]) -> dict[int, int]:
# TODO
return ids_values
for i, name in enumerate(motor_names):
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
motor_idx, model = self.motors[name]
# Convert raw motor ticks to homed ticks, then convert the homed ticks to degrees
values[i] = adjusted_to_homing_ticks(values[i], model, self, motor_idx)
values[i] = convert_ticks_to_degrees(values[i], model)
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Rescale the present position to a nominal range [0, 100] %,
# useful for joints with linear motions like Aloha gripper
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR):
raise JointOutOfRangeError(
f"Wrong motor position range detected for {name}. "
f"Expected to be in nominal range of [0, 100] % (a full linear translation), "
f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, "
f"but present value is {values[i]} %. "
"This might be due to a cable connection issue creating an artificial jump in motor values. "
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
)
return values
def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""Inverse of `apply_calibration`."""
if motor_names is None:
motor_names = self.motor_names
for i, name in enumerate(motor_names):
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
motor_idx, model = self.motors[name]
# Convert degrees to homed ticks, then convert the homed ticks to raw ticks
values[i] = convert_degrees_to_ticks(values[i], model)
values[i] = adjusted_to_motor_ticks(values[i], model, self, motor_idx)
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Convert from nominal lnear range of [0, 100] % to
# actual motor range of values which can be arbitrary.
values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos
values = np.round(values).astype(np.int32)
return values
def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
def _is_comm_success(self, comm: int) -> bool:
import scservo_sdk as scs
return_list = True
if not isinstance(motor_ids, list):
return_list = False
motor_ids = [motor_ids]
return comm == scs.COMM_SUCCESS
assert_same_address(self.model_ctrl_table, self.motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = scs.GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
for idx in motor_ids:
group.addParam(idx)
@staticmethod
def split_int_bytes(value: int, n_bytes: int) -> list[int]:
# Validate input
if value < 0:
raise ValueError(f"Negative values are not allowed: {value}")
for _ in range(num_retry):
comm = group.txRxPacket()
if comm == scs.COMM_SUCCESS:
break
max_value = {1: 0xFF, 2: 0xFFFF, 4: 0xFFFFFFFF}.get(n_bytes)
if max_value is None:
raise NotImplementedError(f"Unsupported byte size: {n_bytes}. Expected [1, 2, 4].")
if comm != scs.COMM_SUCCESS:
raise ConnectionError(
f"Read failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
if value > max_value:
raise ValueError(f"Value {value} exceeds the maximum for {n_bytes} bytes ({max_value}).")
values = []
for idx in motor_ids:
value = group.getData(idx, addr, bytes)
values.append(value)
if return_list:
return values
else:
return values[0]
def _read(self, data_name: str, motor_names: list[str]):
import scservo_sdk as scs
motor_ids = []
models = []
for name in motor_names:
motor_idx, model = self.motors[name]
motor_ids.append(motor_idx)
models.append(model)
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
if data_name not in self.group_readers:
# Very Important to flush the buffer!
self.port_handler.ser.reset_output_buffer()
self.port_handler.ser.reset_input_buffer()
# Create new group reader
self.group_readers[group_key] = scs.GroupSyncRead(
self.port_handler, self.packet_handler, addr, bytes
)
for idx in motor_ids:
self.group_readers[group_key].addParam(idx)
for _ in range(NUM_READ_RETRY):
comm = self.group_readers[group_key].txRxPacket()
if comm == scs.COMM_SUCCESS:
break
if comm != scs.COMM_SUCCESS:
raise ConnectionError(
f"Read failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
values = []
for idx in motor_ids:
value = self.group_readers[group_key].getData(idx, addr, bytes)
values.append(value)
values = np.array(values)
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
values = self.apply_calibration(values, motor_names)
return values
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
import scservo_sdk as scs
if not isinstance(motor_ids, list):
motor_ids = [motor_ids]
if not isinstance(values, list):
values = [values]
assert_same_address(self.model_ctrl_table, motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = scs.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes)
group.addParam(idx, data)
for _ in range(num_retry):
comm = group.txPacket()
if comm == scs.COMM_SUCCESS:
break
if comm != scs.COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
def _write(self, data_name: str, values: list[int], motor_names: list[str]) -> None:
import scservo_sdk as scs
motor_ids = []
models = []
for name in motor_names:
motor_idx, model = self.motors[name]
motor_ids.append(motor_idx)
models.append(model)
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
values = self.revert_calibration(values, motor_names)
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
init_group = data_name not in self.group_readers
if init_group:
self.group_writers[group_key] = scs.GroupSyncWrite(
self.port_handler, self.packet_handler, addr, bytes
)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes)
if init_group:
self.group_writers[group_key].addParam(idx, data)
else:
self.group_writers[group_key].changeParam(idx, data)
comm = self.group_writers[group_key].txPacket()
if comm != scs.COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if n_bytes == 1:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
]
elif n_bytes == 2:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
]
elif n_bytes == 4:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
scs.SCS_LOBYTE(scs.SCS_HIWORD(value)),
scs.SCS_HIBYTE(scs.SCS_HIWORD(value)),
]
return data

View File

@ -13,11 +13,12 @@
# limitations under the License.
import numpy as np
from ..motors_bus import MotorsBus, TorqueMode
from .feetech import (
from ..motors_bus import (
CalibrationMode,
FeetechMotorsBus,
MotorsBus,
TorqueMode,
)
from .feetech import MODEL_RESOLUTION, FeetechMotorsBus
URL_TEMPLATE = (
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
@ -140,19 +141,19 @@ def run_full_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm
) # TODO(pepijn): replace with new instruction homing pos (all motors in middle) in tutorial
input("Press Enter to continue...")
start_positions = np.zeros(len(arm.motor_indices))
end_positions = np.zeros(len(arm.motor_indices))
encoder_offsets = np.zeros(len(arm.motor_indices))
start_positions = np.zeros(len(arm.motor_ids))
end_positions = np.zeros(len(arm.motor_ids))
encoder_offsets = np.zeros(len(arm.motor_ids))
modes = get_calibration_modes(arm)
for i, motor_id in enumerate(arm.motor_indices):
for i, motor_id in enumerate(arm.motor_ids):
if modes[i] == CalibrationMode.DEGREE.name:
encoder_offsets[i] = calibrate_homing_motor(motor_id, arm)
start_positions[i] = 0
end_positions[i] = 0
for i, motor_id in enumerate(arm.motor_indices):
for i, motor_id in enumerate(arm.motor_ids):
if modes[i] == CalibrationMode.LINEAR.name:
start_positions[i], end_positions[i] = calibrate_linear_motor(motor_id, arm)
encoder_offsets[i] = 0
@ -251,3 +252,62 @@ def apply_feetech_offsets_from_calibration(motorsbus: FeetechMotorsBus, calibrat
motorsbus.write("Lock", 0)
print("Offsets have been saved to EEPROM successfully.")
def convert_ticks_to_degrees(ticks, model):
resolutions = MODEL_RESOLUTION[model]
# Convert the ticks to degrees
return ticks * (360.0 / resolutions)
def convert_degrees_to_ticks(degrees, model):
resolutions = MODEL_RESOLUTION[model]
# Convert degrees to motor ticks
return int(degrees * (resolutions / 360.0))
def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motor_bus: MotorsBus, motor_id: int) -> int:
"""
Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048'
becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution).
"""
resolutions = MODEL_RESOLUTION[model]
# Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1].
ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions
# If above halfway, fold it into negative territory => [-2048..+2047].
if ticks > (resolutions // 2):
ticks -= resolutions
# Flip sign if drive_mode is set.
drive_mode = 0
if motor_bus.calibration is not None:
drive_mode = motor_bus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
ticks *= -1
return ticks
def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motor_bus: MotorsBus, motor_id: int) -> int:
"""
Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047]
and recovers the raw [0..(res-1)] ticks with 2048 as midpoint.
"""
# Flip sign if drive_mode was set.
drive_mode = 0
if motor_bus.calibration is not None:
drive_mode = motor_bus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
adjusted_pos *= -1
resolutions = MODEL_RESOLUTION[model]
# Shift by +half-resolution and wrap into [0..res-1].
# This undoes the earlier shift by -half-resolution.
ticks = (adjusted_pos + (resolutions // 2)) % resolutions
return ticks

View File

@ -119,9 +119,7 @@ class ACTPolicy(PreTrainedPolicy):
batch = self.normalize_inputs(batch)
if self.config.image_features:
batch = dict(batch) # shallow copy so that adding a key doesn't modify the original
batch["observation.images"] = torch.stack(
[batch[key] for key in self.config.image_features], dim=-4
)
batch["observation.images"] = [batch[key] for key in self.config.image_features]
# If we are doing temporal ensembling, do online updates where we keep track of the number of actions
# we are ensembling over.
@ -149,9 +147,8 @@ class ACTPolicy(PreTrainedPolicy):
batch = self.normalize_inputs(batch)
if self.config.image_features:
batch = dict(batch) # shallow copy so that adding a key doesn't modify the original
batch["observation.images"] = torch.stack(
[batch[key] for key in self.config.image_features], dim=-4
)
batch["observation.images"] = [batch[key] for key in self.config.image_features]
batch = self.normalize_targets(batch)
actions_hat, (mu_hat, log_sigma_x2_hat) = self.model(batch)
@ -413,11 +410,10 @@ class ACT(nn.Module):
"actions must be provided when using the variational objective in training mode."
)
batch_size = (
batch["observation.images"]
if "observation.images" in batch
else batch["observation.environment_state"]
).shape[0]
if "observation.images" in batch:
batch_size = batch["observation.images"][0].shape[0]
else:
batch_size = batch["observation.environment_state"].shape[0]
# Prepare the latent for input to the transformer encoder.
if self.config.use_vae and "action" in batch:
@ -490,20 +486,21 @@ class ACT(nn.Module):
all_cam_features = []
all_cam_pos_embeds = []
for cam_index in range(batch["observation.images"].shape[-4]):
cam_features = self.backbone(batch["observation.images"][:, cam_index])["feature_map"]
# TODO(rcadene, alexander-soare): remove call to `.to` to speedup forward ; precompute and use
# buffer
# For a list of images, the H and W may vary but H*W is constant.
for img in batch["observation.images"]:
cam_features = self.backbone(img)["feature_map"]
cam_pos_embed = self.encoder_cam_feat_pos_embed(cam_features).to(dtype=cam_features.dtype)
cam_features = self.encoder_img_feat_input_proj(cam_features) # (B, C, h, w)
cam_features = self.encoder_img_feat_input_proj(cam_features)
# Rearrange features to (sequence, batch, dim).
cam_features = einops.rearrange(cam_features, "b c h w -> (h w) b c")
cam_pos_embed = einops.rearrange(cam_pos_embed, "b c h w -> (h w) b c")
all_cam_features.append(cam_features)
all_cam_pos_embeds.append(cam_pos_embed)
# Concatenate camera observation feature maps and positional embeddings along the width dimension,
# and move to (sequence, batch, dim).
all_cam_features = torch.cat(all_cam_features, axis=-1)
encoder_in_tokens.extend(einops.rearrange(all_cam_features, "b c h w -> (h w) b c"))
all_cam_pos_embeds = torch.cat(all_cam_pos_embeds, axis=-1)
encoder_in_pos_embed.extend(einops.rearrange(all_cam_pos_embeds, "b c h w -> (h w) b c"))
encoder_in_tokens.extend(torch.cat(all_cam_features, axis=0))
encoder_in_pos_embed.extend(torch.cat(all_cam_pos_embeds, axis=0))
# Stack all tokens along the sequence dimension.
encoder_in_tokens = torch.stack(encoder_in_tokens, axis=0)

View File

@ -27,7 +27,6 @@ from lerobot.common.motors import TorqueMode
from lerobot.common.motors.dynamixel import (
DynamixelMotorsBus,
run_arm_calibration,
set_operating_mode,
)
from ..robot import Robot
@ -103,7 +102,22 @@ class KochRobot(Robot):
self.arm.write("Torque_Enable", TorqueMode.DISABLED.value)
self.calibrate()
set_operating_mode(self.arm)
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm,
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
all_motors_except_gripper = [name for name in self.arm.motor_names if name != "gripper"]
if len(all_motors_except_gripper) > 0:
# 4 corresponds to Extended Position on Koch motors
self.arm.write("Operating_Mode", 4, all_motors_except_gripper)
# Use 'position control current based' for gripper to be limited by the limit of the current.
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
# to make it move, and it will move back to its original target position when we release the force.
# 5 corresponds to Current Controlled Position on Koch gripper motors "xl330-m077, xl330-m288"
self.arm.write("Operating_Mode", 5, "gripper")
# Set better PID values to close the gap between recorded states and actions
# TODO(rcadene): Implement an automatic procedure to set optimal PID values for each motor

View File

@ -580,7 +580,7 @@ class ManipulatorRobot:
# Used when record_data=True
follower_goal_pos[name] = goal_pos
goal_pos = goal_pos.numpy().astype(np.int32)
goal_pos = goal_pos.numpy().astype(np.float32)
self.follower_arms[name].write("Goal_Position", goal_pos)
self.logs[f"write_follower_{name}_goal_pos_dt_s"] = time.perf_counter() - before_fwrite_t
@ -702,7 +702,7 @@ class ManipulatorRobot:
action_sent.append(goal_pos)
# Send goal position to each follower
goal_pos = goal_pos.numpy().astype(np.int32)
goal_pos = goal_pos.numpy().astype(np.float32)
self.follower_arms[name].write("Goal_Position", goal_pos)
return torch.cat(action_sent)

View File

@ -28,13 +28,3 @@ class KochTeleopConfig(TeleoperatorConfig):
# Sets the arm in torque mode with the gripper motor set to this angle. This makes it possible
# to squeeze the gripper and have it spring back to an open position on its own.
gripper_open_degree: float = 35.156
mock: bool = False
# motors
shoulder_pan: tuple = (1, "xl330-m077")
shoulder_lift: tuple = (2, "xl330-m077")
elbow_flex: tuple = (3, "xl330-m077")
wrist_flex: tuple = (4, "xl330-m077")
wrist_roll: tuple = (5, "xl330-m077")
gripper: tuple = (6, "xl330-m077")

View File

@ -25,7 +25,6 @@ from lerobot.common.motors import TorqueMode
from lerobot.common.motors.dynamixel import (
DynamixelMotorsBus,
run_arm_calibration,
set_operating_mode,
)
from ..teleoperator import Teleoperator
@ -50,12 +49,12 @@ class KochTeleop(Teleoperator):
self.arm = DynamixelMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": config.shoulder_pan,
"shoulder_lift": config.shoulder_lift,
"elbow_flex": config.elbow_flex,
"wrist_flex": config.wrist_flex,
"wrist_roll": config.wrist_roll,
"gripper": config.gripper,
"shoulder_pan": (1, "xl330-m077"),
"shoulder_lift": (2, "xl330-m077"),
"elbow_flex": (3, "xl330-m077"),
"wrist_flex": (4, "xl330-m077"),
"wrist_roll": (5, "xl330-m077"),
"gripper": (6, "xl330-m077"),
},
)
@ -88,7 +87,22 @@ class KochTeleop(Teleoperator):
self.arm.write("Torque_Enable", TorqueMode.DISABLED.value)
self.calibrate()
set_operating_mode(self.arm)
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm,
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
all_motors_except_gripper = [name for name in self.arm.motor_names if name != "gripper"]
if len(all_motors_except_gripper) > 0:
# 4 corresponds to Extended Position on Koch motors
self.arm.write("Operating_Mode", 4, all_motors_except_gripper)
# Use 'position control current based' for gripper to be limited by the limit of the current.
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
# to make it move, and it will move back to its original target position when we release the force.
# 5 corresponds to Current Controlled Position on Koch gripper motors "xl330-m077, xl330-m288"
self.arm.write("Operating_Mode", 5, "gripper")
# Enable torque on the gripper and move it to 45 degrees so that we can use it as a trigger.
logging.info("Activating torque.")

View File

@ -20,6 +20,7 @@ from lerobot.common import (
policies, # noqa: F401
)
from lerobot.common.datasets.transforms import ImageTransformsConfig
from lerobot.common.datasets.video_utils import get_safe_default_codec
@dataclass
@ -35,7 +36,7 @@ class DatasetConfig:
image_transforms: ImageTransformsConfig = field(default_factory=ImageTransformsConfig)
revision: str | None = None
use_imagenet_stats: bool = True
video_backend: str = "pyav"
video_backend: str = field(default_factory=get_safe_default_codec)
@dataclass

View File

@ -1,100 +0,0 @@
"""
usage:
```python
python lerobot/scripts/calibration_visualization_so100.py \
--teleop.type=so100 \
--teleop.port=/dev/tty.usbmodem58760430541
python lerobot/scripts/calibration_visualization_so100.py \
--robot.type=so100 \
--robot.port=/dev/tty.usbmodem585A0084711
```
"""
import time
from dataclasses import dataclass
import draccus
from lerobot.common.motors.feetech.feetech import (
adjusted_to_homing_ticks,
adjusted_to_motor_ticks,
convert_degrees_to_ticks,
convert_ticks_to_degrees,
)
from lerobot.common.robots import RobotConfig
from lerobot.common.robots.so100 import SO100Robot, SO100RobotConfig # noqa: F401
from lerobot.common.teleoperators import TeleoperatorConfig
from lerobot.common.teleoperators.so100 import SO100Teleop, SO100TeleopConfig # noqa: F401
@dataclass
class DebugFeetechConfig:
teleop: TeleoperatorConfig | None = None
robot: RobotConfig | None = None
def __post_init__(self):
if bool(self.teleop) == bool(self.robot):
raise ValueError("Select a single device.")
@draccus.wrap()
def debug_feetech_positions(cfg: DebugFeetechConfig):
"""
Reads each joint's (1) raw ticks, (2) homed ticks, (3) degrees, and (4) invert-adjusted ticks.
"""
device = SO100Teleop(cfg.teleop) if cfg.teleop else SO100Robot(cfg.robot)
device.connect()
# Disable torque on all motors so you can move them freely by hand
device.arm.write("Torque_Enable", 0, motor_names=device.arm.motor_names)
print("Torque disabled on all joints.")
try:
print("\nPress Ctrl+C to quit.\n")
while True:
# Read *raw* positions (no calibration).
raw_positions = device.arm.read_with_motor_ids(
device.arm.motor_models, device.arm.motor_indices, data_name="Present_Position"
)
# Read *already-homed* positions
homed_positions = device.arm.read("Present_Position")
for i, name in enumerate(device.arm.motor_names):
motor_idx, model = device.arm.motors[name]
raw_ticks = raw_positions[i] # 0..4095
homed_val = homed_positions[i] # degrees or % if linear
# Manually compute "adjusted ticks" from raw ticks
manual_adjusted = adjusted_to_homing_ticks(raw_ticks, model, device.arm, motor_idx)
# Convert to degrees
manual_degs = convert_ticks_to_degrees(manual_adjusted, model)
# Convert that deg back to ticks
manual_ticks = convert_degrees_to_ticks(manual_degs, model)
# Then invert them using offset & bus drive mode
inv_ticks = adjusted_to_motor_ticks(manual_ticks, model, device.arm, motor_idx)
print(
f"{name:15s} | "
f"RAW={raw_ticks:4d} | "
f"HOMED_FROM_READ={homed_val:7.2f} | "
f"HOMED_TICKS={manual_adjusted:6d} | "
f"MANUAL_ADJ_DEG={manual_degs:7.2f} | "
f"MANUAL_ADJ_TICKS={manual_ticks:6d} | "
f"INV_TICKS={inv_ticks:4d} "
)
print("----------------------------------------------------")
time.sleep(0.25)
except KeyboardInterrupt:
pass
finally:
print("\nExiting. Disconnecting bus...")
device.disconnect()
if __name__ == "__main__":
debug_feetech_positions()

View File

@ -69,7 +69,7 @@ dependencies = [
"rerun-sdk>=0.21.0",
"termcolor>=2.4.0",
"torch>=2.2.1",
"torchcodec>=0.2.1",
"torchcodec>=0.2.1; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l'))",
"torchvision>=0.21.0",
"wandb>=0.16.3",
"zarr>=2.17.0",

View File

@ -51,34 +51,34 @@ DXL_CRC_TABLE = [
# https://emanual.robotis.com/docs/en/dxl/protocol2/#instruction
INSTRUCTION_TYPES = {
"Ping": 0x01, # Checks whether the Packet has arrived at a device with the same ID as the specified packet ID
"Read": 0x02, # Read data from the Device
"Write": 0x03, # Write data to the Device
"Reg_Write": 0x04, # Register the Instruction Packet in standby status; Packet can later be executed using the Action command
"Action": 0x05, # Executes a Packet that was registered beforehand using Reg Write
"Factory_Reset": 0x06, # Resets the Control Table to its initial factory default settings
"Reboot": 0x08, # Reboot the Device
"Clear": 0x10, # Reset certain information stored in memory
"Control_Table_Backup": 0x20, # Store current Control Table status data to a Backup or to restore backup EEPROM data.
"Status": 0x55, # Return packet sent following the execution of an Instruction Packet
"Sync_Read": 0x82, # Read data from multiple devices with the same Address with the same length at once
"Sync_Write": 0x83, # Write data to multiple devices with the same Address with the same length at once
"Fast_Sync_Read": 0x8A, # Read data from multiple devices with the same Address with the same length at once
"Bulk_Read": 0x92, # Read data from multiple devices with different Addresses with different lengths at once
"Bulk_Write": 0x93, # Write data to multiple devices with different Addresses with different lengths at once
"Fast_Bulk_Read": 0x9A, # Read data from multiple devices with different Addresses with different lengths at once
"Ping": 0x01, # Checks whether the Packet has arrived at a device with the same ID as the specified packet ID
"Read": 0x02, # Read data from the Device
"Write": 0x03, # Write data to the Device
"Reg_Write": 0x04, # Register the Instruction Packet in standby status; Packet can later be executed using the Action command
"Action": 0x05, # Executes a Packet that was registered beforehand using Reg Write
"Factory_Reset": 0x06, # Resets the Control Table to its initial factory default settings
"Reboot": 0x08, # Reboot the Device
"Clear": 0x10, # Reset certain information stored in memory
"Control_Table_Backup": 0x20, # Store current Control Table status data to a Backup or to restore backup EEPROM data.
"Status": 0x55, # Return packet sent following the execution of an Instruction Packet
"Sync_Read": 0x82, # Read data from multiple devices with the same Address with the same length at once
"Sync_Write": 0x83, # Write data to multiple devices with the same Address with the same length at once
"Fast_Sync_Read": 0x8A, # Read data from multiple devices with the same Address with the same length at once
"Bulk_Read": 0x92, # Read data from multiple devices with different Addresses with different lengths at once
"Bulk_Write": 0x93, # Write data to multiple devices with different Addresses with different lengths at once
"Fast_Bulk_Read": 0x9A, # Read data from multiple devices with different Addresses with different lengths at once
} # fmt: skip
# https://emanual.robotis.com/docs/en/dxl/protocol2/#error
STATUS_TYPE = {
ERROR_TYPE = {
"Success": 0x00, # No error
"Result_Fail": 0x01, # Failed to process the sent Instruction Packet
"Instruction_Error": 0x02, # An undefined Instruction has been usedAction has been used without Reg Write
"Instruction_Error": 0x02, # An undefined Instruction has been usedAction has been used without Reg Write
"CRC_Error": 0x03, # The CRC of the sent Packet does not match the expected value
"Data_Range_Error": 0x04, # Data to be written to the specified Address is outside the range of the minimum/maximum value
"Data_Length_Error": 0x05, # Attempted to write Data that is shorter than the required data length of the specified Address
"Data_Range_Error": 0x04, # Data to be written to the specified Address is outside the range of the minimum/maximum value
"Data_Length_Error": 0x05, # Attempted to write Data that is shorter than the required data length of the specified Address
# (ex: when you attempt to only use 2 bytes of a register that has been defined as 4 bytes)
"Data_Limit_Error": 0x06, # Data to be written to the specified Address is outside of the configured Limit value
"Data_Limit_Error": 0x06, # Data to be written to the specified Address is outside of the configured Limit value
"Access_Error": 0x07, # Attempted to write a value to an Address that is Read Only or has not been defined
# Attempted to read a value from an Address that is Write Only or has not been defined
# Attempted to write a value to an EEPROM register while Torque was Enabled.
@ -86,64 +86,110 @@ STATUS_TYPE = {
class MockDynamixelPacketv2(abc.ABC):
@staticmethod
def _compute_crc(crc_accum: int, data_blk: list[int], data_blk_size: int) -> int:
for j in range(data_blk_size):
i = ((crc_accum >> 8) ^ data_blk[j]) & 0xFF
crc_accum = ((crc_accum << 8) ^ DXL_CRC_TABLE[i]) & 0xFFFF
return crc_accum
@classmethod
def build(cls, dxl_id: int, params: list[int], *args, **kwargs) -> bytes:
packet = cls._build(dxl_id, params, *args, **kwargs)
crc = cls._compute_crc(0, packet, len(packet) - 2)
packet[-2] = crc & 0xFF # lower byte
packet[-1] = (crc >> 8) & 0xFF # upper byte
def build(cls, dxl_id: int, params: list[int], length: list[int], *args, **kwargs) -> bytes:
packet = cls._build(dxl_id, params, length, *args, **kwargs)
packet = cls._add_stuffing(packet)
packet = cls._add_crc(packet)
return bytes(packet)
@abc.abstractclassmethod
def _build(cls, dxl_id: int, params: list[int], *args, **kwargs):
def _build(cls, dxl_id: int, params: list[int], length: int, *args, **kwargs) -> list[int]:
pass
@staticmethod
def _add_stuffing(packet: list[int]) -> list[int]:
"""
Byte stuffing is a method of adding additional data to generated instruction packets to ensure that
the packets are processed successfully. When the byte pattern "0xFF 0xFF 0xFD" appears in a packet,
byte stuffing adds 0xFD to the end of the pattern to convert it to 0xFF 0xFF 0xFD 0xFD to ensure
that it is not interpreted as the header at the start of another packet.
Source: https://emanual.robotis.com/docs/en/dxl/protocol2/#transmission-process
Args:
packet (list[int]): The raw packet without stuffing.
Returns:
list[int]: The packet stuffed if it contained a "0xFF 0xFF 0xFD" byte sequence in its data bytes.
"""
packet_length_in = dxl.DXL_MAKEWORD(packet[dxl.PKT_LENGTH_L], packet[dxl.PKT_LENGTH_H])
packet_length_out = packet_length_in
temp = [0] * dxl.TXPACKET_MAX_LEN
# FF FF FD XX ID LEN_L LEN_H
temp[dxl.PKT_HEADER0 : dxl.PKT_HEADER0 + dxl.PKT_LENGTH_H + 1] = packet[
dxl.PKT_HEADER0 : dxl.PKT_HEADER0 + dxl.PKT_LENGTH_H + 1
]
index = dxl.PKT_INSTRUCTION
for i in range(0, packet_length_in - 2): # except CRC
temp[index] = packet[i + dxl.PKT_INSTRUCTION]
index = index + 1
if (
packet[i + dxl.PKT_INSTRUCTION] == 0xFD
and packet[i + dxl.PKT_INSTRUCTION - 1] == 0xFF
and packet[i + dxl.PKT_INSTRUCTION - 2] == 0xFF
):
# FF FF FD
temp[index] = 0xFD
index = index + 1
packet_length_out = packet_length_out + 1
temp[index] = packet[dxl.PKT_INSTRUCTION + packet_length_in - 2]
temp[index + 1] = packet[dxl.PKT_INSTRUCTION + packet_length_in - 1]
index = index + 2
if packet_length_in != packet_length_out:
packet = [0] * index
packet[0:index] = temp[0:index]
packet[dxl.PKT_LENGTH_L] = dxl.DXL_LOBYTE(packet_length_out)
packet[dxl.PKT_LENGTH_H] = dxl.DXL_HIBYTE(packet_length_out)
return packet
@staticmethod
def _add_crc(packet: list[int]) -> list[int]:
crc = 0
for j in range(len(packet) - 2):
i = ((crc >> 8) ^ packet[j]) & 0xFF
crc = ((crc << 8) ^ DXL_CRC_TABLE[i]) & 0xFFFF
packet[-2] = dxl.DXL_LOBYTE(crc)
packet[-1] = dxl.DXL_HIBYTE(crc)
return packet
class MockInstructionPacket(MockDynamixelPacketv2):
"""
Helper class to build valid Dynamixel Protocol 2.0 Instruction Packets with correct CRC.
Helper class to build valid Dynamixel Protocol 2.0 Instruction Packets.
Protocol 2.0 Instruction Packet structure
(from https://emanual.robotis.com/docs/en/dxl/protocol2/#instruction-packet)
0xFF 0xFF 0xFD 0x00 # 4-byte header
<servo_id> # typically 0x01
<length L> <length H> # 2-byte length of (instruction+error+params+CRC)
<instruction> # instruction type
<parameters...> # for a 4-byte read, we have 4 param bytes
<crc_low> <crc_high> # 16-bit CRC
| Header | Packet ID | Length | Instruction | Params | CRC |
| ------------------- | --------- | ----------- | ----------- | ----------------- | ----------- |
| 0xFF 0xFF 0xFD 0x00 | ID | Len_L Len_H | Instr | Param 1 Param N | CRC_L CRC_H |
"""
@classmethod
def _build(cls, dxl_id: int, params: list[int], instruct_type: str):
def _build(cls, dxl_id: int, params: list[int], length: int, instruct_type: str) -> list[int]:
instruct_value = INSTRUCTION_TYPES[instruct_type]
# For Protocol 2.0, "length" = (number_of_params + 3),
# where:
# +1 is for <instruction> byte,
# +2 is for the CRC at the end.
# The official Dynamixel SDK sometimes uses (+7) logic for sync reads
# because it includes special sub-parameters. But at core:
# length = (instruction byte) + (len(params)) + (CRC16 =2).
packet_length = len(params) + 3
return bytearray(
[
0xFF, 0xFF, 0xFD, 0x00, # header
dxl_id, # servo id
packet_length & 0xFF, # length_l
(packet_length >> 8) & 0xFF, # length_h
instruct_value, # instruction type
*params, # data bytes
0x00, 0x00 # placeholder for CRC
]
) # fmt: skip
return [
0xFF, 0xFF, 0xFD, 0x00, # header
dxl_id, # servo id
dxl.DXL_LOBYTE(length), # length_l
dxl.DXL_HIBYTE(length), # length_h
instruct_value, # instruction type
*params, # data bytes
0x00, 0x00 # placeholder for CRC
] # fmt: skip
@classmethod
def sync_read(
@ -153,68 +199,67 @@ class MockInstructionPacket(MockDynamixelPacketv2):
data_length: int,
) -> bytes:
"""
Helper method to build a "Sync Read" broadcast instruction.
Builds a "Sync_Read" broadcast instruction.
(from https://emanual.robotis.com/docs/en/dxl/protocol2/#sync-read-0x82)
The official SDK might add some stuffing or check param_length,
but this is enough for basic compliance if you want a raw packet.
The parameters for Sync Read (Protocol 2.0) are:
The parameters for Sync_Read (Protocol 2.0) are:
param[0] = start_address L
param[1] = start_address H
param[2] = data_length L
param[3] = data_length H
param[4+] = motor IDs to read from
"""
# Example param: [LowAddr, HighAddr, LowLen, HighLen, ID1, ID2, ...]
params = [
(start_address & 0xFF),
((start_address >> 8) & 0xFF),
(data_length & 0xFF),
((data_length >> 8) & 0xFF),
] + dxl_ids
# broadcast ID: 0xFE
return cls.build(dxl_id=0xFE, instruct_type="Sync_Read", params=params)
And 'length' = (number_of_params + 7), where:
+1 is for instruction byte,
+2 is for the address bytes,
+2 is for the length bytes,
+2 is for the CRC at the end.
"""
params = [
dxl.DXL_LOBYTE(start_address),
dxl.DXL_HIBYTE(start_address),
dxl.DXL_LOBYTE(data_length),
dxl.DXL_HIBYTE(data_length),
*dxl_ids,
]
length = len(dxl_ids) + 7
return cls.build(dxl_id=dxl.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Read")
class MockStatusPacket(MockDynamixelPacketv2):
"""
Helper class to build valid Dynamixel Protocol 2.0 Status Packets with correct CRC.
Helper class to build valid Dynamixel Protocol 2.0 Status Packets.
Protocol 2.0 Status Packet structure
(from https://emanual.robotis.com/docs/en/dxl/protocol2/#status-packet)
0xFF 0xFF 0xFD 0x00 # 4-byte header
<servo_id> # typically 0x01
<length L> <length H> # 2-byte length of (instruction+error+params+CRC)
0x55 # instruction = 0x55 means "status packet"
<error> # 0 if no error
<parameters...> # for a 4-byte read, we have 4 param bytes
<crc_low> <crc_high> # 16-bit CRC
| Header | Packet ID | Length | Instruction | Error | Params | CRC |
| ------------------- | --------- | ----------- | ----------- | ----- | ----------------- | ----------- |
| 0xFF 0xFF 0xFD 0x00 | ID | Len_L Len_H | 0x55 | Err | Param 1 Param N | CRC_L CRC_H |
"""
@classmethod
def _build(cls, dxl_id: int, params: list[int], status: str = "Success") -> bytearray:
status_byte = STATUS_TYPE[status]
return bytearray(
[
0xFF, 0xFF, 0xFD, 0x00, # header
dxl_id, # servo id
0x08, 0x00, # length_l, length_h = 8
0x55, # instruction = status
status_byte, # status
*params, # data bytes
0x00, 0x00 # placeholder for CRC
]
) # fmt: skip
def _build(cls, dxl_id: int, params: list[int], length: int, error: str = "Success") -> list[int]:
err_byte = ERROR_TYPE[error]
return [
0xFF, 0xFF, 0xFD, 0x00, # header
dxl_id, # servo id
dxl.DXL_LOBYTE(length), # length_l
dxl.DXL_HIBYTE(length), # length_h
0x55, # instruction = 'status'
err_byte, # error
*params, # data bytes
0x00, 0x00 # placeholder for CRC
] # fmt: skip
@classmethod
def present_position(cls, dxl_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes:
"""Builds a 'Present_Position' packet.
"""Builds a 'Present_Position' status packet.
Args:
pos (int | None, optional): Desired 'Present_Position'. If None, it will use a random value in the
min_max_range. Defaults to None.
dxl_id (int): List of the servos ids
pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it
will use a random value in the min_max_range. Defaults to None.
min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos'
is None. Note that the bounds are included in the range. Defaults to (0, 4095).
@ -222,19 +267,24 @@ class MockStatusPacket(MockDynamixelPacketv2):
bytes: The raw 'Present_Position' status packet ready to be sent through serial.
"""
pos = random.randint(*min_max_range) if pos is None else pos
# [lower pos 8 bits, higher pos 8 bits, 0, 0]
params = [pos & 0xFF, (pos >> 8) & 0xFF, 0, 0]
return cls.build(dxl_id, params)
params = [dxl.DXL_LOBYTE(pos), dxl.DXL_HIBYTE(pos), 0, 0]
length = 8
return cls.build(dxl_id, params=params, length=length)
class MockPortHandler(dxl.PortHandler):
def setupPort(self, baud): # noqa: N802
"""
This class overwrite the 'setupPort' method of the Dynamixel PortHandler because it can specify
baudrates that are not supported with a serial port on MacOS.
"""
def setupPort(self, cflag_baud): # noqa: N802
if self.is_open:
self.closePort()
self.ser = serial.Serial(
port=self.port_name,
# baudrate=self.baudrate, <- This is forbidden for ttys ports (on macos at least)
# baudrate=self.baudrate, <- This will fail on MacOS
# parity = serial.PARITY_ODD,
# stopbits = serial.STOPBITS_TWO,
bytesize=serial.EIGHTBITS,
@ -248,59 +298,72 @@ class MockPortHandler(dxl.PortHandler):
class MockMotors(MockSerial):
"""
This class will simulate physical motors by responding with valid status packets upon receiving some
instruction packets. It is meant to test MotorsBus classes.
'data_name' supported:
- Present_Position
"""
ctrl_table = X_SERIES_CONTROL_TABLE
def __init__(self, dlx_ids: list[int], default_stubs: bool = True):
def __init__(self, dlx_ids: list[int]):
super().__init__()
self._ids = dlx_ids
self.open()
if default_stubs:
self._create_stubs("Present_Position")
def _create_stubs(self, data_name: str):
def build_single_motor_stubs(
self, data_name: str, return_value: int | None = None, num_invalid_try: int | None = None
) -> None:
address, length = self.ctrl_table[data_name]
# sync read all motors
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
sync_read_response_all = self._create_present_pos_send_fn(self._ids, data_name)
self.stub(
name=f"SyncRead_{data_name}_all",
receive_bytes=sync_read_request_all,
send_fn=sync_read_response_all,
)
# sync read single motors
for idx in self._ids:
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
sync_read_response_single = self._create_present_pos_send_fn([idx], data_name)
if data_name == "Present_Position":
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
sync_read_response_single = self._build_present_pos_send_fn(
[idx], [return_value], num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
self.stub(
name=f"SyncRead_{data_name}_{idx}",
receive_bytes=sync_read_request_single,
send_fn=sync_read_response_single,
)
def _create_present_pos_send_fn(
self, dxl_ids: list[int], data_name: str, num_invalid_try: int | None = None
def build_all_motors_stub(
self, data_name: str, return_values: list[int] | None = None, num_invalid_try: int | None = None
) -> None:
address, length = self.ctrl_table[data_name]
if data_name == "Present_Position":
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
sync_read_response_all = self._build_present_pos_send_fn(
self._ids, return_values, num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
self.stub(
name=f"SyncRead_{data_name}_all",
receive_bytes=sync_read_request_all,
send_fn=sync_read_response_all,
)
def _build_present_pos_send_fn(
self, dxl_ids: list[int], return_pos: list[int] | None = None, num_invalid_try: int | None = None
) -> Callable[[int], bytes]:
# if data_name == "Present_Position":
# packet_generator = MockStatusPacket.present_position
# else:
# # TODO(aliberts): add "Goal_Position"
# raise NotImplementedError
return_pos = [None for _ in dxl_ids] if return_pos is None else return_pos
assert len(return_pos) == len(dxl_ids)
def send_fn(_call_count: int) -> bytes:
if num_invalid_try is not None and num_invalid_try >= _call_count:
return bytes(0)
first_packet = MockStatusPacket.present_position(next(iter(dxl_ids)))
if len(dxl_ids) == 1:
return first_packet
packets = first_packet
for idx in dxl_ids:
packets += MockStatusPacket.present_position(dxl_id=idx)
return b""
packets = b"".join(
MockStatusPacket.present_position(idx, pos)
for idx, pos in zip(dxl_ids, return_pos, strict=True)
)
return packets
return send_fn

245
tests/mocks/mock_feetech.py Normal file
View File

@ -0,0 +1,245 @@
import abc
import random
from typing import Callable
import scservo_sdk as scs
import serial
from mock_serial import MockSerial
from lerobot.common.motors.feetech.feetech import SCS_SERIES_CONTROL_TABLE
# https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf
INSTRUCTION_TYPES = {
"Ping": 0x01, # Checks whether the Packet has arrived at a device with the same ID as the specified packet ID
"Read": 0x02, # Read data from the Device
"Write": 0x03, # Write data to the Device
"Reg_Write": 0x04, # Register the Instruction Packet in standby status; Packet can later be executed using the Action command
"Action": 0x05, # Executes a Packet that was registered beforehand using Reg Write
"Factory_Reset": 0x06, # Resets the Control Table to its initial factory default settings
"Sync_Read": 0x82, # Read data from multiple devices with the same Address with the same length at once
"Sync_Write": 0x83, # Write data to multiple devices with the same Address with the same length at once
} # fmt: skip
ERROR_TYPE = {
"Success": 0x00,
"Voltage": 0x01,
"Angle": 0x02,
"Overheat": 0x04,
"Overele": 0x08,
"Overload": 0x20,
}
class MockFeetechPacket(abc.ABC):
@classmethod
def build(cls, scs_id: int, params: list[int], length: int, *args, **kwargs) -> bytes:
packet = cls._build(scs_id, params, length, *args, **kwargs)
packet = cls._add_checksum(packet)
return bytes(packet)
@abc.abstractclassmethod
def _build(cls, scs_id: int, params: list[int], length: int, *args, **kwargs) -> list[int]:
pass
@staticmethod
def _add_checksum(packet: list[int]) -> list[int]:
checksum = 0
for idx in range(2, len(packet) - 1): # except header & checksum
checksum += packet[idx]
packet[-1] = scs.SCS_LOBYTE(~checksum)
return packet
class MockInstructionPacket(MockFeetechPacket):
"""
Helper class to build valid Feetech Instruction Packets.
Instruction Packet structure
(from https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf)
| Header | Packet ID | Length | Instruction | Params | Checksum |
| --------- | --------- | ------ | ----------- | ----------------- | -------- |
| 0xFF 0xFF | ID | Len | Instr | Param 1 Param N | Sum |
"""
@classmethod
def _build(cls, scs_id: int, params: list[int], length: int, instruct_type: str) -> list[int]:
instruct_value = INSTRUCTION_TYPES[instruct_type]
return [
0xFF, 0xFF, # header
scs_id, # servo id
length, # length
instruct_value, # instruction type
*params, # data bytes
0x00, # placeholder for checksum
] # fmt: skip
@classmethod
def sync_read(
cls,
scs_ids: list[int],
start_address: int,
data_length: int,
) -> bytes:
"""
Builds a "Sync_Read" broadcast instruction.
The parameters for Sync Read (Protocol 2.0) are:
param[0] = start_address
param[1] = data_length
param[2+] = motor IDs to read from
And 'length' = (number_of_params + 7), where:
+1 is for instruction byte,
+1 is for the address byte,
+1 is for the length bytes,
+1 is for the checksum at the end.
"""
params = [start_address, data_length, *scs_ids]
length = len(scs_ids) + 4
return cls.build(scs_id=scs.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Read")
class MockStatusPacket(MockFeetechPacket):
"""
Helper class to build valid Feetech Status Packets.
Status Packet structure
(from https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf)
| Header | Packet ID | Length | Error | Params | Checksum |
| --------- | --------- | ------ | ----- | ----------------- | -------- |
| 0xFF 0xFF | ID | Len | Err | Param 1 Param N | Sum |
"""
@classmethod
def _build(cls, scs_id: int, params: list[int], length: int, error: str = "Success") -> list[int]:
err_byte = ERROR_TYPE[error]
return [
0xFF, 0xFF, # header
scs_id, # servo id
length, # length
err_byte, # status
*params, # data bytes
0x00, # placeholder for checksum
] # fmt: skip
@classmethod
def present_position(cls, scs_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes:
"""Builds a 'Present_Position' status packet.
Args:
scs_id (int): List of the servos ids
pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it
will use a random value in the min_max_range. Defaults to None.
min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos'
is None. Note that the bounds are included in the range. Defaults to (0, 4095).
Returns:
bytes: The raw 'Present_Position' status packet ready to be sent through serial.
"""
pos = random.randint(*min_max_range) if pos is None else pos
params = [scs.SCS_LOBYTE(pos), scs.SCS_HIBYTE(pos)]
length = 4
return cls.build(scs_id, params=params, length=length)
class MockPortHandler(scs.PortHandler):
"""
This class overwrite the 'setupPort' method of the Feetech PortHandler because it can specify
baudrates that are not supported with a serial port on MacOS.
"""
def setupPort(self, cflag_baud): # noqa: N802
if self.is_open:
self.closePort()
self.ser = serial.Serial(
port=self.port_name,
# baudrate=self.baudrate, <- This will fail on MacOS
# parity = serial.PARITY_ODD,
# stopbits = serial.STOPBITS_TWO,
bytesize=serial.EIGHTBITS,
timeout=0,
)
self.is_open = True
self.ser.reset_input_buffer()
self.tx_time_per_byte = (1000.0 / self.baudrate) * 10.0
return True
class MockMotors(MockSerial):
"""
This class will simulate physical motors by responding with valid status packets upon receiving some
instruction packets. It is meant to test MotorsBus classes.
'data_name' supported:
- Present_Position
"""
ctrl_table = SCS_SERIES_CONTROL_TABLE
def __init__(self, scs_ids: list[int]):
super().__init__()
self._ids = scs_ids
self.open()
def build_single_motor_stubs(
self, data_name: str, return_value: int | None = None, num_invalid_try: int | None = None
) -> None:
address, length = self.ctrl_table[data_name]
for idx in self._ids:
if data_name == "Present_Position":
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
sync_read_response_single = self._build_present_pos_send_fn(
[idx], [return_value], num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
self.stub(
name=f"SyncRead_{data_name}_{idx}",
receive_bytes=sync_read_request_single,
send_fn=sync_read_response_single,
)
def build_all_motors_stub(
self, data_name: str, return_values: list[int] | None = None, num_invalid_try: int | None = None
) -> None:
address, length = self.ctrl_table[data_name]
if data_name == "Present_Position":
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
sync_read_response_all = self._build_present_pos_send_fn(
self._ids, return_values, num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
self.stub(
name=f"SyncRead_{data_name}_all",
receive_bytes=sync_read_request_all,
send_fn=sync_read_response_all,
)
def _build_present_pos_send_fn(
self, scs_ids: list[int], return_pos: list[int] | None = None, num_invalid_try: int | None = None
) -> Callable[[int], bytes]:
return_pos = [None for _ in scs_ids] if return_pos is None else return_pos
assert len(return_pos) == len(scs_ids)
def send_fn(_call_count: int) -> bytes:
if num_invalid_try is not None and num_invalid_try >= _call_count:
return b""
packets = b"".join(
MockStatusPacket.present_position(idx, pos)
for idx, pos in zip(scs_ids, return_pos, strict=True)
)
return packets
return send_fn

View File

@ -1,151 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ruff: noqa: N802, E741
"""
Mocked classes and functions from scservo_sdk to allow for testing FeetechMotorsBus code.
Warning: These mocked versions are minimalist. They do not exactly mock every behaviors
from the original classes and functions (e.g. return types might be None instead of boolean).
"""
DEFAULT_BAUDRATE = 1_000_000
COMM_SUCCESS = 0 # tx or rx packet communication success
def convert_to_bytes(value, bytes):
# TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform
# `convert_bytes_to_value`
del bytes # unused
return value
def get_default_motor_values(motor_index):
return {
# Key (int) are from SCS_SERIES_CONTROL_TABLE
5: motor_index, # ID
6: DEFAULT_BAUDRATE, # Baud_rate
10: 0, # Drive_Mode
21: 32, # P_Coefficient
22: 32, # D_Coefficient
23: 0, # I_Coefficient
40: 0, # Torque_Enable
41: 254, # Acceleration
31: -2047, # Offset
33: 0, # Mode
55: 1, # Lock
# Set 2560 since calibration values for Aloha gripper is between start_pos=2499 and end_pos=3144
# For other joints, 2560 will be autocorrected to be in calibration range
56: 2560, # Present_Position
58: 0, # Present_Speed
69: 0, # Present_Current
85: 150, # Maximum_Acceleration
}
# Macro for Control Table Value
def SCS_MAKEWORD(a, b):
return (a & 0xFF) | ((b & 0xFF) << 8)
def SCS_MAKEDWORD(a, b):
return (a & 0xFFFF) | (b & 0xFFFF) << 16
def SCS_LOWORD(l):
return l & 0xFFFF
def SCS_HIWORD(l):
return (l >> 16) & 0xFFFF
def SCS_LOBYTE(w):
return w & 0xFF
def SCS_HIBYTE(w):
return (w >> 8) & 0xFF
class PortHandler:
def __init__(self, port):
self.port = port
# factory default baudrate
self.baudrate = DEFAULT_BAUDRATE
self.ser = SerialMock()
def openPort(self): # noqa: N802
return True
def closePort(self): # noqa: N802
pass
def setPacketTimeoutMillis(self, timeout_ms): # noqa: N802
del timeout_ms # unused
def getBaudRate(self): # noqa: N802
return self.baudrate
def setBaudRate(self, baudrate): # noqa: N802
self.baudrate = baudrate
class PacketHandler:
def __init__(self, protocol_version):
del protocol_version # unused
# Use packet_handler.data to communicate across Read and Write
self.data = {}
class GroupSyncRead:
def __init__(self, port_handler, packet_handler, address, bytes):
self.packet_handler = packet_handler
def addParam(self, motor_index): # noqa: N802
# Initialize motor default values
if motor_index not in self.packet_handler.data:
self.packet_handler.data[motor_index] = get_default_motor_values(motor_index)
def txRxPacket(self): # noqa: N802
return COMM_SUCCESS
def getData(self, index, address, bytes): # noqa: N802
return self.packet_handler.data[index][address]
class GroupSyncWrite:
def __init__(self, port_handler, packet_handler, address, bytes):
self.packet_handler = packet_handler
self.address = address
def addParam(self, index, data): # noqa: N802
if index not in self.packet_handler.data:
self.packet_handler.data[index] = get_default_motor_values(index)
self.changeParam(index, data)
def txPacket(self): # noqa: N802
return COMM_SUCCESS
def changeParam(self, index, data): # noqa: N802
self.packet_handler.data[index][self.address] = data
class SerialMock:
def reset_output_buffer(self):
pass
def reset_input_buffer(self):
pass

View File

@ -1,35 +1,51 @@
import sys
from unittest.mock import patch
import dynamixel_sdk as dxl
import pytest
from lerobot.common.motors.dynamixel.dynamixel import DynamixelMotorsBus
from tests.mocks.mock_dynamixel import MockInstructionPacket, MockMotors, MockPortHandler
from lerobot.common.motors.dynamixel import DynamixelMotorsBus
from tests.mocks.mock_dynamixel import MockMotors, MockPortHandler
@pytest.fixture(autouse=True)
def patch_port_handler():
with patch.object(dxl, "PortHandler", MockPortHandler):
yield # Patch is applied for the duration of each test
if sys.platform == "darwin":
with patch.object(dxl, "PortHandler", MockPortHandler):
yield
else:
yield
@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}")
def test_autouse_patch():
"""Ensure that the autouse fixture correctly patches dxl.PortHandler with MockPortHandler."""
"""Ensures that the autouse fixture correctly patches dxl.PortHandler with MockPortHandler."""
assert dxl.PortHandler is MockPortHandler
@pytest.mark.parametrize(
"value, n_bytes, expected",
[
(0x12, 1, [0x12]), # Single byte
(0x1234, 2, [0x34, 0x12]), # Two bytes
(0x12345678, 4, [0x78, 0x56, 0x34, 0x12]), # Four bytes
(0, 1, [0x00]), # Zero with 1 byte
(0, 2, [0x00, 0x00]), # Zero with 2 bytes
(0, 4, [0x00, 0x00, 0x00, 0x00]), # Zero with 4 bytes
(255, 1, [0xFF]), # Max single byte
(65535, 2, [0xFF, 0xFF]), # Max two bytes
(4294967295, 4, [0xFF, 0xFF, 0xFF, 0xFF]), # Max four bytes
(0x12, 1, [0x12]),
(0x1234, 2, [0x34, 0x12]),
(0x12345678, 4, [0x78, 0x56, 0x34, 0x12]),
(0, 1, [0x00]),
(0, 2, [0x00, 0x00]),
(0, 4, [0x00, 0x00, 0x00, 0x00]),
(255, 1, [0xFF]),
(65535, 2, [0xFF, 0xFF]),
(4294967295, 4, [0xFF, 0xFF, 0xFF, 0xFF]),
],
ids=[
"1 byte",
"2 bytes",
"4 bytes",
"0 with 1 byte",
"0 with 2 bytes",
"0 with 4 bytes",
"max single byte",
"max two bytes",
"max four bytes",
],
) # fmt: skip
def test_split_int_bytes(value, n_bytes, expected):
@ -53,7 +69,7 @@ def test_split_int_bytes_large_number():
def test_abc_implementation():
# Instantiation should raise an error if the class doesn't implements abstract methods/properties
"""Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
DynamixelMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "xl330-m077")})
@ -63,11 +79,14 @@ def test_abc_implementation():
None,
[1, 2, 3],
["dummy_1", "dummy_2", "dummy_3"],
[1, "dummy_2", 3], # Mixed
[1, "dummy_2", 3],
],
ids=["None", "by ids", "by names", "mixed"],
)
def test_read_all_motors(motors):
mock_motors = MockMotors([1, 2, 3])
positions = [1337, 42, 4016]
mock_motors.build_all_motors_stub("Present_Position", return_values=positions)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors={
@ -81,13 +100,22 @@ def test_read_all_motors(motors):
pos_dict = motors_bus.read("Present_Position", motors=motors)
assert mock_motors.stubs["SyncRead_Present_Position_all"].called
assert len(pos_dict) == 3
assert all(returned_pos == pos for returned_pos, pos in zip(pos_dict.values(), positions, strict=True))
assert set(pos_dict) == {"dummy_1", "dummy_2", "dummy_3"}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
@pytest.mark.parametrize("idx", [1, 2, 3])
def test_read_single_motor_name(idx):
@pytest.mark.parametrize(
"idx, pos",
[
[1, 1337],
[2, 42],
[3, 4016],
],
)
def test_read_single_motor_by_name(idx, pos):
mock_motors = MockMotors([1, 2, 3])
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors={
@ -101,13 +129,21 @@ def test_read_single_motor_name(idx):
pos_dict = motors_bus.read("Present_Position", f"dummy_{idx}")
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
assert len(pos_dict) == 1
assert pos_dict == {f"dummy_{idx}": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
@pytest.mark.parametrize("idx", [1, 2, 3])
def test_read_single_motor_id(idx):
@pytest.mark.parametrize(
"idx, pos",
[
[1, 1337],
[2, 42],
[3, 4016],
],
)
def test_read_single_motor_by_id(idx, pos):
mock_motors = MockMotors([1, 2, 3])
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors={
@ -121,28 +157,24 @@ def test_read_single_motor_id(idx):
pos_dict = motors_bus.read("Present_Position", idx)
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
assert len(pos_dict) == 1
assert pos_dict == {f"dummy_{idx}": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
@pytest.mark.parametrize(
"num_retry, num_invalid_try",
"num_retry, num_invalid_try, pos",
[
[1, 2],
[2, 3],
[3, 2],
[2, 1],
[1, 2, 1337],
[2, 3, 42],
[3, 2, 4016],
[2, 1, 999],
],
)
def test_read_num_retry(num_retry, num_invalid_try):
mock_motors = MockMotors([1, 2, 3], default_stubs=None)
address, length = mock_motors.ctrl_table["Present_Position"]
receive_bytes = MockInstructionPacket.sync_read([1], address, length)
send_fn = mock_motors._create_present_pos_send_fn(
[1], "Present_Position", num_invalid_try=num_invalid_try
def test_read_num_retry(num_retry, num_invalid_try, pos):
mock_motors = MockMotors([1, 2, 3])
mock_motors.build_single_motor_stubs(
"Present_Position", return_value=pos, num_invalid_try=num_invalid_try
)
mock_motors.stub(name="num_retry", receive_bytes=receive_bytes, send_fn=send_fn)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors={
@ -155,10 +187,10 @@ def test_read_num_retry(num_retry, num_invalid_try):
if num_retry >= num_invalid_try:
pos_dict = motors_bus.read("Present_Position", 1, num_retry=num_retry)
assert len(pos_dict) == 1
assert pos_dict == {"dummy_1": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
else:
with pytest.raises(ConnectionError):
_ = motors_bus.read("Present_Position", 1, num_retry=num_retry)
assert mock_motors.stubs["num_retry"].calls == num_retry
assert mock_motors.stubs["SyncRead_Present_Position_1"].calls == num_retry

View File

@ -2,43 +2,195 @@ import sys
from unittest.mock import patch
import pytest
import scservo_sdk as scs
from lerobot.common.motors.feetech import FeetechMotorsBus
from tests.mocks import mock_scservo_sdk
from tests.mocks.mock_feetech import MockMotors, MockPortHandler
@pytest.fixture(autouse=True)
def patch_scservo_sdk():
with patch.dict(sys.modules, {"scservo_sdk": mock_scservo_sdk}):
def patch_port_handler():
if sys.platform == "darwin":
with patch.object(scs, "PortHandler", MockPortHandler):
yield
else:
yield
def test_patch_sdk():
assert "scservo_sdk" in sys.modules # Should be patched
assert sys.modules["scservo_sdk"] is mock_scservo_sdk # Should match the mock
@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}")
def test_autouse_patch():
"""Ensures that the autouse fixture correctly patches scs.PortHandler with MockPortHandler."""
assert scs.PortHandler is MockPortHandler
@pytest.mark.parametrize(
"value, n_bytes, expected",
[
(0x12, 1, [0x12]),
(0x1234, 2, [0x34, 0x12]),
(0x12345678, 4, [0x78, 0x56, 0x34, 0x12]),
(0, 1, [0x00]),
(0, 2, [0x00, 0x00]),
(0, 4, [0x00, 0x00, 0x00, 0x00]),
(255, 1, [0xFF]),
(65535, 2, [0xFF, 0xFF]),
(4294967295, 4, [0xFF, 0xFF, 0xFF, 0xFF]),
],
ids=[
"1 byte",
"2 bytes",
"4 bytes",
"0 with 1 byte",
"0 with 2 bytes",
"0 with 4 bytes",
"max single byte",
"max two bytes",
"max four bytes",
],
) # fmt: skip
def test_split_int_bytes(value, n_bytes, expected):
assert FeetechMotorsBus.split_int_bytes(value, n_bytes) == expected
def test_split_int_bytes_invalid_n_bytes():
with pytest.raises(NotImplementedError):
FeetechMotorsBus.split_int_bytes(100, 3)
def test_split_int_bytes_negative_numbers():
with pytest.raises(ValueError):
neg = FeetechMotorsBus.split_int_bytes(-1, 1)
print(neg)
def test_split_int_bytes_large_number():
with pytest.raises(ValueError):
FeetechMotorsBus.split_int_bytes(2**32, 4) # 4-byte max is 0xFFFFFFFF
def test_abc_implementation():
# Instantiation should raise an error if the class doesn't implements abstract methods/properties
"""Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
FeetechMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "sts3215")})
def test_configure_motors_all_ids_1():
# see SCS_SERIES_BAUDRATE_TABLE
smaller_baudrate = 19_200
smaller_baudrate_value = 7
# This test expect the configuration was already correct.
motors_bus = FeetechMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "sts3215")})
@pytest.mark.parametrize(
"motors",
[
None,
[1, 2, 3],
["dummy_1", "dummy_2", "dummy_3"],
[1, "dummy_2", 3],
],
ids=["None", "by ids", "by names", "mixed"],
)
def test_read_all_motors(motors):
mock_motors = MockMotors([1, 2, 3])
positions = [1337, 42, 4016]
mock_motors.build_all_motors_stub("Present_Position", return_values=positions)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors={
"dummy_1": (1, "sts3215"),
"dummy_2": (2, "sts3215"),
"dummy_3": (3, "sts3215"),
},
)
motors_bus.connect()
motors_bus.write("Baud_Rate", [smaller_baudrate_value] * len(motors_bus))
motors_bus.set_baudrate(smaller_baudrate)
motors_bus.write("ID", [1] * len(motors_bus))
del motors_bus
pos_dict = motors_bus.read("Present_Position", motors=motors)
# Test configure
motors_bus = FeetechMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "sts3215")})
assert mock_motors.stubs["SyncRead_Present_Position_all"].called
assert all(returned_pos == pos for returned_pos, pos in zip(pos_dict.values(), positions, strict=True))
assert set(pos_dict) == {"dummy_1", "dummy_2", "dummy_3"}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
@pytest.mark.parametrize(
"idx, pos",
[
[1, 1337],
[2, 42],
[3, 4016],
],
)
def test_read_single_motor_by_name(idx, pos):
mock_motors = MockMotors([1, 2, 3])
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors={
"dummy_1": (1, "sts3215"),
"dummy_2": (2, "sts3215"),
"dummy_3": (3, "sts3215"),
},
)
motors_bus.connect()
assert motors_bus.are_motors_configured()
del motors_bus
pos_dict = motors_bus.read("Present_Position", f"dummy_{idx}")
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
assert pos_dict == {f"dummy_{idx}": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
@pytest.mark.parametrize(
"idx, pos",
[
[1, 1337],
[2, 42],
[3, 4016],
],
)
def test_read_single_motor_by_id(idx, pos):
mock_motors = MockMotors([1, 2, 3])
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors={
"dummy_1": (1, "sts3215"),
"dummy_2": (2, "sts3215"),
"dummy_3": (3, "sts3215"),
},
)
motors_bus.connect()
pos_dict = motors_bus.read("Present_Position", idx)
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
assert pos_dict == {f"dummy_{idx}": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
@pytest.mark.parametrize(
"num_retry, num_invalid_try, pos",
[
[1, 2, 1337],
[2, 3, 42],
[3, 2, 4016],
[2, 1, 999],
],
)
def test_read_num_retry(num_retry, num_invalid_try, pos):
mock_motors = MockMotors([1, 2, 3])
mock_motors.build_single_motor_stubs(
"Present_Position", return_value=pos, num_invalid_try=num_invalid_try
)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors={
"dummy_1": (1, "sts3215"),
"dummy_2": (2, "sts3215"),
"dummy_3": (3, "sts3215"),
},
)
motors_bus.connect()
if num_retry >= num_invalid_try:
pos_dict = motors_bus.read("Present_Position", 1, num_retry=num_retry)
assert pos_dict == {"dummy_1": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
else:
with pytest.raises(ConnectionError):
_ = motors_bus.read("Present_Position", 1, num_retry=num_retry)
assert mock_motors.stubs["SyncRead_Present_Position_1"].calls == num_retry

View File

@ -1,123 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tests for physical motors and their mocked versions.
If the physical motors are not connected to the computer, or not working,
the test will be skipped.
Example of running a specific test:
```bash
pytest -sx tests/test_motors.py::test_find_port
pytest -sx tests/test_motors.py::test_motors_bus
```
Example of running test on real dynamixel motors connected to the computer:
```bash
pytest -sx 'tests/test_motors.py::test_motors_bus[dynamixel-False]'
```
Example of running test on a mocked version of dynamixel motors:
```bash
pytest -sx 'tests/test_motors.py::test_motors_bus[dynamixel-True]'
```
"""
# TODO(rcadene): measure fps in nightly?
# TODO(rcadene): test logs
# TODO(rcadene): test calibration
# TODO(rcadene): add compatibility with other motors bus
import time
import numpy as np
import pytest
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.scripts.find_motors_bus_port import find_port
from tests.utils import TEST_MOTOR_TYPES, make_motors_bus, require_motor
@pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES)
@require_motor
def test_find_port(request, motor_type, mock):
if mock:
request.getfixturevalue("patch_builtins_input")
with pytest.raises(OSError):
find_port()
else:
find_port()
@pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES)
@require_motor
def test_motors_bus(request, motor_type, mock):
if mock:
request.getfixturevalue("patch_builtins_input")
motors_bus = make_motors_bus(motor_type, mock=mock)
# Test reading and writing before connecting raises an error
with pytest.raises(DeviceNotConnectedError):
motors_bus.read("Torque_Enable")
with pytest.raises(DeviceNotConnectedError):
motors_bus.write("Torque_Enable", 1)
with pytest.raises(DeviceNotConnectedError):
motors_bus.disconnect()
# Test deleting the object without connecting first
del motors_bus
# Test connecting
motors_bus = make_motors_bus(motor_type, mock=mock)
motors_bus.connect()
# Test connecting twice raises an error
with pytest.raises(DeviceAlreadyConnectedError):
motors_bus.connect()
# Test disabling torque and reading torque on all motors
motors_bus.write("Torque_Enable", 0)
values = motors_bus.read("Torque_Enable")
assert isinstance(values, np.ndarray)
assert len(values) == len(motors_bus.motors)
assert (values == 0).all()
# Test writing torque on a specific motor
motors_bus.write("Torque_Enable", 1, "gripper")
# Test reading torque from this specific motor. It is now 1
values = motors_bus.read("Torque_Enable", "gripper")
assert len(values) == 1
assert values[0] == 1
# Test reading torque from all motors. It is 1 for the specific motor,
# and 0 on the others.
values = motors_bus.read("Torque_Enable")
gripper_index = motors_bus.motor_names.index("gripper")
assert values[gripper_index] == 1
assert values.sum() == 1 # gripper is the only motor to have torque 1
# Test writing torque on all motors and it is 1 for all.
motors_bus.write("Torque_Enable", 1)
values = motors_bus.read("Torque_Enable")
assert (values == 1).all()
# Test ordering the motors to move slightly (+1 value among 4096) and this move
# can be executed and seen by the motor position sensor
values = motors_bus.read("Present_Position")
motors_bus.write("Goal_Position", values + 1)
# Give time for the motors to move to the goal position
time.sleep(1)
new_values = motors_bus.read("Present_Position")
assert (new_values == values).all()