Compare commits

...

10 Commits

Author SHA1 Message Date
Simon Alibert 9e34c1d731 Move feetech table & cleanup 2025-03-22 01:24:48 +01:00
Simon Alibert 857f335be9 Improve feetech mocking 2025-03-22 01:19:51 +01:00
Simon Alibert fc4a95f187 Add CRC docstring 2025-03-22 00:50:01 +01:00
Simon Alibert 4fe1880887 Add ping testing 2025-03-22 00:40:22 +01:00
Simon Alibert 6fa859fa19 Improve dynamixel mocking 2025-03-22 00:39:41 +01:00
Simon Alibert 2abfa5838d Improve read ergonomics & typing, rm find_motor_indices 2025-03-22 00:34:07 +01:00
Simon Alibert 3d119c0ccb Add single value write 2025-03-21 12:31:41 +01:00
Simon Alibert a32081757d Add Motor class 2025-03-21 12:13:44 +01:00
Simon Alibert 56c04ffc53 Move dxl table & cleanup 2025-03-21 11:28:11 +01:00
Simon Alibert 715d4557af Ruff ignore F401 & F403 for init files 2025-03-21 11:22:02 +01:00
13 changed files with 693 additions and 558 deletions

View File

@ -1,3 +1 @@
from .motors_bus import CalibrationMode, DriveMode, MotorsBus, TorqueMode
__all__ = ["CalibrationMode", "DriveMode", "MotorsBus", "TorqueMode"]
from .motors_bus import CalibrationMode, DriveMode, Motor, MotorsBus, TorqueMode

View File

@ -1,4 +1,3 @@
from .dynamixel import DynamixelMotorsBus
from .dynamixel_calibration import run_arm_calibration
__all__ = ["DynamixelMotorsBus", "run_arm_calibration"]
from .tables import *

View File

