diff --git a/tests/mocks/mock_feetech.py b/tests/mocks/mock_feetech.py index f938522e..fc5422ed 100644 --- a/tests/mocks/mock_feetech.py +++ b/tests/mocks/mock_feetech.py @@ -89,8 +89,31 @@ class MockInstructionPacket(MockFeetechPacket): No parameters required. """ - params, length = [], 2 - return cls.build(scs_id=scs_id, params=params, length=length, instruct_type="Ping") + return cls.build(scs_id=scs_id, params=[], length=2, instruct_type="Ping") + + @classmethod + def read( + cls, + scs_id: int, + start_address: int, + data_length: int, + ) -> bytes: + """ + Builds a "Read" instruction. + + The parameters for Read are: + param[0] = start_address + param[1] = data_length + + And 'length' = 4, where: + +1 is for instruction byte, + +1 is for the address byte, + +1 is for the length bytes, + +1 is for the checksum at the end. + """ + params = [start_address, data_length] + length = 4 + return cls.build(scs_id=scs_id, params=params, length=length, instruct_type="Read") @classmethod def sync_read( @@ -211,23 +234,17 @@ class MockStatusPacket(MockFeetechPacket): ] # fmt: skip @classmethod - def ping(cls, scs_id: int, model_nb: int = 1190, firm_ver: int = 50) -> bytes: + def ping(cls, scs_id: int, error: str = "Success") -> bytes: """Builds a 'Ping' status packet. Args: scs_id (int): ID of the servo responding. - model_nb (int, optional): Desired 'model number' to be returned in the packet. Defaults to 1190 - which corresponds to a XL330-M077-T. - firm_ver (int, optional): Desired 'firmware version' to be returned in the packet. - Defaults to 50. + error (str, optional): Error to be returned. Defaults to "Success". Returns: bytes: The raw 'Ping' status packet ready to be sent through serial. """ - # raise NotImplementedError - params = [scs.SCS_LOBYTE(model_nb), scs.SCS_HIBYTE(model_nb), firm_ver] - length = 2 - return cls.build(scs_id, params=params, length=length) + return cls.build(scs_id, params=[], length=2, error=error) @classmethod def present_position(cls, scs_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes: @@ -248,6 +265,24 @@ class MockStatusPacket(MockFeetechPacket): length = 4 return cls.build(scs_id, params=params, length=length) + @classmethod + def model_number(cls, scs_id: int, model_nb: int | None = None) -> bytes: + """Builds a 'Present_Position' status packet. + + Args: + scs_id (int): List of the servos ids. + pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it + will use a random value in the min_max_range. Defaults to None. + min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos' + is None. Note that the bounds are included in the range. Defaults to (0, 4095). + + Returns: + bytes: The raw 'Present_Position' status packet ready to be sent through serial. + """ + params = [scs.SCS_LOBYTE(model_nb), scs.SCS_HIBYTE(model_nb)] + length = 4 + return cls.build(scs_id, params=params, length=length) + class MockPortHandler(scs.PortHandler): """ @@ -294,14 +329,11 @@ class MockMotors(MockSerial): self._MockSerial__stubs[name or new_stub.receive_bytes] = new_stub return new_stub - def build_broadcast_ping_stub( - self, ids_models: dict[int, list[int]] | None = None, num_invalid_try: int = 0 - ) -> str: + def build_broadcast_ping_stub(self, ids: list[int] | None = None, num_invalid_try: int = 0) -> str: ping_request = MockInstructionPacket.ping(scs.BROADCAST_ID) - return_packets = b"".join(MockStatusPacket.ping(idx, model) for idx, model in ids_models.items()) + return_packets = b"".join(MockStatusPacket.ping(idx) for idx in ids) ping_response = self._build_send_fn(return_packets, num_invalid_try) - - stub_name = "Ping_" + "_".join([str(idx) for idx in ids_models]) + stub_name = "Ping_" + "_".join([str(idx) for idx in ids]) self.stub( name=stub_name, receive_bytes=ping_request, @@ -309,11 +341,9 @@ class MockMotors(MockSerial): ) return stub_name - def build_ping_stub( - self, scs_id: int, model_nb: int, firm_ver: int = 50, num_invalid_try: int = 0 - ) -> str: + def build_ping_stub(self, scs_id: int, num_invalid_try: int = 0) -> str: ping_request = MockInstructionPacket.ping(scs_id) - return_packet = MockStatusPacket.ping(scs_id, model_nb, firm_ver) + return_packet = MockStatusPacket.ping(scs_id) ping_response = self._build_send_fn(return_packet, num_invalid_try) stub_name = f"Ping_{scs_id}" self.stub( @@ -323,21 +353,48 @@ class MockMotors(MockSerial): ) return stub_name + def build_read_stub( + self, data_name: str, scs_id: int, value: int | None = None, num_invalid_try: int = 0 + ) -> str: + """ + 'data_name' supported: + - Model_Number + """ + if data_name != "Model_Number": + raise NotImplementedError + address, length = self.ctrl_table[data_name] + read_request = MockInstructionPacket.read(scs_id, address, length) + return_packet = MockStatusPacket.model_number(scs_id, value) + read_response = self._build_send_fn(return_packet, num_invalid_try) + stub_name = f"Read_{data_name}_{scs_id}" + self.stub( + name=stub_name, + receive_bytes=read_request, + send_fn=read_response, + ) + return stub_name + def build_sync_read_stub( self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0 ) -> str: """ 'data_name' supported: - Present_Position + - Model_Number """ - if data_name != "Present_Position": - raise NotImplementedError - address, length = self.ctrl_table[data_name] sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length) - return_packets = b"".join( - MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items() - ) + if data_name == "Present_Position": + return_packets = b"".join( + MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items() + ) + elif data_name == "Model_Number": + return_packets = b"".join( + MockStatusPacket.model_number(idx, model_nb) for idx, model_nb in ids_values.items() + ) + else: + raise NotImplementedError + sync_read_response = self._build_send_fn(return_packets, num_invalid_try) stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values]) self.stub( diff --git a/tests/motors/test_feetech.py b/tests/motors/test_feetech.py index 994e6944..bed0957f 100644 --- a/tests/motors/test_feetech.py +++ b/tests/motors/test_feetech.py @@ -92,12 +92,13 @@ def test_abc_implementation(dummy_motors): FeetechMotorsBus(port="/dev/dummy-port", motors=dummy_motors) -@pytest.mark.skip("TODO") +# @pytest.mark.skip("TODO") @pytest.mark.parametrize("idx", [1, 2, 3]) def test_ping(idx, mock_motors, dummy_motors): expected_model = dummy_motors[f"dummy_{idx}"].model model_nb = MODEL_NUMBER[expected_model] - stub_name = mock_motors.build_ping_stub(idx, model_nb) + ping_stub = mock_motors.build_ping_stub(idx) + mobel_nb_stub = mock_motors.build_read_stub("Model_Number", idx, model_nb) motors_bus = FeetechMotorsBus( port=mock_motors.port, motors=dummy_motors, @@ -107,14 +108,15 @@ def test_ping(idx, mock_motors, dummy_motors): ping_model_nb = motors_bus.ping(idx) assert ping_model_nb == expected_model - assert mock_motors.stubs[stub_name].called + assert mock_motors.stubs[ping_stub].called + assert mock_motors.stubs[mobel_nb_stub].called -@pytest.mark.skip("TODO") def test_broadcast_ping(mock_motors, dummy_motors): expected_models = {m.id: m.model for m in dummy_motors.values()} model_nbs = {id_: MODEL_NUMBER[model] for id_, model in expected_models.items()} - stub_name = mock_motors.build_broadcast_ping_stub(model_nbs) + ping_stub = mock_motors.build_broadcast_ping_stub(list(model_nbs)) + mobel_nb_stub = mock_motors.build_sync_read_stub("Model_Number", model_nbs) motors_bus = FeetechMotorsBus( port=mock_motors.port, motors=dummy_motors, @@ -124,7 +126,8 @@ def test_broadcast_ping(mock_motors, dummy_motors): ping_model_nbs = motors_bus.broadcast_ping() assert ping_model_nbs == expected_models - assert mock_motors.stubs[stub_name].called + assert mock_motors.stubs[ping_stub].called + assert mock_motors.stubs[mobel_nb_stub].called def test_sync_read_none(mock_motors, dummy_motors):