Fix feetech ping tests
This commit is contained in:
parent
7a75bb9f61
commit
4293b6a4fb
|
@ -89,8 +89,31 @@ class MockInstructionPacket(MockFeetechPacket):
|
||||||
|
|
||||||
No parameters required.
|
No parameters required.
|
||||||
"""
|
"""
|
||||||
params, length = [], 2
|
return cls.build(scs_id=scs_id, params=[], length=2, instruct_type="Ping")
|
||||||
return cls.build(scs_id=scs_id, params=params, length=length, instruct_type="Ping")
|
|
||||||
|
@classmethod
|
||||||
|
def read(
|
||||||
|
cls,
|
||||||
|
scs_id: int,
|
||||||
|
start_address: int,
|
||||||
|
data_length: int,
|
||||||
|
) -> bytes:
|
||||||
|
"""
|
||||||
|
Builds a "Read" instruction.
|
||||||
|
|
||||||
|
The parameters for Read are:
|
||||||
|
param[0] = start_address
|
||||||
|
param[1] = data_length
|
||||||
|
|
||||||
|
And 'length' = 4, where:
|
||||||
|
+1 is for instruction byte,
|
||||||
|
+1 is for the address byte,
|
||||||
|
+1 is for the length bytes,
|
||||||
|
+1 is for the checksum at the end.
|
||||||
|
"""
|
||||||
|
params = [start_address, data_length]
|
||||||
|
length = 4
|
||||||
|
return cls.build(scs_id=scs_id, params=params, length=length, instruct_type="Read")
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def sync_read(
|
def sync_read(
|
||||||
|
@ -211,23 +234,17 @@ class MockStatusPacket(MockFeetechPacket):
|
||||||
] # fmt: skip
|
] # fmt: skip
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def ping(cls, scs_id: int, model_nb: int = 1190, firm_ver: int = 50) -> bytes:
|
def ping(cls, scs_id: int, error: str = "Success") -> bytes:
|
||||||
"""Builds a 'Ping' status packet.
|
"""Builds a 'Ping' status packet.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
scs_id (int): ID of the servo responding.
|
scs_id (int): ID of the servo responding.
|
||||||
model_nb (int, optional): Desired 'model number' to be returned in the packet. Defaults to 1190
|
error (str, optional): Error to be returned. Defaults to "Success".
|
||||||
which corresponds to a XL330-M077-T.
|
|
||||||
firm_ver (int, optional): Desired 'firmware version' to be returned in the packet.
|
|
||||||
Defaults to 50.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
bytes: The raw 'Ping' status packet ready to be sent through serial.
|
bytes: The raw 'Ping' status packet ready to be sent through serial.
|
||||||
"""
|
"""
|
||||||
# raise NotImplementedError
|
return cls.build(scs_id, params=[], length=2, error=error)
|
||||||
params = [scs.SCS_LOBYTE(model_nb), scs.SCS_HIBYTE(model_nb), firm_ver]
|
|
||||||
length = 2
|
|
||||||
return cls.build(scs_id, params=params, length=length)
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def present_position(cls, scs_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes:
|
def present_position(cls, scs_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes:
|
||||||
|
@ -248,6 +265,24 @@ class MockStatusPacket(MockFeetechPacket):
|
||||||
length = 4
|
length = 4
|
||||||
return cls.build(scs_id, params=params, length=length)
|
return cls.build(scs_id, params=params, length=length)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def model_number(cls, scs_id: int, model_nb: int | None = None) -> bytes:
|
||||||
|
"""Builds a 'Present_Position' status packet.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
scs_id (int): List of the servos ids.
|
||||||
|
pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it
|
||||||
|
will use a random value in the min_max_range. Defaults to None.
|
||||||
|
min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos'
|
||||||
|
is None. Note that the bounds are included in the range. Defaults to (0, 4095).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bytes: The raw 'Present_Position' status packet ready to be sent through serial.
|
||||||
|
"""
|
||||||
|
params = [scs.SCS_LOBYTE(model_nb), scs.SCS_HIBYTE(model_nb)]
|
||||||
|
length = 4
|
||||||
|
return cls.build(scs_id, params=params, length=length)
|
||||||
|
|
||||||
|
|
||||||
class MockPortHandler(scs.PortHandler):
|
class MockPortHandler(scs.PortHandler):
|
||||||
"""
|
"""
|
||||||
|
@ -294,14 +329,11 @@ class MockMotors(MockSerial):
|
||||||
self._MockSerial__stubs[name or new_stub.receive_bytes] = new_stub
|
self._MockSerial__stubs[name or new_stub.receive_bytes] = new_stub
|
||||||
return new_stub
|
return new_stub
|
||||||
|
|
||||||
def build_broadcast_ping_stub(
|
def build_broadcast_ping_stub(self, ids: list[int] | None = None, num_invalid_try: int = 0) -> str:
|
||||||
self, ids_models: dict[int, list[int]] | None = None, num_invalid_try: int = 0
|
|
||||||
) -> str:
|
|
||||||
ping_request = MockInstructionPacket.ping(scs.BROADCAST_ID)
|
ping_request = MockInstructionPacket.ping(scs.BROADCAST_ID)
|
||||||
return_packets = b"".join(MockStatusPacket.ping(idx, model) for idx, model in ids_models.items())
|
return_packets = b"".join(MockStatusPacket.ping(idx) for idx in ids)
|
||||||
ping_response = self._build_send_fn(return_packets, num_invalid_try)
|
ping_response = self._build_send_fn(return_packets, num_invalid_try)
|
||||||
|
stub_name = "Ping_" + "_".join([str(idx) for idx in ids])
|
||||||
stub_name = "Ping_" + "_".join([str(idx) for idx in ids_models])
|
|
||||||
self.stub(
|
self.stub(
|
||||||
name=stub_name,
|
name=stub_name,
|
||||||
receive_bytes=ping_request,
|
receive_bytes=ping_request,
|
||||||
|
@ -309,11 +341,9 @@ class MockMotors(MockSerial):
|
||||||
)
|
)
|
||||||
return stub_name
|
return stub_name
|
||||||
|
|
||||||
def build_ping_stub(
|
def build_ping_stub(self, scs_id: int, num_invalid_try: int = 0) -> str:
|
||||||
self, scs_id: int, model_nb: int, firm_ver: int = 50, num_invalid_try: int = 0
|
|
||||||
) -> str:
|
|
||||||
ping_request = MockInstructionPacket.ping(scs_id)
|
ping_request = MockInstructionPacket.ping(scs_id)
|
||||||
return_packet = MockStatusPacket.ping(scs_id, model_nb, firm_ver)
|
return_packet = MockStatusPacket.ping(scs_id)
|
||||||
ping_response = self._build_send_fn(return_packet, num_invalid_try)
|
ping_response = self._build_send_fn(return_packet, num_invalid_try)
|
||||||
stub_name = f"Ping_{scs_id}"
|
stub_name = f"Ping_{scs_id}"
|
||||||
self.stub(
|
self.stub(
|
||||||
|
@ -323,21 +353,48 @@ class MockMotors(MockSerial):
|
||||||
)
|
)
|
||||||
return stub_name
|
return stub_name
|
||||||
|
|
||||||
|
def build_read_stub(
|
||||||
|
self, data_name: str, scs_id: int, value: int | None = None, num_invalid_try: int = 0
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
'data_name' supported:
|
||||||
|
- Model_Number
|
||||||
|
"""
|
||||||
|
if data_name != "Model_Number":
|
||||||
|
raise NotImplementedError
|
||||||
|
address, length = self.ctrl_table[data_name]
|
||||||
|
read_request = MockInstructionPacket.read(scs_id, address, length)
|
||||||
|
return_packet = MockStatusPacket.model_number(scs_id, value)
|
||||||
|
read_response = self._build_send_fn(return_packet, num_invalid_try)
|
||||||
|
stub_name = f"Read_{data_name}_{scs_id}"
|
||||||
|
self.stub(
|
||||||
|
name=stub_name,
|
||||||
|
receive_bytes=read_request,
|
||||||
|
send_fn=read_response,
|
||||||
|
)
|
||||||
|
return stub_name
|
||||||
|
|
||||||
def build_sync_read_stub(
|
def build_sync_read_stub(
|
||||||
self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0
|
self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
'data_name' supported:
|
'data_name' supported:
|
||||||
- Present_Position
|
- Present_Position
|
||||||
|
- Model_Number
|
||||||
"""
|
"""
|
||||||
if data_name != "Present_Position":
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
address, length = self.ctrl_table[data_name]
|
address, length = self.ctrl_table[data_name]
|
||||||
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
|
sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
|
||||||
return_packets = b"".join(
|
if data_name == "Present_Position":
|
||||||
MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items()
|
return_packets = b"".join(
|
||||||
)
|
MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items()
|
||||||
|
)
|
||||||
|
elif data_name == "Model_Number":
|
||||||
|
return_packets = b"".join(
|
||||||
|
MockStatusPacket.model_number(idx, model_nb) for idx, model_nb in ids_values.items()
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
|
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
|
||||||
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values])
|
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values])
|
||||||
self.stub(
|
self.stub(
|
||||||
|
|
|
@ -92,12 +92,13 @@ def test_abc_implementation(dummy_motors):
|
||||||
FeetechMotorsBus(port="/dev/dummy-port", motors=dummy_motors)
|
FeetechMotorsBus(port="/dev/dummy-port", motors=dummy_motors)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip("TODO")
|
# @pytest.mark.skip("TODO")
|
||||||
@pytest.mark.parametrize("idx", [1, 2, 3])
|
@pytest.mark.parametrize("idx", [1, 2, 3])
|
||||||
def test_ping(idx, mock_motors, dummy_motors):
|
def test_ping(idx, mock_motors, dummy_motors):
|
||||||
expected_model = dummy_motors[f"dummy_{idx}"].model
|
expected_model = dummy_motors[f"dummy_{idx}"].model
|
||||||
model_nb = MODEL_NUMBER[expected_model]
|
model_nb = MODEL_NUMBER[expected_model]
|
||||||
stub_name = mock_motors.build_ping_stub(idx, model_nb)
|
ping_stub = mock_motors.build_ping_stub(idx)
|
||||||
|
mobel_nb_stub = mock_motors.build_read_stub("Model_Number", idx, model_nb)
|
||||||
motors_bus = FeetechMotorsBus(
|
motors_bus = FeetechMotorsBus(
|
||||||
port=mock_motors.port,
|
port=mock_motors.port,
|
||||||
motors=dummy_motors,
|
motors=dummy_motors,
|
||||||
|
@ -107,14 +108,15 @@ def test_ping(idx, mock_motors, dummy_motors):
|
||||||
ping_model_nb = motors_bus.ping(idx)
|
ping_model_nb = motors_bus.ping(idx)
|
||||||
|
|
||||||
assert ping_model_nb == expected_model
|
assert ping_model_nb == expected_model
|
||||||
assert mock_motors.stubs[stub_name].called
|
assert mock_motors.stubs[ping_stub].called
|
||||||
|
assert mock_motors.stubs[mobel_nb_stub].called
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip("TODO")
|
|
||||||
def test_broadcast_ping(mock_motors, dummy_motors):
|
def test_broadcast_ping(mock_motors, dummy_motors):
|
||||||
expected_models = {m.id: m.model for m in dummy_motors.values()}
|
expected_models = {m.id: m.model for m in dummy_motors.values()}
|
||||||
model_nbs = {id_: MODEL_NUMBER[model] for id_, model in expected_models.items()}
|
model_nbs = {id_: MODEL_NUMBER[model] for id_, model in expected_models.items()}
|
||||||
stub_name = mock_motors.build_broadcast_ping_stub(model_nbs)
|
ping_stub = mock_motors.build_broadcast_ping_stub(list(model_nbs))
|
||||||
|
mobel_nb_stub = mock_motors.build_sync_read_stub("Model_Number", model_nbs)
|
||||||
motors_bus = FeetechMotorsBus(
|
motors_bus = FeetechMotorsBus(
|
||||||
port=mock_motors.port,
|
port=mock_motors.port,
|
||||||
motors=dummy_motors,
|
motors=dummy_motors,
|
||||||
|
@ -124,7 +126,8 @@ def test_broadcast_ping(mock_motors, dummy_motors):
|
||||||
ping_model_nbs = motors_bus.broadcast_ping()
|
ping_model_nbs = motors_bus.broadcast_ping()
|
||||||
|
|
||||||
assert ping_model_nbs == expected_models
|
assert ping_model_nbs == expected_models
|
||||||
assert mock_motors.stubs[stub_name].called
|
assert mock_motors.stubs[ping_stub].called
|
||||||
|
assert mock_motors.stubs[mobel_nb_stub].called
|
||||||
|
|
||||||
|
|
||||||
def test_sync_read_none(mock_motors, dummy_motors):
|
def test_sync_read_none(mock_motors, dummy_motors):
|
||||||
|
|
Loading…
Reference in New Issue