diff --git a/.gitignore b/.gitignore index d6c51c90..42f2e755 100644 --- a/.gitignore +++ b/.gitignore @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +.dev # Logging logs tmp diff --git a/lerobot/common/motors/dynamixel/dynamixel.py b/lerobot/common/motors/dynamixel/dynamixel.py index 1ebefac0..21f2524c 100644 --- a/lerobot/common/motors/dynamixel/dynamixel.py +++ b/lerobot/common/motors/dynamixel/dynamixel.py @@ -143,11 +143,11 @@ class DynamixelMotorsBus(MotorsBus): self.write("Return_Delay_Time", id_, 0) def disable_torque(self, motors: str | list[str] | None = None) -> None: - for name in self._get_names_list(motors): + for name in self._get_motors_list(motors): self.write("Torque_Enable", name, TorqueMode.DISABLED.value) def enable_torque(self, motors: str | list[str] | None = None) -> None: - for name in self._get_names_list(motors): + for name in self._get_motors_list(motors): self.write("Torque_Enable", name, TorqueMode.ENABLED.value) def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]: diff --git a/tests/mocks/mock_dynamixel.py b/tests/mocks/mock_dynamixel.py index 1c1ab6fe..a9d434e9 100644 --- a/tests/mocks/mock_dynamixel.py +++ b/tests/mocks/mock_dynamixel.py @@ -46,41 +46,6 @@ DXL_CRC_TABLE = [ 0x8213, 0x0216, 0x021C, 0x8219, 0x0208, 0x820D, 0x8207, 0x0202 ] # fmt: skip -# https://emanual.robotis.com/docs/en/dxl/protocol2/#instruction -INSTRUCTION_TYPES = { - "Ping": dxl.INST_PING, # Checks whether the Packet has arrived at a device with the same ID as the specified packet ID - "Read": dxl.INST_READ, # Read data from the Device - "Write": dxl.INST_WRITE, # Write data to the Device - "Reg_Write": dxl.INST_REG_WRITE, # Register the Instruction Packet in standby status; Packet can later be executed using the Action command - "Action": dxl.INST_ACTION, # Executes a Packet that was registered beforehand using Reg Write - "Factory_Reset": dxl.INST_FACTORY_RESET, # Resets the Control Table to its initial factory default settings - "Reboot": dxl.INST_REBOOT, # Reboot the Device - "Clear": dxl.INST_CLEAR, # Reset certain information stored in memory - "Control_Table_Backup": 0x20, # Store current Control Table status data to a Backup or to restore backup EEPROM data. - "Status": dxl.INST_STATUS, # Return packet sent following the execution of an Instruction Packet - "Sync_Read": dxl.INST_SYNC_READ, # Read data from multiple devices with the same Address with the same length at once - "Sync_Write": dxl.INST_SYNC_WRITE, # Write data to multiple devices with the same Address with the same length at once - "Fast_Sync_Read": 0x8A, # Read data from multiple devices with the same Address with the same length at once - "Bulk_Read": dxl.INST_BULK_READ, # Read data from multiple devices with different Addresses with different lengths at once - "Bulk_Write": dxl.INST_BULK_WRITE, # Write data to multiple devices with different Addresses with different lengths at once - "Fast_Bulk_Read": 0x9A, # Read data from multiple devices with different Addresses with different lengths at once -} # fmt: skip - -# https://emanual.robotis.com/docs/en/dxl/protocol2/#error -ERROR_TYPE = { - "Success": 0x00, # No error - "Result_Fail": dxl.ERRNUM_RESULT_FAIL, # Failed to process the sent Instruction Packet - "Instruction_Error": dxl.ERRNUM_INSTRUCTION, # An undefined Instruction has been usedAction has been used without Reg Write - "CRC_Error": dxl.ERRNUM_CRC, # The CRC of the sent Packet does not match the expected value - "Data_Range_Error": dxl.ERRNUM_DATA_RANGE, # Data to be written to the specified Address is outside the range of the minimum/maximum value - "Data_Length_Error": dxl.ERRNUM_DATA_LENGTH, # Attempted to write Data that is shorter than the required data length of the specified Address - # (ex: when you attempt to only use 2 bytes of a register that has been defined as 4 bytes) - "Data_Limit_Error": dxl.ERRNUM_DATA_LIMIT, # Data to be written to the specified Address is outside of the configured Limit value - "Access_Error": dxl.ERRNUM_ACCESS, # Attempted to write a value to an Address that is Read Only or has not been defined - # Attempted to read a value from an Address that is Write Only or has not been defined - # Attempted to write a value to an EEPROM register while Torque was Enabled. -} # fmt: skip - class MockDynamixelPacketv2(abc.ABC): @classmethod @@ -187,14 +152,14 @@ class MockInstructionPacket(MockDynamixelPacketv2): """ @classmethod - def _build(cls, dxl_id: int, params: list[int], length: int, instruct_type: str) -> list[int]: - instruct_value = INSTRUCTION_TYPES[instruct_type] + def _build(cls, dxl_id: int, params: list[int], length: int, instruction: int) -> list[int]: + length = len(params) + 3 return [ 0xFF, 0xFF, 0xFD, 0x00, # header dxl_id, # servo id dxl.DXL_LOBYTE(length), # length_l dxl.DXL_HIBYTE(length), # length_h - instruct_value, # instruction type + instruction, # instruction type *params, # data bytes 0x00, 0x00 # placeholder for CRC ] # fmt: skip @@ -210,8 +175,39 @@ class MockInstructionPacket(MockDynamixelPacketv2): No parameters required. """ - params, length = [], 3 - return cls.build(dxl_id=dxl_id, params=params, length=length, instruct_type="Ping") + return cls.build(dxl_id=dxl_id, params=[], length=3, instruction=dxl.INST_PING) + + @classmethod + def read( + cls, + dxl_id: int, + start_address: int, + data_length: int, + ) -> bytes: + """ + Builds a "Read" instruction. + https://emanual.robotis.com/docs/en/dxl/protocol2/#read-0x02 + + The parameters for Read (Protocol 2.0) are: + param[0] = start_address L + param[1] = start_address H + param[2] = data_length L + param[3] = data_length H + + And 'length' = data_length + 5, where: + +1 is for instruction byte, + +2 is for the length bytes, + +2 is for the CRC at the end. + """ + params = [ + dxl.DXL_LOBYTE(start_address), + dxl.DXL_HIBYTE(start_address), + dxl.DXL_LOBYTE(data_length), + dxl.DXL_HIBYTE(data_length), + ] + length = len(params) + 3 + # length = data_length + 5 + return cls.build(dxl_id=dxl_id, params=params, length=length, instruction=dxl.INST_READ) @classmethod def write( @@ -245,7 +241,7 @@ class MockInstructionPacket(MockDynamixelPacketv2): *data, ] length = data_length + 5 - return cls.build(dxl_id=dxl_id, params=params, length=length, instruct_type="Write") + return cls.build(dxl_id=dxl_id, params=params, length=length, instruction=dxl.INST_WRITE) @classmethod def sync_read( @@ -279,7 +275,9 @@ class MockInstructionPacket(MockDynamixelPacketv2): *dxl_ids, ] length = len(dxl_ids) + 7 - return cls.build(dxl_id=dxl.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Read") + return cls.build( + dxl_id=dxl.BROADCAST_ID, params=params, length=length, instruction=dxl.INST_SYNC_READ + ) @classmethod def sync_write( @@ -326,7 +324,9 @@ class MockInstructionPacket(MockDynamixelPacketv2): *data, ] length = len(ids_values) * (1 + data_length) + 7 - return cls.build(dxl_id=dxl.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Write") + return cls.build( + dxl_id=dxl.BROADCAST_ID, params=params, length=length, instruction=dxl.INST_SYNC_WRITE + ) class MockStatusPacket(MockDynamixelPacketv2): @@ -342,21 +342,20 @@ class MockStatusPacket(MockDynamixelPacketv2): """ @classmethod - def _build(cls, dxl_id: int, params: list[int], length: int, error: str = "Success") -> list[int]: - err_byte = ERROR_TYPE[error] + def _build(cls, dxl_id: int, params: list[int], length: int, error: int = 0) -> list[int]: return [ 0xFF, 0xFF, 0xFD, 0x00, # header dxl_id, # servo id dxl.DXL_LOBYTE(length), # length_l dxl.DXL_HIBYTE(length), # length_h 0x55, # instruction = 'status' - err_byte, # error + error, # error *params, # data bytes 0x00, 0x00 # placeholder for CRC ] # fmt: skip @classmethod - def ping(cls, dxl_id: int, model_nb: int = 1190, firm_ver: int = 50) -> bytes: + def ping(cls, dxl_id: int, model_nb: int = 1190, firm_ver: int = 50, error: int = 0) -> bytes: """ Builds a 'Ping' status packet. https://emanual.robotis.com/docs/en/dxl/protocol2/#ping-0x01 @@ -373,10 +372,10 @@ class MockStatusPacket(MockDynamixelPacketv2): """ params = [dxl.DXL_LOBYTE(model_nb), dxl.DXL_HIBYTE(model_nb), firm_ver] length = 7 - return cls.build(dxl_id, params=params, length=length) + return cls.build(dxl_id, params=params, length=length, error=error) @classmethod - def read(cls, dxl_id: int, value: int, param_length: int) -> bytes: + def read(cls, dxl_id: int, value: int, param_length: int, error: int = 0) -> bytes: """ Builds a 'Read' status packet (also works for 'Sync Read') https://emanual.robotis.com/docs/en/dxl/protocol2/#read-0x02 @@ -392,7 +391,7 @@ class MockStatusPacket(MockDynamixelPacketv2): """ params = _split_into_byte_chunks(value, param_length) length = param_length + 4 - return cls.build(dxl_id, params=params, length=length) + return cls.build(dxl_id, params=params, length=length, error=error) class MockPortHandler(dxl.PortHandler): @@ -456,10 +455,10 @@ class MockMotors(MockSerial): return stub_name def build_ping_stub( - self, dxl_id: int, model_nb: int, firm_ver: int = 50, num_invalid_try: int = 0 + self, dxl_id: int, model_nb: int, firm_ver: int = 50, num_invalid_try: int = 0, error: int = 0 ) -> str: ping_request = MockInstructionPacket.ping(dxl_id) - return_packet = MockStatusPacket.ping(dxl_id, model_nb, firm_ver) + return_packet = MockStatusPacket.ping(dxl_id, model_nb, firm_ver, error) ping_response = self._build_send_fn(return_packet, num_invalid_try) stub_name = f"Ping_{dxl_id}" self.stub( @@ -469,14 +468,63 @@ class MockMotors(MockSerial): ) return stub_name - def build_sync_read_stub( - self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0 + def build_read_stub( + self, + address: int, + length: int, + dxl_id: int, + value: int, + reply: bool = True, + error: int = 0, + num_invalid_try: int = 0, + ) -> str: + read_request = MockInstructionPacket.read(dxl_id, address, length) + return_packet = MockStatusPacket.read(dxl_id, value, length, error) if reply else b"" + read_response = self._build_send_fn(return_packet, num_invalid_try) + stub_name = f"Read_{address}_{length}_{dxl_id}_{value}_{error}" + self.stub( + name=stub_name, + receive_bytes=read_request, + send_fn=read_response, + ) + return stub_name + + def build_write_stub( + self, + address: int, + length: int, + dxl_id: int, + value: int, + reply: bool = True, + error: int = 0, + num_invalid_try: int = 0, + ) -> str: + sync_read_request = MockInstructionPacket.write(dxl_id, value, address, length) + return_packet = MockStatusPacket.build(dxl_id, params=[], length=4, error=error) if reply else b"" + stub_name = f"Write_{address}_{length}_{dxl_id}" + self.stub( + name=stub_name, + receive_bytes=sync_read_request, + send_fn=self._build_send_fn(return_packet, num_invalid_try), + ) + return stub_name + + def build_sync_read_stub( + self, + address: int, + length: int, + ids_values: dict[int, int], + reply: bool = True, + num_invalid_try: int = 0, ) -> str: - address, length = self.ctrl_table[data_name] sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) - return_packets = b"".join(MockStatusPacket.read(id_, pos, length) for id_, pos in ids_values.items()) + return_packets = ( + b"".join(MockStatusPacket.read(id_, pos, length) for id_, pos in ids_values.items()) + if reply + else b"" + ) sync_read_response = self._build_send_fn(return_packets, num_invalid_try) - stub_name = f"Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values]) + stub_name = f"Sync_Read_{address}_{length}_" + "_".join([str(id_) for id_ in ids_values]) self.stub( name=stub_name, receive_bytes=sync_read_request, @@ -485,11 +533,10 @@ class MockMotors(MockSerial): return stub_name def build_sequential_sync_read_stub( - self, data_name: str, ids_values: dict[int, list[int]] | None = None + self, address: int, length: int, ids_values: dict[int, list[int]] | None = None ) -> str: sequence_length = len(next(iter(ids_values.values()))) assert all(len(positions) == sequence_length for positions in ids_values.values()) - address, length = self.ctrl_table[data_name] sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) sequential_packets = [] for count in range(sequence_length): @@ -499,7 +546,7 @@ class MockMotors(MockSerial): sequential_packets.append(return_packets) sync_read_response = self._build_sequential_send_fn(sequential_packets) - stub_name = f"Seq_Sync_Read_{data_name}_" + "_".join([str(id_) for id_ in ids_values]) + stub_name = f"Seq_Sync_Read_{address}_{length}_" + "_".join([str(id_) for id_ in ids_values]) self.stub( name=stub_name, receive_bytes=sync_read_request, @@ -508,11 +555,10 @@ class MockMotors(MockSerial): return stub_name def build_sync_write_stub( - self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0 + self, address: int, length: int, ids_values: dict[int, int], num_invalid_try: int = 0 ) -> str: - address, length = self.ctrl_table[data_name] sync_read_request = MockInstructionPacket.sync_write(ids_values, address, length) - stub_name = f"Sync_Write_{data_name}_" + "_".join([str(id_) for id_ in ids_values]) + stub_name = f"Sync_Write_{address}_{length}_" + "_".join([str(id_) for id_ in ids_values]) self.stub( name=stub_name, receive_bytes=sync_read_request, @@ -520,20 +566,6 @@ class MockMotors(MockSerial): ) return stub_name - def build_write_stub( - self, data_name: str, dxl_id: int, value: int, error: str = "Success", num_invalid_try: int = 0 - ) -> str: - address, length = self.ctrl_table[data_name] - sync_read_request = MockInstructionPacket.write(dxl_id, value, address, length) - return_packet = MockStatusPacket.build(dxl_id, params=[], length=4, error=error) - stub_name = f"Write_{data_name}_{dxl_id}" - self.stub( - name=stub_name, - receive_bytes=sync_read_request, - send_fn=self._build_send_fn(return_packet, num_invalid_try), - ) - return stub_name - @staticmethod def _build_send_fn(packet: bytes, num_invalid_try: int = 0) -> Callable[[int], bytes]: def send_fn(_call_count: int) -> bytes: diff --git a/tests/motors/test_dynamixel.py b/tests/motors/test_dynamixel.py index 2b708836..163af2d1 100644 --- a/tests/motors/test_dynamixel.py +++ b/tests/motors/test_dynamixel.py @@ -1,3 +1,4 @@ +import re import sys from typing import Generator from unittest.mock import MagicMock, patch @@ -7,6 +8,7 @@ import pytest from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.dynamixel import MODEL_NUMBER_TABLE, DynamixelMotorsBus +from lerobot.common.motors.dynamixel.tables import X_SERIES_CONTROL_TABLE from lerobot.common.utils.encoding_utils import encode_twos_complement from tests.mocks.mock_dynamixel import MockMotors, MockPortHandler @@ -87,7 +89,7 @@ def test_abc_implementation(dummy_motors): @pytest.mark.parametrize("id_", [1, 2, 3]) def test_ping(id_, mock_motors, dummy_motors): expected_model_nb = MODEL_NUMBER_TABLE[dummy_motors[f"dummy_{id_}"].model] - stub_name = mock_motors.build_ping_stub(id_, expected_model_nb) + stub = mock_motors.build_ping_stub(id_, expected_model_nb) motors_bus = DynamixelMotorsBus( port=mock_motors.port, motors=dummy_motors, @@ -97,13 +99,13 @@ def test_ping(id_, mock_motors, dummy_motors): ping_model_nb = motors_bus.ping(id_) assert ping_model_nb == expected_model_nb - assert mock_motors.stubs[stub_name].called + assert mock_motors.stubs[stub].called def test_broadcast_ping(mock_motors, dummy_motors): models = {m.id: m.model for m in dummy_motors.values()} expected_model_nbs = {id_: MODEL_NUMBER_TABLE[model] for id_, model in models.items()} - stub_name = mock_motors.build_broadcast_ping_stub(expected_model_nbs) + stub = mock_motors.build_broadcast_ping_stub(expected_model_nbs) motors_bus = DynamixelMotorsBus( port=mock_motors.port, motors=dummy_motors, @@ -113,178 +115,202 @@ def test_broadcast_ping(mock_motors, dummy_motors): ping_model_nbs = motors_bus.broadcast_ping() assert ping_model_nbs == expected_model_nbs - assert mock_motors.stubs[stub_name].called - - -def test_sync_read_none(mock_motors, dummy_motors): - expected_positions = { - "dummy_1": 1337, - "dummy_2": 42, - "dummy_3": 4016, - } - ids_values = dict(zip([1, 2, 3], expected_positions.values(), strict=True)) - stub_name = mock_motors.build_sync_read_stub("Present_Position", ids_values) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) - - read_positions = motors_bus.sync_read("Present_Position", normalize=False) - - assert mock_motors.stubs[stub_name].called - assert read_positions == expected_positions + assert mock_motors.stubs[stub].called @pytest.mark.parametrize( - "id_, position", + "addr, length, id_, value", [ - (1, 1337), - (2, 42), - (3, 4016), + (0, 1, 1, 2), + (10, 2, 2, 999), + (42, 4, 3, 1337), ], ) -def test_sync_read_single_value(id_, position, mock_motors, dummy_motors): - expected_position = {f"dummy_{id_}": position} - stub_name = mock_motors.build_sync_read_stub("Present_Position", {id_: position}) +def test__read(addr, length, id_, value, mock_motors, dummy_motors): + stub = mock_motors.build_read_stub(addr, length, id_, value) motors_bus = DynamixelMotorsBus( port=mock_motors.port, motors=dummy_motors, ) motors_bus.connect(assert_motors_exist=False) - read_position = motors_bus.sync_read("Present_Position", f"dummy_{id_}", normalize=False) + read_value, _, _ = motors_bus._read(addr, length, id_) - assert mock_motors.stubs[stub_name].called - assert read_position == expected_position + assert mock_motors.stubs[stub].called + assert read_value == value -@pytest.mark.parametrize( - "ids, positions", - [ - ([1], [1337]), - ([1, 2], [1337, 42]), - ([1, 2, 3], [1337, 42, 4016]), - ], - ids=["1 motor", "2 motors", "3 motors"], -) # fmt: skip -def test_sync_read(ids, positions, mock_motors, dummy_motors): - assert len(ids) == len(positions) - names = [f"dummy_{dxl_id}" for dxl_id in ids] - expected_positions = dict(zip(names, positions, strict=True)) - ids_values = dict(zip(ids, positions, strict=True)) - stub_name = mock_motors.build_sync_read_stub("Present_Position", ids_values) +@pytest.mark.parametrize("raise_on_error", (True, False)) +def test__read_error(raise_on_error, mock_motors, dummy_motors): + addr, length, id_, value, error = (10, 4, 1, 1337, dxl.ERRNUM_DATA_LIMIT) + stub = mock_motors.build_read_stub(addr, length, id_, value, error=error) motors_bus = DynamixelMotorsBus( port=mock_motors.port, motors=dummy_motors, ) motors_bus.connect(assert_motors_exist=False) - read_positions = motors_bus.sync_read("Present_Position", names, normalize=False) - - assert mock_motors.stubs[stub_name].called - assert read_positions == expected_positions - - -@pytest.mark.parametrize( - "num_retry, num_invalid_try, pos", - [ - (0, 2, 1337), - (2, 3, 42), - (3, 2, 4016), - (2, 1, 999), - ], -) -def test_sync_read_num_retry(num_retry, num_invalid_try, pos, mock_motors, dummy_motors): - expected_position = {"dummy_1": pos} - stub_name = mock_motors.build_sync_read_stub( - "Present_Position", {1: pos}, num_invalid_try=num_invalid_try - ) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) - - if num_retry >= num_invalid_try: - pos_dict = motors_bus.sync_read("Present_Position", "dummy_1", normalize=False, num_retry=num_retry) - assert pos_dict == expected_position + if raise_on_error: + with pytest.raises( + RuntimeError, match=re.escape("[RxPacketError] The data value exceeds the limit value!") + ): + motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) else: - with pytest.raises(ConnectionError): - _ = motors_bus.sync_read("Present_Position", "dummy_1", normalize=False, num_retry=num_retry) + _, _, read_error = motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) + assert read_error == error - expected_calls = min(1 + num_retry, 1 + num_invalid_try) - assert mock_motors.stubs[stub_name].calls == expected_calls + assert mock_motors.stubs[stub].called -@pytest.mark.parametrize( - "data_name, value", - [ - ("Torque_Enable", 0), - ("Torque_Enable", 1), - ("Goal_Position", 1337), - ("Goal_Position", 42), - ], -) -def test_sync_write_single_value(data_name, value, mock_motors, dummy_motors): - ids_values = {m.id: value for m in dummy_motors.values()} - stub_name = mock_motors.build_sync_write_stub(data_name, ids_values) +@pytest.mark.parametrize("raise_on_error", (True, False)) +def test__read_comm(raise_on_error, mock_motors, dummy_motors): + addr, length, id_, value = (10, 4, 1, 1337) + stub = mock_motors.build_read_stub(addr, length, id_, value, reply=False) motors_bus = DynamixelMotorsBus( port=mock_motors.port, motors=dummy_motors, ) motors_bus.connect(assert_motors_exist=False) - motors_bus.sync_write(data_name, value, normalize=False) + if raise_on_error: + with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): + motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) + else: + _, read_comm, _ = motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) + assert read_comm == dxl.COMM_RX_TIMEOUT - assert mock_motors.stubs[stub_name].wait_called() + assert mock_motors.stubs[stub].called @pytest.mark.parametrize( - "ids, positions", + "addr, length, id_, value", [ - ([1], [1337]), - ([1, 2], [1337, 42]), - ([1, 2, 3], [1337, 42, 4016]), + (0, 1, 1, 2), + (10, 2, 2, 999), + (42, 4, 3, 1337), + ], +) +def test__write(addr, length, id_, value, mock_motors, dummy_motors): + stub = mock_motors.build_write_stub(addr, length, id_, value) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect(assert_motors_exist=False) + + comm, error = motors_bus._write(addr, length, id_, value) + + assert mock_motors.stubs[stub].called + assert comm == dxl.COMM_SUCCESS + assert error == 0 + + +@pytest.mark.parametrize("raise_on_error", (True, False)) +def test__write_error(raise_on_error, mock_motors, dummy_motors): + addr, length, id_, value, error = (10, 4, 1, 1337, dxl.ERRNUM_DATA_LIMIT) + stub = mock_motors.build_write_stub(addr, length, id_, value, error=error) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect(assert_motors_exist=False) + + if raise_on_error: + with pytest.raises( + RuntimeError, match=re.escape("[RxPacketError] The data value exceeds the limit value!") + ): + motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) + else: + _, write_error = motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) + assert write_error == error + + assert mock_motors.stubs[stub].called + + +@pytest.mark.parametrize("raise_on_error", (True, False)) +def test__write_comm(raise_on_error, mock_motors, dummy_motors): + addr, length, id_, value = (10, 4, 1, 1337) + stub = mock_motors.build_write_stub(addr, length, id_, value, reply=False) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect(assert_motors_exist=False) + + if raise_on_error: + with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): + motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) + else: + write_comm, _ = motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) + assert write_comm == dxl.COMM_RX_TIMEOUT + + assert mock_motors.stubs[stub].called + + +@pytest.mark.parametrize( + "addr, length, ids_values", + [ + (0, 1, {1: 4}), + (10, 2, {1: 1337, 2: 42}), + (42, 4, {1: 1337, 2: 42, 3: 4016}), ], ids=["1 motor", "2 motors", "3 motors"], -) # fmt: skip -def test_sync_write(ids, positions, mock_motors, dummy_motors): - assert len(ids) == len(positions) - ids_values = dict(zip(ids, positions, strict=True)) - stub_name = mock_motors.build_sync_write_stub("Goal_Position", ids_values) +) +def test__sync_read(addr, length, ids_values, mock_motors, dummy_motors): + stub = mock_motors.build_sync_read_stub(addr, length, ids_values) motors_bus = DynamixelMotorsBus( port=mock_motors.port, motors=dummy_motors, ) motors_bus.connect(assert_motors_exist=False) - write_values = {f"dummy_{id_}": pos for id_, pos in ids_values.items()} - motors_bus.sync_write("Goal_Position", write_values, normalize=False) + read_values, _ = motors_bus._sync_read(addr, length, list(ids_values)) - assert mock_motors.stubs[stub_name].wait_called() + assert mock_motors.stubs[stub].called + assert read_values == ids_values + + +@pytest.mark.parametrize("raise_on_error", (True, False)) +def test__sync_read_comm(raise_on_error, mock_motors, dummy_motors): + addr, length, ids_values = (10, 4, {1: 1337}) + stub = mock_motors.build_sync_read_stub(addr, length, ids_values, reply=False) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect(assert_motors_exist=False) + + if raise_on_error: + with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): + motors_bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) + else: + _, read_comm = motors_bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) + assert read_comm == dxl.COMM_RX_TIMEOUT + + assert mock_motors.stubs[stub].called @pytest.mark.parametrize( - "data_name, dxl_id, value", + "addr, length, ids_values", [ - ("Torque_Enable", 1, 0), - ("Torque_Enable", 1, 1), - ("Goal_Position", 2, 1337), - ("Goal_Position", 3, 42), + (0, 1, {1: 4}), + (10, 2, {1: 1337, 2: 42}), + (42, 4, {1: 1337, 2: 42, 3: 4016}), ], + ids=["1 motor", "2 motors", "3 motors"], ) -def test_write(data_name, dxl_id, value, mock_motors, dummy_motors): - stub_name = mock_motors.build_write_stub(data_name, dxl_id, value) +def test__sync_write(addr, length, ids_values, mock_motors, dummy_motors): + stub = mock_motors.build_sync_write_stub(addr, length, ids_values) motors_bus = DynamixelMotorsBus( port=mock_motors.port, motors=dummy_motors, ) motors_bus.connect(assert_motors_exist=False) - motors_bus.write(data_name, f"dummy_{dxl_id}", value, normalize=False) + comm = motors_bus._sync_write(addr, length, ids_values) - assert mock_motors.stubs[stub_name].called + assert mock_motors.stubs[stub].wait_called() + assert comm == dxl.COMM_SUCCESS def test_is_calibrated(mock_motors, dummy_motors, dummy_calibration): @@ -292,10 +318,10 @@ def test_is_calibrated(mock_motors, dummy_motors, dummy_calibration): encoded_homings = {m.id: encode_twos_complement(m.homing_offset, 4) for m in dummy_calibration.values()} mins = {m.id: m.range_min for m in dummy_calibration.values()} maxes = {m.id: m.range_max for m in dummy_calibration.values()} - drive_modes_stub = mock_motors.build_sync_read_stub("Drive_Mode", drive_modes) - offsets_stub = mock_motors.build_sync_read_stub("Homing_Offset", encoded_homings) - mins_stub = mock_motors.build_sync_read_stub("Min_Position_Limit", mins) - maxes_stub = mock_motors.build_sync_read_stub("Max_Position_Limit", maxes) + drive_modes_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Drive_Mode"], drive_modes) + offsets_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Homing_Offset"], encoded_homings) + mins_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Min_Position_Limit"], mins) + maxes_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Max_Position_Limit"], maxes) motors_bus = DynamixelMotorsBus( port=mock_motors.port, motors=dummy_motors, @@ -317,9 +343,15 @@ def test_reset_calibration(mock_motors, dummy_motors): write_mins_stubs = [] write_maxes_stubs = [] for motor in dummy_motors.values(): - write_homing_stubs.append(mock_motors.build_write_stub("Homing_Offset", motor.id, 0)) - write_mins_stubs.append(mock_motors.build_write_stub("Min_Position_Limit", motor.id, 0)) - write_maxes_stubs.append(mock_motors.build_write_stub("Max_Position_Limit", motor.id, 4095)) + write_homing_stubs.append( + mock_motors.build_write_stub(*X_SERIES_CONTROL_TABLE["Homing_Offset"], motor.id, 0) + ) + write_mins_stubs.append( + mock_motors.build_write_stub(*X_SERIES_CONTROL_TABLE["Min_Position_Limit"], motor.id, 0) + ) + write_maxes_stubs.append( + mock_motors.build_write_stub(*X_SERIES_CONTROL_TABLE["Max_Position_Limit"], motor.id, 4095) + ) motors_bus = DynamixelMotorsBus( port=mock_motors.port, @@ -349,11 +381,13 @@ def test_set_half_turn_homings(mock_motors, dummy_motors): 2: 2005, # 2047 - 42 3: -1625, # 2047 - 3672 } - read_pos_stub = mock_motors.build_sync_read_stub("Present_Position", current_positions) + read_pos_stub = mock_motors.build_sync_read_stub( + *X_SERIES_CONTROL_TABLE["Present_Position"], current_positions + ) write_homing_stubs = [] for id_, homing in expected_homings.items(): encoded_homing = encode_twos_complement(homing, 4) - stub = mock_motors.build_write_stub("Homing_Offset", id_, encoded_homing) + stub = mock_motors.build_write_stub(*X_SERIES_CONTROL_TABLE["Homing_Offset"], id_, encoded_homing) write_homing_stubs.append(stub) motors_bus = DynamixelMotorsBus( @@ -386,7 +420,9 @@ def test_record_ranges_of_motion(mock_motors, dummy_motors): "dummy_2": 3600, "dummy_3": 4002, } - read_pos_stub = mock_motors.build_sequential_sync_read_stub("Present_Position", positions) + read_pos_stub = mock_motors.build_sequential_sync_read_stub( + *X_SERIES_CONTROL_TABLE["Present_Position"], positions + ) with patch("lerobot.common.motors.motors_bus.enter_pressed", side_effect=[False, True]): motors_bus = DynamixelMotorsBus( port=mock_motors.port,