From 329d103453e3fe36d9cdbdc7e717cdd256f0429b Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Sun, 23 Mar 2025 16:12:24 +0100 Subject: [PATCH] Add dxl write test --- tests/mocks/mock_dynamixel.py | 54 +++++++++++++++++++++++++++++++--- tests/motors/test_dynamixel.py | 44 +++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 4 deletions(-) diff --git a/tests/mocks/mock_dynamixel.py b/tests/mocks/mock_dynamixel.py index b75479d3..21668e17 100644 --- a/tests/mocks/mock_dynamixel.py +++ b/tests/mocks/mock_dynamixel.py @@ -294,6 +294,40 @@ class MockInstructionPacket(MockDynamixelPacketv2): length = len(ids_values) * (1 + data_length) + 7 return cls.build(dxl_id=dxl.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Write") + @classmethod + def write( + cls, + dxl_id: int, + value: int, + start_address: int, + data_length: int, + ) -> bytes: + """ + Builds a "Write" instruction. + (from https://emanual.robotis.com/docs/en/dxl/protocol2/#write-0x03) + + The parameters for Write (Protocol 2.0) are: + param[0] = start_address L + param[1] = start_address H + param[2] = 1st Byte + param[3] = 2nd Byte + ... + param[1+X] = X-th Byte + + And 'length' = data_length + 5, where: + +1 is for instruction byte, + +2 is for the length bytes, + +2 is for the CRC at the end. + """ + data = DynamixelMotorsBus.split_int_bytes(value, data_length) + params = [ + dxl.DXL_LOBYTE(start_address), + dxl.DXL_HIBYTE(start_address), + *data, + ] + length = data_length + 5 + return cls.build(dxl_id=dxl_id, params=params, length=length, instruct_type="Write") + class MockStatusPacket(MockDynamixelPacketv2): """ @@ -458,7 +492,6 @@ class MockMotors(MockSerial): ping_request = MockInstructionPacket.ping(dxl_id) return_packet = MockStatusPacket.ping(dxl_id, model_nb, firm_ver) ping_response = self._build_send_fn(return_packet, num_invalid_try) - stub_name = f"Ping_{dxl_id}" self.stub( name=stub_name, @@ -474,16 +507,15 @@ class MockMotors(MockSerial): 'data_name' supported: - Present_Position """ - address, length = self.ctrl_table[data_name] - sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) if data_name != "Present_Position": raise NotImplementedError + address, length = self.ctrl_table[data_name] + sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) return_packets = b"".join( MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items() ) sync_read_response = self._build_send_fn(return_packets, num_invalid_try) - stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values]) self.stub( name=stub_name, @@ -505,6 +537,20 @@ class MockMotors(MockSerial): ) return stub_name + def build_write_stub( + self, data_name: str, dxl_id: int, value: int, error: str = "Success", num_invalid_try: int = 0 + ) -> str: + address, length = self.ctrl_table[data_name] + sync_read_request = MockInstructionPacket.write(dxl_id, value, address, length) + return_packet = MockStatusPacket.build(dxl_id, params=[], length=4, error=error) + stub_name = f"Write_{data_name}_{dxl_id}" + self.stub( + name=stub_name, + receive_bytes=sync_read_request, + send_fn=self._build_send_fn(return_packet, num_invalid_try), + ) + return stub_name + @staticmethod def _build_send_fn(packet: bytes, num_invalid_try: int = 0) -> Callable[[int], bytes]: def send_fn(_call_count: int) -> bytes: diff --git a/tests/motors/test_dynamixel.py b/tests/motors/test_dynamixel.py index 5b58f570..fee37b28 100644 --- a/tests/motors/test_dynamixel.py +++ b/tests/motors/test_dynamixel.py @@ -397,3 +397,47 @@ def test_sync_write_by_names(ids, positions, mock_motors, dummy_motors): motors_bus.sync_write("Goal_Position", write_values) assert mock_motors.stubs[stub_name].wait_called() + + +@pytest.mark.parametrize( + "data_name, dxl_id, value", + [ + ("Torque_Enable", 1, 0), + ("Torque_Enable", 1, 1), + ("Goal_Position", 2, 1337), + ("Goal_Position", 3, 42), + ], +) +def test_write_by_id(data_name, dxl_id, value, mock_motors, dummy_motors): + stub_name = mock_motors.build_write_stub(data_name, dxl_id, value) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + + motors_bus.write(data_name, dxl_id, value) + + assert mock_motors.stubs[stub_name].called + + +@pytest.mark.parametrize( + "data_name, dxl_id, value", + [ + ("Torque_Enable", 1, 0), + ("Torque_Enable", 1, 1), + ("Goal_Position", 2, 1337), + ("Goal_Position", 3, 42), + ], +) +def test_write_by_name(data_name, dxl_id, value, mock_motors, dummy_motors): + stub_name = mock_motors.build_write_stub(data_name, dxl_id, value) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + + motors_bus.write(data_name, f"dummy_{dxl_id}", value) + + assert mock_motors.stubs[stub_name].called