@ -20,153 +20,17 @@
from copy import deepcopy
import numpy as np
from ..motors_bus import MotorsBus
from .tables import MODEL_BAUDRATE_TABLE, MODEL_CONTROL_TABLE, MODEL_RESOLUTION
PROTOCOL_VERSION = 2.0
BAUDRATE = 1_000_000
DEFAULT_TIMEOUT_MS = 1000
MAX_ID_RANGE = 252
# The following bounds define the lower and upper joints range (after calibration).
# For joints in degree (i.e. revolute joints), their nominal range is [-180, 180] degrees
# which corresponds to a half rotation on the left and half rotation on the right.
# Some joints might require higher range, so we allow up to [-270, 270] degrees until
# an error is raised.
LOWER_BOUND_DEGREE = -270
UPPER_BOUND_DEGREE = 270
# For joints in percentage (i.e. joints that move linearly like the prismatic joint of a gripper),
# their nominal range is [0, 100] %. For instance, for Aloha gripper, 0% is fully
# closed, and 100% is fully open. To account for slight calibration issue, we allow up to
# [-10, 110] until an error is raised.
LOWER_BOUND_LINEAR = -10
UPPER_BOUND_LINEAR = 110
HALF_TURN_DEGREE = 180
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m077
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m288
# https://emanual.robotis.com/docs/en/dxl/x/xl430-w250
# https://emanual.robotis.com/docs/en/dxl/x/xm430-w350
# https://emanual.robotis.com/docs/en/dxl/x/xm540-w270
# https://emanual.robotis.com/docs/en/dxl/x/xc430-w150
# data_name: (address, size_byte)
X_SERIES_CONTROL_TABLE = {
"Model_Number": (0, 2),
"Model_Information": (2, 4),
"Firmware_Version": (6, 1),
"ID": (7, 1),
"Baud_Rate": (8, 1),
"Return_Delay_Time": (9, 1),
"Drive_Mode": (10, 1),
"Operating_Mode": (11, 1),
"Secondary_ID": (12, 1),
"Protocol_Type": (13, 1),
"Homing_Offset": (20, 4),
"Moving_Threshold": (24, 4),
"Temperature_Limit": (31, 1),
"Max_Voltage_Limit": (32, 2),
"Min_Voltage_Limit": (34, 2),
"PWM_Limit": (36, 2),
"Current_Limit": (38, 2),
"Acceleration_Limit": (40, 4),
"Velocity_Limit": (44, 4),
"Max_Position_Limit": (48, 4),
"Min_Position_Limit": (52, 4),
"Shutdown": (63, 1),
"Torque_Enable": (64, 1),
"LED": (65, 1),
"Status_Return_Level": (68, 1),
"Registered_Instruction": (69, 1),
"Hardware_Error_Status": (70, 1),
"Velocity_I_Gain": (76, 2),
"Velocity_P_Gain": (78, 2),
"Position_D_Gain": (80, 2),
"Position_I_Gain": (82, 2),
"Position_P_Gain": (84, 2),
"Feedforward_2nd_Gain": (88, 2),
"Feedforward_1st_Gain": (90, 2),
"Bus_Watchdog": (98, 1),
"Goal_PWM": (100, 2),
"Goal_Current": (102, 2),
"Goal_Velocity": (104, 4),
"Profile_Acceleration": (108, 4),
"Profile_Velocity": (112, 4),
"Goal_Position": (116, 4),
"Realtime_Tick": (120, 2),
"Moving": (122, 1),
"Moving_Status": (123, 1),
"Present_PWM": (124, 2),
"Present_Current": (126, 2),
"Present_Velocity": (128, 4),
"Present_Position": (132, 4),
"Velocity_Trajectory": (136, 4),
"Position_Trajectory": (140, 4),
"Present_Input_Voltage": (144, 2),
"Present_Temperature": (146, 1),
}
X_SERIES_BAUDRATE_TABLE = {
0: 9_600,
1: 57_600,
2: 115_200,
3: 1_000_000,
4: 2_000_000,
5: 3_000_000,
6: 4_000_000,
}
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]
CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
MODEL_CONTROL_TABLE = {
"x_series": X_SERIES_CONTROL_TABLE,
"xl330-m077": X_SERIES_CONTROL_TABLE,
"xl330-m288": X_SERIES_CONTROL_TABLE,
"xl430-w250": X_SERIES_CONTROL_TABLE,
"xm430-w350": X_SERIES_CONTROL_TABLE,
"xm540-w270": X_SERIES_CONTROL_TABLE,
"xc430-w150": X_SERIES_CONTROL_TABLE,
}
MODEL_RESOLUTION = {
"x_series": 4096,
"xl330-m077": 4096,
"xl330-m288": 4096,
"xl430-w250": 4096,
"xm430-w350": 4096,
"xm540-w270": 4096,
"xc430-w150": 4096,
}
MODEL_BAUDRATE_TABLE = {
"x_series": X_SERIES_BAUDRATE_TABLE,
"xl330-m077": X_SERIES_BAUDRATE_TABLE,
"xl330-m288": X_SERIES_BAUDRATE_TABLE,
"xl430-w250": X_SERIES_BAUDRATE_TABLE,
"xm430-w350": X_SERIES_BAUDRATE_TABLE,
"xm540-w270": X_SERIES_BAUDRATE_TABLE,
"xc430-w150": X_SERIES_BAUDRATE_TABLE,
}
NUM_READ_RETRY = 10
NUM_WRITE_RETRY = 10
def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str]) -> np.ndarray:
"""This function converts the degree range to the step range for indicating motors rotation.
It assumes a motor achieves a full rotation by going from -180 degree position to +180.
The motor resolution (e.g. 4096) corresponds to the number of steps needed to achieve a full rotation.
"""
resolutions = [MODEL_RESOLUTION[model] for model in models]
steps = degrees / 180 * np.array(resolutions) / 2
steps = steps.astype(int)
return steps
class DynamixelMotorsBus(MotorsBus):
"""
@ -194,10 +58,16 @@ class DynamixelMotorsBus(MotorsBus):
self.reader = dxl.GroupSyncRead(self.port_handler, self.packet_handler, 0, 0)
self.writer = dxl.GroupSyncWrite(self.port_handler, self.packet_handler, 0, 0)
def broadcast_ping(self) -> tuple[list, int]:
data_list, comm = self.packet_handler.broadcastPing(self.port_handler)
# TODO(aliberts): translate data_list into {id: model}
return data_list, comm
def broadcast_ping(
self, num_retry: int = 0, raise_on_error: bool = False
) -> dict[int, list[int, int]] | None:
for _ in range(1 + num_retry):
data_list, comm = self.packet_handler.broadcastPing(self.port_handler)
if self._is_comm_success(comm):
return data_list
if raise_on_error:
raise ConnectionError(f"Broadcast ping returned a {comm} comm error.")
def calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
# TODO

View File

@ -0,0 +1,110 @@
# data_name: (address, size_byte)
# https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#control-table
X_SERIES_CONTROL_TABLE = {
"Model_Number": (0, 2),
"Model_Information": (2, 4),
"Firmware_Version": (6, 1),
"ID": (7, 1),
"Baud_Rate": (8, 1),
"Return_Delay_Time": (9, 1),
"Drive_Mode": (10, 1),
"Operating_Mode": (11, 1),
"Secondary_ID": (12, 1),
"Protocol_Type": (13, 1),
"Homing_Offset": (20, 4),
"Moving_Threshold": (24, 4),
"Temperature_Limit": (31, 1),
"Max_Voltage_Limit": (32, 2),
"Min_Voltage_Limit": (34, 2),
"PWM_Limit": (36, 2),
"Current_Limit": (38, 2),
"Acceleration_Limit": (40, 4),
"Velocity_Limit": (44, 4),
"Max_Position_Limit": (48, 4),
"Min_Position_Limit": (52, 4),
"Shutdown": (63, 1),
"Torque_Enable": (64, 1),
"LED": (65, 1),
"Status_Return_Level": (68, 1),
"Registered_Instruction": (69, 1),
"Hardware_Error_Status": (70, 1),
"Velocity_I_Gain": (76, 2),
"Velocity_P_Gain": (78, 2),
"Position_D_Gain": (80, 2),
"Position_I_Gain": (82, 2),
"Position_P_Gain": (84, 2),
"Feedforward_2nd_Gain": (88, 2),
"Feedforward_1st_Gain": (90, 2),
"Bus_Watchdog": (98, 1),
"Goal_PWM": (100, 2),
"Goal_Current": (102, 2),
"Goal_Velocity": (104, 4),
"Profile_Acceleration": (108, 4),
"Profile_Velocity": (112, 4),
"Goal_Position": (116, 4),
"Realtime_Tick": (120, 2),
"Moving": (122, 1),
"Moving_Status": (123, 1),
"Present_PWM": (124, 2),
"Present_Current": (126, 2),
"Present_Velocity": (128, 4),
"Present_Position": (132, 4),
"Velocity_Trajectory": (136, 4),
"Position_Trajectory": (140, 4),
"Present_Input_Voltage": (144, 2),
"Present_Temperature": (146, 1),
}
# https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#baud-rate8
X_SERIES_BAUDRATE_TABLE = {
0: 9_600,
1: 57_600,
2: 115_200,
3: 1_000_000,
4: 2_000_000,
5: 3_000_000,
6: 4_000_000,
}
# {model: model_resolution}
# https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#specifications
MODEL_RESOLUTION = {
"x_series": 4096,
"xl330-m077": 4096,
"xl330-m288": 4096,
"xl430-w250": 4096,
"xm430-w350": 4096,
"xm540-w270": 4096,
"xc430-w150": 4096,
}
# {model: model_number}
# https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#control-table-of-eeprom-area
MODELS_TABLE = {
"xl330-m077": 1190,
"xl330-m288": 1200,
"xl430-w250": 1060,
"xm430-w350": 1020,
"xm540-w270": 1120,
"xc430-w150": 1070,
}
MODEL_CONTROL_TABLE = {
"x_series": X_SERIES_CONTROL_TABLE,
"xl330-m077": X_SERIES_CONTROL_TABLE,
"xl330-m288": X_SERIES_CONTROL_TABLE,
"xl430-w250": X_SERIES_CONTROL_TABLE,
"xm430-w350": X_SERIES_CONTROL_TABLE,
"xm540-w270": X_SERIES_CONTROL_TABLE,
"xc430-w150": X_SERIES_CONTROL_TABLE,
}
MODEL_BAUDRATE_TABLE = {
"x_series": X_SERIES_BAUDRATE_TABLE,
"xl330-m077": X_SERIES_BAUDRATE_TABLE,
"xl330-m288": X_SERIES_BAUDRATE_TABLE,
"xl430-w250": X_SERIES_BAUDRATE_TABLE,
"xm430-w350": X_SERIES_BAUDRATE_TABLE,
"xm540-w270": X_SERIES_BAUDRATE_TABLE,
"xc430-w150": X_SERIES_BAUDRATE_TABLE,
}

View File

@ -1,8 +1,3 @@
from .feetech import FeetechMotorsBus
from .feetech_calibration import apply_feetech_offsets_from_calibration, run_full_arm_calibration
__all__ = [
"FeetechMotorsBus",
"apply_feetech_offsets_from_calibration",
"run_full_arm_calibration",
]
from .tables import *

View File

@ -15,6 +15,12 @@
from copy import deepcopy
from ..motors_bus import MotorsBus
from .tables import (
CALIBRATION_REQUIRED,
MODEL_BAUDRATE_TABLE,
MODEL_CONTROL_TABLE,
MODEL_RESOLUTION,
)
PROTOCOL_VERSION = 0
BAUDRATE = 1_000_000
@ -22,100 +28,6 @@ DEFAULT_TIMEOUT_MS = 1000
MAX_ID_RANGE = 252
# For joints in percentage (i.e. joints that move linearly like the prismatic joint of a gripper),
# their nominal range is [0, 100] %. For instance, for Aloha gripper, 0% is fully
# closed, and 100% is fully open. To account for slight calibration issue, we allow up to
# [-10, 110] until an error is raised.
LOWER_BOUND_LINEAR = -10
UPPER_BOUND_LINEAR = 110
HALF_TURN_DEGREE = 180
# See this link for STS3215 Memory Table:
# https://docs.google.com/spreadsheets/d/1GVs7W1VS1PqdhA1nW-abeyAHhTUxKUdR/edit?usp=sharing&ouid=116566590112741600240&rtpof=true&sd=true
# data_name: (address, size_byte)
SCS_SERIES_CONTROL_TABLE = {
"Model": (3, 2),
"ID": (5, 1),
"Baud_Rate": (6, 1),
"Return_Delay": (7, 1),
"Response_Status_Level": (8, 1),
"Min_Angle_Limit": (9, 2),
"Max_Angle_Limit": (11, 2),
"Max_Temperature_Limit": (13, 1),
"Max_Voltage_Limit": (14, 1),
"Min_Voltage_Limit": (15, 1),
"Max_Torque_Limit": (16, 2),
"Phase": (18, 1),
"Unloading_Condition": (19, 1),
"LED_Alarm_Condition": (20, 1),
"P_Coefficient": (21, 1),
"D_Coefficient": (22, 1),
"I_Coefficient": (23, 1),
"Minimum_Startup_Force": (24, 2),
"CW_Dead_Zone": (26, 1),
"CCW_Dead_Zone": (27, 1),
"Protection_Current": (28, 2),
"Angular_Resolution": (30, 1),
"Offset": (31, 2),
"Mode": (33, 1),
"Protective_Torque": (34, 1),
"Protection_Time": (35, 1),
"Overload_Torque": (36, 1),
"Speed_closed_loop_P_proportional_coefficient": (37, 1),
"Over_Current_Protection_Time": (38, 1),
"Velocity_closed_loop_I_integral_coefficient": (39, 1),
"Torque_Enable": (40, 1),
"Acceleration": (41, 1),
"Goal_Position": (42, 2),
"Goal_Time": (44, 2),
"Goal_Speed": (46, 2),
"Torque_Limit": (48, 2),
"Lock": (55, 1),
"Present_Position": (56, 2),
"Present_Speed": (58, 2),
"Present_Load": (60, 2),
"Present_Voltage": (62, 1),
"Present_Temperature": (63, 1),
"Status": (65, 1),
"Moving": (66, 1),
"Present_Current": (69, 2),
# Not in the Memory Table
"Maximum_Acceleration": (85, 2),
}
SCS_SERIES_BAUDRATE_TABLE = {
0: 1_000_000,
1: 500_000,
2: 250_000,
3: 128_000,
4: 115_200,
5: 57_600,
6: 38_400,
7: 19_200,
}
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]
MODEL_CONTROL_TABLE = {
"scs_series": SCS_SERIES_CONTROL_TABLE,
"sts3215": SCS_SERIES_CONTROL_TABLE,
}
MODEL_RESOLUTION = {
"scs_series": 4096,
"sts3215": 4096,
}
MODEL_BAUDRATE_TABLE = {
"scs_series": SCS_SERIES_BAUDRATE_TABLE,
"sts3215": SCS_SERIES_BAUDRATE_TABLE,
}
# High number of retries is needed for feetech compared to dynamixel motors.
NUM_READ_RETRY = 20
NUM_WRITE_RETRY = 20
class FeetechMotorsBus(MotorsBus):
"""

