Refactor tests

This commit is contained in:
Simon Alibert 2025-04-14 15:26:29 +02:00
parent d70bc4bde9
commit 1f210bc8a3
4 changed files with 124 additions and 213 deletions

View File

@ -5,7 +5,6 @@ import dynamixel_sdk as dxl
import serial import serial
from mock_serial.mock_serial import MockSerial from mock_serial.mock_serial import MockSerial
from lerobot.common.motors.dynamixel import X_SERIES_CONTROL_TABLE
from lerobot.common.motors.dynamixel.dynamixel import _split_into_byte_chunks from lerobot.common.motors.dynamixel.dynamixel import _split_into_byte_chunks
from .mock_serial_patch import WaitableStub from .mock_serial_patch import WaitableStub
@ -425,8 +424,6 @@ class MockMotors(MockSerial):
instruction packets. It is meant to test MotorsBus classes. instruction packets. It is meant to test MotorsBus classes.
""" """
ctrl_table = X_SERIES_CONTROL_TABLE
def __init__(self): def __init__(self):
super().__init__() super().__init__()

View File

@ -5,7 +5,6 @@ import scservo_sdk as scs
import serial import serial
from mock_serial import MockSerial from mock_serial import MockSerial
from lerobot.common.motors.feetech import STS_SMS_SERIES_CONTROL_TABLE
from lerobot.common.motors.feetech.feetech import _split_into_byte_chunks, patch_setPacketTimeout from lerobot.common.motors.feetech.feetech import _split_into_byte_chunks, patch_setPacketTimeout
from .mock_serial_patch import WaitableStub from .mock_serial_patch import WaitableStub
@ -278,8 +277,6 @@ class MockMotors(MockSerial):
instruction packets. It is meant to test MotorsBus classes. instruction packets. It is meant to test MotorsBus classes.
""" """
ctrl_table = STS_SMS_SERIES_CONTROL_TABLE
def __init__(self): def __init__(self):
super().__init__() super().__init__()

View File

