From a38bb15e79ff116537e5c14e46c20d9069ccbfaa Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Sun, 23 Mar 2025 16:48:32 +0100 Subject: [PATCH] Add feetech write test --- tests/mocks/mock_feetech.py | 49 +++++++++++++++++++++++++++++++++--- tests/motors/test_feetech.py | 46 +++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 4 deletions(-) diff --git a/tests/mocks/mock_feetech.py b/tests/mocks/mock_feetech.py index 44061094..f23dd6ee 100644 --- a/tests/mocks/mock_feetech.py +++ b/tests/mocks/mock_feetech.py @@ -155,6 +155,35 @@ class MockInstructionPacket(MockFeetechPacket): length = len(ids_values) * (1 + data_length) + 4 return cls.build(scs_id=scs.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Write") + @classmethod + def write( + cls, + scs_id: int, + value: int, + start_address: int, + data_length: int, + ) -> bytes: + """ + Builds a "Write" instruction. + + The parameters for Write (Protocol 2.0) are: + param[0] = start_address L + param[1] = start_address H + param[2] = 1st Byte + param[3] = 2nd Byte + ... + param[1+X] = X-th Byte + + And 'length' = data_length + 5, where: + +1 is for instruction byte, + +2 is for the length bytes, + +2 is for the CRC at the end. + """ + data = FeetechMotorsBus.split_int_bytes(value, data_length) + params = [start_address, *data] + length = data_length + 3 + return cls.build(scs_id=scs_id, params=params, length=length, instruct_type="Write") + class MockStatusPacket(MockFeetechPacket): """ @@ -317,7 +346,6 @@ class MockMotors(MockSerial): ping_request = MockInstructionPacket.ping(scs_id) return_packet = MockStatusPacket.ping(scs_id, model_nb, firm_ver) ping_response = self._build_send_fn(return_packet, num_invalid_try) - stub_name = f"Ping_{scs_id}" self.stub( name=stub_name, @@ -333,16 +361,15 @@ class MockMotors(MockSerial): 'data_name' supported: - Present_Position """ - address, length = self.ctrl_table[data_name] - sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) if data_name != "Present_Position": raise NotImplementedError + address, length = self.ctrl_table[data_name] + sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) return_packets = b"".join( MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items() ) sync_read_response = self._build_send_fn(return_packets, num_invalid_try) - stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values]) self.stub( name=stub_name, @@ -364,6 +391,20 @@ class MockMotors(MockSerial): ) return stub_name + def build_write_stub( + self, data_name: str, scs_id: int, value: int, error: str = "Success", num_invalid_try: int = 0 + ) -> str: + address, length = self.ctrl_table[data_name] + sync_read_request = MockInstructionPacket.write(scs_id, value, address, length) + return_packet = MockStatusPacket.build(scs_id, params=[], length=2, error=error) + stub_name = f"Write_{data_name}_{scs_id}" + self.stub( + name=stub_name, + receive_bytes=sync_read_request, + send_fn=self._build_send_fn(return_packet, num_invalid_try), + ) + return stub_name + @staticmethod def _build_send_fn(packet: bytes, num_invalid_try: int = 0) -> Callable[[int], bytes]: def send_fn(_call_count: int) -> bytes: diff --git a/tests/motors/test_feetech.py b/tests/motors/test_feetech.py index 7f917e92..8f3b4d04 100644 --- a/tests/motors/test_feetech.py +++ b/tests/motors/test_feetech.py @@ -399,3 +399,49 @@ def test_sync_write_by_names(ids, positions, mock_motors, dummy_motors): motors_bus.sync_write("Goal_Position", write_values) assert mock_motors.stubs[stub_name].wait_called() + + +@pytest.mark.skip("TODO") +@pytest.mark.parametrize( + "data_name, dxl_id, value", + [ + ("Torque_Enable", 1, 0), + ("Torque_Enable", 1, 1), + ("Goal_Position", 2, 1337), + ("Goal_Position", 3, 42), + ], +) +def test_write_by_id(data_name, dxl_id, value, mock_motors, dummy_motors): + stub_name = mock_motors.build_write_stub(data_name, dxl_id, value) + motors_bus = FeetechMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + + motors_bus.write(data_name, dxl_id, value) + + assert mock_motors.stubs[stub_name].called + + +@pytest.mark.skip("TODO") +@pytest.mark.parametrize( + "data_name, dxl_id, value", + [ + ("Torque_Enable", 1, 0), + ("Torque_Enable", 1, 1), + ("Goal_Position", 2, 1337), + ("Goal_Position", 3, 42), + ], +) +def test_write_by_name(data_name, dxl_id, value, mock_motors, dummy_motors): + stub_name = mock_motors.build_write_stub(data_name, dxl_id, value) + motors_bus = FeetechMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + + motors_bus.write(data_name, f"dummy_{dxl_id}", value) + + assert mock_motors.stubs[stub_name].called