View File

@ -0,0 +1,80 @@
# See this link for STS3215 Memory Table:
# https://docs.google.com/spreadsheets/d/1GVs7W1VS1PqdhA1nW-abeyAHhTUxKUdR/edit?usp=sharing&ouid=116566590112741600240&rtpof=true&sd=true
# data_name: (address, size_byte)
SCS_SERIES_CONTROL_TABLE = {
"Model": (3, 2),
"ID": (5, 1),
"Baud_Rate": (6, 1),
"Return_Delay": (7, 1),
"Response_Status_Level": (8, 1),
"Min_Angle_Limit": (9, 2),
"Max_Angle_Limit": (11, 2),
"Max_Temperature_Limit": (13, 1),
"Max_Voltage_Limit": (14, 1),
"Min_Voltage_Limit": (15, 1),
"Max_Torque_Limit": (16, 2),
"Phase": (18, 1),
"Unloading_Condition": (19, 1),
"LED_Alarm_Condition": (20, 1),
"P_Coefficient": (21, 1),
"D_Coefficient": (22, 1),
"I_Coefficient": (23, 1),
"Minimum_Startup_Force": (24, 2),
"CW_Dead_Zone": (26, 1),
"CCW_Dead_Zone": (27, 1),
"Protection_Current": (28, 2),
"Angular_Resolution": (30, 1),
"Offset": (31, 2),
"Mode": (33, 1),
"Protective_Torque": (34, 1),
"Protection_Time": (35, 1),
"Overload_Torque": (36, 1),
"Speed_closed_loop_P_proportional_coefficient": (37, 1),
"Over_Current_Protection_Time": (38, 1),
"Velocity_closed_loop_I_integral_coefficient": (39, 1),
"Torque_Enable": (40, 1),
"Acceleration": (41, 1),
"Goal_Position": (42, 2),
"Goal_Time": (44, 2),
"Goal_Speed": (46, 2),
"Torque_Limit": (48, 2),
"Lock": (55, 1),
"Present_Position": (56, 2),
"Present_Speed": (58, 2),
"Present_Load": (60, 2),
"Present_Voltage": (62, 1),
"Present_Temperature": (63, 1),
"Status": (65, 1),
"Moving": (66, 1),
"Present_Current": (69, 2),
# Not in the Memory Table
"Maximum_Acceleration": (85, 2),
}
SCS_SERIES_BAUDRATE_TABLE = {
0: 1_000_000,
1: 500_000,
2: 250_000,
3: 128_000,
4: 115_200,
5: 57_600,
6: 38_400,
7: 19_200,
}
MODEL_CONTROL_TABLE = {
"scs_series": SCS_SERIES_CONTROL_TABLE,
"sts3215": SCS_SERIES_CONTROL_TABLE,
}
MODEL_RESOLUTION = {
"scs_series": 4096,
"sts3215": 4096,
}
MODEL_BAUDRATE_TABLE = {
"scs_series": SCS_SERIES_BAUDRATE_TABLE,
"sts3215": SCS_SERIES_BAUDRATE_TABLE,
}
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]

View File

