Improve dynamixel mocking

This commit is contained in:
Simon Alibert 2025-03-22 00:39:41 +01:00
parent 2abfa5838d
commit 6fa859fa19
2 changed files with 152 additions and 111 deletions

View File

@ -10,43 +10,38 @@ from lerobot.common.motors.dynamixel import X_SERIES_CONTROL_TABLE
# https://emanual.robotis.com/docs/en/dxl/crc/ # https://emanual.robotis.com/docs/en/dxl/crc/
DXL_CRC_TABLE = [ DXL_CRC_TABLE = [
0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011,
0x8011, 0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D, 0x8033, 0x0036, 0x003C, 0x8039, 0x0028, 0x802D, 0x8027, 0x0022,
0x8027, 0x0022, 0x8063, 0x0066, 0x006C, 0x8069, 0x0078, 0x8063, 0x0066, 0x006C, 0x8069, 0x0078, 0x807D, 0x8077, 0x0072,
0x807D, 0x8077, 0x0072, 0x0050, 0x8055, 0x805F, 0x005A, 0x0050, 0x8055, 0x805F, 0x005A, 0x804B, 0x004E, 0x0044, 0x8041,
0x804B, 0x004E, 0x0044, 0x8041, 0x80C3, 0x00C6, 0x00CC, 0x80C3, 0x00C6, 0x00CC, 0x80C9, 0x00D8, 0x80DD, 0x80D7, 0x00D2,
0x80C9, 0x00D8, 0x80DD, 0x80D7, 0x00D2, 0x00F0, 0x80F5, 0x00F0, 0x80F5, 0x80FF, 0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1,
0x80FF, 0x00FA, 0x80EB, 0x00EE, 0x00E4, 0x80E1, 0x00A0, 0x00A0, 0x80A5, 0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1,
0x80A5, 0x80AF, 0x00AA, 0x80BB, 0x00BE, 0x00B4, 0x80B1, 0x8093, 0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087, 0x0082,
0x8093, 0x0096, 0x009C, 0x8099, 0x0088, 0x808D, 0x8087, 0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D, 0x8197, 0x0192,
0x0082, 0x8183, 0x0186, 0x018C, 0x8189, 0x0198, 0x819D, 0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB, 0x01AE, 0x01A4, 0x81A1,
0x8197, 0x0192, 0x01B0, 0x81B5, 0x81BF, 0x01BA, 0x81AB, 0x01E0, 0x81E5, 0x81EF, 0x01EA, 0x81FB, 0x01FE, 0x01F4, 0x81F1,
0x01AE, 0x01A4, 0x81A1, 0x01E0, 0x81E5, 0x81EF, 0x01EA, 0x81D3, 0x01D6, 0x01DC, 0x81D9, 0x01C8, 0x81CD, 0x81C7, 0x01C2,
0x81FB, 0x01FE, 0x01F4, 0x81F1, 0x81D3, 0x01D6, 0x01DC, 0x0140, 0x8145, 0x814F, 0x014A, 0x815B, 0x015E, 0x0154, 0x8151,
0x81D9, 0x01C8, 0x81CD, 0x81C7, 0x01C2, 0x0140, 0x8145, 0x8173, 0x0176, 0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162,
0x814F, 0x014A, 0x815B, 0x015E, 0x0154, 0x8151, 0x8173, 0x8123, 0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137, 0x0132,
0x0176, 0x017C, 0x8179, 0x0168, 0x816D, 0x8167, 0x0162, 0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E, 0x0104, 0x8101,
0x8123, 0x0126, 0x012C, 0x8129, 0x0138, 0x813D, 0x8137, 0x8303, 0x0306, 0x030C, 0x8309, 0x0318, 0x831D, 0x8317, 0x0312,
0x0132, 0x0110, 0x8115, 0x811F, 0x011A, 0x810B, 0x010E, 0x0330, 0x8335, 0x833F, 0x033A, 0x832B, 0x032E, 0x0324, 0x8321,
0x0104, 0x8101, 0x8303, 0x0306, 0x030C, 0x8309, 0x0318, 0x0360, 0x8365, 0x836F, 0x036A, 0x837B, 0x037E, 0x0374, 0x8371,
0x831D, 0x8317, 0x0312, 0x0330, 0x8335, 0x833F, 0x033A, 0x8353, 0x0356, 0x035C, 0x8359, 0x0348, 0x834D, 0x8347, 0x0342,
0x832B, 0x032E, 0x0324, 0x8321, 0x0360, 0x8365, 0x836F, 0x03C0, 0x83C5, 0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1,
0x036A, 0x837B, 0x037E, 0x0374, 0x8371, 0x8353, 0x0356, 0x83F3, 0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7, 0x03E2,
0x035C, 0x8359, 0x0348, 0x834D, 0x8347, 0x0342, 0x03C0, 0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD, 0x83B7, 0x03B2,
0x83C5, 0x83CF, 0x03CA, 0x83DB, 0x03DE, 0x03D4, 0x83D1, 0x0390, 0x8395, 0x839F, 0x039A, 0x838B, 0x038E, 0x0384, 0x8381,
0x83F3, 0x03F6, 0x03FC, 0x83F9, 0x03E8, 0x83ED, 0x83E7, 0x0280, 0x8285, 0x828F, 0x028A, 0x829B, 0x029E, 0x0294, 0x8291,
0x03E2, 0x83A3, 0x03A6, 0x03AC, 0x83A9, 0x03B8, 0x83BD, 0x82B3, 0x02B6, 0x02BC, 0x82B9, 0x02A8, 0x82AD, 0x82A7, 0x02A2,
0x83B7, 0x03B2, 0x0390, 0x8395, 0x839F, 0x039A, 0x838B, 0x82E3, 0x02E6, 0x02EC, 0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2,
0x038E, 0x0384, 0x8381, 0x0280, 0x8285, 0x828F, 0x028A, 0x02D0, 0x82D5, 0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1,
0x829B, 0x029E, 0x0294, 0x8291, 0x82B3, 0x02B6, 0x02BC, 0x8243, 0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257, 0x0252,
0x82B9, 0x02A8, 0x82AD, 0x82A7, 0x02A2, 0x82E3, 0x02E6, 0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E, 0x0264, 0x8261,
0x02EC, 0x82E9, 0x02F8, 0x82FD, 0x82F7, 0x02F2, 0x02D0, 0x0220, 0x8225, 0x822F, 0x022A, 0x823B, 0x023E, 0x0234, 0x8231,
0x82D5, 0x82DF, 0x02DA, 0x82CB, 0x02CE, 0x02C4, 0x82C1, 0x8213, 0x0216, 0x021C, 0x8219, 0x0208, 0x820D, 0x8207, 0x0202
0x8243, 0x0246, 0x024C, 0x8249, 0x0258, 0x825D, 0x8257,
0x0252, 0x0270, 0x8275, 0x827F, 0x027A, 0x826B, 0x026E,
0x0264, 0x8261, 0x0220, 0x8225, 0x822F, 0x022A, 0x823B,
0x023E, 0x0234, 0x8231, 0x8213, 0x0216, 0x021C, 0x8219,
0x0208, 0x820D, 0x8207, 0x0202
] # fmt: skip ] # fmt: skip
# https://emanual.robotis.com/docs/en/dxl/protocol2/#instruction # https://emanual.robotis.com/docs/en/dxl/protocol2/#instruction
@ -191,6 +186,20 @@ class MockInstructionPacket(MockDynamixelPacketv2):
0x00, 0x00 # placeholder for CRC 0x00, 0x00 # placeholder for CRC
] # fmt: skip ] # fmt: skip
@classmethod
def ping(
cls,
dxl_id: int,
) -> bytes:
"""
Builds a "Ping" broadcast instruction.
(from https://emanual.robotis.com/docs/en/dxl/protocol2/#ping-0x01)
No parameters required.
"""
params, length = [], 3
return cls.build(dxl_id=dxl_id, params=params, length=length, instruct_type="Ping")
@classmethod @classmethod
def sync_read( def sync_read(
cls, cls,
@ -252,12 +261,32 @@ class MockStatusPacket(MockDynamixelPacketv2):
0x00, 0x00 # placeholder for CRC 0x00, 0x00 # placeholder for CRC
] # fmt: skip ] # fmt: skip
@classmethod
def ping(cls, dxl_id: int, model_nb: int = 1190, firm_ver: int = 50) -> bytes:
"""Builds a 'Ping' status packet.
https://emanual.robotis.com/docs/en/dxl/protocol2/#ping-0x01
Args:
dxl_id (int): ID of the servo responding.
model_nb (int, optional): Desired 'model number' to be returned in the packet. Defaults to 1190
which corresponds to a XL330-M077-T.
firm_ver (int, optional): Desired 'firmware version' to be returned in the packet.
Defaults to 50.
Returns:
bytes: The raw 'Ping' status packet ready to be sent through serial.
"""
params = [dxl.DXL_LOBYTE(model_nb), dxl.DXL_HIBYTE(model_nb), firm_ver]
length = 7
return cls.build(dxl_id, params=params, length=length)
@classmethod @classmethod
def present_position(cls, dxl_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes: def present_position(cls, dxl_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes:
"""Builds a 'Present_Position' status packet. """Builds a 'Present_Position' status packet.
Args: Args:
dxl_id (int): List of the servos ids dxl_id (int): Servo id
pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it
will use a random value in the min_max_range. Defaults to None. will use a random value in the min_max_range. Defaults to None.
min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos' min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos'
@ -308,62 +337,69 @@ class MockMotors(MockSerial):
ctrl_table = X_SERIES_CONTROL_TABLE ctrl_table = X_SERIES_CONTROL_TABLE
def __init__(self, dlx_ids: list[int]): def __init__(self):
super().__init__() super().__init__()
self._ids = dlx_ids
self.open() self.open()
def build_single_motor_stubs( def build_broadcast_ping_stub(
self, data_name: str, return_value: int | None = None, num_invalid_try: int | None = None self, ids_models_firmwares: dict[int, list[int]] | None = None, num_invalid_try: int = 0
) -> None: ) -> str:
address, length = self.ctrl_table[data_name] ping_request = MockInstructionPacket.ping(dxl.BROADCAST_ID)
for idx in self._ids: return_packets = b"".join(
if data_name == "Present_Position": MockStatusPacket.ping(idx, model, firm_ver)
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length) for idx, (model, firm_ver) in ids_models_firmwares.items()
sync_read_response_single = self._build_present_pos_send_fn(
[idx], [return_value], num_invalid_try
) )
else: ping_response = self._build_send_fn(return_packets, num_invalid_try)
raise NotImplementedError # TODO(aliberts): add ping?
stub_name = "Ping_" + "_".join([str(idx) for idx in ids_models_firmwares])
self.stub( self.stub(
name=f"SyncRead_{data_name}_{idx}", name=stub_name,
receive_bytes=sync_read_request_single, receive_bytes=ping_request,
send_fn=sync_read_response_single, send_fn=ping_response,
) )
return stub_name
def build_all_motors_stub( def build_ping_stub(
self, data_name: str, return_values: list[int] | None = None, num_invalid_try: int | None = None self, dxl_id: int, model_nb: int, firm_ver: int = 50, num_invalid_try: int = 0
) -> None: ) -> str:
address, length = self.ctrl_table[data_name] ping_request = MockInstructionPacket.ping(dxl_id)
if data_name == "Present_Position": return_packet = MockStatusPacket.ping(dxl_id, model_nb, firm_ver)
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length) ping_response = self._build_send_fn(return_packet, num_invalid_try)
sync_read_response_all = self._build_present_pos_send_fn(
self._ids, return_values, num_invalid_try
)
else:
raise NotImplementedError # TODO(aliberts): add ping?
stub_name = f"Ping_{dxl_id}"
self.stub( self.stub(
name=f"SyncRead_{data_name}_all", name=stub_name,
receive_bytes=sync_read_request_all, receive_bytes=ping_request,
send_fn=sync_read_response_all, send_fn=ping_response,
) )
return stub_name
def _build_present_pos_send_fn( def build_sync_read_stub(
self, dxl_ids: list[int], return_pos: list[int] | None = None, num_invalid_try: int | None = None self, data_name: str, ids_values: dict[int, int] | None = None, num_invalid_try: int = 0
) -> Callable[[int], bytes]: ) -> str:
return_pos = [None for _ in dxl_ids] if return_pos is None else return_pos address, length = self.ctrl_table[data_name]
assert len(return_pos) == len(dxl_ids) sync_read_request = MockInstructionPacket.sync_read(list(ids_values), address, length)
if data_name != "Present_Position":
raise NotImplementedError
return_packets = b"".join(
MockStatusPacket.present_position(idx, pos) for idx, pos in ids_values.items()
)
sync_read_response = self._build_send_fn(return_packets, num_invalid_try)
stub_name = f"Sync_Read_{data_name}_" + "_".join([str(idx) for idx in ids_values])
self.stub(
name=stub_name,
receive_bytes=sync_read_request,
send_fn=sync_read_response,
)
return stub_name
@staticmethod
def _build_send_fn(packet: bytes, num_invalid_try: int = 0) -> Callable[[int], bytes]:
def send_fn(_call_count: int) -> bytes: def send_fn(_call_count: int) -> bytes:
if num_invalid_try is not None and num_invalid_try >= _call_count: if num_invalid_try >= _call_count:
return b"" return b""
return packet
packets = b"".join(
MockStatusPacket.present_position(idx, pos)
for idx, pos in zip(dxl_ids, return_pos, strict=True)
)
return packets
return send_fn return send_fn

View File

@ -81,6 +81,7 @@ def test_split_int_bytes_large_number():
def test_abc_implementation(dummy_motors): def test_abc_implementation(dummy_motors):
"""Instantiation should raise an error if the class doesn't implement abstract methods/properties.""" """Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
DynamixelMotorsBus(port="/dev/dummy-port", motors=dummy_motors) DynamixelMotorsBus(port="/dev/dummy-port", motors=dummy_motors)
DynamixelMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "xl330-m077")}) DynamixelMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "xl330-m077")})
@ -95,21 +96,24 @@ def test_abc_implementation(dummy_motors):
ids=["None", "by ids", "by names", "mixed"], ids=["None", "by ids", "by names", "mixed"],
) )
def test_read_all_motors(motors, dummy_motors): def test_read_all_motors(motors, dummy_motors):
mock_motors = MockMotors([1, 2, 3]) mock_motors = MockMotors()
positions = [1337, 42, 4016] expected_positions = {
mock_motors.build_sync_read_all_motors_stub("Present_Position", return_values=positions) 1: 1337,
2: 42,
3: 4016,
}
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_positions)
motors_bus = DynamixelMotorsBus( motors_bus = DynamixelMotorsBus(
port=mock_motors.port, port=mock_motors.port,
motors=dummy_motors, motors=dummy_motors,
) )
motors_bus.connect() motors_bus.connect()
pos_dict = motors_bus.read("Present_Position", motors=motors) positions_read = motors_bus.read("Present_Position", motors=motors)
assert mock_motors.stubs["SyncRead_Present_Position_all"].called motors = ["dummy_1", "dummy_2", "dummy_3"] if motors is None else motors
assert all(returned_pos == pos for returned_pos, pos in zip(pos_dict.values(), positions, strict=True)) assert mock_motors.stubs[stub_name].called
assert set(pos_dict) == {"dummy_1", "dummy_2", "dummy_3"} assert positions_read == dict(zip(motors, expected_positions.values(), strict=True))
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -121,8 +125,9 @@ def test_read_all_motors(motors, dummy_motors):
], ],
) )
def test_read_single_motor_by_name(idx, pos, dummy_motors): def test_read_single_motor_by_name(idx, pos, dummy_motors):
mock_motors = MockMotors([1, 2, 3]) mock_motors = MockMotors()
mock_motors.build_sync_read_single_motor_stubs("Present_Position", return_value=pos) expected_position = {idx: pos}
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position)
motors_bus = DynamixelMotorsBus( motors_bus = DynamixelMotorsBus(
port=mock_motors.port, port=mock_motors.port,
motors=dummy_motors, motors=dummy_motors,
@ -131,9 +136,8 @@ def test_read_single_motor_by_name(idx, pos, dummy_motors):
pos_dict = motors_bus.read("Present_Position", f"dummy_{idx}") pos_dict = motors_bus.read("Present_Position", f"dummy_{idx}")
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called assert mock_motors.stubs[stub_name].called
assert pos_dict == {f"dummy_{idx}": pos} assert pos_dict == {f"dummy_{idx}": pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -145,8 +149,9 @@ def test_read_single_motor_by_name(idx, pos, dummy_motors):
], ],
) )
def test_read_single_motor_by_id(idx, pos, dummy_motors): def test_read_single_motor_by_id(idx, pos, dummy_motors):
mock_motors = MockMotors([1, 2, 3]) mock_motors = MockMotors()
mock_motors.build_sync_read_single_motor_stubs("Present_Position", return_value=pos) expected_position = {idx: pos}
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position)
motors_bus = DynamixelMotorsBus( motors_bus = DynamixelMotorsBus(
port=mock_motors.port, port=mock_motors.port,
motors=dummy_motors, motors=dummy_motors,
@ -155,24 +160,24 @@ def test_read_single_motor_by_id(idx, pos, dummy_motors):
pos_dict = motors_bus.read("Present_Position", idx) pos_dict = motors_bus.read("Present_Position", idx)
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called assert mock_motors.stubs[stub_name].called
assert pos_dict == {f"dummy_{idx}": pos} assert pos_dict == {idx: pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
@pytest.mark.parametrize( @pytest.mark.parametrize(
"num_retry, num_invalid_try, pos", "num_retry, num_invalid_try, pos",
[ [
[1, 2, 1337], [0, 2, 1337],
[2, 3, 42], [2, 3, 42],
[3, 2, 4016], [3, 2, 4016],
[2, 1, 999], [2, 1, 999],
], ],
) )
def test_read_num_retry(num_retry, num_invalid_try, pos, dummy_motors): def test_read_num_retry(num_retry, num_invalid_try, pos, dummy_motors):
mock_motors = MockMotors([1, 2, 3]) mock_motors = MockMotors()
mock_motors.build_sync_read_single_motor_stubs( expected_position = {1: pos}
"Present_Position", return_value=pos, num_invalid_try=num_invalid_try stub_name = mock_motors.build_sync_read_stub(
"Present_Position", expected_position, num_invalid_try=num_invalid_try
) )
motors_bus = DynamixelMotorsBus( motors_bus = DynamixelMotorsBus(
port=mock_motors.port, port=mock_motors.port,
@ -182,10 +187,10 @@ def test_read_num_retry(num_retry, num_invalid_try, pos, dummy_motors):
if num_retry >= num_invalid_try: if num_retry >= num_invalid_try:
pos_dict = motors_bus.read("Present_Position", 1, num_retry=num_retry) pos_dict = motors_bus.read("Present_Position", 1, num_retry=num_retry)
assert pos_dict == {"dummy_1": pos} assert pos_dict == {1: pos}
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
else: else:
with pytest.raises(ConnectionError): with pytest.raises(ConnectionError):
_ = motors_bus.read("Present_Position", 1, num_retry=num_retry) _ = motors_bus.read("Present_Position", 1, num_retry=num_retry)
assert mock_motors.stubs["SyncRead_Present_Position_1"].calls == num_retry expected_calls = min(1 + num_retry, 1 + num_invalid_try)
assert mock_motors.stubs[stub_name].calls == expected_calls