diff --git a/lerobot/common/motors/dynamixel/dynamixel.py b/lerobot/common/motors/dynamixel/dynamixel.py index 2abce818..cb2d2829 100644 --- a/lerobot/common/motors/dynamixel/dynamixel.py +++ b/lerobot/common/motors/dynamixel/dynamixel.py @@ -29,7 +29,8 @@ from .tables import ( AVAILABLE_BAUDRATES, MODEL_BAUDRATE_TABLE, MODEL_CONTROL_TABLE, - MODEL_NUMBER, + MODEL_ENCODING_TABLE, + MODEL_NUMBER_TABLE, MODEL_RESOLUTION, ) @@ -37,7 +38,7 @@ PROTOCOL_VERSION = 2.0 BAUDRATE = 1_000_000 DEFAULT_TIMEOUT_MS = 1000 -NORMALIZATION_REQUIRED = ["Goal_Position", "Present_Position"] +NORMALIZED_DATA = ["Goal_Position", "Present_Position"] CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"] logger = logging.getLogger(__name__) @@ -94,9 +95,10 @@ class DynamixelMotorsBus(MotorsBus): default_timeout = DEFAULT_TIMEOUT_MS model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE) model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) - model_number_table = deepcopy(MODEL_NUMBER) + model_encoding_table = deepcopy(MODEL_ENCODING_TABLE) + model_number_table = deepcopy(MODEL_NUMBER_TABLE) model_resolution_table = deepcopy(MODEL_RESOLUTION) - normalization_required = deepcopy(NORMALIZATION_REQUIRED) + normalized_data = deepcopy(NORMALIZED_DATA) def __init__( self, @@ -128,11 +130,25 @@ class DynamixelMotorsBus(MotorsBus): for motor in motors: self.write("Torque_Enable", motor, TorqueMode.ENABLED.value) - def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: - return encode_twos_complement(value, n_bytes) + def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]: + for id_ in ids_values: + model = self._id_to_model(id_) + encoding_table = self.model_encoding_table.get(model) + if encoding_table and data_name in encoding_table: + n_bytes = encoding_table[data_name] + ids_values[id_] = encode_twos_complement(ids_values[id_], n_bytes) - def _decode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int: - return decode_twos_complement(value, n_bytes) + return ids_values + + def _decode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]: + for id_ in ids_values: + model = self._id_to_model(id_) + encoding_table = self.model_encoding_table.get(model) + if encoding_table and data_name in encoding_table: + n_bytes = encoding_table[data_name] + ids_values[id_] = decode_twos_complement(ids_values[id_], n_bytes) + + return ids_values def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]: """ @@ -182,13 +198,13 @@ class DynamixelMotorsBus(MotorsBus): data_list, comm = self.packet_handler.broadcastPing(self.port_handler) if self._is_comm_success(comm): break - logger.debug(f"Broadcast failed on port '{self.port}' ({n_try=})") + logger.debug(f"Broadcast ping failed on port '{self.port}' ({n_try=})") logger.debug(self.packet_handler.getTxRxResult(comm)) if not self._is_comm_success(comm): if raise_on_error: raise ConnectionError(self.packet_handler.getTxRxResult(comm)) - return data_list if data_list else None + return return {id_: data[0] for id_, data in data_list.items()} diff --git a/lerobot/common/motors/dynamixel/tables.py b/lerobot/common/motors/dynamixel/tables.py index 525404bf..a9f6d9e7 100644 --- a/lerobot/common/motors/dynamixel/tables.py +++ b/lerobot/common/motors/dynamixel/tables.py @@ -1,4 +1,4 @@ -# data_name: (address, size_byte) +# {data_name: (address, size_byte)} # https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#control-table X_SERIES_CONTROL_TABLE = { "Model_Number": (0, 2), @@ -66,6 +66,27 @@ X_SERIES_BAUDRATE_TABLE = { 6: 4_000_000, } +# {data_name: size_byte} +X_SERIES_ENCODINGS_TABLE = { + "Homing_Offset": X_SERIES_CONTROL_TABLE["Homing_Offset"][1], + "Goal_PWM": X_SERIES_CONTROL_TABLE["Goal_PWM"][1], + "Goal_Current": X_SERIES_CONTROL_TABLE["Goal_Current"][1], + "Goal_Velocity": X_SERIES_CONTROL_TABLE["Goal_Velocity"][1], + "Present_PWM": X_SERIES_CONTROL_TABLE["Present_PWM"][1], + "Present_Current": X_SERIES_CONTROL_TABLE["Present_Current"][1], + "Present_Velocity": X_SERIES_CONTROL_TABLE["Present_Velocity"][1], +} + +MODEL_ENCODING_TABLE = { + "x_series": X_SERIES_ENCODINGS_TABLE, + "xl330-m077": X_SERIES_ENCODINGS_TABLE, + "xl330-m288": X_SERIES_ENCODINGS_TABLE, + "xl430-w250": X_SERIES_ENCODINGS_TABLE, + "xm430-w350": X_SERIES_ENCODINGS_TABLE, + "xm540-w270": X_SERIES_ENCODINGS_TABLE, + "xc430-w150": X_SERIES_ENCODINGS_TABLE, +} + # {model: model_resolution} # https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#specifications MODEL_RESOLUTION = { @@ -80,7 +101,7 @@ MODEL_RESOLUTION = { # {model: model_number} # https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#control-table-of-eeprom-area -MODEL_NUMBER = { +MODEL_NUMBER_TABLE = { "xl330-m077": 1190, "xl330-m288": 1200, "xl430-w250": 1060, diff --git a/tests/motors/test_dynamixel.py b/tests/motors/test_dynamixel.py index 9ebbbacb..6fd0e3a7 100644 --- a/tests/motors/test_dynamixel.py +++ b/tests/motors/test_dynamixel.py @@ -6,7 +6,7 @@ import dynamixel_sdk as dxl import pytest from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode -from lerobot.common.motors.dynamixel import MODEL_NUMBER, DynamixelMotorsBus +from lerobot.common.motors.dynamixel import MODEL_NUMBER_TABLE, DynamixelMotorsBus from lerobot.common.utils.encoding_utils import encode_twos_complement from tests.mocks.mock_dynamixel import MockMotors, MockPortHandler @@ -113,7 +113,7 @@ def test_abc_implementation(dummy_motors): @pytest.mark.parametrize("id_", [1, 2, 3]) def test_ping(id_, mock_motors, dummy_motors): - expected_model_nb = MODEL_NUMBER[dummy_motors[f"dummy_{id_}"].model] + expected_model_nb = MODEL_NUMBER_TABLE[dummy_motors[f"dummy_{id_}"].model] stub_name = mock_motors.build_ping_stub(id_, expected_model_nb) motors_bus = DynamixelMotorsBus( port=mock_motors.port, @@ -129,7 +129,7 @@ def test_ping(id_, mock_motors, dummy_motors): def test_broadcast_ping(mock_motors, dummy_motors): models = {m.id: m.model for m in dummy_motors.values()} - expected_model_nbs = {id_: MODEL_NUMBER[model] for id_, model in models.items()} + expected_model_nbs = {id_: MODEL_NUMBER_TABLE[model] for id_, model in models.items()} stub_name = mock_motors.build_broadcast_ping_stub(expected_model_nbs) motors_bus = DynamixelMotorsBus( port=mock_motors.port, @@ -171,55 +171,7 @@ def test_sync_read_none(mock_motors, dummy_motors): (3, 4016), ], ) -def test_sync_read_by_id(id_, position, mock_motors, dummy_motors): - expected_position = {id_: position} - stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) - - read_position = motors_bus.sync_read("Present_Position", id_, normalize=False) - - assert mock_motors.stubs[stub_name].called - assert read_position == expected_position - - -@pytest.mark.parametrize( - "ids, positions", - [ - ([1], [1337]), - ([1, 2], [1337, 42]), - ([1, 2, 3], [1337, 42, 4016]), - ], - ids=["1 motor", "2 motors", "3 motors"], -) # fmt: skip -def test_sync_read_by_ids(ids, positions, mock_motors, dummy_motors): - assert len(ids) == len(positions) - expected_positions = dict(zip(ids, positions, strict=True)) - stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_positions) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) - - read_positions = motors_bus.sync_read("Present_Position", ids, normalize=False) - - assert mock_motors.stubs[stub_name].called - assert read_positions == expected_positions - - -@pytest.mark.parametrize( - "id_, position", - [ - (1, 1337), - (2, 42), - (3, 4016), - ], -) -def test_sync_read_by_name(id_, position, mock_motors, dummy_motors): +def test_sync_read_single_value(id_, position, mock_motors, dummy_motors): expected_position = {f"dummy_{id_}": position} stub_name = mock_motors.build_sync_read_stub("Present_Position", {id_: position}) motors_bus = DynamixelMotorsBus( @@ -243,7 +195,7 @@ def test_sync_read_by_name(id_, position, mock_motors, dummy_motors): ], ids=["1 motor", "2 motors", "3 motors"], ) # fmt: skip -def test_sync_read_by_names(ids, positions, mock_motors, dummy_motors): +def test_sync_read(ids, positions, mock_motors, dummy_motors): assert len(ids) == len(positions) names = [f"dummy_{dxl_id}" for dxl_id in ids] expected_positions = dict(zip(names, positions, strict=True)) @@ -271,9 +223,9 @@ def test_sync_read_by_names(ids, positions, mock_motors, dummy_motors): ], ) def test_sync_read_num_retry(num_retry, num_invalid_try, pos, mock_motors, dummy_motors): - expected_position = {1: pos} + expected_position = {"dummy_1": pos} stub_name = mock_motors.build_sync_read_stub( - "Present_Position", expected_position, num_invalid_try=num_invalid_try + "Present_Position", {1: pos}, num_invalid_try=num_invalid_try ) motors_bus = DynamixelMotorsBus( port=mock_motors.port, @@ -282,11 +234,11 @@ def test_sync_read_num_retry(num_retry, num_invalid_try, pos, mock_motors, dummy motors_bus.connect(assert_motors_exist=False) if num_retry >= num_invalid_try: - pos_dict = motors_bus.sync_read("Present_Position", 1, normalize=False, num_retry=num_retry) - assert pos_dict == {1: pos} + pos_dict = motors_bus.sync_read("Present_Position", "dummy_1", normalize=False, num_retry=num_retry) + assert pos_dict == expected_position else: with pytest.raises(ConnectionError): - _ = motors_bus.sync_read("Present_Position", 1, normalize=False, num_retry=num_retry) + _ = motors_bus.sync_read("Present_Position", "dummy_1", normalize=False, num_retry=num_retry) expected_calls = min(1 + num_retry, 1 + num_invalid_try) assert mock_motors.stubs[stub_name].calls == expected_calls @@ -315,28 +267,6 @@ def test_sync_write_single_value(data_name, value, mock_motors, dummy_motors): assert mock_motors.stubs[stub_name].wait_called() -@pytest.mark.parametrize( - "id_, position", - [ - (1, 1337), - (2, 42), - (3, 4016), - ], -) -def test_sync_write_by_id(id_, position, mock_motors, dummy_motors): - value = {id_: position} - stub_name = mock_motors.build_sync_write_stub("Goal_Position", value) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) - - motors_bus.sync_write("Goal_Position", value, normalize=False) - - assert mock_motors.stubs[stub_name].wait_called() - - @pytest.mark.parametrize( "ids, positions", [ @@ -346,54 +276,7 @@ def test_sync_write_by_id(id_, position, mock_motors, dummy_motors): ], ids=["1 motor", "2 motors", "3 motors"], ) # fmt: skip -def test_sync_write_by_ids(ids, positions, mock_motors, dummy_motors): - assert len(ids) == len(positions) - values = dict(zip(ids, positions, strict=True)) - stub_name = mock_motors.build_sync_write_stub("Goal_Position", values) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) - - motors_bus.sync_write("Goal_Position", values, normalize=False) - - assert mock_motors.stubs[stub_name].wait_called() - - -@pytest.mark.parametrize( - "id_, position", - [ - (1, 1337), - (2, 42), - (3, 4016), - ], -) -def test_sync_write_by_name(id_, position, mock_motors, dummy_motors): - id_value = {id_: position} - stub_name = mock_motors.build_sync_write_stub("Goal_Position", id_value) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) - - write_value = {f"dummy_{id_}": position} - motors_bus.sync_write("Goal_Position", write_value, normalize=False) - - assert mock_motors.stubs[stub_name].wait_called() - - -@pytest.mark.parametrize( - "ids, positions", - [ - ([1], [1337]), - ([1, 2], [1337, 42]), - ([1, 2, 3], [1337, 42, 4016]), - ], - ids=["1 motor", "2 motors", "3 motors"], -) # fmt: skip -def test_sync_write_by_names(ids, positions, mock_motors, dummy_motors): +def test_sync_write(ids, positions, mock_motors, dummy_motors): assert len(ids) == len(positions) ids_values = dict(zip(ids, positions, strict=True)) stub_name = mock_motors.build_sync_write_stub("Goal_Position", ids_values) @@ -418,29 +301,7 @@ def test_sync_write_by_names(ids, positions, mock_motors, dummy_motors): ("Goal_Position", 3, 42), ], ) -def test_write_by_id(data_name, dxl_id, value, mock_motors, dummy_motors): - stub_name = mock_motors.build_write_stub(data_name, dxl_id, value) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) - - motors_bus.write(data_name, dxl_id, value, normalize=False) - - assert mock_motors.stubs[stub_name].called - - -@pytest.mark.parametrize( - "data_name, dxl_id, value", - [ - ("Torque_Enable", 1, 0), - ("Torque_Enable", 1, 1), - ("Goal_Position", 2, 1337), - ("Goal_Position", 3, 42), - ], -) -def test_write_by_name(data_name, dxl_id, value, mock_motors, dummy_motors): +def test_write(data_name, dxl_id, value, mock_motors, dummy_motors): stub_name = mock_motors.build_write_stub(data_name, dxl_id, value) motors_bus = DynamixelMotorsBus( port=mock_motors.port,