@ -22,19 +22,22 @@
import abc
import json
import time
from dataclasses import dataclass
from enum import Enum
from functools import cached_property
from pathlib import Path
from pprint import pformat
from typing import Protocol
from typing import Protocol, TypeAlias, overload
import serial
import tqdm
from deepdiff import DeepDiff
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.utils.utils import capture_timestamp_utc
NameOrID: TypeAlias = str | int
Value: TypeAlias = int | float
MAX_ID_RANGE = 252
@ -197,6 +200,12 @@ class GroupSyncWrite(Protocol):
def txPacket(self): ...
@dataclass
class Motor:
id: int
model: str
class MotorsBus(abc.ABC):
"""The main LeRobot class for implementing motors buses.
@ -248,7 +257,7 @@ class MotorsBus(abc.ABC):
def __init__(
self,
port: str,
motors: dict[str, tuple[int, str]],
motors: dict[str, Motor],
):
self.port = port
self.motors = motors
@ -262,8 +271,8 @@ class MotorsBus(abc.ABC):
self.logs = {} # TODO(aliberts): use subclass logger
self.calibration = None
self._id_to_model = dict(self.motors.values())
self._id_to_name = {idx: name for name, (idx, _) in self.motors.items()}
self._id_to_model = {m.id: m.model for m in self.motors.values()}
self._id_to_name = {m.id: name for name, m in self.motors.items()}
def __len__(self):
return len(self.motors)
@ -278,39 +287,39 @@ class MotorsBus(abc.ABC):
@cached_property
def _has_different_ctrl_tables(self) -> bool:
if len(self.motor_models) < 2:
if len(self.models) < 2:
return False
first_table = self.model_ctrl_table[self.motor_models[0]]
return any(DeepDiff(first_table, self.model_ctrl_table[model]) for model in self.motor_models[1:])
first_table = self.model_ctrl_table[self.models[0]]
return any(DeepDiff(first_table, self.model_ctrl_table[model]) for model in self.models[1:])
def idx_to_model(self, idx: int) -> str:
return self._id_to_model[idx]
def id_to_model(self, motor_id: int) -> str:
return self._id_to_model[motor_id]
def idx_to_name(self, idx: int) -> str:
return self._id_to_name[idx]
def id_to_name(self, motor_id: int) -> str:
return self._id_to_name[motor_id]
@cached_property
def motor_names(self) -> list[str]:
def names(self) -> list[str]:
return list(self.motors)
@cached_property
def motor_models(self) -> list[str]:
return [model for _, model in self.motors.values()]
def models(self) -> list[str]:
return [m.model for m in self.motors.values()]
@cached_property
def motor_ids(self) -> list[int]:
return [idx for idx, _ in self.motors.values()]
def ids(self) -> list[int]:
return [m.id for m in self.motors.values()]
def _validate_motors(self) -> None:
# TODO(aliberts): improve error messages for this (display problematics values)
if len(self.motor_ids) != len(set(self.motor_ids)):
if len(self.ids) != len(set(self.ids)):
raise ValueError("Some motors have the same id.")
if len(self.motor_names) != len(set(self.motor_names)):
if len(self.names) != len(set(self.names)):
raise ValueError("Some motors have the same name.")
if any(model not in self.model_resolution_table for model in self.motor_models):
if any(model not in self.model_resolution_table for model in self.models):
raise ValueError("Some motors models are not available.")
@property
@ -347,46 +356,25 @@ class MotorsBus(abc.ABC):
"""
try:
# TODO(aliberts): use ping instead
return (self.motor_ids == self.read("ID")).all()
return (self.ids == self.read("ID")).all()
except ConnectionError as e:
print(e)
return False
def ping(self, motor: str | int, num_retry: int | None = None) -> int:
idx = self.get_safe_id(motor)
for _ in range(num_retry):
model_number, comm, _ = self.packet_handler.ping(self.port, idx)
def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None:
idx = self.get_motor_id(motor)
for _ in range(1 + num_retry):
model_number, comm, error = self.packet_handler.ping(self.port_handler, idx)
if self._is_comm_success(comm):
return model_number
# TODO(aliberts): Should we?
return comm
if raise_on_error:
raise ConnectionError(f"Ping motor {motor} returned a {error} error code.")
@abc.abstractmethod
def broadcast_ping(self, num_retry: int | None = None):
...
# TODO(aliberts): this will replace 'find_motor_indices'
def find_motor_indices(self, possible_ids: list[str] = None, num_retry: int = 2):
if possible_ids is None:
possible_ids = range(MAX_ID_RANGE)
indices = []
for idx in tqdm.tqdm(possible_ids):
try:
present_idx = self.read("ID", idx, num_retry=num_retry)[0]
except ConnectionError:
continue
if idx != present_idx:
# sanity check
raise OSError(
"Motor index used to communicate through the bus is not the same as the one present in the motor "
"memory. The motor memory might be damaged."
)
indices.append(idx)
return indices
def broadcast_ping(
self, num_retry: int = 0, raise_on_error: bool = False
) -> dict[int, list[int, int]] | None: ...
def set_baudrate(self, baudrate) -> None:
present_bus_baudrate = self.port_handler.getBaudRate()
@ -453,17 +441,23 @@ class MotorsBus(abc.ABC):
"""
pass
def get_safe_id(self, motor: str | int) -> int:
def get_motor_id(self, motor: NameOrID) -> int:
if isinstance(motor, str):
return self.motors[motor][0]
return self.motors[motor].id
elif isinstance(motor, int):
return motor
else:
raise ValueError(f"{motor} should be int or str.")
raise TypeError(f"'{motor}' should be int, str.")
@overload
def read(self, data_name: str, motors: None = ...) -> dict[str, Value]: ...
@overload
def read(self, data_name: str, motors: NameOrID) -> dict[NameOrID, Value]: ...
@overload
def read(self, data_name: str, motors: list[NameOrID]) -> dict[NameOrID, Value]: ...
def read(
self, data_name: str, motors: str | int | list[str | int] | None = None, num_retry: int = 1
) -> dict[str, float]:
self, data_name: str, motors: NameOrID | list[NameOrID] | None = None, num_retry: int = 0
) -> dict[NameOrID, Value]:
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
@ -471,18 +465,22 @@ class MotorsBus(abc.ABC):
start_time = time.perf_counter()
id_key_map: dict[int, NameOrID] = {}
if motors is None:
motors = self.motor_ids
id_key_map = {m.id: name for name, m in self.motors.items()}
elif isinstance(motors, (str, int)):
id_key_map = {self.get_motor_id(motors): motors}
elif isinstance(motors, list):
id_key_map = {self.get_motor_id(m): m for m in motors}
else:
raise TypeError(motors)
if isinstance(motors, (str, int)):
motors = [motors]
motor_ids = [self.get_safe_id(motor) for motor in motors]
motor_ids = list(id_key_map)
if self._has_different_ctrl_tables:
models = [self.idx_to_model(idx) for idx in motor_ids]
models = [self.id_to_model(idx) for idx in motor_ids]
assert_same_address(self.model_ctrl_table, models, data_name)
model = self.idx_to_model(next(iter(motor_ids)))
model = self.id_to_model(next(iter(motor_ids)))
addr, n_bytes = self.model_ctrl_table[model][data_name]
comm, ids_values = self._read(motor_ids, addr, n_bytes, num_retry)
@ -495,8 +493,7 @@ class MotorsBus(abc.ABC):
if data_name in self.calibration_required and self.calibration is not None:
ids_values = self.calibrate_values(ids_values)
# TODO(aliberts): return keys in the same format we got them?
ids_values = {self.idx_to_name(idx): val for idx, val in ids_values.items()}
keys_values = {id_key_map[idx]: val for idx, val in ids_values.items()}
# log the number of seconds it took to read the data from the motors
delta_ts_name = get_log_name("delta_timestamp_s", "read", data_name, motor_ids)
@ -506,10 +503,10 @@ class MotorsBus(abc.ABC):
ts_utc_name = get_log_name("timestamp_utc", "read", data_name, motor_ids)
self.logs[ts_utc_name] = capture_timestamp_utc()
return ids_values
return keys_values
def _read(
self, motor_ids: list[str], address: int, n_bytes: int, num_retry: int = 1
self, motor_ids: list[str], address: int, n_bytes: int, num_retry: int = 0
) -> tuple[int, dict[int, int]]:
self.reader.clearParam()
self.reader.start_address = address
@ -523,7 +520,7 @@ class MotorsBus(abc.ABC):
for idx in motor_ids:
self.reader.addParam(idx)
for _ in range(num_retry):
for _ in range(1 + num_retry):
comm = self.reader.txRxPacket()
if self._is_comm_success(comm):
break
@ -540,7 +537,7 @@ class MotorsBus(abc.ABC):
# for idx in motor_ids:
# value = self.reader.getData(idx, address, n_bytes)
def write(self, data_name: str, values_dict: dict[str | int, int], num_retry: int = 1) -> None:
def write(self, data_name: str, values: int | dict[NameOrID, int], num_retry: int = 0) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
@ -548,16 +545,21 @@ class MotorsBus(abc.ABC):
start_time = time.perf_counter()
ids_values = {self.get_safe_id(motor): val for motor, val in values_dict.items()}
if isinstance(values, int):
ids_values = {id_: values for id_ in self.ids}
elif isinstance(values, dict):
ids_values = {self.get_motor_id(motor): val for motor, val in values.items()}
else:
raise ValueError(f"'values' is expected to be a single value or a dict. Got {values}")
if self._has_different_ctrl_tables:
models = [self.idx_to_model(idx) for idx in ids_values]
models = [self.id_to_model(idx) for idx in ids_values]
assert_same_address(self.model_ctrl_table, models, data_name)
if data_name in self.calibration_required and self.calibration is not None:
ids_values = self.uncalibrate_values(ids_values)
model = self.idx_to_model(next(iter(ids_values)))
model = self.id_to_model(next(iter(ids_values)))
addr, n_bytes = self.model_ctrl_table[model][data_name]
comm = self._write(ids_values, addr, n_bytes, num_retry)
@ -575,7 +577,7 @@ class MotorsBus(abc.ABC):
ts_utc_name = get_log_name("timestamp_utc", "write", data_name, list(ids_values))
self.logs[ts_utc_name] = capture_timestamp_utc()
def _write(self, ids_values: dict[int, int], address: int, n_bytes: int, num_retry: int = 1) -> int:
def _write(self, ids_values: dict[int, int], address: int, n_bytes: int, num_retry: int = 0) -> int:
self.writer.clearParam()
self.writer.start_address = address
self.writer.data_length = n_bytes
@ -584,7 +586,7 @@ class MotorsBus(abc.ABC):
data = self.split_int_bytes(value, n_bytes)
self.writer.addParam(idx, data)
for _ in range(num_retry):
for _ in range(1 + num_retry):
comm = self.writer.txPacket()
if self._is_comm_success(comm):
break

View File

@ -108,6 +108,9 @@ exclude = ["tests/artifacts/**/*.safetensors"]
[tool.ruff.lint]
select = ["E4", "E7", "E9", "F", "I", "N", "B", "C4", "SIM"]
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401", "F403"]
[tool.bandit]
exclude_dirs = [
"tests",

View File

@ -6,47 +6,42 @@ import dynamixel_sdk as dxl
import serial
from mock_serial import MockSerial
from lerobot.common.motors.dynamixel.dynamixel import X_SERIES_CONTROL_TABLE
from lerobot.common.motors.dynamixel import X_SERIES_CONTROL_TABLE
# https://emanual.robotis.com/docs/en/dxl/crc/
DXL_CRC_TABLE = [
0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014,
0x8011, 0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D,
0x8027, 0x0022, 0x8063, 0x0066, 0x006C, 0x8069, 0x0078,
0x807D, 0x8077, 0x0072, 0x0050, 0x8055, 0x805F, 0x005A,
0x804B, 0x004E, 0x0044, 0x8041, 0x80C3, 0x00C6, 0x00CC,
0x80C9, 0x00D8, 0x80DD, 0x80D7, 0x00D2, 0x00F0, 0x80F5,
0x80FF, 0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1, 0x00A0,
0x80A5, 0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1,
0x8093, 0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087,
0x0082, 0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D,
0x8197, 0x0192, 0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB,
0x01AE, 0x01A4, 0x81A1, 0x01E0, 0x81E5, 0x81EF, 0x01EA,
0x81FB, 0x01FE, 0x01F4, 0x81F1, 0x81D3, 0x01D6, 0x01DC,
0x81D9, 0x01C8, 0x81CD, 0x81C7, 0x01C2, 0x0140, 0x8145,
0x814F, 0x014A, 0x815B, 0x015E, 0x0154, 0x8151, 0x8173,
0x0176, 0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162,
0x8123, 0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137,
0x0132, 0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E,
0x0104, 0x8101, 0x8303, 0x0306, 0x030C, 0x8309, 0x0318,
0x831D, 0x8317, 0x0312, 0x0330, 0x8335, 0x833F, 0x033A,
0x832B, 0x032E, 0x0324, 0x8321, 0x0360, 0x8365, 0x836F,
0x036A, 0x837B, 0x037E, 0x0374, 0x8371, 0x8353, 0x0356,
0x035C, 0x8359, 0x0348, 0x834D, 0x8347, 0x0342, 0x03C0,
0x83C5, 0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1,
0x83F3, 0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7,
0x03E2, 0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD,
0x83B7, 0x03B2, 0x0390, 0x8395, 0x839F, 0x039A, 0x838B,
0x038E, 0x0384, 0x8381, 0x0280, 0x8285, 0x828F, 0x028A,
0x829B, 0x029E, 0x0294, 0x8291, 0x82B3, 0x02B6, 0x02BC,
0x82B9, 0x02A8, 0x82AD, 0x82A7, 0x02A2, 0x82E3, 0x02E6,
0x02EC, 0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2, 0x02D0,
0x82D5, 0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1,
0x8243, 0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257,
0x0252, 0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E,
0x0264, 0x8261, 0x0220, 0x8225, 0x822F, 0x022A, 0x823B,
0x023E, 0x0234, 0x8231, 0x8213, 0x0216, 0x021C, 0x8219,
0x0208, 0x820D, 0x8207, 0x0202
0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011,
0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D, 0x8027, 0x0022,
0x8063, 0x0066, 0x006C, 0x8069, 0x0078, 0x807D, 0x8077, 0x0072,
0x0050, 0x8055, 0x805F, 0x005A, 0x804B, 0x004E, 0x0044, 0x8041,
0x80C3, 0x00C6, 0x00CC, 0x80C9, 0x00D8, 0x80DD, 0x80D7, 0x00D2,
0x00F0, 0x80F5, 0x80FF, 0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1,
0x00A0, 0x80A5, 0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1,
0x8093, 0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087, 0x0082,
0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D, 0x8197, 0x0192,
0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB, 0x01AE, 0x01A4, 0x81A1,
0x01E0, 0x81E5, 0x81EF, 0x01EA, 0x81FB, 0x01FE, 0x01F4, 0x81F1,
0x81D3, 0x01D6, 0x01DC, 0x81D9, 0x01C8, 0x81CD, 0x81C7, 0x01C2,
0x0140, 0x8145, 0x814F, 0x014A, 0x815B, 0x015E, 0x0154, 0x8151,
0x8173, 0x0176, 0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162,
0x8123, 0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137, 0x0132,
0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E, 0x0104, 0x8101,
0x8303, 0x0306, 0x030C, 0x8309, 0x0318, 0x831D, 0x8317, 0x0312,
0x0330, 0x8335, 0x833F, 0x033A, 0x832B, 0x032E, 0x0324, 0x8321,
0x0360, 0x8365, 0x836F, 0x036A, 0x837B, 0x037E, 0x0374, 0x8371,
0x8353, 0x0356, 0x035C, 0x8359, 0x0348, 0x834D, 0x8347, 0x0342,
0x03C0, 0x83C5, 0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1,
0x83F3, 0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7, 0x03E2,
0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD, 0x83B7, 0x03B2,
0x0390, 0x8395, 0x839F, 0x039A, 0x838B, 0x038E, 0x0384, 0x8381,
0x0280, 0x8285, 0x828F, 0x028A, 0x829B, 0x029E, 0x0294, 0x8291,
0x82B3, 0x02B6, 0x02BC, 0x82B9, 0x02A8, 0x82AD, 0x82A7, 0x02A2,
0x82E3, 0x02E6, 0x02EC, 0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2,
0x02D0, 0x82D5, 0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1,
0x8243, 0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257, 0x0252,
0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E, 0x0264, 0x8261,
0x0220, 0x8225, 0x822F, 0x022A, 0x823B, 0x023E, 0x0234, 0x8231,
0x8213, 0x0216, 0x021C, 0x8219, 0x0208, 0x820D, 0x8207, 0x0202
] # fmt: skip
# https://emanual.robotis.com/docs/en/dxl/protocol2/#instruction
@ -154,6 +149,17 @@ class MockDynamixelPacketv2(abc.ABC):
@staticmethod
def _add_crc(packet: list[int]) -> list[int]:
"""Computes and add CRC to the packet.
https://emanual.robotis.com/docs/en/dxl/crc/
https://en.wikipedia.org/wiki/Cyclic_redundancy_check
Args:
packet (list[int]): The raw packet without CRC (but with placeholders for it).
Returns:
list[int]: The raw packet with a valid CRC.
"""
crc = 0
for j in range(len(packet) - 2):
i = ((crc >> 8) ^ packet[j]) & 0xFF
@ -191,6 +197,20 @@ class MockInstructionPacket(MockDynamixelPacketv2):
0x00, 0x00 # placeholder for CRC
] # fmt: skip
@classmethod
def ping(
cls,
dxl_id: int,
) -> bytes:
"""
Builds a "Ping" broadcast instruction.
(from https://emanual.robotis.com/docs/en/dxl/protocol2/#ping-0x01)
No parameters required.
"""
params, length = [], 3
return cls.build(dxl_id=dxl_id, params=params, length=length, instruct_type="Ping")
@classmethod
def sync_read(
cls,
@ -252,12 +272,32 @@ class MockStatusPacket(MockDynamixelPacketv2):
0x00, 0x00 # placeholder for CRC
] # fmt: skip
@classmethod
def ping(cls, dxl_id: int, model_nb: int = 1190, firm_ver: int = 50) -> bytes:
"""Builds a 'Ping' status packet.
https://emanual.robotis.com/docs/en/dxl/protocol2/#ping-0x01
Args:
dxl_id (int): ID of the servo responding.
model_nb (int, optional): Desired 'model number' to be returned in the packet. Defaults to 1190
which corresponds to a XL330-M077-T.
firm_ver (int, optional): Desired 'firmware version' to be returned in the packet.
Defaults to 50.
Returns:
bytes: The raw 'Ping' status packet ready to be sent through serial.
"""
params = [dxl.DXL_LOBYTE(model_nb), dxl.DXL_HIBYTE(model_nb), firm_ver]
length = 7
return cls.build(dxl_id, params=params, length=length)
@classmethod
def present_position(cls, dxl_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes:
"""Builds a 'Present_Position' status packet.
Args:
dxl_id (int): List of the servos ids
dxl_id (int): Servo id
pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it
will use a random value in the min_max_range. Defaults to None.
min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos'
@ -308,62 +348,69 @@ class MockMotors(MockSerial):
ctrl_table = X_SERIES_CONTROL_TABLE
def __init__(self, dlx_ids: list[int]):
def __init__(self):
super().__init__()
self._ids = dlx_ids
self.open()
def build_single_motor_stubs(
self, data_name: str, return_value: int | None = None, num_invalid_try: int | None = None
) -> None:
address, length = self.ctrl_table[data_name]
for idx in self._ids:
if data_name == "Present_Position":
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
sync_read_response_single = self._build_present_pos_send_fn(
[idx], [return_value], num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
self.stub(
name=f"SyncRead_{data_name}_{idx}",
receive_bytes=sync_read_request_single,
send_fn=sync_read_response_single,
)
def build_all_motors_stub(
self, data_name: str, return_values: list[int] | None = None, num_invalid_try: int | None = None
) -> None:
address, length = self.ctrl_table[data_name]
if data_name == "Present_Position":
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
sync_read_response_all = self._build_present_pos_send_fn(
self._ids, return_values, num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
self.stub(
name=f"SyncRead_{data_name}_all",
receive_bytes=sync_read_request_all,
send_fn=sync_read_response_all,
def build_broadcast_ping_stub(
self, ids_models_firmwares: dict[int, list[int]] | None = None, num_invalid_try: int = 0
) -> str:
ping_request = MockInstructionPacket.ping(dxl.BROADCAST_ID)
return_packets = b"".join(
MockStatusPacket.ping(idx, model, firm_ver)
for idx, (model, firm_ver) in ids_models_firmwares.items()
)
ping_response = self._build_send_fn(return_packets, num_invalid_try)
def _build_present_pos_send_fn(
self, dxl_ids: list[int], return_pos: list[int] | None = None, num_invalid_try: int | None = None
) -> Callable[[int], bytes]:
return_pos = [None for _ in dxl_ids] if return_pos is None else return_pos
assert len(return_pos) == len(dxl_ids)
stub_name = "Ping_" + "_".join([str(idx) for idx in ids_models_firmwares])
self.stub(
name=stub_name,
receive_bytes=ping_request,
send_fn=ping_response,
)
return stub_name
def build_ping_stub(
self, dxl_id: int, model_nb: int, firm_ver: int = 50, num_invalid_try: int = 0
) -> str:
ping_request = MockInstructionPacket.ping(dxl_id)
return_packet = MockStatusPacket.ping(dxl_id, model_nb, firm_ver)
ping_response = self._build_send_fn(return_packet, num_invalid_try)
stub_name = f"Ping_{dxl_id}"
self.stub(
name=stub_name,
receive_bytes=ping_request,
send_fn=ping_response,
)
return stub_name
def build_sync_read_stub(
self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0
) -> str:
address, length = self.ctrl_table[data_name]
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
if data_name != "Present_Position":
raise NotImplementedError
return_packets = b"".join(
MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items()
)
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values])
self.stub(
name=stub_name,
receive_bytes=sync_read_request,
send_fn=sync_read_response,
)
return stub_name
@staticmethod
def _build_send_fn(packet: bytes, num_invalid_try: int = 0) -> Callable[[int], bytes]:
def send_fn(_call_count: int) -> bytes:
if num_invalid_try is not None and num_invalid_try >= _call_count:
if num_invalid_try >= _call_count:
return b""
packets = b"".join(
MockStatusPacket.present_position(idx, pos)
for idx, pos in zip(dxl_ids, return_pos, strict=True)
)
return packets
return packet
return send_fn

View File

@ -6,7 +6,7 @@ import scservo_sdk as scs
import serial
from mock_serial import MockSerial
from lerobot.common.motors.feetech.feetech import SCS_SERIES_CONTROL_TABLE
from lerobot.common.motors.feetech.tables import SCS_SERIES_CONTROL_TABLE
# https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf
INSTRUCTION_TYPES = {
@ -77,6 +77,19 @@ class MockInstructionPacket(MockFeetechPacket):
0x00, # placeholder for checksum
] # fmt: skip
@classmethod
def ping(
cls,
scs_id: int,
) -> bytes:
"""
Builds a "Ping" broadcast instruction.
No parameters required.
"""
params, length = [], 2
return cls.build(scs_id=scs_id, params=params, length=length, instruct_type="Ping")
@classmethod
def sync_read(
cls,
@ -128,6 +141,25 @@ class MockStatusPacket(MockFeetechPacket):
0x00, # placeholder for checksum
] # fmt: skip
@classmethod
def ping(cls, scs_id: int, model_nb: int = 1190, firm_ver: int = 50) -> bytes:
"""Builds a 'Ping' status packet.
Args:
scs_id (int): ID of the servo responding.
model_nb (int, optional): Desired 'model number' to be returned in the packet. Defaults to 1190
which corresponds to a XL330-M077-T.
firm_ver (int, optional): Desired 'firmware version' to be returned in the packet.
Defaults to 50.
Returns:
bytes: The raw 'Ping' status packet ready to be sent through serial.
"""
# raise NotImplementedError
params = [scs.SCS_LOBYTE(model_nb), scs.SCS_HIBYTE(model_nb), firm_ver]
length = 2
return cls.build(scs_id, params=params, length=length)
@classmethod
def present_position(cls, scs_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes:
"""Builds a 'Present_Position' status packet.
@ -184,62 +216,69 @@ class MockMotors(MockSerial):
ctrl_table = SCS_SERIES_CONTROL_TABLE
def __init__(self, scs_ids: list[int]):
def __init__(self):
super().__init__()
self._ids = scs_ids
self.open()
def build_single_motor_stubs(
self, data_name: str, return_value: int | None = None, num_invalid_try: int | None = None
) -> None:
address, length = self.ctrl_table[data_name]
for idx in self._ids:
if data_name == "Present_Position":
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
sync_read_response_single = self._build_present_pos_send_fn(
[idx], [return_value], num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
self.stub(
name=f"SyncRead_{data_name}_{idx}",
receive_bytes=sync_read_request_single,
send_fn=sync_read_response_single,
)
def build_all_motors_stub(
self, data_name: str, return_values: list[int] | None = None, num_invalid_try: int | None = None
) -> None:
address, length = self.ctrl_table[data_name]
if data_name == "Present_Position":
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
sync_read_response_all = self._build_present_pos_send_fn(
self._ids, return_values, num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
self.stub(
name=f"SyncRead_{data_name}_all",
receive_bytes=sync_read_request_all,
send_fn=sync_read_response_all,
def build_broadcast_ping_stub(
self, ids_models_firmwares: dict[int, list[int]] | None = None, num_invalid_try: int = 0
) -> str:
ping_request = MockInstructionPacket.ping(scs.BROADCAST_ID)
return_packets = b"".join(
MockStatusPacket.ping(idx, model, firm_ver)
for idx, (model, firm_ver) in ids_models_firmwares.items()
)
ping_response = self._build_send_fn(return_packets, num_invalid_try)
def _build_present_pos_send_fn(
self, scs_ids: list[int], return_pos: list[int] | None = None, num_invalid_try: int | None = None
) -> Callable[[int], bytes]:
return_pos = [None for _ in scs_ids] if return_pos is None else return_pos
assert len(return_pos) == len(scs_ids)
stub_name = "Ping_" + "_".join([str(idx) for idx in ids_models_firmwares])
self.stub(
name=stub_name,
receive_bytes=ping_request,
send_fn=ping_response,
)
return stub_name
def build_ping_stub(
self, scs_id: int, model_nb: int, firm_ver: int = 50, num_invalid_try: int = 0
) -> str:
ping_request = MockInstructionPacket.ping(scs_id)
return_packet = MockStatusPacket.ping(scs_id, model_nb, firm_ver)
ping_response = self._build_send_fn(return_packet, num_invalid_try)
stub_name = f"Ping_{scs_id}"
self.stub(
name=stub_name,
receive_bytes=ping_request,
send_fn=ping_response,
)
return stub_name
def build_sync_read_stub(
self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0
) -> str:
address, length = self.ctrl_table[data_name]
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
if data_name != "Present_Position":
raise NotImplementedError
return_packets = b"".join(
MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items()
)
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values])
self.stub(
name=stub_name,
receive_bytes=sync_read_request,
send_fn=sync_read_response,
)
return stub_name
@staticmethod
def _build_send_fn(packet: bytes, num_invalid_try: int = 0) -> Callable[[int], bytes]:
def send_fn(_call_count: int) -> bytes:
if num_invalid_try is not None and num_invalid_try >= _call_count:
if num_invalid_try >= _call_count:
return b""
packets = b"".join(
MockStatusPacket.present_position(idx, pos)
for idx, pos in zip(scs_ids, return_pos, strict=True)
)
return packets
return packet
return send_fn

View File

@ -4,6 +4,7 @@ from unittest.mock import patch
import dynamixel_sdk as dxl
import pytest
from lerobot.common.motors import Motor
from lerobot.common.motors.dynamixel import DynamixelMotorsBus
from tests.mocks.mock_dynamixel import MockMotors, MockPortHandler
@ -17,6 +18,15 @@ def patch_port_handler():
yield
@pytest.fixture
def dummy_motors() -> dict[str, Motor]:
return {
"dummy_1": Motor(id=1, model="xl430-w250"),
"dummy_2": Motor(id=2, model="xm540-w270"),
"dummy_3": Motor(id=3, model="xl330-m077"),
}
@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}")
def test_autouse_patch():
"""Ensures that the autouse fixture correctly patches dxl.PortHandler with MockPortHandler."""
@ -68,9 +78,50 @@ def test_split_int_bytes_large_number():
DynamixelMotorsBus.split_int_bytes(2**32, 4) # 4-byte max is 0xFFFFFFFF
def test_abc_implementation():
def test_abc_implementation(dummy_motors):
"""Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
DynamixelMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "xl330-m077")})
DynamixelMotorsBus(port="/dev/dummy-port", motors=dummy_motors)
@pytest.mark.parametrize(
"idx, model_nb",
[
[1, 1190],
[2, 1200],
[3, 1120],
],
)
def test_ping(idx, model_nb, dummy_motors):
mock_motors = MockMotors()
mock_motors.build_ping_stub(idx, model_nb)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
ping_model_nb = motors_bus.ping(idx)
assert ping_model_nb == model_nb
def test_broadcast_ping(dummy_motors):
expected_pings = {
1: [1060, 50],
2: [1120, 30],
3: [1190, 10],
}
mock_motors = MockMotors()
mock_motors.build_broadcast_ping_stub(expected_pings)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
ping_list = motors_bus.broadcast_ping()
assert ping_list == expected_pings
@pytest.mark.parametrize(
@ -83,26 +134,25 @@ def test_abc_implementation():
],
ids=["None", "by ids", "by names", "mixed"],
)
def test_read_all_motors(motors):
mock_motors = MockMotors([1, 2, 3])
positions = [1337, 42, 4016]
mock_motors.build_all_motors_stub("Present_Position", return_values=positions)
def test_read_all_motors(motors, dummy_motors):
mock_motors = MockMotors()
expected_positions = {
1: 1337,
2: 42,
3: 4016,
}
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_positions)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors={
"dummy_1": (1, "xl330-m077"),
"dummy_2": (2, "xl330-m077"),
"dummy_3": (3, "xl330-m077"),
},
motors=dummy_motors,
)
motors_bus.connect()
pos_dict = motors_bus.read("Present_Position", motors=motors)
positions_read = motors_bus.read("Present_Position", motors=motors)
assert mock_motors.stubs["SyncRead_Present_Position_all"].called
assert all(returned_pos == pos for returned_pos, pos in zip(pos_dict.values(), positions, strict=True))
assert set(pos_dict) == {"dummy_1", "dummy_2", "dummy_3"}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
motors = ["dummy_1", "dummy_2", "dummy_3"] if motors is None else motors
assert mock_motors.stubs[stub_name].called
assert positions_read == dict(zip(motors, expected_positions.values(), strict=True))
@pytest.mark.parametrize(
@ -113,24 +163,20 @@ def test_read_all_motors(motors):
[3, 4016],
],
)
def test_read_single_motor_by_name(idx, pos):
mock_motors = MockMotors([1, 2, 3])
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
def test_read_single_motor_by_name(idx, pos, dummy_motors):
mock_motors = MockMotors()
expected_position = {idx: pos}
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors={
"dummy_1": (1, "xl330-m077"),
"dummy_2": (2, "xl330-m077"),
"dummy_3": (3, "xl330-m077"),
},
motors=dummy_motors,
)
motors_bus.connect()
pos_dict = motors_bus.read("Present_Position", f"dummy_{idx}")
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
assert mock_motors.stubs[stub_name].called
assert pos_dict == {f"dummy_{idx}": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
@pytest.mark.parametrize(
@ -141,56 +187,49 @@ def test_read_single_motor_by_name(idx, pos):
[3, 4016],
],
)
def test_read_single_motor_by_id(idx, pos):
mock_motors = MockMotors([1, 2, 3])
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
def test_read_single_motor_by_id(idx, pos, dummy_motors):
mock_motors = MockMotors()
expected_position = {idx: pos}
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors={
"dummy_1": (1, "xl330-m077"),
"dummy_2": (2, "xl330-m077"),
"dummy_3": (3, "xl330-m077"),
},
motors=dummy_motors,
)
motors_bus.connect()
pos_dict = motors_bus.read("Present_Position", idx)
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
assert pos_dict == {f"dummy_{idx}": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
assert mock_motors.stubs[stub_name].called
assert pos_dict == {idx: pos}
@pytest.mark.parametrize(
"num_retry, num_invalid_try, pos",
[
[1, 2, 1337],
[0, 2, 1337],
[2, 3, 42],
[3, 2, 4016],
[2, 1, 999],
],
)
def test_read_num_retry(num_retry, num_invalid_try, pos):
mock_motors = MockMotors([1, 2, 3])
mock_motors.build_single_motor_stubs(
"Present_Position", return_value=pos, num_invalid_try=num_invalid_try
def test_read_num_retry(num_retry, num_invalid_try, pos, dummy_motors):
mock_motors = MockMotors()
expected_position = {1: pos}
stub_name = mock_motors.build_sync_read_stub(
"Present_Position", expected_position, num_invalid_try=num_invalid_try
)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors={
"dummy_1": (1, "xl330-m077"),
"dummy_2": (2, "xl330-m077"),
"dummy_3": (3, "xl330-m077"),
},
motors=dummy_motors,
)
motors_bus.connect()
if num_retry >= num_invalid_try:
pos_dict = motors_bus.read("Present_Position", 1, num_retry=num_retry)
assert pos_dict == {"dummy_1": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
assert pos_dict == {1: pos}
else:
with pytest.raises(ConnectionError):
_ = motors_bus.read("Present_Position", 1, num_retry=num_retry)
assert mock_motors.stubs["SyncRead_Present_Position_1"].calls == num_retry
expected_calls = min(1 + num_retry, 1 + num_invalid_try)
assert mock_motors.stubs[stub_name].calls == expected_calls

View File

@ -4,6 +4,7 @@ from unittest.mock import patch
import pytest
import scservo_sdk as scs
from lerobot.common.motors import Motor
from lerobot.common.motors.feetech import FeetechMotorsBus
from tests.mocks.mock_feetech import MockMotors, MockPortHandler
@ -17,6 +18,15 @@ def patch_port_handler():
yield
@pytest.fixture
def dummy_motors() -> dict[str, Motor]:
return {
"dummy_1": Motor(id=1, model="sts3215"),
"dummy_2": Motor(id=2, model="sts3215"),
"dummy_3": Motor(id=3, model="sts3215"),
}
@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}")
def test_autouse_patch():
"""Ensures that the autouse fixture correctly patches scs.PortHandler with MockPortHandler."""
@ -68,9 +78,52 @@ def test_split_int_bytes_large_number():
FeetechMotorsBus.split_int_bytes(2**32, 4) # 4-byte max is 0xFFFFFFFF
def test_abc_implementation():
def test_abc_implementation(dummy_motors):
"""Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
FeetechMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "sts3215")})
FeetechMotorsBus(port="/dev/dummy-port", motors=dummy_motors)
@pytest.mark.skip("TODO")
@pytest.mark.parametrize(
"idx, model_nb",
[
[1, 1190],
[2, 1200],
[3, 1120],
],
)
def test_ping(idx, model_nb, dummy_motors):
mock_motors = MockMotors()
mock_motors.build_ping_stub(idx, model_nb)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
ping_model_nb = motors_bus.ping(idx)
assert ping_model_nb == model_nb
@pytest.mark.skip("TODO")
def test_broadcast_ping(dummy_motors):
expected_pings = {
1: [1060, 50],
2: [1120, 30],
3: [1190, 10],
}
mock_motors = MockMotors()
mock_motors.build_broadcast_ping_stub(expected_pings)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
ping_list = motors_bus.broadcast_ping()
assert ping_list == expected_pings
@pytest.mark.parametrize(
@ -83,26 +136,25 @@ def test_abc_implementation():
],
ids=["None", "by ids", "by names", "mixed"],
)
def test_read_all_motors(motors):
mock_motors = MockMotors([1, 2, 3])
positions = [1337, 42, 4016]
mock_motors.build_all_motors_stub("Present_Position", return_values=positions)
def test_read_all_motors(motors, dummy_motors):
mock_motors = MockMotors()
expected_positions = {
1: 1337,
2: 42,
3: 4016,
}
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_positions)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors={
"dummy_1": (1, "sts3215"),
"dummy_2": (2, "sts3215"),
"dummy_3": (3, "sts3215"),
},
motors=dummy_motors,
)
motors_bus.connect()
pos_dict = motors_bus.read("Present_Position", motors=motors)
positions_read = motors_bus.read("Present_Position", motors=motors)
assert mock_motors.stubs["SyncRead_Present_Position_all"].called
assert all(returned_pos == pos for returned_pos, pos in zip(pos_dict.values(), positions, strict=True))
assert set(pos_dict) == {"dummy_1", "dummy_2", "dummy_3"}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
motors = ["dummy_1", "dummy_2", "dummy_3"] if motors is None else motors
assert mock_motors.stubs[stub_name].called
assert positions_read == dict(zip(motors, expected_positions.values(), strict=True))
@pytest.mark.parametrize(
@ -113,24 +165,20 @@ def test_read_all_motors(motors):
[3, 4016],
],
)
def test_read_single_motor_by_name(idx, pos):
mock_motors = MockMotors([1, 2, 3])
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
def test_read_single_motor_by_name(idx, pos, dummy_motors):
mock_motors = MockMotors()
expected_position = {idx: pos}
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors={
"dummy_1": (1, "sts3215"),
"dummy_2": (2, "sts3215"),
"dummy_3": (3, "sts3215"),
},
motors=dummy_motors,
)
motors_bus.connect()
pos_dict = motors_bus.read("Present_Position", f"dummy_{idx}")
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
assert mock_motors.stubs[stub_name].called
assert pos_dict == {f"dummy_{idx}": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
@pytest.mark.parametrize(
@ -141,56 +189,49 @@ def test_read_single_motor_by_name(idx, pos):
[3, 4016],
],
)
def test_read_single_motor_by_id(idx, pos):
mock_motors = MockMotors([1, 2, 3])
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
def test_read_single_motor_by_id(idx, pos, dummy_motors):
mock_motors = MockMotors()
expected_position = {idx: pos}
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors={
"dummy_1": (1, "sts3215"),
"dummy_2": (2, "sts3215"),
"dummy_3": (3, "sts3215"),
},
motors=dummy_motors,
)
motors_bus.connect()
pos_dict = motors_bus.read("Present_Position", idx)
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
assert pos_dict == {f"dummy_{idx}": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
assert mock_motors.stubs[stub_name].called
assert pos_dict == {idx: pos}
@pytest.mark.parametrize(
"num_retry, num_invalid_try, pos",
[
[1, 2, 1337],
[0, 2, 1337],
[2, 3, 42],
[3, 2, 4016],
[2, 1, 999],
],
)
def test_read_num_retry(num_retry, num_invalid_try, pos):
mock_motors = MockMotors([1, 2, 3])
mock_motors.build_single_motor_stubs(
"Present_Position", return_value=pos, num_invalid_try=num_invalid_try
def test_read_num_retry(num_retry, num_invalid_try, pos, dummy_motors):
mock_motors = MockMotors()
expected_position = {1: pos}
stub_name = mock_motors.build_sync_read_stub(
"Present_Position", expected_position, num_invalid_try=num_invalid_try
)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors={
"dummy_1": (1, "sts3215"),
"dummy_2": (2, "sts3215"),
"dummy_3": (3, "sts3215"),
},
motors=dummy_motors,
)
motors_bus.connect()
if num_retry >= num_invalid_try:
pos_dict = motors_bus.read("Present_Position", 1, num_retry=num_retry)
assert pos_dict == {"dummy_1": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
assert pos_dict == {1: pos}
else:
with pytest.raises(ConnectionError):
_ = motors_bus.read("Present_Position", 1, num_retry=num_retry)
assert mock_motors.stubs["SyncRead_Present_Position_1"].calls == num_retry
expected_calls = min(1 + num_retry, 1 + num_invalid_try)
assert mock_motors.stubs[stub_name].calls == expected_calls