@ -90,13 +90,10 @@ def test_abc_implementation(dummy_motors):
def test_ping(id_, mock_motors, dummy_motors): def test_ping(id_, mock_motors, dummy_motors):
expected_model_nb = MODEL_NUMBER_TABLE[dummy_motors[f"dummy_{id_}"].model] expected_model_nb = MODEL_NUMBER_TABLE[dummy_motors[f"dummy_{id_}"].model]
stub = mock_motors.build_ping_stub(id_, expected_model_nb) stub = mock_motors.build_ping_stub(id_, expected_model_nb)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
ping_model_nb = motors_bus.ping(id_) ping_model_nb = bus.ping(id_)
assert ping_model_nb == expected_model_nb assert ping_model_nb == expected_model_nb
assert mock_motors.stubs[stub].called assert mock_motors.stubs[stub].called
@ -106,13 +103,10 @@ def test_broadcast_ping(mock_motors, dummy_motors):
models = {m.id: m.model for m in dummy_motors.values()} models = {m.id: m.model for m in dummy_motors.values()}
expected_model_nbs = {id_: MODEL_NUMBER_TABLE[model] for id_, model in models.items()} expected_model_nbs = {id_: MODEL_NUMBER_TABLE[model] for id_, model in models.items()}
stub = mock_motors.build_broadcast_ping_stub(expected_model_nbs) stub = mock_motors.build_broadcast_ping_stub(expected_model_nbs)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
ping_model_nbs = motors_bus.broadcast_ping() ping_model_nbs = bus.broadcast_ping()
assert ping_model_nbs == expected_model_nbs assert ping_model_nbs == expected_model_nbs
assert mock_motors.stubs[stub].called assert mock_motors.stubs[stub].called
@ -128,13 +122,10 @@ def test_broadcast_ping(mock_motors, dummy_motors):
) )
def test__read(addr, length, id_, value, mock_motors, dummy_motors): def test__read(addr, length, id_, value, mock_motors, dummy_motors):
stub = mock_motors.build_read_stub(addr, length, id_, value) stub = mock_motors.build_read_stub(addr, length, id_, value)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
read_value, _, _ = motors_bus._read(addr, length, id_) read_value, _, _ = bus._read(addr, length, id_)
assert mock_motors.stubs[stub].called assert mock_motors.stubs[stub].called
assert read_value == value assert read_value == value
@ -144,19 +135,16 @@ def test__read(addr, length, id_, value, mock_motors, dummy_motors):
def test__read_error(raise_on_error, mock_motors, dummy_motors): def test__read_error(raise_on_error, mock_motors, dummy_motors):
addr, length, id_, value, error = (10, 4, 1, 1337, dxl.ERRNUM_DATA_LIMIT) addr, length, id_, value, error = (10, 4, 1, 1337, dxl.ERRNUM_DATA_LIMIT)
stub = mock_motors.build_read_stub(addr, length, id_, value, error=error) stub = mock_motors.build_read_stub(addr, length, id_, value, error=error)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
if raise_on_error: if raise_on_error:
with pytest.raises( with pytest.raises(
RuntimeError, match=re.escape("[RxPacketError] The data value exceeds the limit value!") RuntimeError, match=re.escape("[RxPacketError] The data value exceeds the limit value!")
): ):
motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) bus._read(addr, length, id_, raise_on_error=raise_on_error)
else: else:
_, _, read_error = motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) _, _, read_error = bus._read(addr, length, id_, raise_on_error=raise_on_error)
assert read_error == error assert read_error == error
assert mock_motors.stubs[stub].called assert mock_motors.stubs[stub].called
@ -166,17 +154,14 @@ def test__read_error(raise_on_error, mock_motors, dummy_motors):
def test__read_comm(raise_on_error, mock_motors, dummy_motors): def test__read_comm(raise_on_error, mock_motors, dummy_motors):
addr, length, id_, value = (10, 4, 1, 1337) addr, length, id_, value = (10, 4, 1, 1337)
stub = mock_motors.build_read_stub(addr, length, id_, value, reply=False) stub = mock_motors.build_read_stub(addr, length, id_, value, reply=False)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
if raise_on_error: if raise_on_error:
with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")):
motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) bus._read(addr, length, id_, raise_on_error=raise_on_error)
else: else:
_, read_comm, _ = motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) _, read_comm, _ = bus._read(addr, length, id_, raise_on_error=raise_on_error)
assert read_comm == dxl.COMM_RX_TIMEOUT assert read_comm == dxl.COMM_RX_TIMEOUT
assert mock_motors.stubs[stub].called assert mock_motors.stubs[stub].called
@ -192,13 +177,10 @@ def test__read_comm(raise_on_error, mock_motors, dummy_motors):
) )
def test__write(addr, length, id_, value, mock_motors, dummy_motors): def test__write(addr, length, id_, value, mock_motors, dummy_motors):
stub = mock_motors.build_write_stub(addr, length, id_, value) stub = mock_motors.build_write_stub(addr, length, id_, value)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
comm, error = motors_bus._write(addr, length, id_, value) comm, error = bus._write(addr, length, id_, value)
assert mock_motors.stubs[stub].called assert mock_motors.stubs[stub].called
assert comm == dxl.COMM_SUCCESS assert comm == dxl.COMM_SUCCESS
@ -209,19 +191,16 @@ def test__write(addr, length, id_, value, mock_motors, dummy_motors):
def test__write_error(raise_on_error, mock_motors, dummy_motors): def test__write_error(raise_on_error, mock_motors, dummy_motors):
addr, length, id_, value, error = (10, 4, 1, 1337, dxl.ERRNUM_DATA_LIMIT) addr, length, id_, value, error = (10, 4, 1, 1337, dxl.ERRNUM_DATA_LIMIT)
stub = mock_motors.build_write_stub(addr, length, id_, value, error=error) stub = mock_motors.build_write_stub(addr, length, id_, value, error=error)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
if raise_on_error: if raise_on_error:
with pytest.raises( with pytest.raises(
RuntimeError, match=re.escape("[RxPacketError] The data value exceeds the limit value!") RuntimeError, match=re.escape("[RxPacketError] The data value exceeds the limit value!")
): ):
motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) bus._write(addr, length, id_, value, raise_on_error=raise_on_error)
else: else:
_, write_error = motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) _, write_error = bus._write(addr, length, id_, value, raise_on_error=raise_on_error)
assert write_error == error assert write_error == error
assert mock_motors.stubs[stub].called assert mock_motors.stubs[stub].called
@ -231,17 +210,14 @@ def test__write_error(raise_on_error, mock_motors, dummy_motors):
def test__write_comm(raise_on_error, mock_motors, dummy_motors): def test__write_comm(raise_on_error, mock_motors, dummy_motors):
addr, length, id_, value = (10, 4, 1, 1337) addr, length, id_, value = (10, 4, 1, 1337)
stub = mock_motors.build_write_stub(addr, length, id_, value, reply=False) stub = mock_motors.build_write_stub(addr, length, id_, value, reply=False)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
if raise_on_error: if raise_on_error:
with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")):
motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) bus._write(addr, length, id_, value, raise_on_error=raise_on_error)
else: else:
write_comm, _ = motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) write_comm, _ = bus._write(addr, length, id_, value, raise_on_error=raise_on_error)
assert write_comm == dxl.COMM_RX_TIMEOUT assert write_comm == dxl.COMM_RX_TIMEOUT
assert mock_motors.stubs[stub].called assert mock_motors.stubs[stub].called
@ -258,13 +234,10 @@ def test__write_comm(raise_on_error, mock_motors, dummy_motors):
) )
def test__sync_read(addr, length, ids_values, mock_motors, dummy_motors): def test__sync_read(addr, length, ids_values, mock_motors, dummy_motors):
stub = mock_motors.build_sync_read_stub(addr, length, ids_values) stub = mock_motors.build_sync_read_stub(addr, length, ids_values)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
read_values, _ = motors_bus._sync_read(addr, length, list(ids_values)) read_values, _ = bus._sync_read(addr, length, list(ids_values))
assert mock_motors.stubs[stub].called assert mock_motors.stubs[stub].called
assert read_values == ids_values assert read_values == ids_values
@ -274,17 +247,14 @@ def test__sync_read(addr, length, ids_values, mock_motors, dummy_motors):
def test__sync_read_comm(raise_on_error, mock_motors, dummy_motors): def test__sync_read_comm(raise_on_error, mock_motors, dummy_motors):
addr, length, ids_values = (10, 4, {1: 1337}) addr, length, ids_values = (10, 4, {1: 1337})
stub = mock_motors.build_sync_read_stub(addr, length, ids_values, reply=False) stub = mock_motors.build_sync_read_stub(addr, length, ids_values, reply=False)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
if raise_on_error: if raise_on_error:
with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")):
motors_bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error)
else: else:
_, read_comm = motors_bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) _, read_comm = bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error)
assert read_comm == dxl.COMM_RX_TIMEOUT assert read_comm == dxl.COMM_RX_TIMEOUT
assert mock_motors.stubs[stub].called assert mock_motors.stubs[stub].called
@ -301,13 +271,10 @@ def test__sync_read_comm(raise_on_error, mock_motors, dummy_motors):
) )
def test__sync_write(addr, length, ids_values, mock_motors, dummy_motors): def test__sync_write(addr, length, ids_values, mock_motors, dummy_motors):
stub = mock_motors.build_sync_write_stub(addr, length, ids_values) stub = mock_motors.build_sync_write_stub(addr, length, ids_values)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
comm = motors_bus._sync_write(addr, length, ids_values) comm = bus._sync_write(addr, length, ids_values)
assert mock_motors.stubs[stub].wait_called() assert mock_motors.stubs[stub].wait_called()
assert comm == dxl.COMM_SUCCESS assert comm == dxl.COMM_SUCCESS
@ -322,14 +289,14 @@ def test_is_calibrated(mock_motors, dummy_motors, dummy_calibration):
offsets_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Homing_Offset"], encoded_homings) offsets_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Homing_Offset"], encoded_homings)
mins_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Min_Position_Limit"], mins) mins_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Min_Position_Limit"], mins)
maxes_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Max_Position_Limit"], maxes) maxes_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Max_Position_Limit"], maxes)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(
port=mock_motors.port, port=mock_motors.port,
motors=dummy_motors, motors=dummy_motors,
calibration=dummy_calibration, calibration=dummy_calibration,
) )
motors_bus.connect(assert_motors_exist=False) bus.connect(assert_motors_exist=False)
is_calibrated = motors_bus.is_calibrated is_calibrated = bus.is_calibrated
assert is_calibrated assert is_calibrated
assert mock_motors.stubs[drive_modes_stub].called assert mock_motors.stubs[drive_modes_stub].called
@ -353,13 +320,10 @@ def test_reset_calibration(mock_motors, dummy_motors):
mock_motors.build_write_stub(*X_SERIES_CONTROL_TABLE["Max_Position_Limit"], motor.id, 4095) mock_motors.build_write_stub(*X_SERIES_CONTROL_TABLE["Max_Position_Limit"], motor.id, 4095)
) )
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
motors_bus.reset_calibration() bus.reset_calibration()
assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs) assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs)
assert all(mock_motors.stubs[stub].called for stub in write_mins_stubs) assert all(mock_motors.stubs[stub].called for stub in write_mins_stubs)
@ -390,16 +354,13 @@ def test_set_half_turn_homings(mock_motors, dummy_motors):
stub = mock_motors.build_write_stub(*X_SERIES_CONTROL_TABLE["Homing_Offset"], id_, encoded_homing) stub = mock_motors.build_write_stub(*X_SERIES_CONTROL_TABLE["Homing_Offset"], id_, encoded_homing)
write_homing_stubs.append(stub) write_homing_stubs.append(stub)
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors, bus.reset_calibration = MagicMock()
)
motors_bus.connect(assert_motors_exist=False)
motors_bus.reset_calibration = MagicMock()
motors_bus.set_half_turn_homings() bus.set_half_turn_homings()
motors_bus.reset_calibration.assert_called_once() bus.reset_calibration.assert_called_once()
assert mock_motors.stubs[read_pos_stub].called assert mock_motors.stubs[read_pos_stub].called
assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs) assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs)
@ -424,13 +385,10 @@ def test_record_ranges_of_motion(mock_motors, dummy_motors):
*X_SERIES_CONTROL_TABLE["Present_Position"], positions *X_SERIES_CONTROL_TABLE["Present_Position"], positions
) )
with patch("lerobot.common.motors.motors_bus.enter_pressed", side_effect=[False, True]): with patch("lerobot.common.motors.motors_bus.enter_pressed", side_effect=[False, True]):
motors_bus = DynamixelMotorsBus( bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
mins, maxes = motors_bus.record_ranges_of_motion(display_values=False) mins, maxes = bus.record_ranges_of_motion(display_values=False)
assert mock_motors.stubs[read_pos_stub].calls == 3 assert mock_motors.stubs[read_pos_stub].calls == 3
assert mins == expected_mins assert mins == expected_mins

View File

@ -91,36 +91,19 @@ def test_abc_implementation(dummy_motors):
FeetechMotorsBus(port="/dev/dummy-port", motors=dummy_motors) FeetechMotorsBus(port="/dev/dummy-port", motors=dummy_motors)
@pytest.mark.skip("TODO")
def test_scan_port(mock_motors):
expected = {
9_600: {1: 777},
57_600: {2: 777},
500_000: {237: 777},
}
expected_model_nbs = {id_: model for d in expected.values() for id_, model in d.items()}
ping_stub = mock_motors.build_broadcast_ping_stub(list(expected_model_nbs))
mobel_nb_stub = mock_motors.build_sync_read_stub("Model_Number", expected_model_nbs)
found = FeetechMotorsBus.scan_port(mock_motors.port)
assert found == expected
assert mock_motors.stubs[ping_stub].called
assert mock_motors.stubs[mobel_nb_stub].called
@pytest.mark.parametrize("id_", [1, 2, 3]) @pytest.mark.parametrize("id_", [1, 2, 3])
def test_ping(id_, mock_motors, dummy_motors): def test_ping(id_, mock_motors, dummy_motors):
expected_model_nb = MODEL_NUMBER_TABLE[dummy_motors[f"dummy_{id_}"].model] expected_model_nb = MODEL_NUMBER_TABLE[dummy_motors[f"dummy_{id_}"].model]
addr, length = MODEL_NUMBER addr, length = MODEL_NUMBER
ping_stub = mock_motors.build_ping_stub(id_) ping_stub = mock_motors.build_ping_stub(id_)
mobel_nb_stub = mock_motors.build_read_stub(addr, length, id_, expected_model_nb) mobel_nb_stub = mock_motors.build_read_stub(addr, length, id_, expected_model_nb)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(
port=mock_motors.port, port=mock_motors.port,
motors=dummy_motors, motors=dummy_motors,
) )
motors_bus.connect(assert_motors_exist=False) bus.connect(assert_motors_exist=False)
ping_model_nb = motors_bus.ping(id_) ping_model_nb = bus.ping(id_)
assert ping_model_nb == expected_model_nb assert ping_model_nb == expected_model_nb
assert mock_motors.stubs[ping_stub].called assert mock_motors.stubs[ping_stub].called
@ -138,13 +121,13 @@ def test_broadcast_ping(mock_motors, dummy_motors):
stub = mock_motors.build_read_stub(addr, length, id_, model_nb) stub = mock_motors.build_read_stub(addr, length, id_, model_nb)
expected_model_nbs[id_] = model_nb expected_model_nbs[id_] = model_nb
mobel_nb_stubs.append(stub) mobel_nb_stubs.append(stub)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(
port=mock_motors.port, port=mock_motors.port,
motors=dummy_motors, motors=dummy_motors,
) )
motors_bus.connect(assert_motors_exist=False) bus.connect(assert_motors_exist=False)
ping_model_nbs = motors_bus.broadcast_ping() ping_model_nbs = bus.broadcast_ping()
assert ping_model_nbs == expected_model_nbs assert ping_model_nbs == expected_model_nbs
assert mock_motors.stubs[ping_stub].called assert mock_motors.stubs[ping_stub].called
@ -160,57 +143,57 @@ def test_broadcast_ping(mock_motors, dummy_motors):
], ],
) )
def test__read(addr, length, id_, value, mock_motors, dummy_motors): def test__read(addr, length, id_, value, mock_motors, dummy_motors):
stub_name = mock_motors.build_read_stub(addr, length, id_, value) stub = mock_motors.build_read_stub(addr, length, id_, value)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(
port=mock_motors.port, port=mock_motors.port,
motors=dummy_motors, motors=dummy_motors,
) )
motors_bus.connect(assert_motors_exist=False) bus.connect(assert_motors_exist=False)
read_value, _, _ = motors_bus._read(addr, length, id_) read_value, _, _ = bus._read(addr, length, id_)
assert mock_motors.stubs[stub_name].called assert mock_motors.stubs[stub].called
assert read_value == value assert read_value == value
@pytest.mark.parametrize("raise_on_error", (True, False)) @pytest.mark.parametrize("raise_on_error", (True, False))
def test__read_error(raise_on_error, mock_motors, dummy_motors): def test__read_error(raise_on_error, mock_motors, dummy_motors):
addr, length, id_, value, error = (10, 4, 1, 1337, scs.ERRBIT_VOLTAGE) addr, length, id_, value, error = (10, 4, 1, 1337, scs.ERRBIT_VOLTAGE)
stub_name = mock_motors.build_read_stub(addr, length, id_, value, error=error) stub = mock_motors.build_read_stub(addr, length, id_, value, error=error)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(
port=mock_motors.port, port=mock_motors.port,
motors=dummy_motors, motors=dummy_motors,
) )
motors_bus.connect(assert_motors_exist=False) bus.connect(assert_motors_exist=False)
if raise_on_error: if raise_on_error:
with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")): with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")):
motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) bus._read(addr, length, id_, raise_on_error=raise_on_error)
else: else:
_, _, read_error = motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) _, _, read_error = bus._read(addr, length, id_, raise_on_error=raise_on_error)
assert read_error == error assert read_error == error
assert mock_motors.stubs[stub_name].called assert mock_motors.stubs[stub].called
@pytest.mark.parametrize("raise_on_error", (True, False)) @pytest.mark.parametrize("raise_on_error", (True, False))
def test__read_comm(raise_on_error, mock_motors, dummy_motors): def test__read_comm(raise_on_error, mock_motors, dummy_motors):
addr, length, id_, value = (10, 4, 1, 1337) addr, length, id_, value = (10, 4, 1, 1337)
stub_name = mock_motors.build_read_stub(addr, length, id_, value, reply=False) stub = mock_motors.build_read_stub(addr, length, id_, value, reply=False)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(
port=mock_motors.port, port=mock_motors.port,
motors=dummy_motors, motors=dummy_motors,
) )
motors_bus.connect(assert_motors_exist=False) bus.connect(assert_motors_exist=False)
if raise_on_error: if raise_on_error:
with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")):
motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) bus._read(addr, length, id_, raise_on_error=raise_on_error)
else: else:
_, read_comm, _ = motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) _, read_comm, _ = bus._read(addr, length, id_, raise_on_error=raise_on_error)
assert read_comm == scs.COMM_RX_TIMEOUT assert read_comm == scs.COMM_RX_TIMEOUT
assert mock_motors.stubs[stub_name].called assert mock_motors.stubs[stub].called
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -222,16 +205,16 @@ def test__read_comm(raise_on_error, mock_motors, dummy_motors):
], ],
) )
def test__write(addr, length, id_, value, mock_motors, dummy_motors): def test__write(addr, length, id_, value, mock_motors, dummy_motors):
stub_name = mock_motors.build_write_stub(addr, length, id_, value) stub = mock_motors.build_write_stub(addr, length, id_, value)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(
port=mock_motors.port, port=mock_motors.port,
motors=dummy_motors, motors=dummy_motors,
) )
motors_bus.connect(assert_motors_exist=False) bus.connect(assert_motors_exist=False)
comm, error = motors_bus._write(addr, length, id_, value) comm, error = bus._write(addr, length, id_, value)
assert mock_motors.stubs[stub_name].called assert mock_motors.stubs[stub].called
assert comm == scs.COMM_SUCCESS assert comm == scs.COMM_SUCCESS
assert error == 0 assert error == 0
@ -239,41 +222,35 @@ def test__write(addr, length, id_, value, mock_motors, dummy_motors):
@pytest.mark.parametrize("raise_on_error", (True, False)) @pytest.mark.parametrize("raise_on_error", (True, False))
def test__write_error(raise_on_error, mock_motors, dummy_motors): def test__write_error(raise_on_error, mock_motors, dummy_motors):
addr, length, id_, value, error = (10, 4, 1, 1337, scs.ERRBIT_VOLTAGE) addr, length, id_, value, error = (10, 4, 1, 1337, scs.ERRBIT_VOLTAGE)
stub_name = mock_motors.build_write_stub(addr, length, id_, value, error=error) stub = mock_motors.build_write_stub(addr, length, id_, value, error=error)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
if raise_on_error: if raise_on_error:
with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")): with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")):
motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) bus._write(addr, length, id_, value, raise_on_error=raise_on_error)
else: else:
_, write_error = motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) _, write_error = bus._write(addr, length, id_, value, raise_on_error=raise_on_error)
assert write_error == error assert write_error == error
assert mock_motors.stubs[stub_name].called assert mock_motors.stubs[stub].called
@pytest.mark.parametrize("raise_on_error", (True, False)) @pytest.mark.parametrize("raise_on_error", (True, False))
def test__write_comm(raise_on_error, mock_motors, dummy_motors): def test__write_comm(raise_on_error, mock_motors, dummy_motors):
addr, length, id_, value = (10, 4, 1, 1337) addr, length, id_, value = (10, 4, 1, 1337)
stub_name = mock_motors.build_write_stub(addr, length, id_, value, reply=False) stub = mock_motors.build_write_stub(addr, length, id_, value, reply=False)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
if raise_on_error: if raise_on_error:
with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")):
motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) bus._write(addr, length, id_, value, raise_on_error=raise_on_error)
else: else:
write_comm, _ = motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) write_comm, _ = bus._write(addr, length, id_, value, raise_on_error=raise_on_error)
assert write_comm == scs.COMM_RX_TIMEOUT assert write_comm == scs.COMM_RX_TIMEOUT
assert mock_motors.stubs[stub_name].called assert mock_motors.stubs[stub].called
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -286,37 +263,31 @@ def test__write_comm(raise_on_error, mock_motors, dummy_motors):
ids=["1 motor", "2 motors", "3 motors"], ids=["1 motor", "2 motors", "3 motors"],
) )
def test__sync_read(addr, length, ids_values, mock_motors, dummy_motors): def test__sync_read(addr, length, ids_values, mock_motors, dummy_motors):
stub_name = mock_motors.build_sync_read_stub(addr, length, ids_values) stub = mock_motors.build_sync_read_stub(addr, length, ids_values)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
read_values, _ = motors_bus._sync_read(addr, length, list(ids_values)) read_values, _ = bus._sync_read(addr, length, list(ids_values))
assert mock_motors.stubs[stub_name].called assert mock_motors.stubs[stub].called
assert read_values == ids_values assert read_values == ids_values
@pytest.mark.parametrize("raise_on_error", (True, False)) @pytest.mark.parametrize("raise_on_error", (True, False))
def test__sync_read_comm(raise_on_error, mock_motors, dummy_motors): def test__sync_read_comm(raise_on_error, mock_motors, dummy_motors):
addr, length, ids_values = (10, 4, {1: 1337}) addr, length, ids_values = (10, 4, {1: 1337})
stub_name = mock_motors.build_sync_read_stub(addr, length, ids_values, reply=False) stub = mock_motors.build_sync_read_stub(addr, length, ids_values, reply=False)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
if raise_on_error: if raise_on_error:
with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")):
motors_bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error)
else: else:
_, read_comm = motors_bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) _, read_comm = bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error)
assert read_comm == scs.COMM_RX_TIMEOUT assert read_comm == scs.COMM_RX_TIMEOUT
assert mock_motors.stubs[stub_name].called assert mock_motors.stubs[stub].called
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -329,16 +300,13 @@ def test__sync_read_comm(raise_on_error, mock_motors, dummy_motors):
ids=["1 motor", "2 motors", "3 motors"], ids=["1 motor", "2 motors", "3 motors"],
) )
def test__sync_write(addr, length, ids_values, mock_motors, dummy_motors): def test__sync_write(addr, length, ids_values, mock_motors, dummy_motors):
stub_name = mock_motors.build_sync_write_stub(addr, length, ids_values) stub = mock_motors.build_sync_write_stub(addr, length, ids_values)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
comm = motors_bus._sync_write(addr, length, ids_values) comm = bus._sync_write(addr, length, ids_values)
assert mock_motors.stubs[stub_name].wait_called() assert mock_motors.stubs[stub].wait_called()
assert comm == scs.COMM_SUCCESS assert comm == scs.COMM_SUCCESS
@ -351,14 +319,14 @@ def test_is_calibrated(mock_motors, dummy_motors, dummy_calibration):
) )
mins_stub = mock_motors.build_sync_read_stub(*STS_SMS_SERIES_CONTROL_TABLE["Min_Position_Limit"], mins) mins_stub = mock_motors.build_sync_read_stub(*STS_SMS_SERIES_CONTROL_TABLE["Min_Position_Limit"], mins)
maxes_stub = mock_motors.build_sync_read_stub(*STS_SMS_SERIES_CONTROL_TABLE["Max_Position_Limit"], maxes) maxes_stub = mock_motors.build_sync_read_stub(*STS_SMS_SERIES_CONTROL_TABLE["Max_Position_Limit"], maxes)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(
port=mock_motors.port, port=mock_motors.port,
motors=dummy_motors, motors=dummy_motors,
calibration=dummy_calibration, calibration=dummy_calibration,
) )
motors_bus.connect(assert_motors_exist=False) bus.connect(assert_motors_exist=False)
is_calibrated = motors_bus.is_calibrated is_calibrated = bus.is_calibrated
assert is_calibrated assert is_calibrated
assert mock_motors.stubs[offsets_stub].called assert mock_motors.stubs[offsets_stub].called
@ -381,13 +349,10 @@ def test_reset_calibration(mock_motors, dummy_motors):
mock_motors.build_write_stub(*STS_SMS_SERIES_CONTROL_TABLE["Max_Position_Limit"], motor.id, 4095) mock_motors.build_write_stub(*STS_SMS_SERIES_CONTROL_TABLE["Max_Position_Limit"], motor.id, 4095)
) )
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
motors_bus.reset_calibration() bus.reset_calibration()
assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs) assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs)
assert all(mock_motors.stubs[stub].called for stub in write_mins_stubs) assert all(mock_motors.stubs[stub].called for stub in write_mins_stubs)
@ -420,16 +385,13 @@ def test_set_half_turn_homings(mock_motors, dummy_motors):
) )
write_homing_stubs.append(stub) write_homing_stubs.append(stub)
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors, bus.reset_calibration = MagicMock()
)
motors_bus.connect(assert_motors_exist=False)
motors_bus.reset_calibration = MagicMock()
motors_bus.set_half_turn_homings() bus.set_half_turn_homings()
motors_bus.reset_calibration.assert_called_once() bus.reset_calibration.assert_called_once()
assert mock_motors.stubs[read_pos_stub].called assert mock_motors.stubs[read_pos_stub].called
assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs) assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs)
@ -450,18 +412,15 @@ def test_record_ranges_of_motion(mock_motors, dummy_motors):
"dummy_2": 3600, "dummy_2": 3600,
"dummy_3": 4002, "dummy_3": 4002,
} }
read_pos_stub = mock_motors.build_sequential_sync_read_stub( stub = mock_motors.build_sequential_sync_read_stub(
*STS_SMS_SERIES_CONTROL_TABLE["Present_Position"], positions *STS_SMS_SERIES_CONTROL_TABLE["Present_Position"], positions
) )
with patch("lerobot.common.motors.motors_bus.enter_pressed", side_effect=[False, True]): with patch("lerobot.common.motors.motors_bus.enter_pressed", side_effect=[False, True]):
motors_bus = FeetechMotorsBus( bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors)
port=mock_motors.port, bus.connect(assert_motors_exist=False)
motors=dummy_motors,
)
motors_bus.connect(assert_motors_exist=False)
mins, maxes = motors_bus.record_ranges_of_motion(display_values=False) mins, maxes = bus.record_ranges_of_motion(display_values=False)
assert mock_motors.stubs[read_pos_stub].calls == 3 assert mock_motors.stubs[stub].calls == 3
assert mins == expected_mins assert mins == expected_mins
assert maxes == expected_maxes assert maxes == expected_maxes