Compare commits
10 Commits
6541982dff
...
9e34c1d731
Author | SHA1 | Date |
---|---|---|
|
9e34c1d731 | |
|
857f335be9 | |
|
fc4a95f187 | |
|
4fe1880887 | |
|
6fa859fa19 | |
|
2abfa5838d | |
|
3d119c0ccb | |
|
a32081757d | |
|
56c04ffc53 | |
|
715d4557af |
|
@ -1,3 +1 @@
|
|||
from .motors_bus import CalibrationMode, DriveMode, MotorsBus, TorqueMode
|
||||
|
||||
__all__ = ["CalibrationMode", "DriveMode", "MotorsBus", "TorqueMode"]
|
||||
from .motors_bus import CalibrationMode, DriveMode, Motor, MotorsBus, TorqueMode
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
from .dynamixel import DynamixelMotorsBus
|
||||
from .dynamixel_calibration import run_arm_calibration
|
||||
|
||||
__all__ = ["DynamixelMotorsBus", "run_arm_calibration"]
|
||||
from .tables import *
|
||||
|
|
|
@ -20,153 +20,17 @@
|
|||
|
||||
from copy import deepcopy
|
||||
|
||||
import numpy as np
|
||||
|
||||
from ..motors_bus import MotorsBus
|
||||
from .tables import MODEL_BAUDRATE_TABLE, MODEL_CONTROL_TABLE, MODEL_RESOLUTION
|
||||
|
||||
PROTOCOL_VERSION = 2.0
|
||||
BAUDRATE = 1_000_000
|
||||
DEFAULT_TIMEOUT_MS = 1000
|
||||
|
||||
MAX_ID_RANGE = 252
|
||||
|
||||
# The following bounds define the lower and upper joints range (after calibration).
|
||||
# For joints in degree (i.e. revolute joints), their nominal range is [-180, 180] degrees
|
||||
# which corresponds to a half rotation on the left and half rotation on the right.
|
||||
# Some joints might require higher range, so we allow up to [-270, 270] degrees until
|
||||
# an error is raised.
|
||||
LOWER_BOUND_DEGREE = -270
|
||||
UPPER_BOUND_DEGREE = 270
|
||||
|
||||
# For joints in percentage (i.e. joints that move linearly like the prismatic joint of a gripper),
|
||||
# their nominal range is [0, 100] %. For instance, for Aloha gripper, 0% is fully
|
||||
# closed, and 100% is fully open. To account for slight calibration issue, we allow up to
|
||||
# [-10, 110] until an error is raised.
|
||||
LOWER_BOUND_LINEAR = -10
|
||||
UPPER_BOUND_LINEAR = 110
|
||||
|
||||
HALF_TURN_DEGREE = 180
|
||||
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m077
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m288
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/xl430-w250
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/xm430-w350
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/xm540-w270
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/xc430-w150
|
||||
|
||||
# data_name: (address, size_byte)
|
||||
X_SERIES_CONTROL_TABLE = {
|
||||
"Model_Number": (0, 2),
|
||||
"Model_Information": (2, 4),
|
||||
"Firmware_Version": (6, 1),
|
||||
"ID": (7, 1),
|
||||
"Baud_Rate": (8, 1),
|
||||
"Return_Delay_Time": (9, 1),
|
||||
"Drive_Mode": (10, 1),
|
||||
"Operating_Mode": (11, 1),
|
||||
"Secondary_ID": (12, 1),
|
||||
"Protocol_Type": (13, 1),
|
||||
"Homing_Offset": (20, 4),
|
||||
"Moving_Threshold": (24, 4),
|
||||
"Temperature_Limit": (31, 1),
|
||||
"Max_Voltage_Limit": (32, 2),
|
||||
"Min_Voltage_Limit": (34, 2),
|
||||
"PWM_Limit": (36, 2),
|
||||
"Current_Limit": (38, 2),
|
||||
"Acceleration_Limit": (40, 4),
|
||||
"Velocity_Limit": (44, 4),
|
||||
"Max_Position_Limit": (48, 4),
|
||||
"Min_Position_Limit": (52, 4),
|
||||
"Shutdown": (63, 1),
|
||||
"Torque_Enable": (64, 1),
|
||||
"LED": (65, 1),
|
||||
"Status_Return_Level": (68, 1),
|
||||
"Registered_Instruction": (69, 1),
|
||||
"Hardware_Error_Status": (70, 1),
|
||||
"Velocity_I_Gain": (76, 2),
|
||||
"Velocity_P_Gain": (78, 2),
|
||||
"Position_D_Gain": (80, 2),
|
||||
"Position_I_Gain": (82, 2),
|
||||
"Position_P_Gain": (84, 2),
|
||||
"Feedforward_2nd_Gain": (88, 2),
|
||||
"Feedforward_1st_Gain": (90, 2),
|
||||
"Bus_Watchdog": (98, 1),
|
||||
"Goal_PWM": (100, 2),
|
||||
"Goal_Current": (102, 2),
|
||||
"Goal_Velocity": (104, 4),
|
||||
"Profile_Acceleration": (108, 4),
|
||||
"Profile_Velocity": (112, 4),
|
||||
"Goal_Position": (116, 4),
|
||||
"Realtime_Tick": (120, 2),
|
||||
"Moving": (122, 1),
|
||||
"Moving_Status": (123, 1),
|
||||
"Present_PWM": (124, 2),
|
||||
"Present_Current": (126, 2),
|
||||
"Present_Velocity": (128, 4),
|
||||
"Present_Position": (132, 4),
|
||||
"Velocity_Trajectory": (136, 4),
|
||||
"Position_Trajectory": (140, 4),
|
||||
"Present_Input_Voltage": (144, 2),
|
||||
"Present_Temperature": (146, 1),
|
||||
}
|
||||
|
||||
X_SERIES_BAUDRATE_TABLE = {
|
||||
0: 9_600,
|
||||
1: 57_600,
|
||||
2: 115_200,
|
||||
3: 1_000_000,
|
||||
4: 2_000_000,
|
||||
5: 3_000_000,
|
||||
6: 4_000_000,
|
||||
}
|
||||
|
||||
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]
|
||||
CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
|
||||
|
||||
MODEL_CONTROL_TABLE = {
|
||||
"x_series": X_SERIES_CONTROL_TABLE,
|
||||
"xl330-m077": X_SERIES_CONTROL_TABLE,
|
||||
"xl330-m288": X_SERIES_CONTROL_TABLE,
|
||||
"xl430-w250": X_SERIES_CONTROL_TABLE,
|
||||
"xm430-w350": X_SERIES_CONTROL_TABLE,
|
||||
"xm540-w270": X_SERIES_CONTROL_TABLE,
|
||||
"xc430-w150": X_SERIES_CONTROL_TABLE,
|
||||
}
|
||||
|
||||
MODEL_RESOLUTION = {
|
||||
"x_series": 4096,
|
||||
"xl330-m077": 4096,
|
||||
"xl330-m288": 4096,
|
||||
"xl430-w250": 4096,
|
||||
"xm430-w350": 4096,
|
||||
"xm540-w270": 4096,
|
||||
"xc430-w150": 4096,
|
||||
}
|
||||
|
||||
MODEL_BAUDRATE_TABLE = {
|
||||
"x_series": X_SERIES_BAUDRATE_TABLE,
|
||||
"xl330-m077": X_SERIES_BAUDRATE_TABLE,
|
||||
"xl330-m288": X_SERIES_BAUDRATE_TABLE,
|
||||
"xl430-w250": X_SERIES_BAUDRATE_TABLE,
|
||||
"xm430-w350": X_SERIES_BAUDRATE_TABLE,
|
||||
"xm540-w270": X_SERIES_BAUDRATE_TABLE,
|
||||
"xc430-w150": X_SERIES_BAUDRATE_TABLE,
|
||||
}
|
||||
|
||||
NUM_READ_RETRY = 10
|
||||
NUM_WRITE_RETRY = 10
|
||||
|
||||
|
||||
def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str]) -> np.ndarray:
|
||||
"""This function converts the degree range to the step range for indicating motors rotation.
|
||||
It assumes a motor achieves a full rotation by going from -180 degree position to +180.
|
||||
The motor resolution (e.g. 4096) corresponds to the number of steps needed to achieve a full rotation.
|
||||
"""
|
||||
resolutions = [MODEL_RESOLUTION[model] for model in models]
|
||||
steps = degrees / 180 * np.array(resolutions) / 2
|
||||
steps = steps.astype(int)
|
||||
return steps
|
||||
|
||||
|
||||
class DynamixelMotorsBus(MotorsBus):
|
||||
"""
|
||||
|
@ -194,10 +58,16 @@ class DynamixelMotorsBus(MotorsBus):
|
|||
self.reader = dxl.GroupSyncRead(self.port_handler, self.packet_handler, 0, 0)
|
||||
self.writer = dxl.GroupSyncWrite(self.port_handler, self.packet_handler, 0, 0)
|
||||
|
||||
def broadcast_ping(self) -> tuple[list, int]:
|
||||
data_list, comm = self.packet_handler.broadcastPing(self.port_handler)
|
||||
# TODO(aliberts): translate data_list into {id: model}
|
||||
return data_list, comm
|
||||
def broadcast_ping(
|
||||
self, num_retry: int = 0, raise_on_error: bool = False
|
||||
) -> dict[int, list[int, int]] | None:
|
||||
for _ in range(1 + num_retry):
|
||||
data_list, comm = self.packet_handler.broadcastPing(self.port_handler)
|
||||
if self._is_comm_success(comm):
|
||||
return data_list
|
||||
|
||||
if raise_on_error:
|
||||
raise ConnectionError(f"Broadcast ping returned a {comm} comm error.")
|
||||
|
||||
def calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
|
||||
# TODO
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
# data_name: (address, size_byte)
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#control-table
|
||||
X_SERIES_CONTROL_TABLE = {
|
||||
"Model_Number": (0, 2),
|
||||
"Model_Information": (2, 4),
|
||||
"Firmware_Version": (6, 1),
|
||||
"ID": (7, 1),
|
||||
"Baud_Rate": (8, 1),
|
||||
"Return_Delay_Time": (9, 1),
|
||||
"Drive_Mode": (10, 1),
|
||||
"Operating_Mode": (11, 1),
|
||||
"Secondary_ID": (12, 1),
|
||||
"Protocol_Type": (13, 1),
|
||||
"Homing_Offset": (20, 4),
|
||||
"Moving_Threshold": (24, 4),
|
||||
"Temperature_Limit": (31, 1),
|
||||
"Max_Voltage_Limit": (32, 2),
|
||||
"Min_Voltage_Limit": (34, 2),
|
||||
"PWM_Limit": (36, 2),
|
||||
"Current_Limit": (38, 2),
|
||||
"Acceleration_Limit": (40, 4),
|
||||
"Velocity_Limit": (44, 4),
|
||||
"Max_Position_Limit": (48, 4),
|
||||
"Min_Position_Limit": (52, 4),
|
||||
"Shutdown": (63, 1),
|
||||
"Torque_Enable": (64, 1),
|
||||
"LED": (65, 1),
|
||||
"Status_Return_Level": (68, 1),
|
||||
"Registered_Instruction": (69, 1),
|
||||
"Hardware_Error_Status": (70, 1),
|
||||
"Velocity_I_Gain": (76, 2),
|
||||
"Velocity_P_Gain": (78, 2),
|
||||
"Position_D_Gain": (80, 2),
|
||||
"Position_I_Gain": (82, 2),
|
||||
"Position_P_Gain": (84, 2),
|
||||
"Feedforward_2nd_Gain": (88, 2),
|
||||
"Feedforward_1st_Gain": (90, 2),
|
||||
"Bus_Watchdog": (98, 1),
|
||||
"Goal_PWM": (100, 2),
|
||||
"Goal_Current": (102, 2),
|
||||
"Goal_Velocity": (104, 4),
|
||||
"Profile_Acceleration": (108, 4),
|
||||
"Profile_Velocity": (112, 4),
|
||||
"Goal_Position": (116, 4),
|
||||
"Realtime_Tick": (120, 2),
|
||||
"Moving": (122, 1),
|
||||
"Moving_Status": (123, 1),
|
||||
"Present_PWM": (124, 2),
|
||||
"Present_Current": (126, 2),
|
||||
"Present_Velocity": (128, 4),
|
||||
"Present_Position": (132, 4),
|
||||
"Velocity_Trajectory": (136, 4),
|
||||
"Position_Trajectory": (140, 4),
|
||||
"Present_Input_Voltage": (144, 2),
|
||||
"Present_Temperature": (146, 1),
|
||||
}
|
||||
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#baud-rate8
|
||||
X_SERIES_BAUDRATE_TABLE = {
|
||||
0: 9_600,
|
||||
1: 57_600,
|
||||
2: 115_200,
|
||||
3: 1_000_000,
|
||||
4: 2_000_000,
|
||||
5: 3_000_000,
|
||||
6: 4_000_000,
|
||||
}
|
||||
|
||||
# {model: model_resolution}
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#specifications
|
||||
MODEL_RESOLUTION = {
|
||||
"x_series": 4096,
|
||||
"xl330-m077": 4096,
|
||||
"xl330-m288": 4096,
|
||||
"xl430-w250": 4096,
|
||||
"xm430-w350": 4096,
|
||||
"xm540-w270": 4096,
|
||||
"xc430-w150": 4096,
|
||||
}
|
||||
|
||||
# {model: model_number}
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#control-table-of-eeprom-area
|
||||
MODELS_TABLE = {
|
||||
"xl330-m077": 1190,
|
||||
"xl330-m288": 1200,
|
||||
"xl430-w250": 1060,
|
||||
"xm430-w350": 1020,
|
||||
"xm540-w270": 1120,
|
||||
"xc430-w150": 1070,
|
||||
}
|
||||
|
||||
MODEL_CONTROL_TABLE = {
|
||||
"x_series": X_SERIES_CONTROL_TABLE,
|
||||
"xl330-m077": X_SERIES_CONTROL_TABLE,
|
||||
"xl330-m288": X_SERIES_CONTROL_TABLE,
|
||||
"xl430-w250": X_SERIES_CONTROL_TABLE,
|
||||
"xm430-w350": X_SERIES_CONTROL_TABLE,
|
||||
"xm540-w270": X_SERIES_CONTROL_TABLE,
|
||||
"xc430-w150": X_SERIES_CONTROL_TABLE,
|
||||
}
|
||||
|
||||
MODEL_BAUDRATE_TABLE = {
|
||||
"x_series": X_SERIES_BAUDRATE_TABLE,
|
||||
"xl330-m077": X_SERIES_BAUDRATE_TABLE,
|
||||
"xl330-m288": X_SERIES_BAUDRATE_TABLE,
|
||||
"xl430-w250": X_SERIES_BAUDRATE_TABLE,
|
||||
"xm430-w350": X_SERIES_BAUDRATE_TABLE,
|
||||
"xm540-w270": X_SERIES_BAUDRATE_TABLE,
|
||||
"xc430-w150": X_SERIES_BAUDRATE_TABLE,
|
||||
}
|
|
@ -1,8 +1,3 @@
|
|||
from .feetech import FeetechMotorsBus
|
||||
from .feetech_calibration import apply_feetech_offsets_from_calibration, run_full_arm_calibration
|
||||
|
||||
__all__ = [
|
||||
"FeetechMotorsBus",
|
||||
"apply_feetech_offsets_from_calibration",
|
||||
"run_full_arm_calibration",
|
||||
]
|
||||
from .tables import *
|
||||
|
|
|
@ -15,6 +15,12 @@
|
|||
from copy import deepcopy
|
||||
|
||||
from ..motors_bus import MotorsBus
|
||||
from .tables import (
|
||||
CALIBRATION_REQUIRED,
|
||||
MODEL_BAUDRATE_TABLE,
|
||||
MODEL_CONTROL_TABLE,
|
||||
MODEL_RESOLUTION,
|
||||
)
|
||||
|
||||
PROTOCOL_VERSION = 0
|
||||
BAUDRATE = 1_000_000
|
||||
|
@ -22,100 +28,6 @@ DEFAULT_TIMEOUT_MS = 1000
|
|||
|
||||
MAX_ID_RANGE = 252
|
||||
|
||||
# For joints in percentage (i.e. joints that move linearly like the prismatic joint of a gripper),
|
||||
# their nominal range is [0, 100] %. For instance, for Aloha gripper, 0% is fully
|
||||
# closed, and 100% is fully open. To account for slight calibration issue, we allow up to
|
||||
# [-10, 110] until an error is raised.
|
||||
LOWER_BOUND_LINEAR = -10
|
||||
UPPER_BOUND_LINEAR = 110
|
||||
|
||||
HALF_TURN_DEGREE = 180
|
||||
|
||||
# See this link for STS3215 Memory Table:
|
||||
# https://docs.google.com/spreadsheets/d/1GVs7W1VS1PqdhA1nW-abeyAHhTUxKUdR/edit?usp=sharing&ouid=116566590112741600240&rtpof=true&sd=true
|
||||
# data_name: (address, size_byte)
|
||||
SCS_SERIES_CONTROL_TABLE = {
|
||||
"Model": (3, 2),
|
||||
"ID": (5, 1),
|
||||
"Baud_Rate": (6, 1),
|
||||
"Return_Delay": (7, 1),
|
||||
"Response_Status_Level": (8, 1),
|
||||
"Min_Angle_Limit": (9, 2),
|
||||
"Max_Angle_Limit": (11, 2),
|
||||
"Max_Temperature_Limit": (13, 1),
|
||||
"Max_Voltage_Limit": (14, 1),
|
||||
"Min_Voltage_Limit": (15, 1),
|
||||
"Max_Torque_Limit": (16, 2),
|
||||
"Phase": (18, 1),
|
||||
"Unloading_Condition": (19, 1),
|
||||
"LED_Alarm_Condition": (20, 1),
|
||||
"P_Coefficient": (21, 1),
|
||||
"D_Coefficient": (22, 1),
|
||||
"I_Coefficient": (23, 1),
|
||||
"Minimum_Startup_Force": (24, 2),
|
||||
"CW_Dead_Zone": (26, 1),
|
||||
"CCW_Dead_Zone": (27, 1),
|
||||
"Protection_Current": (28, 2),
|
||||
"Angular_Resolution": (30, 1),
|
||||
"Offset": (31, 2),
|
||||
"Mode": (33, 1),
|
||||
"Protective_Torque": (34, 1),
|
||||
"Protection_Time": (35, 1),
|
||||
"Overload_Torque": (36, 1),
|
||||
"Speed_closed_loop_P_proportional_coefficient": (37, 1),
|
||||
"Over_Current_Protection_Time": (38, 1),
|
||||
"Velocity_closed_loop_I_integral_coefficient": (39, 1),
|
||||
"Torque_Enable": (40, 1),
|
||||
"Acceleration": (41, 1),
|
||||
"Goal_Position": (42, 2),
|
||||
"Goal_Time": (44, 2),
|
||||
"Goal_Speed": (46, 2),
|
||||
"Torque_Limit": (48, 2),
|
||||
"Lock": (55, 1),
|
||||
"Present_Position": (56, 2),
|
||||
"Present_Speed": (58, 2),
|
||||
"Present_Load": (60, 2),
|
||||
"Present_Voltage": (62, 1),
|
||||
"Present_Temperature": (63, 1),
|
||||
"Status": (65, 1),
|
||||
"Moving": (66, 1),
|
||||
"Present_Current": (69, 2),
|
||||
# Not in the Memory Table
|
||||
"Maximum_Acceleration": (85, 2),
|
||||
}
|
||||
|
||||
SCS_SERIES_BAUDRATE_TABLE = {
|
||||
0: 1_000_000,
|
||||
1: 500_000,
|
||||
2: 250_000,
|
||||
3: 128_000,
|
||||
4: 115_200,
|
||||
5: 57_600,
|
||||
6: 38_400,
|
||||
7: 19_200,
|
||||
}
|
||||
|
||||
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]
|
||||
|
||||
MODEL_CONTROL_TABLE = {
|
||||
"scs_series": SCS_SERIES_CONTROL_TABLE,
|
||||
"sts3215": SCS_SERIES_CONTROL_TABLE,
|
||||
}
|
||||
|
||||
MODEL_RESOLUTION = {
|
||||
"scs_series": 4096,
|
||||
"sts3215": 4096,
|
||||
}
|
||||
|
||||
MODEL_BAUDRATE_TABLE = {
|
||||
"scs_series": SCS_SERIES_BAUDRATE_TABLE,
|
||||
"sts3215": SCS_SERIES_BAUDRATE_TABLE,
|
||||
}
|
||||
|
||||
# High number of retries is needed for feetech compared to dynamixel motors.
|
||||
NUM_READ_RETRY = 20
|
||||
NUM_WRITE_RETRY = 20
|
||||
|
||||
|
||||
class FeetechMotorsBus(MotorsBus):
|
||||
"""
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
# See this link for STS3215 Memory Table:
|
||||
# https://docs.google.com/spreadsheets/d/1GVs7W1VS1PqdhA1nW-abeyAHhTUxKUdR/edit?usp=sharing&ouid=116566590112741600240&rtpof=true&sd=true
|
||||
# data_name: (address, size_byte)
|
||||
SCS_SERIES_CONTROL_TABLE = {
|
||||
"Model": (3, 2),
|
||||
"ID": (5, 1),
|
||||
"Baud_Rate": (6, 1),
|
||||
"Return_Delay": (7, 1),
|
||||
"Response_Status_Level": (8, 1),
|
||||
"Min_Angle_Limit": (9, 2),
|
||||
"Max_Angle_Limit": (11, 2),
|
||||
"Max_Temperature_Limit": (13, 1),
|
||||
"Max_Voltage_Limit": (14, 1),
|
||||
"Min_Voltage_Limit": (15, 1),
|
||||
"Max_Torque_Limit": (16, 2),
|
||||
"Phase": (18, 1),
|
||||
"Unloading_Condition": (19, 1),
|
||||
"LED_Alarm_Condition": (20, 1),
|
||||
"P_Coefficient": (21, 1),
|
||||
"D_Coefficient": (22, 1),
|
||||
"I_Coefficient": (23, 1),
|
||||
"Minimum_Startup_Force": (24, 2),
|
||||
"CW_Dead_Zone": (26, 1),
|
||||
"CCW_Dead_Zone": (27, 1),
|
||||
"Protection_Current": (28, 2),
|
||||
"Angular_Resolution": (30, 1),
|
||||
"Offset": (31, 2),
|
||||
"Mode": (33, 1),
|
||||
"Protective_Torque": (34, 1),
|
||||
"Protection_Time": (35, 1),
|
||||
"Overload_Torque": (36, 1),
|
||||
"Speed_closed_loop_P_proportional_coefficient": (37, 1),
|
||||
"Over_Current_Protection_Time": (38, 1),
|
||||
"Velocity_closed_loop_I_integral_coefficient": (39, 1),
|
||||
"Torque_Enable": (40, 1),
|
||||
"Acceleration": (41, 1),
|
||||
"Goal_Position": (42, 2),
|
||||
"Goal_Time": (44, 2),
|
||||
"Goal_Speed": (46, 2),
|
||||
"Torque_Limit": (48, 2),
|
||||
"Lock": (55, 1),
|
||||
"Present_Position": (56, 2),
|
||||
"Present_Speed": (58, 2),
|
||||
"Present_Load": (60, 2),
|
||||
"Present_Voltage": (62, 1),
|
||||
"Present_Temperature": (63, 1),
|
||||
"Status": (65, 1),
|
||||
"Moving": (66, 1),
|
||||
"Present_Current": (69, 2),
|
||||
# Not in the Memory Table
|
||||
"Maximum_Acceleration": (85, 2),
|
||||
}
|
||||
|
||||
SCS_SERIES_BAUDRATE_TABLE = {
|
||||
0: 1_000_000,
|
||||
1: 500_000,
|
||||
2: 250_000,
|
||||
3: 128_000,
|
||||
4: 115_200,
|
||||
5: 57_600,
|
||||
6: 38_400,
|
||||
7: 19_200,
|
||||
}
|
||||
|
||||
MODEL_CONTROL_TABLE = {
|
||||
"scs_series": SCS_SERIES_CONTROL_TABLE,
|
||||
"sts3215": SCS_SERIES_CONTROL_TABLE,
|
||||
}
|
||||
|
||||
MODEL_RESOLUTION = {
|
||||
"scs_series": 4096,
|
||||
"sts3215": 4096,
|
||||
}
|
||||
|
||||
MODEL_BAUDRATE_TABLE = {
|
||||
"scs_series": SCS_SERIES_BAUDRATE_TABLE,
|
||||
"sts3215": SCS_SERIES_BAUDRATE_TABLE,
|
||||
}
|
||||
|
||||
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]
|
|
@ -22,19 +22,22 @@
|
|||
import abc
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from functools import cached_property
|
||||
from pathlib import Path
|
||||
from pprint import pformat
|
||||
from typing import Protocol
|
||||
from typing import Protocol, TypeAlias, overload
|
||||
|
||||
import serial
|
||||
import tqdm
|
||||
from deepdiff import DeepDiff
|
||||
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.common.utils.utils import capture_timestamp_utc
|
||||
|
||||
NameOrID: TypeAlias = str | int
|
||||
Value: TypeAlias = int | float
|
||||
|
||||
MAX_ID_RANGE = 252
|
||||
|
||||
|
||||
|
@ -197,6 +200,12 @@ class GroupSyncWrite(Protocol):
|
|||
def txPacket(self): ...
|
||||
|
||||
|
||||
@dataclass
|
||||
class Motor:
|
||||
id: int
|
||||
model: str
|
||||
|
||||
|
||||
class MotorsBus(abc.ABC):
|
||||
"""The main LeRobot class for implementing motors buses.
|
||||
|
||||
|
@ -248,7 +257,7 @@ class MotorsBus(abc.ABC):
|
|||
def __init__(
|
||||
self,
|
||||
port: str,
|
||||
motors: dict[str, tuple[int, str]],
|
||||
motors: dict[str, Motor],
|
||||
):
|
||||
self.port = port
|
||||
self.motors = motors
|
||||
|
@ -262,8 +271,8 @@ class MotorsBus(abc.ABC):
|
|||
self.logs = {} # TODO(aliberts): use subclass logger
|
||||
self.calibration = None
|
||||
|
||||
self._id_to_model = dict(self.motors.values())
|
||||
self._id_to_name = {idx: name for name, (idx, _) in self.motors.items()}
|
||||
self._id_to_model = {m.id: m.model for m in self.motors.values()}
|
||||
self._id_to_name = {m.id: name for name, m in self.motors.items()}
|
||||
|
||||
def __len__(self):
|
||||
return len(self.motors)
|
||||
|
@ -278,39 +287,39 @@ class MotorsBus(abc.ABC):
|
|||
|
||||
@cached_property
|
||||
def _has_different_ctrl_tables(self) -> bool:
|
||||
if len(self.motor_models) < 2:
|
||||
if len(self.models) < 2:
|
||||
return False
|
||||
|
||||
first_table = self.model_ctrl_table[self.motor_models[0]]
|
||||
return any(DeepDiff(first_table, self.model_ctrl_table[model]) for model in self.motor_models[1:])
|
||||
first_table = self.model_ctrl_table[self.models[0]]
|
||||
return any(DeepDiff(first_table, self.model_ctrl_table[model]) for model in self.models[1:])
|
||||
|
||||
def idx_to_model(self, idx: int) -> str:
|
||||
return self._id_to_model[idx]
|
||||
def id_to_model(self, motor_id: int) -> str:
|
||||
return self._id_to_model[motor_id]
|
||||
|
||||
def idx_to_name(self, idx: int) -> str:
|
||||
return self._id_to_name[idx]
|
||||
def id_to_name(self, motor_id: int) -> str:
|
||||
return self._id_to_name[motor_id]
|
||||
|
||||
@cached_property
|
||||
def motor_names(self) -> list[str]:
|
||||
def names(self) -> list[str]:
|
||||
return list(self.motors)
|
||||
|
||||
@cached_property
|
||||
def motor_models(self) -> list[str]:
|
||||
return [model for _, model in self.motors.values()]
|
||||
def models(self) -> list[str]:
|
||||
return [m.model for m in self.motors.values()]
|
||||
|
||||
@cached_property
|
||||
def motor_ids(self) -> list[int]:
|
||||
return [idx for idx, _ in self.motors.values()]
|
||||
def ids(self) -> list[int]:
|
||||
return [m.id for m in self.motors.values()]
|
||||
|
||||
def _validate_motors(self) -> None:
|
||||
# TODO(aliberts): improve error messages for this (display problematics values)
|
||||
if len(self.motor_ids) != len(set(self.motor_ids)):
|
||||
if len(self.ids) != len(set(self.ids)):
|
||||
raise ValueError("Some motors have the same id.")
|
||||
|
||||
if len(self.motor_names) != len(set(self.motor_names)):
|
||||
if len(self.names) != len(set(self.names)):
|
||||
raise ValueError("Some motors have the same name.")
|
||||
|
||||
if any(model not in self.model_resolution_table for model in self.motor_models):
|
||||
if any(model not in self.model_resolution_table for model in self.models):
|
||||
raise ValueError("Some motors models are not available.")
|
||||
|
||||
@property
|
||||
|
@ -347,46 +356,25 @@ class MotorsBus(abc.ABC):
|
|||
"""
|
||||
try:
|
||||
# TODO(aliberts): use ping instead
|
||||
return (self.motor_ids == self.read("ID")).all()
|
||||
return (self.ids == self.read("ID")).all()
|
||||
except ConnectionError as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
def ping(self, motor: str | int, num_retry: int | None = None) -> int:
|
||||
idx = self.get_safe_id(motor)
|
||||
for _ in range(num_retry):
|
||||
model_number, comm, _ = self.packet_handler.ping(self.port, idx)
|
||||
def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None:
|
||||
idx = self.get_motor_id(motor)
|
||||
for _ in range(1 + num_retry):
|
||||
model_number, comm, error = self.packet_handler.ping(self.port_handler, idx)
|
||||
if self._is_comm_success(comm):
|
||||
return model_number
|
||||
|
||||
# TODO(aliberts): Should we?
|
||||
return comm
|
||||
if raise_on_error:
|
||||
raise ConnectionError(f"Ping motor {motor} returned a {error} error code.")
|
||||
|
||||
@abc.abstractmethod
|
||||
def broadcast_ping(self, num_retry: int | None = None):
|
||||
...
|
||||
# TODO(aliberts): this will replace 'find_motor_indices'
|
||||
|
||||
def find_motor_indices(self, possible_ids: list[str] = None, num_retry: int = 2):
|
||||
if possible_ids is None:
|
||||
possible_ids = range(MAX_ID_RANGE)
|
||||
|
||||
indices = []
|
||||
for idx in tqdm.tqdm(possible_ids):
|
||||
try:
|
||||
present_idx = self.read("ID", idx, num_retry=num_retry)[0]
|
||||
except ConnectionError:
|
||||
continue
|
||||
|
||||
if idx != present_idx:
|
||||
# sanity check
|
||||
raise OSError(
|
||||
"Motor index used to communicate through the bus is not the same as the one present in the motor "
|
||||
"memory. The motor memory might be damaged."
|
||||
)
|
||||
indices.append(idx)
|
||||
|
||||
return indices
|
||||
def broadcast_ping(
|
||||
self, num_retry: int = 0, raise_on_error: bool = False
|
||||
) -> dict[int, list[int, int]] | None: ...
|
||||
|
||||
def set_baudrate(self, baudrate) -> None:
|
||||
present_bus_baudrate = self.port_handler.getBaudRate()
|
||||
|
@ -453,17 +441,23 @@ class MotorsBus(abc.ABC):
|
|||
"""
|
||||
pass
|
||||
|
||||
def get_safe_id(self, motor: str | int) -> int:
|
||||
def get_motor_id(self, motor: NameOrID) -> int:
|
||||
if isinstance(motor, str):
|
||||
return self.motors[motor][0]
|
||||
return self.motors[motor].id
|
||||
elif isinstance(motor, int):
|
||||
return motor
|
||||
else:
|
||||
raise ValueError(f"{motor} should be int or str.")
|
||||
raise TypeError(f"'{motor}' should be int, str.")
|
||||
|
||||
@overload
|
||||
def read(self, data_name: str, motors: None = ...) -> dict[str, Value]: ...
|
||||
@overload
|
||||
def read(self, data_name: str, motors: NameOrID) -> dict[NameOrID, Value]: ...
|
||||
@overload
|
||||
def read(self, data_name: str, motors: list[NameOrID]) -> dict[NameOrID, Value]: ...
|
||||
def read(
|
||||
self, data_name: str, motors: str | int | list[str | int] | None = None, num_retry: int = 1
|
||||
) -> dict[str, float]:
|
||||
self, data_name: str, motors: NameOrID | list[NameOrID] | None = None, num_retry: int = 0
|
||||
) -> dict[NameOrID, Value]:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(
|
||||
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
||||
|
@ -471,18 +465,22 @@ class MotorsBus(abc.ABC):
|
|||
|
||||
start_time = time.perf_counter()
|
||||
|
||||
id_key_map: dict[int, NameOrID] = {}
|
||||
if motors is None:
|
||||
motors = self.motor_ids
|
||||
id_key_map = {m.id: name for name, m in self.motors.items()}
|
||||
elif isinstance(motors, (str, int)):
|
||||
id_key_map = {self.get_motor_id(motors): motors}
|
||||
elif isinstance(motors, list):
|
||||
id_key_map = {self.get_motor_id(m): m for m in motors}
|
||||
else:
|
||||
raise TypeError(motors)
|
||||
|
||||
if isinstance(motors, (str, int)):
|
||||
motors = [motors]
|
||||
|
||||
motor_ids = [self.get_safe_id(motor) for motor in motors]
|
||||
motor_ids = list(id_key_map)
|
||||
if self._has_different_ctrl_tables:
|
||||
models = [self.idx_to_model(idx) for idx in motor_ids]
|
||||
models = [self.id_to_model(idx) for idx in motor_ids]
|
||||
assert_same_address(self.model_ctrl_table, models, data_name)
|
||||
|
||||
model = self.idx_to_model(next(iter(motor_ids)))
|
||||
model = self.id_to_model(next(iter(motor_ids)))
|
||||
addr, n_bytes = self.model_ctrl_table[model][data_name]
|
||||
|
||||
comm, ids_values = self._read(motor_ids, addr, n_bytes, num_retry)
|
||||
|
@ -495,8 +493,7 @@ class MotorsBus(abc.ABC):
|
|||
if data_name in self.calibration_required and self.calibration is not None:
|
||||
ids_values = self.calibrate_values(ids_values)
|
||||
|
||||
# TODO(aliberts): return keys in the same format we got them?
|
||||
ids_values = {self.idx_to_name(idx): val for idx, val in ids_values.items()}
|
||||
keys_values = {id_key_map[idx]: val for idx, val in ids_values.items()}
|
||||
|
||||
# log the number of seconds it took to read the data from the motors
|
||||
delta_ts_name = get_log_name("delta_timestamp_s", "read", data_name, motor_ids)
|
||||
|
@ -506,10 +503,10 @@ class MotorsBus(abc.ABC):
|
|||
ts_utc_name = get_log_name("timestamp_utc", "read", data_name, motor_ids)
|
||||
self.logs[ts_utc_name] = capture_timestamp_utc()
|
||||
|
||||
return ids_values
|
||||
return keys_values
|
||||
|
||||
def _read(
|
||||
self, motor_ids: list[str], address: int, n_bytes: int, num_retry: int = 1
|
||||
self, motor_ids: list[str], address: int, n_bytes: int, num_retry: int = 0
|
||||
) -> tuple[int, dict[int, int]]:
|
||||
self.reader.clearParam()
|
||||
self.reader.start_address = address
|
||||
|
@ -523,7 +520,7 @@ class MotorsBus(abc.ABC):
|
|||
for idx in motor_ids:
|
||||
self.reader.addParam(idx)
|
||||
|
||||
for _ in range(num_retry):
|
||||
for _ in range(1 + num_retry):
|
||||
comm = self.reader.txRxPacket()
|
||||
if self._is_comm_success(comm):
|
||||
break
|
||||
|
@ -540,7 +537,7 @@ class MotorsBus(abc.ABC):
|
|||
# for idx in motor_ids:
|
||||
# value = self.reader.getData(idx, address, n_bytes)
|
||||
|
||||
def write(self, data_name: str, values_dict: dict[str | int, int], num_retry: int = 1) -> None:
|
||||
def write(self, data_name: str, values: int | dict[NameOrID, int], num_retry: int = 0) -> None:
|
||||
if not self.is_connected:
|
||||
raise DeviceNotConnectedError(
|
||||
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
||||
|
@ -548,16 +545,21 @@ class MotorsBus(abc.ABC):
|
|||
|
||||
start_time = time.perf_counter()
|
||||
|
||||
ids_values = {self.get_safe_id(motor): val for motor, val in values_dict.items()}
|
||||
if isinstance(values, int):
|
||||
ids_values = {id_: values for id_ in self.ids}
|
||||
elif isinstance(values, dict):
|
||||
ids_values = {self.get_motor_id(motor): val for motor, val in values.items()}
|
||||
else:
|
||||
raise ValueError(f"'values' is expected to be a single value or a dict. Got {values}")
|
||||
|
||||
if self._has_different_ctrl_tables:
|
||||
models = [self.idx_to_model(idx) for idx in ids_values]
|
||||
models = [self.id_to_model(idx) for idx in ids_values]
|
||||
assert_same_address(self.model_ctrl_table, models, data_name)
|
||||
|
||||
if data_name in self.calibration_required and self.calibration is not None:
|
||||
ids_values = self.uncalibrate_values(ids_values)
|
||||
|
||||
model = self.idx_to_model(next(iter(ids_values)))
|
||||
model = self.id_to_model(next(iter(ids_values)))
|
||||
addr, n_bytes = self.model_ctrl_table[model][data_name]
|
||||
|
||||
comm = self._write(ids_values, addr, n_bytes, num_retry)
|
||||
|
@ -575,7 +577,7 @@ class MotorsBus(abc.ABC):
|
|||
ts_utc_name = get_log_name("timestamp_utc", "write", data_name, list(ids_values))
|
||||
self.logs[ts_utc_name] = capture_timestamp_utc()
|
||||
|
||||
def _write(self, ids_values: dict[int, int], address: int, n_bytes: int, num_retry: int = 1) -> int:
|
||||
def _write(self, ids_values: dict[int, int], address: int, n_bytes: int, num_retry: int = 0) -> int:
|
||||
self.writer.clearParam()
|
||||
self.writer.start_address = address
|
||||
self.writer.data_length = n_bytes
|
||||
|
@ -584,7 +586,7 @@ class MotorsBus(abc.ABC):
|
|||
data = self.split_int_bytes(value, n_bytes)
|
||||
self.writer.addParam(idx, data)
|
||||
|
||||
for _ in range(num_retry):
|
||||
for _ in range(1 + num_retry):
|
||||
comm = self.writer.txPacket()
|
||||
if self._is_comm_success(comm):
|
||||
break
|
||||
|
|
|
@ -108,6 +108,9 @@ exclude = ["tests/artifacts/**/*.safetensors"]
|
|||
[tool.ruff.lint]
|
||||
select = ["E4", "E7", "E9", "F", "I", "N", "B", "C4", "SIM"]
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"__init__.py" = ["F401", "F403"]
|
||||
|
||||
[tool.bandit]
|
||||
exclude_dirs = [
|
||||
"tests",
|
||||
|
|
|
@ -6,47 +6,42 @@ import dynamixel_sdk as dxl
|
|||
import serial
|
||||
from mock_serial import MockSerial
|
||||
|
||||
from lerobot.common.motors.dynamixel.dynamixel import X_SERIES_CONTROL_TABLE
|
||||
from lerobot.common.motors.dynamixel import X_SERIES_CONTROL_TABLE
|
||||
|
||||
# https://emanual.robotis.com/docs/en/dxl/crc/
|
||||
DXL_CRC_TABLE = [
|
||||
0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014,
|
||||
0x8011, 0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D,
|
||||
0x8027, 0x0022, 0x8063, 0x0066, 0x006C, 0x8069, 0x0078,
|
||||
0x807D, 0x8077, 0x0072, 0x0050, 0x8055, 0x805F, 0x005A,
|
||||
0x804B, 0x004E, 0x0044, 0x8041, 0x80C3, 0x00C6, 0x00CC,
|
||||
0x80C9, 0x00D8, 0x80DD, 0x80D7, 0x00D2, 0x00F0, 0x80F5,
|
||||
0x80FF, 0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1, 0x00A0,
|
||||
0x80A5, 0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1,
|
||||
0x8093, 0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087,
|
||||
0x0082, 0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D,
|
||||
0x8197, 0x0192, 0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB,
|
||||
0x01AE, 0x01A4, 0x81A1, 0x01E0, 0x81E5, 0x81EF, 0x01EA,
|
||||
0x81FB, 0x01FE, 0x01F4, 0x81F1, 0x81D3, 0x01D6, 0x01DC,
|
||||
0x81D9, 0x01C8, 0x81CD, 0x81C7, 0x01C2, 0x0140, 0x8145,
|
||||
0x814F, 0x014A, 0x815B, 0x015E, 0x0154, 0x8151, 0x8173,
|
||||
0x0176, 0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162,
|
||||
0x8123, 0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137,
|
||||
0x0132, 0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E,
|
||||
0x0104, 0x8101, 0x8303, 0x0306, 0x030C, 0x8309, 0x0318,
|
||||
0x831D, 0x8317, 0x0312, 0x0330, 0x8335, 0x833F, 0x033A,
|
||||
0x832B, 0x032E, 0x0324, 0x8321, 0x0360, 0x8365, 0x836F,
|
||||
0x036A, 0x837B, 0x037E, 0x0374, 0x8371, 0x8353, 0x0356,
|
||||
0x035C, 0x8359, 0x0348, 0x834D, 0x8347, 0x0342, 0x03C0,
|
||||
0x83C5, 0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1,
|
||||
0x83F3, 0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7,
|
||||
0x03E2, 0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD,
|
||||
0x83B7, 0x03B2, 0x0390, 0x8395, 0x839F, 0x039A, 0x838B,
|
||||
0x038E, 0x0384, 0x8381, 0x0280, 0x8285, 0x828F, 0x028A,
|
||||
0x829B, 0x029E, 0x0294, 0x8291, 0x82B3, 0x02B6, 0x02BC,
|
||||
0x82B9, 0x02A8, 0x82AD, 0x82A7, 0x02A2, 0x82E3, 0x02E6,
|
||||
0x02EC, 0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2, 0x02D0,
|
||||
0x82D5, 0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1,
|
||||
0x8243, 0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257,
|
||||
0x0252, 0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E,
|
||||
0x0264, 0x8261, 0x0220, 0x8225, 0x822F, 0x022A, 0x823B,
|
||||
0x023E, 0x0234, 0x8231, 0x8213, 0x0216, 0x021C, 0x8219,
|
||||
0x0208, 0x820D, 0x8207, 0x0202
|
||||
0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011,
|
||||
0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D, 0x8027, 0x0022,
|
||||
0x8063, 0x0066, 0x006C, 0x8069, 0x0078, 0x807D, 0x8077, 0x0072,
|
||||
0x0050, 0x8055, 0x805F, 0x005A, 0x804B, 0x004E, 0x0044, 0x8041,
|
||||
0x80C3, 0x00C6, 0x00CC, 0x80C9, 0x00D8, 0x80DD, 0x80D7, 0x00D2,
|
||||
0x00F0, 0x80F5, 0x80FF, 0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1,
|
||||
0x00A0, 0x80A5, 0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1,
|
||||
0x8093, 0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087, 0x0082,
|
||||
0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D, 0x8197, 0x0192,
|
||||
0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB, 0x01AE, 0x01A4, 0x81A1,
|
||||
0x01E0, 0x81E5, 0x81EF, 0x01EA, 0x81FB, 0x01FE, 0x01F4, 0x81F1,
|
||||
0x81D3, 0x01D6, 0x01DC, 0x81D9, 0x01C8, 0x81CD, 0x81C7, 0x01C2,
|
||||
0x0140, 0x8145, 0x814F, 0x014A, 0x815B, 0x015E, 0x0154, 0x8151,
|
||||
0x8173, 0x0176, 0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162,
|
||||
0x8123, 0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137, 0x0132,
|
||||
0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E, 0x0104, 0x8101,
|
||||
0x8303, 0x0306, 0x030C, 0x8309, 0x0318, 0x831D, 0x8317, 0x0312,
|
||||
0x0330, 0x8335, 0x833F, 0x033A, 0x832B, 0x032E, 0x0324, 0x8321,
|
||||
0x0360, 0x8365, 0x836F, 0x036A, 0x837B, 0x037E, 0x0374, 0x8371,
|
||||
0x8353, 0x0356, 0x035C, 0x8359, 0x0348, 0x834D, 0x8347, 0x0342,
|
||||
0x03C0, 0x83C5, 0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1,
|
||||
0x83F3, 0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7, 0x03E2,
|
||||
0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD, 0x83B7, 0x03B2,
|
||||
0x0390, 0x8395, 0x839F, 0x039A, 0x838B, 0x038E, 0x0384, 0x8381,
|
||||
0x0280, 0x8285, 0x828F, 0x028A, 0x829B, 0x029E, 0x0294, 0x8291,
|
||||
0x82B3, 0x02B6, 0x02BC, 0x82B9, 0x02A8, 0x82AD, 0x82A7, 0x02A2,
|
||||
0x82E3, 0x02E6, 0x02EC, 0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2,
|
||||
0x02D0, 0x82D5, 0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1,
|
||||
0x8243, 0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257, 0x0252,
|
||||
0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E, 0x0264, 0x8261,
|
||||
0x0220, 0x8225, 0x822F, 0x022A, 0x823B, 0x023E, 0x0234, 0x8231,
|
||||
0x8213, 0x0216, 0x021C, 0x8219, 0x0208, 0x820D, 0x8207, 0x0202
|
||||
] # fmt: skip
|
||||
|
||||
# https://emanual.robotis.com/docs/en/dxl/protocol2/#instruction
|
||||
|
@ -154,6 +149,17 @@ class MockDynamixelPacketv2(abc.ABC):
|
|||
|
||||
@staticmethod
|
||||
def _add_crc(packet: list[int]) -> list[int]:
|
||||
"""Computes and add CRC to the packet.
|
||||
|
||||
https://emanual.robotis.com/docs/en/dxl/crc/
|
||||
https://en.wikipedia.org/wiki/Cyclic_redundancy_check
|
||||
|
||||
Args:
|
||||
packet (list[int]): The raw packet without CRC (but with placeholders for it).
|
||||
|
||||
Returns:
|
||||
list[int]: The raw packet with a valid CRC.
|
||||
"""
|
||||
crc = 0
|
||||
for j in range(len(packet) - 2):
|
||||
i = ((crc >> 8) ^ packet[j]) & 0xFF
|
||||
|
@ -191,6 +197,20 @@ class MockInstructionPacket(MockDynamixelPacketv2):
|
|||
0x00, 0x00 # placeholder for CRC
|
||||
] # fmt: skip
|
||||
|
||||
@classmethod
|
||||
def ping(
|
||||
cls,
|
||||
dxl_id: int,
|
||||
) -> bytes:
|
||||
"""
|
||||
Builds a "Ping" broadcast instruction.
|
||||
(from https://emanual.robotis.com/docs/en/dxl/protocol2/#ping-0x01)
|
||||
|
||||
No parameters required.
|
||||
"""
|
||||
params, length = [], 3
|
||||
return cls.build(dxl_id=dxl_id, params=params, length=length, instruct_type="Ping")
|
||||
|
||||
@classmethod
|
||||
def sync_read(
|
||||
cls,
|
||||
|
@ -252,12 +272,32 @@ class MockStatusPacket(MockDynamixelPacketv2):
|
|||
0x00, 0x00 # placeholder for CRC
|
||||
] # fmt: skip
|
||||
|
||||
@classmethod
|
||||
def ping(cls, dxl_id: int, model_nb: int = 1190, firm_ver: int = 50) -> bytes:
|
||||
"""Builds a 'Ping' status packet.
|
||||
|
||||
https://emanual.robotis.com/docs/en/dxl/protocol2/#ping-0x01
|
||||
|
||||
Args:
|
||||
dxl_id (int): ID of the servo responding.
|
||||
model_nb (int, optional): Desired 'model number' to be returned in the packet. Defaults to 1190
|
||||
which corresponds to a XL330-M077-T.
|
||||
firm_ver (int, optional): Desired 'firmware version' to be returned in the packet.
|
||||
Defaults to 50.
|
||||
|
||||
Returns:
|
||||
bytes: The raw 'Ping' status packet ready to be sent through serial.
|
||||
"""
|
||||
params = [dxl.DXL_LOBYTE(model_nb), dxl.DXL_HIBYTE(model_nb), firm_ver]
|
||||
length = 7
|
||||
return cls.build(dxl_id, params=params, length=length)
|
||||
|
||||
@classmethod
|
||||
def present_position(cls, dxl_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes:
|
||||
"""Builds a 'Present_Position' status packet.
|
||||
|
||||
Args:
|
||||
dxl_id (int): List of the servos ids
|
||||
dxl_id (int): Servo id
|
||||
pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it
|
||||
will use a random value in the min_max_range. Defaults to None.
|
||||
min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos'
|
||||
|
@ -308,62 +348,69 @@ class MockMotors(MockSerial):
|
|||
|
||||
ctrl_table = X_SERIES_CONTROL_TABLE
|
||||
|
||||
def __init__(self, dlx_ids: list[int]):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._ids = dlx_ids
|
||||
self.open()
|
||||
|
||||
def build_single_motor_stubs(
|
||||
self, data_name: str, return_value: int | None = None, num_invalid_try: int | None = None
|
||||
) -> None:
|
||||
address, length = self.ctrl_table[data_name]
|
||||
for idx in self._ids:
|
||||
if data_name == "Present_Position":
|
||||
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
|
||||
sync_read_response_single = self._build_present_pos_send_fn(
|
||||
[idx], [return_value], num_invalid_try
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError # TODO(aliberts): add ping?
|
||||
|
||||
self.stub(
|
||||
name=f"SyncRead_{data_name}_{idx}",
|
||||
receive_bytes=sync_read_request_single,
|
||||
send_fn=sync_read_response_single,
|
||||
)
|
||||
|
||||
def build_all_motors_stub(
|
||||
self, data_name: str, return_values: list[int] | None = None, num_invalid_try: int | None = None
|
||||
) -> None:
|
||||
address, length = self.ctrl_table[data_name]
|
||||
if data_name == "Present_Position":
|
||||
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
|
||||
sync_read_response_all = self._build_present_pos_send_fn(
|
||||
self._ids, return_values, num_invalid_try
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError # TODO(aliberts): add ping?
|
||||
|
||||
self.stub(
|
||||
name=f"SyncRead_{data_name}_all",
|
||||
receive_bytes=sync_read_request_all,
|
||||
send_fn=sync_read_response_all,
|
||||
def build_broadcast_ping_stub(
|
||||
self, ids_models_firmwares: dict[int, list[int]] | None = None, num_invalid_try: int = 0
|
||||
) -> str:
|
||||
ping_request = MockInstructionPacket.ping(dxl.BROADCAST_ID)
|
||||
return_packets = b"".join(
|
||||
MockStatusPacket.ping(idx, model, firm_ver)
|
||||
for idx, (model, firm_ver) in ids_models_firmwares.items()
|
||||
)
|
||||
ping_response = self._build_send_fn(return_packets, num_invalid_try)
|
||||
|
||||
def _build_present_pos_send_fn(
|
||||
self, dxl_ids: list[int], return_pos: list[int] | None = None, num_invalid_try: int | None = None
|
||||
) -> Callable[[int], bytes]:
|
||||
return_pos = [None for _ in dxl_ids] if return_pos is None else return_pos
|
||||
assert len(return_pos) == len(dxl_ids)
|
||||
stub_name = "Ping_" + "_".join([str(idx) for idx in ids_models_firmwares])
|
||||
self.stub(
|
||||
name=stub_name,
|
||||
receive_bytes=ping_request,
|
||||
send_fn=ping_response,
|
||||
)
|
||||
return stub_name
|
||||
|
||||
def build_ping_stub(
|
||||
self, dxl_id: int, model_nb: int, firm_ver: int = 50, num_invalid_try: int = 0
|
||||
) -> str:
|
||||
ping_request = MockInstructionPacket.ping(dxl_id)
|
||||
return_packet = MockStatusPacket.ping(dxl_id, model_nb, firm_ver)
|
||||
ping_response = self._build_send_fn(return_packet, num_invalid_try)
|
||||
|
||||
stub_name = f"Ping_{dxl_id}"
|
||||
self.stub(
|
||||
name=stub_name,
|
||||
receive_bytes=ping_request,
|
||||
send_fn=ping_response,
|
||||
)
|
||||
return stub_name
|
||||
|
||||
def build_sync_read_stub(
|
||||
self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0
|
||||
) -> str:
|
||||
address, length = self.ctrl_table[data_name]
|
||||
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
|
||||
if data_name != "Present_Position":
|
||||
raise NotImplementedError
|
||||
|
||||
return_packets = b"".join(
|
||||
MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items()
|
||||
)
|
||||
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
|
||||
|
||||
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values])
|
||||
self.stub(
|
||||
name=stub_name,
|
||||
receive_bytes=sync_read_request,
|
||||
send_fn=sync_read_response,
|
||||
)
|
||||
return stub_name
|
||||
|
||||
@staticmethod
|
||||
def _build_send_fn(packet: bytes, num_invalid_try: int = 0) -> Callable[[int], bytes]:
|
||||
def send_fn(_call_count: int) -> bytes:
|
||||
if num_invalid_try is not None and num_invalid_try >= _call_count:
|
||||
if num_invalid_try >= _call_count:
|
||||
return b""
|
||||
|
||||
packets = b"".join(
|
||||
MockStatusPacket.present_position(idx, pos)
|
||||
for idx, pos in zip(dxl_ids, return_pos, strict=True)
|
||||
)
|
||||
return packets
|
||||
return packet
|
||||
|
||||
return send_fn
|
||||
|
|
|
@ -6,7 +6,7 @@ import scservo_sdk as scs
|
|||
import serial
|
||||
from mock_serial import MockSerial
|
||||
|
||||
from lerobot.common.motors.feetech.feetech import SCS_SERIES_CONTROL_TABLE
|
||||
from lerobot.common.motors.feetech.tables import SCS_SERIES_CONTROL_TABLE
|
||||
|
||||
# https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf
|
||||
INSTRUCTION_TYPES = {
|
||||
|
@ -77,6 +77,19 @@ class MockInstructionPacket(MockFeetechPacket):
|
|||
0x00, # placeholder for checksum
|
||||
] # fmt: skip
|
||||
|
||||
@classmethod
|
||||
def ping(
|
||||
cls,
|
||||
scs_id: int,
|
||||
) -> bytes:
|
||||
"""
|
||||
Builds a "Ping" broadcast instruction.
|
||||
|
||||
No parameters required.
|
||||
"""
|
||||
params, length = [], 2
|
||||
return cls.build(scs_id=scs_id, params=params, length=length, instruct_type="Ping")
|
||||
|
||||
@classmethod
|
||||
def sync_read(
|
||||
cls,
|
||||
|
@ -128,6 +141,25 @@ class MockStatusPacket(MockFeetechPacket):
|
|||
0x00, # placeholder for checksum
|
||||
] # fmt: skip
|
||||
|
||||
@classmethod
|
||||
def ping(cls, scs_id: int, model_nb: int = 1190, firm_ver: int = 50) -> bytes:
|
||||
"""Builds a 'Ping' status packet.
|
||||
|
||||
Args:
|
||||
scs_id (int): ID of the servo responding.
|
||||
model_nb (int, optional): Desired 'model number' to be returned in the packet. Defaults to 1190
|
||||
which corresponds to a XL330-M077-T.
|
||||
firm_ver (int, optional): Desired 'firmware version' to be returned in the packet.
|
||||
Defaults to 50.
|
||||
|
||||
Returns:
|
||||
bytes: The raw 'Ping' status packet ready to be sent through serial.
|
||||
"""
|
||||
# raise NotImplementedError
|
||||
params = [scs.SCS_LOBYTE(model_nb), scs.SCS_HIBYTE(model_nb), firm_ver]
|
||||
length = 2
|
||||
return cls.build(scs_id, params=params, length=length)
|
||||
|
||||
@classmethod
|
||||
def present_position(cls, scs_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes:
|
||||
"""Builds a 'Present_Position' status packet.
|
||||
|
@ -184,62 +216,69 @@ class MockMotors(MockSerial):
|
|||
|
||||
ctrl_table = SCS_SERIES_CONTROL_TABLE
|
||||
|
||||
def __init__(self, scs_ids: list[int]):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._ids = scs_ids
|
||||
self.open()
|
||||
|
||||
def build_single_motor_stubs(
|
||||
self, data_name: str, return_value: int | None = None, num_invalid_try: int | None = None
|
||||
) -> None:
|
||||
address, length = self.ctrl_table[data_name]
|
||||
for idx in self._ids:
|
||||
if data_name == "Present_Position":
|
||||
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
|
||||
sync_read_response_single = self._build_present_pos_send_fn(
|
||||
[idx], [return_value], num_invalid_try
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError # TODO(aliberts): add ping?
|
||||
|
||||
self.stub(
|
||||
name=f"SyncRead_{data_name}_{idx}",
|
||||
receive_bytes=sync_read_request_single,
|
||||
send_fn=sync_read_response_single,
|
||||
)
|
||||
|
||||
def build_all_motors_stub(
|
||||
self, data_name: str, return_values: list[int] | None = None, num_invalid_try: int | None = None
|
||||
) -> None:
|
||||
address, length = self.ctrl_table[data_name]
|
||||
if data_name == "Present_Position":
|
||||
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
|
||||
sync_read_response_all = self._build_present_pos_send_fn(
|
||||
self._ids, return_values, num_invalid_try
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError # TODO(aliberts): add ping?
|
||||
|
||||
self.stub(
|
||||
name=f"SyncRead_{data_name}_all",
|
||||
receive_bytes=sync_read_request_all,
|
||||
send_fn=sync_read_response_all,
|
||||
def build_broadcast_ping_stub(
|
||||
self, ids_models_firmwares: dict[int, list[int]] | None = None, num_invalid_try: int = 0
|
||||
) -> str:
|
||||
ping_request = MockInstructionPacket.ping(scs.BROADCAST_ID)
|
||||
return_packets = b"".join(
|
||||
MockStatusPacket.ping(idx, model, firm_ver)
|
||||
for idx, (model, firm_ver) in ids_models_firmwares.items()
|
||||
)
|
||||
ping_response = self._build_send_fn(return_packets, num_invalid_try)
|
||||
|
||||
def _build_present_pos_send_fn(
|
||||
self, scs_ids: list[int], return_pos: list[int] | None = None, num_invalid_try: int | None = None
|
||||
) -> Callable[[int], bytes]:
|
||||
return_pos = [None for _ in scs_ids] if return_pos is None else return_pos
|
||||
assert len(return_pos) == len(scs_ids)
|
||||
stub_name = "Ping_" + "_".join([str(idx) for idx in ids_models_firmwares])
|
||||
self.stub(
|
||||
name=stub_name,
|
||||
receive_bytes=ping_request,
|
||||
send_fn=ping_response,
|
||||
)
|
||||
return stub_name
|
||||
|
||||
def build_ping_stub(
|
||||
self, scs_id: int, model_nb: int, firm_ver: int = 50, num_invalid_try: int = 0
|
||||
) -> str:
|
||||
ping_request = MockInstructionPacket.ping(scs_id)
|
||||
return_packet = MockStatusPacket.ping(scs_id, model_nb, firm_ver)
|
||||
ping_response = self._build_send_fn(return_packet, num_invalid_try)
|
||||
|
||||
stub_name = f"Ping_{scs_id}"
|
||||
self.stub(
|
||||
name=stub_name,
|
||||
receive_bytes=ping_request,
|
||||
send_fn=ping_response,
|
||||
)
|
||||
return stub_name
|
||||
|
||||
def build_sync_read_stub(
|
||||
self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0
|
||||
) -> str:
|
||||
address, length = self.ctrl_table[data_name]
|
||||
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
|
||||
if data_name != "Present_Position":
|
||||
raise NotImplementedError
|
||||
|
||||
return_packets = b"".join(
|
||||
MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items()
|
||||
)
|
||||
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
|
||||
|
||||
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values])
|
||||
self.stub(
|
||||
name=stub_name,
|
||||
receive_bytes=sync_read_request,
|
||||
send_fn=sync_read_response,
|
||||
)
|
||||
return stub_name
|
||||
|
||||
@staticmethod
|
||||
def _build_send_fn(packet: bytes, num_invalid_try: int = 0) -> Callable[[int], bytes]:
|
||||
def send_fn(_call_count: int) -> bytes:
|
||||
if num_invalid_try is not None and num_invalid_try >= _call_count:
|
||||
if num_invalid_try >= _call_count:
|
||||
return b""
|
||||
|
||||
packets = b"".join(
|
||||
MockStatusPacket.present_position(idx, pos)
|
||||
for idx, pos in zip(scs_ids, return_pos, strict=True)
|
||||
)
|
||||
return packets
|
||||
return packet
|
||||
|
||||
return send_fn
|
||||
|
|
|
@ -4,6 +4,7 @@ from unittest.mock import patch
|
|||
import dynamixel_sdk as dxl
|
||||
import pytest
|
||||
|
||||
from lerobot.common.motors import Motor
|
||||
from lerobot.common.motors.dynamixel import DynamixelMotorsBus
|
||||
from tests.mocks.mock_dynamixel import MockMotors, MockPortHandler
|
||||
|
||||
|
@ -17,6 +18,15 @@ def patch_port_handler():
|
|||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dummy_motors() -> dict[str, Motor]:
|
||||
return {
|
||||
"dummy_1": Motor(id=1, model="xl430-w250"),
|
||||
"dummy_2": Motor(id=2, model="xm540-w270"),
|
||||
"dummy_3": Motor(id=3, model="xl330-m077"),
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}")
|
||||
def test_autouse_patch():
|
||||
"""Ensures that the autouse fixture correctly patches dxl.PortHandler with MockPortHandler."""
|
||||
|
@ -68,9 +78,50 @@ def test_split_int_bytes_large_number():
|
|||
DynamixelMotorsBus.split_int_bytes(2**32, 4) # 4-byte max is 0xFFFFFFFF
|
||||
|
||||
|
||||
def test_abc_implementation():
|
||||
def test_abc_implementation(dummy_motors):
|
||||
"""Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
|
||||
DynamixelMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "xl330-m077")})
|
||||
DynamixelMotorsBus(port="/dev/dummy-port", motors=dummy_motors)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"idx, model_nb",
|
||||
[
|
||||
[1, 1190],
|
||||
[2, 1200],
|
||||
[3, 1120],
|
||||
],
|
||||
)
|
||||
def test_ping(idx, model_nb, dummy_motors):
|
||||
mock_motors = MockMotors()
|
||||
mock_motors.build_ping_stub(idx, model_nb)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
ping_model_nb = motors_bus.ping(idx)
|
||||
|
||||
assert ping_model_nb == model_nb
|
||||
|
||||
|
||||
def test_broadcast_ping(dummy_motors):
|
||||
expected_pings = {
|
||||
1: [1060, 50],
|
||||
2: [1120, 30],
|
||||
3: [1190, 10],
|
||||
}
|
||||
mock_motors = MockMotors()
|
||||
mock_motors.build_broadcast_ping_stub(expected_pings)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
ping_list = motors_bus.broadcast_ping()
|
||||
|
||||
assert ping_list == expected_pings
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -83,26 +134,25 @@ def test_abc_implementation():
|
|||
],
|
||||
ids=["None", "by ids", "by names", "mixed"],
|
||||
)
|
||||
def test_read_all_motors(motors):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
positions = [1337, 42, 4016]
|
||||
mock_motors.build_all_motors_stub("Present_Position", return_values=positions)
|
||||
def test_read_all_motors(motors, dummy_motors):
|
||||
mock_motors = MockMotors()
|
||||
expected_positions = {
|
||||
1: 1337,
|
||||
2: 42,
|
||||
3: 4016,
|
||||
}
|
||||
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_positions)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
"dummy_1": (1, "xl330-m077"),
|
||||
"dummy_2": (2, "xl330-m077"),
|
||||
"dummy_3": (3, "xl330-m077"),
|
||||
},
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
pos_dict = motors_bus.read("Present_Position", motors=motors)
|
||||
positions_read = motors_bus.read("Present_Position", motors=motors)
|
||||
|
||||
assert mock_motors.stubs["SyncRead_Present_Position_all"].called
|
||||
assert all(returned_pos == pos for returned_pos, pos in zip(pos_dict.values(), positions, strict=True))
|
||||
assert set(pos_dict) == {"dummy_1", "dummy_2", "dummy_3"}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
motors = ["dummy_1", "dummy_2", "dummy_3"] if motors is None else motors
|
||||
assert mock_motors.stubs[stub_name].called
|
||||
assert positions_read == dict(zip(motors, expected_positions.values(), strict=True))
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -113,24 +163,20 @@ def test_read_all_motors(motors):
|
|||
[3, 4016],
|
||||
],
|
||||
)
|
||||
def test_read_single_motor_by_name(idx, pos):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
|
||||
def test_read_single_motor_by_name(idx, pos, dummy_motors):
|
||||
mock_motors = MockMotors()
|
||||
expected_position = {idx: pos}
|
||||
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
"dummy_1": (1, "xl330-m077"),
|
||||
"dummy_2": (2, "xl330-m077"),
|
||||
"dummy_3": (3, "xl330-m077"),
|
||||
},
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
pos_dict = motors_bus.read("Present_Position", f"dummy_{idx}")
|
||||
|
||||
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
|
||||
assert mock_motors.stubs[stub_name].called
|
||||
assert pos_dict == {f"dummy_{idx}": pos}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -141,56 +187,49 @@ def test_read_single_motor_by_name(idx, pos):
|
|||
[3, 4016],
|
||||
],
|
||||
)
|
||||
def test_read_single_motor_by_id(idx, pos):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
|
||||
def test_read_single_motor_by_id(idx, pos, dummy_motors):
|
||||
mock_motors = MockMotors()
|
||||
expected_position = {idx: pos}
|
||||
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
"dummy_1": (1, "xl330-m077"),
|
||||
"dummy_2": (2, "xl330-m077"),
|
||||
"dummy_3": (3, "xl330-m077"),
|
||||
},
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
pos_dict = motors_bus.read("Present_Position", idx)
|
||||
|
||||
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
|
||||
assert pos_dict == {f"dummy_{idx}": pos}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
assert mock_motors.stubs[stub_name].called
|
||||
assert pos_dict == {idx: pos}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"num_retry, num_invalid_try, pos",
|
||||
[
|
||||
[1, 2, 1337],
|
||||
[0, 2, 1337],
|
||||
[2, 3, 42],
|
||||
[3, 2, 4016],
|
||||
[2, 1, 999],
|
||||
],
|
||||
)
|
||||
def test_read_num_retry(num_retry, num_invalid_try, pos):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
mock_motors.build_single_motor_stubs(
|
||||
"Present_Position", return_value=pos, num_invalid_try=num_invalid_try
|
||||
def test_read_num_retry(num_retry, num_invalid_try, pos, dummy_motors):
|
||||
mock_motors = MockMotors()
|
||||
expected_position = {1: pos}
|
||||
stub_name = mock_motors.build_sync_read_stub(
|
||||
"Present_Position", expected_position, num_invalid_try=num_invalid_try
|
||||
)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
"dummy_1": (1, "xl330-m077"),
|
||||
"dummy_2": (2, "xl330-m077"),
|
||||
"dummy_3": (3, "xl330-m077"),
|
||||
},
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
if num_retry >= num_invalid_try:
|
||||
pos_dict = motors_bus.read("Present_Position", 1, num_retry=num_retry)
|
||||
assert pos_dict == {"dummy_1": pos}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
assert pos_dict == {1: pos}
|
||||
else:
|
||||
with pytest.raises(ConnectionError):
|
||||
_ = motors_bus.read("Present_Position", 1, num_retry=num_retry)
|
||||
|
||||
assert mock_motors.stubs["SyncRead_Present_Position_1"].calls == num_retry
|
||||
expected_calls = min(1 + num_retry, 1 + num_invalid_try)
|
||||
assert mock_motors.stubs[stub_name].calls == expected_calls
|
||||
|
|
|
@ -4,6 +4,7 @@ from unittest.mock import patch
|
|||
import pytest
|
||||
import scservo_sdk as scs
|
||||
|
||||
from lerobot.common.motors import Motor
|
||||
from lerobot.common.motors.feetech import FeetechMotorsBus
|
||||
from tests.mocks.mock_feetech import MockMotors, MockPortHandler
|
||||
|
||||
|
@ -17,6 +18,15 @@ def patch_port_handler():
|
|||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dummy_motors() -> dict[str, Motor]:
|
||||
return {
|
||||
"dummy_1": Motor(id=1, model="sts3215"),
|
||||
"dummy_2": Motor(id=2, model="sts3215"),
|
||||
"dummy_3": Motor(id=3, model="sts3215"),
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}")
|
||||
def test_autouse_patch():
|
||||
"""Ensures that the autouse fixture correctly patches scs.PortHandler with MockPortHandler."""
|
||||
|
@ -68,9 +78,52 @@ def test_split_int_bytes_large_number():
|
|||
FeetechMotorsBus.split_int_bytes(2**32, 4) # 4-byte max is 0xFFFFFFFF
|
||||
|
||||
|
||||
def test_abc_implementation():
|
||||
def test_abc_implementation(dummy_motors):
|
||||
"""Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
|
||||
FeetechMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "sts3215")})
|
||||
FeetechMotorsBus(port="/dev/dummy-port", motors=dummy_motors)
|
||||
|
||||
|
||||
@pytest.mark.skip("TODO")
|
||||
@pytest.mark.parametrize(
|
||||
"idx, model_nb",
|
||||
[
|
||||
[1, 1190],
|
||||
[2, 1200],
|
||||
[3, 1120],
|
||||
],
|
||||
)
|
||||
def test_ping(idx, model_nb, dummy_motors):
|
||||
mock_motors = MockMotors()
|
||||
mock_motors.build_ping_stub(idx, model_nb)
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
ping_model_nb = motors_bus.ping(idx)
|
||||
|
||||
assert ping_model_nb == model_nb
|
||||
|
||||
|
||||
@pytest.mark.skip("TODO")
|
||||
def test_broadcast_ping(dummy_motors):
|
||||
expected_pings = {
|
||||
1: [1060, 50],
|
||||
2: [1120, 30],
|
||||
3: [1190, 10],
|
||||
}
|
||||
mock_motors = MockMotors()
|
||||
mock_motors.build_broadcast_ping_stub(expected_pings)
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
ping_list = motors_bus.broadcast_ping()
|
||||
|
||||
assert ping_list == expected_pings
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -83,26 +136,25 @@ def test_abc_implementation():
|
|||
],
|
||||
ids=["None", "by ids", "by names", "mixed"],
|
||||
)
|
||||
def test_read_all_motors(motors):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
positions = [1337, 42, 4016]
|
||||
mock_motors.build_all_motors_stub("Present_Position", return_values=positions)
|
||||
def test_read_all_motors(motors, dummy_motors):
|
||||
mock_motors = MockMotors()
|
||||
expected_positions = {
|
||||
1: 1337,
|
||||
2: 42,
|
||||
3: 4016,
|
||||
}
|
||||
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_positions)
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
"dummy_1": (1, "sts3215"),
|
||||
"dummy_2": (2, "sts3215"),
|
||||
"dummy_3": (3, "sts3215"),
|
||||
},
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
pos_dict = motors_bus.read("Present_Position", motors=motors)
|
||||
positions_read = motors_bus.read("Present_Position", motors=motors)
|
||||
|
||||
assert mock_motors.stubs["SyncRead_Present_Position_all"].called
|
||||
assert all(returned_pos == pos for returned_pos, pos in zip(pos_dict.values(), positions, strict=True))
|
||||
assert set(pos_dict) == {"dummy_1", "dummy_2", "dummy_3"}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
motors = ["dummy_1", "dummy_2", "dummy_3"] if motors is None else motors
|
||||
assert mock_motors.stubs[stub_name].called
|
||||
assert positions_read == dict(zip(motors, expected_positions.values(), strict=True))
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -113,24 +165,20 @@ def test_read_all_motors(motors):
|
|||
[3, 4016],
|
||||
],
|
||||
)
|
||||
def test_read_single_motor_by_name(idx, pos):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
|
||||
def test_read_single_motor_by_name(idx, pos, dummy_motors):
|
||||
mock_motors = MockMotors()
|
||||
expected_position = {idx: pos}
|
||||
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position)
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
"dummy_1": (1, "sts3215"),
|
||||
"dummy_2": (2, "sts3215"),
|
||||
"dummy_3": (3, "sts3215"),
|
||||
},
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
pos_dict = motors_bus.read("Present_Position", f"dummy_{idx}")
|
||||
|
||||
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
|
||||
assert mock_motors.stubs[stub_name].called
|
||||
assert pos_dict == {f"dummy_{idx}": pos}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -141,56 +189,49 @@ def test_read_single_motor_by_name(idx, pos):
|
|||
[3, 4016],
|
||||
],
|
||||
)
|
||||
def test_read_single_motor_by_id(idx, pos):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
|
||||
def test_read_single_motor_by_id(idx, pos, dummy_motors):
|
||||
mock_motors = MockMotors()
|
||||
expected_position = {idx: pos}
|
||||
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position)
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
"dummy_1": (1, "sts3215"),
|
||||
"dummy_2": (2, "sts3215"),
|
||||
"dummy_3": (3, "sts3215"),
|
||||
},
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
pos_dict = motors_bus.read("Present_Position", idx)
|
||||
|
||||
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
|
||||
assert pos_dict == {f"dummy_{idx}": pos}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
assert mock_motors.stubs[stub_name].called
|
||||
assert pos_dict == {idx: pos}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"num_retry, num_invalid_try, pos",
|
||||
[
|
||||
[1, 2, 1337],
|
||||
[0, 2, 1337],
|
||||
[2, 3, 42],
|
||||
[3, 2, 4016],
|
||||
[2, 1, 999],
|
||||
],
|
||||
)
|
||||
def test_read_num_retry(num_retry, num_invalid_try, pos):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
mock_motors.build_single_motor_stubs(
|
||||
"Present_Position", return_value=pos, num_invalid_try=num_invalid_try
|
||||
def test_read_num_retry(num_retry, num_invalid_try, pos, dummy_motors):
|
||||
mock_motors = MockMotors()
|
||||
expected_position = {1: pos}
|
||||
stub_name = mock_motors.build_sync_read_stub(
|
||||
"Present_Position", expected_position, num_invalid_try=num_invalid_try
|
||||
)
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
"dummy_1": (1, "sts3215"),
|
||||
"dummy_2": (2, "sts3215"),
|
||||
"dummy_3": (3, "sts3215"),
|
||||
},
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
if num_retry >= num_invalid_try:
|
||||
pos_dict = motors_bus.read("Present_Position", 1, num_retry=num_retry)
|
||||
assert pos_dict == {"dummy_1": pos}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
assert pos_dict == {1: pos}
|
||||
else:
|
||||
with pytest.raises(ConnectionError):
|
||||
_ = motors_bus.read("Present_Position", 1, num_retry=num_retry)
|
||||
|
||||
assert mock_motors.stubs["SyncRead_Present_Position_1"].calls == num_retry
|
||||
expected_calls = min(1 + num_retry, 1 + num_invalid_try)
|
||||
assert mock_motors.stubs[stub_name].calls == expected_calls
|
||||
|
|
Loading…
Reference in New Issue