From 1f210bc8a37c15c213878690e64e59783ee93999 Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Mon, 14 Apr 2025 15:26:29 +0200 Subject: [PATCH] Refactor tests --- tests/mocks/mock_dynamixel.py | 3 - tests/mocks/mock_feetech.py | 3 - tests/motors/test_dynamixel.py | 146 +++++++++----------------- tests/motors/test_feetech.py | 185 +++++++++++++-------------------- 4 files changed, 124 insertions(+), 213 deletions(-) diff --git a/tests/mocks/mock_dynamixel.py b/tests/mocks/mock_dynamixel.py index a9d434e9..6f78400d 100644 --- a/tests/mocks/mock_dynamixel.py +++ b/tests/mocks/mock_dynamixel.py @@ -5,7 +5,6 @@ import dynamixel_sdk as dxl import serial from mock_serial.mock_serial import MockSerial -from lerobot.common.motors.dynamixel import X_SERIES_CONTROL_TABLE from lerobot.common.motors.dynamixel.dynamixel import _split_into_byte_chunks from .mock_serial_patch import WaitableStub @@ -425,8 +424,6 @@ class MockMotors(MockSerial): instruction packets. It is meant to test MotorsBus classes. """ - ctrl_table = X_SERIES_CONTROL_TABLE - def __init__(self): super().__init__() diff --git a/tests/mocks/mock_feetech.py b/tests/mocks/mock_feetech.py index f4bb1c68..5948bd5e 100644 --- a/tests/mocks/mock_feetech.py +++ b/tests/mocks/mock_feetech.py @@ -5,7 +5,6 @@ import scservo_sdk as scs import serial from mock_serial import MockSerial -from lerobot.common.motors.feetech import STS_SMS_SERIES_CONTROL_TABLE from lerobot.common.motors.feetech.feetech import _split_into_byte_chunks, patch_setPacketTimeout from .mock_serial_patch import WaitableStub @@ -278,8 +277,6 @@ class MockMotors(MockSerial): instruction packets. It is meant to test MotorsBus classes. """ - ctrl_table = STS_SMS_SERIES_CONTROL_TABLE - def __init__(self): super().__init__() diff --git a/tests/motors/test_dynamixel.py b/tests/motors/test_dynamixel.py index 163af2d1..cb2c11e6 100644 --- a/tests/motors/test_dynamixel.py +++ b/tests/motors/test_dynamixel.py @@ -90,13 +90,10 @@ def test_abc_implementation(dummy_motors): def test_ping(id_, mock_motors, dummy_motors): expected_model_nb = MODEL_NUMBER_TABLE[dummy_motors[f"dummy_{id_}"].model] stub = mock_motors.build_ping_stub(id_, expected_model_nb) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) - ping_model_nb = motors_bus.ping(id_) + ping_model_nb = bus.ping(id_) assert ping_model_nb == expected_model_nb assert mock_motors.stubs[stub].called @@ -106,13 +103,10 @@ def test_broadcast_ping(mock_motors, dummy_motors): models = {m.id: m.model for m in dummy_motors.values()} expected_model_nbs = {id_: MODEL_NUMBER_TABLE[model] for id_, model in models.items()} stub = mock_motors.build_broadcast_ping_stub(expected_model_nbs) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) - ping_model_nbs = motors_bus.broadcast_ping() + ping_model_nbs = bus.broadcast_ping() assert ping_model_nbs == expected_model_nbs assert mock_motors.stubs[stub].called @@ -128,13 +122,10 @@ def test_broadcast_ping(mock_motors, dummy_motors): ) def test__read(addr, length, id_, value, mock_motors, dummy_motors): stub = mock_motors.build_read_stub(addr, length, id_, value) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) - read_value, _, _ = motors_bus._read(addr, length, id_) + read_value, _, _ = bus._read(addr, length, id_) assert mock_motors.stubs[stub].called assert read_value == value @@ -144,19 +135,16 @@ def test__read(addr, length, id_, value, mock_motors, dummy_motors): def test__read_error(raise_on_error, mock_motors, dummy_motors): addr, length, id_, value, error = (10, 4, 1, 1337, dxl.ERRNUM_DATA_LIMIT) stub = mock_motors.build_read_stub(addr, length, id_, value, error=error) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) if raise_on_error: with pytest.raises( RuntimeError, match=re.escape("[RxPacketError] The data value exceeds the limit value!") ): - motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) + bus._read(addr, length, id_, raise_on_error=raise_on_error) else: - _, _, read_error = motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) + _, _, read_error = bus._read(addr, length, id_, raise_on_error=raise_on_error) assert read_error == error assert mock_motors.stubs[stub].called @@ -166,17 +154,14 @@ def test__read_error(raise_on_error, mock_motors, dummy_motors): def test__read_comm(raise_on_error, mock_motors, dummy_motors): addr, length, id_, value = (10, 4, 1, 1337) stub = mock_motors.build_read_stub(addr, length, id_, value, reply=False) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) if raise_on_error: with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): - motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) + bus._read(addr, length, id_, raise_on_error=raise_on_error) else: - _, read_comm, _ = motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) + _, read_comm, _ = bus._read(addr, length, id_, raise_on_error=raise_on_error) assert read_comm == dxl.COMM_RX_TIMEOUT assert mock_motors.stubs[stub].called @@ -192,13 +177,10 @@ def test__read_comm(raise_on_error, mock_motors, dummy_motors): ) def test__write(addr, length, id_, value, mock_motors, dummy_motors): stub = mock_motors.build_write_stub(addr, length, id_, value) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) - comm, error = motors_bus._write(addr, length, id_, value) + comm, error = bus._write(addr, length, id_, value) assert mock_motors.stubs[stub].called assert comm == dxl.COMM_SUCCESS @@ -209,19 +191,16 @@ def test__write(addr, length, id_, value, mock_motors, dummy_motors): def test__write_error(raise_on_error, mock_motors, dummy_motors): addr, length, id_, value, error = (10, 4, 1, 1337, dxl.ERRNUM_DATA_LIMIT) stub = mock_motors.build_write_stub(addr, length, id_, value, error=error) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) if raise_on_error: with pytest.raises( RuntimeError, match=re.escape("[RxPacketError] The data value exceeds the limit value!") ): - motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) + bus._write(addr, length, id_, value, raise_on_error=raise_on_error) else: - _, write_error = motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) + _, write_error = bus._write(addr, length, id_, value, raise_on_error=raise_on_error) assert write_error == error assert mock_motors.stubs[stub].called @@ -231,17 +210,14 @@ def test__write_error(raise_on_error, mock_motors, dummy_motors): def test__write_comm(raise_on_error, mock_motors, dummy_motors): addr, length, id_, value = (10, 4, 1, 1337) stub = mock_motors.build_write_stub(addr, length, id_, value, reply=False) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) if raise_on_error: with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): - motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) + bus._write(addr, length, id_, value, raise_on_error=raise_on_error) else: - write_comm, _ = motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) + write_comm, _ = bus._write(addr, length, id_, value, raise_on_error=raise_on_error) assert write_comm == dxl.COMM_RX_TIMEOUT assert mock_motors.stubs[stub].called @@ -258,13 +234,10 @@ def test__write_comm(raise_on_error, mock_motors, dummy_motors): ) def test__sync_read(addr, length, ids_values, mock_motors, dummy_motors): stub = mock_motors.build_sync_read_stub(addr, length, ids_values) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) - read_values, _ = motors_bus._sync_read(addr, length, list(ids_values)) + read_values, _ = bus._sync_read(addr, length, list(ids_values)) assert mock_motors.stubs[stub].called assert read_values == ids_values @@ -274,17 +247,14 @@ def test__sync_read(addr, length, ids_values, mock_motors, dummy_motors): def test__sync_read_comm(raise_on_error, mock_motors, dummy_motors): addr, length, ids_values = (10, 4, {1: 1337}) stub = mock_motors.build_sync_read_stub(addr, length, ids_values, reply=False) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) if raise_on_error: with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): - motors_bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) + bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) else: - _, read_comm = motors_bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) + _, read_comm = bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) assert read_comm == dxl.COMM_RX_TIMEOUT assert mock_motors.stubs[stub].called @@ -301,13 +271,10 @@ def test__sync_read_comm(raise_on_error, mock_motors, dummy_motors): ) def test__sync_write(addr, length, ids_values, mock_motors, dummy_motors): stub = mock_motors.build_sync_write_stub(addr, length, ids_values) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) - comm = motors_bus._sync_write(addr, length, ids_values) + comm = bus._sync_write(addr, length, ids_values) assert mock_motors.stubs[stub].wait_called() assert comm == dxl.COMM_SUCCESS @@ -322,14 +289,14 @@ def test_is_calibrated(mock_motors, dummy_motors, dummy_calibration): offsets_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Homing_Offset"], encoded_homings) mins_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Min_Position_Limit"], mins) maxes_stub = mock_motors.build_sync_read_stub(*X_SERIES_CONTROL_TABLE["Max_Position_Limit"], maxes) - motors_bus = DynamixelMotorsBus( + bus = DynamixelMotorsBus( port=mock_motors.port, motors=dummy_motors, calibration=dummy_calibration, ) - motors_bus.connect(assert_motors_exist=False) + bus.connect(assert_motors_exist=False) - is_calibrated = motors_bus.is_calibrated + is_calibrated = bus.is_calibrated assert is_calibrated assert mock_motors.stubs[drive_modes_stub].called @@ -353,13 +320,10 @@ def test_reset_calibration(mock_motors, dummy_motors): mock_motors.build_write_stub(*X_SERIES_CONTROL_TABLE["Max_Position_Limit"], motor.id, 4095) ) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) - motors_bus.reset_calibration() + bus.reset_calibration() assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs) assert all(mock_motors.stubs[stub].called for stub in write_mins_stubs) @@ -390,16 +354,13 @@ def test_set_half_turn_homings(mock_motors, dummy_motors): stub = mock_motors.build_write_stub(*X_SERIES_CONTROL_TABLE["Homing_Offset"], id_, encoded_homing) write_homing_stubs.append(stub) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) - motors_bus.reset_calibration = MagicMock() + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) + bus.reset_calibration = MagicMock() - motors_bus.set_half_turn_homings() + bus.set_half_turn_homings() - motors_bus.reset_calibration.assert_called_once() + bus.reset_calibration.assert_called_once() assert mock_motors.stubs[read_pos_stub].called assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs) @@ -424,13 +385,10 @@ def test_record_ranges_of_motion(mock_motors, dummy_motors): *X_SERIES_CONTROL_TABLE["Present_Position"], positions ) with patch("lerobot.common.motors.motors_bus.enter_pressed", side_effect=[False, True]): - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = DynamixelMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) - mins, maxes = motors_bus.record_ranges_of_motion(display_values=False) + mins, maxes = bus.record_ranges_of_motion(display_values=False) assert mock_motors.stubs[read_pos_stub].calls == 3 assert mins == expected_mins diff --git a/tests/motors/test_feetech.py b/tests/motors/test_feetech.py index d25b98bc..baf6d340 100644 --- a/tests/motors/test_feetech.py +++ b/tests/motors/test_feetech.py @@ -91,36 +91,19 @@ def test_abc_implementation(dummy_motors): FeetechMotorsBus(port="/dev/dummy-port", motors=dummy_motors) -@pytest.mark.skip("TODO") -def test_scan_port(mock_motors): - expected = { - 9_600: {1: 777}, - 57_600: {2: 777}, - 500_000: {237: 777}, - } - expected_model_nbs = {id_: model for d in expected.values() for id_, model in d.items()} - ping_stub = mock_motors.build_broadcast_ping_stub(list(expected_model_nbs)) - mobel_nb_stub = mock_motors.build_sync_read_stub("Model_Number", expected_model_nbs) - found = FeetechMotorsBus.scan_port(mock_motors.port) - - assert found == expected - assert mock_motors.stubs[ping_stub].called - assert mock_motors.stubs[mobel_nb_stub].called - - @pytest.mark.parametrize("id_", [1, 2, 3]) def test_ping(id_, mock_motors, dummy_motors): expected_model_nb = MODEL_NUMBER_TABLE[dummy_motors[f"dummy_{id_}"].model] addr, length = MODEL_NUMBER ping_stub = mock_motors.build_ping_stub(id_) mobel_nb_stub = mock_motors.build_read_stub(addr, length, id_, expected_model_nb) - motors_bus = FeetechMotorsBus( + bus = FeetechMotorsBus( port=mock_motors.port, motors=dummy_motors, ) - motors_bus.connect(assert_motors_exist=False) + bus.connect(assert_motors_exist=False) - ping_model_nb = motors_bus.ping(id_) + ping_model_nb = bus.ping(id_) assert ping_model_nb == expected_model_nb assert mock_motors.stubs[ping_stub].called @@ -138,13 +121,13 @@ def test_broadcast_ping(mock_motors, dummy_motors): stub = mock_motors.build_read_stub(addr, length, id_, model_nb) expected_model_nbs[id_] = model_nb mobel_nb_stubs.append(stub) - motors_bus = FeetechMotorsBus( + bus = FeetechMotorsBus( port=mock_motors.port, motors=dummy_motors, ) - motors_bus.connect(assert_motors_exist=False) + bus.connect(assert_motors_exist=False) - ping_model_nbs = motors_bus.broadcast_ping() + ping_model_nbs = bus.broadcast_ping() assert ping_model_nbs == expected_model_nbs assert mock_motors.stubs[ping_stub].called @@ -160,57 +143,57 @@ def test_broadcast_ping(mock_motors, dummy_motors): ], ) def test__read(addr, length, id_, value, mock_motors, dummy_motors): - stub_name = mock_motors.build_read_stub(addr, length, id_, value) - motors_bus = FeetechMotorsBus( + stub = mock_motors.build_read_stub(addr, length, id_, value) + bus = FeetechMotorsBus( port=mock_motors.port, motors=dummy_motors, ) - motors_bus.connect(assert_motors_exist=False) + bus.connect(assert_motors_exist=False) - read_value, _, _ = motors_bus._read(addr, length, id_) + read_value, _, _ = bus._read(addr, length, id_) - assert mock_motors.stubs[stub_name].called + assert mock_motors.stubs[stub].called assert read_value == value @pytest.mark.parametrize("raise_on_error", (True, False)) def test__read_error(raise_on_error, mock_motors, dummy_motors): addr, length, id_, value, error = (10, 4, 1, 1337, scs.ERRBIT_VOLTAGE) - stub_name = mock_motors.build_read_stub(addr, length, id_, value, error=error) - motors_bus = FeetechMotorsBus( + stub = mock_motors.build_read_stub(addr, length, id_, value, error=error) + bus = FeetechMotorsBus( port=mock_motors.port, motors=dummy_motors, ) - motors_bus.connect(assert_motors_exist=False) + bus.connect(assert_motors_exist=False) if raise_on_error: with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")): - motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) + bus._read(addr, length, id_, raise_on_error=raise_on_error) else: - _, _, read_error = motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) + _, _, read_error = bus._read(addr, length, id_, raise_on_error=raise_on_error) assert read_error == error - assert mock_motors.stubs[stub_name].called + assert mock_motors.stubs[stub].called @pytest.mark.parametrize("raise_on_error", (True, False)) def test__read_comm(raise_on_error, mock_motors, dummy_motors): addr, length, id_, value = (10, 4, 1, 1337) - stub_name = mock_motors.build_read_stub(addr, length, id_, value, reply=False) - motors_bus = FeetechMotorsBus( + stub = mock_motors.build_read_stub(addr, length, id_, value, reply=False) + bus = FeetechMotorsBus( port=mock_motors.port, motors=dummy_motors, ) - motors_bus.connect(assert_motors_exist=False) + bus.connect(assert_motors_exist=False) if raise_on_error: with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): - motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) + bus._read(addr, length, id_, raise_on_error=raise_on_error) else: - _, read_comm, _ = motors_bus._read(addr, length, id_, raise_on_error=raise_on_error) + _, read_comm, _ = bus._read(addr, length, id_, raise_on_error=raise_on_error) assert read_comm == scs.COMM_RX_TIMEOUT - assert mock_motors.stubs[stub_name].called + assert mock_motors.stubs[stub].called @pytest.mark.parametrize( @@ -222,16 +205,16 @@ def test__read_comm(raise_on_error, mock_motors, dummy_motors): ], ) def test__write(addr, length, id_, value, mock_motors, dummy_motors): - stub_name = mock_motors.build_write_stub(addr, length, id_, value) - motors_bus = FeetechMotorsBus( + stub = mock_motors.build_write_stub(addr, length, id_, value) + bus = FeetechMotorsBus( port=mock_motors.port, motors=dummy_motors, ) - motors_bus.connect(assert_motors_exist=False) + bus.connect(assert_motors_exist=False) - comm, error = motors_bus._write(addr, length, id_, value) + comm, error = bus._write(addr, length, id_, value) - assert mock_motors.stubs[stub_name].called + assert mock_motors.stubs[stub].called assert comm == scs.COMM_SUCCESS assert error == 0 @@ -239,41 +222,35 @@ def test__write(addr, length, id_, value, mock_motors, dummy_motors): @pytest.mark.parametrize("raise_on_error", (True, False)) def test__write_error(raise_on_error, mock_motors, dummy_motors): addr, length, id_, value, error = (10, 4, 1, 1337, scs.ERRBIT_VOLTAGE) - stub_name = mock_motors.build_write_stub(addr, length, id_, value, error=error) - motors_bus = FeetechMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + stub = mock_motors.build_write_stub(addr, length, id_, value, error=error) + bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) if raise_on_error: with pytest.raises(RuntimeError, match=re.escape("[RxPacketError] Input voltage error!")): - motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) + bus._write(addr, length, id_, value, raise_on_error=raise_on_error) else: - _, write_error = motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) + _, write_error = bus._write(addr, length, id_, value, raise_on_error=raise_on_error) assert write_error == error - assert mock_motors.stubs[stub_name].called + assert mock_motors.stubs[stub].called @pytest.mark.parametrize("raise_on_error", (True, False)) def test__write_comm(raise_on_error, mock_motors, dummy_motors): addr, length, id_, value = (10, 4, 1, 1337) - stub_name = mock_motors.build_write_stub(addr, length, id_, value, reply=False) - motors_bus = FeetechMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + stub = mock_motors.build_write_stub(addr, length, id_, value, reply=False) + bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) if raise_on_error: with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): - motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) + bus._write(addr, length, id_, value, raise_on_error=raise_on_error) else: - write_comm, _ = motors_bus._write(addr, length, id_, value, raise_on_error=raise_on_error) + write_comm, _ = bus._write(addr, length, id_, value, raise_on_error=raise_on_error) assert write_comm == scs.COMM_RX_TIMEOUT - assert mock_motors.stubs[stub_name].called + assert mock_motors.stubs[stub].called @pytest.mark.parametrize( @@ -286,37 +263,31 @@ def test__write_comm(raise_on_error, mock_motors, dummy_motors): ids=["1 motor", "2 motors", "3 motors"], ) def test__sync_read(addr, length, ids_values, mock_motors, dummy_motors): - stub_name = mock_motors.build_sync_read_stub(addr, length, ids_values) - motors_bus = FeetechMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + stub = mock_motors.build_sync_read_stub(addr, length, ids_values) + bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) - read_values, _ = motors_bus._sync_read(addr, length, list(ids_values)) + read_values, _ = bus._sync_read(addr, length, list(ids_values)) - assert mock_motors.stubs[stub_name].called + assert mock_motors.stubs[stub].called assert read_values == ids_values @pytest.mark.parametrize("raise_on_error", (True, False)) def test__sync_read_comm(raise_on_error, mock_motors, dummy_motors): addr, length, ids_values = (10, 4, {1: 1337}) - stub_name = mock_motors.build_sync_read_stub(addr, length, ids_values, reply=False) - motors_bus = FeetechMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + stub = mock_motors.build_sync_read_stub(addr, length, ids_values, reply=False) + bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) if raise_on_error: with pytest.raises(ConnectionError, match=re.escape("[TxRxResult] There is no status packet!")): - motors_bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) + bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) else: - _, read_comm = motors_bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) + _, read_comm = bus._sync_read(addr, length, list(ids_values), raise_on_error=raise_on_error) assert read_comm == scs.COMM_RX_TIMEOUT - assert mock_motors.stubs[stub_name].called + assert mock_motors.stubs[stub].called @pytest.mark.parametrize( @@ -329,16 +300,13 @@ def test__sync_read_comm(raise_on_error, mock_motors, dummy_motors): ids=["1 motor", "2 motors", "3 motors"], ) def test__sync_write(addr, length, ids_values, mock_motors, dummy_motors): - stub_name = mock_motors.build_sync_write_stub(addr, length, ids_values) - motors_bus = FeetechMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + stub = mock_motors.build_sync_write_stub(addr, length, ids_values) + bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) - comm = motors_bus._sync_write(addr, length, ids_values) + comm = bus._sync_write(addr, length, ids_values) - assert mock_motors.stubs[stub_name].wait_called() + assert mock_motors.stubs[stub].wait_called() assert comm == scs.COMM_SUCCESS @@ -351,14 +319,14 @@ def test_is_calibrated(mock_motors, dummy_motors, dummy_calibration): ) mins_stub = mock_motors.build_sync_read_stub(*STS_SMS_SERIES_CONTROL_TABLE["Min_Position_Limit"], mins) maxes_stub = mock_motors.build_sync_read_stub(*STS_SMS_SERIES_CONTROL_TABLE["Max_Position_Limit"], maxes) - motors_bus = FeetechMotorsBus( + bus = FeetechMotorsBus( port=mock_motors.port, motors=dummy_motors, calibration=dummy_calibration, ) - motors_bus.connect(assert_motors_exist=False) + bus.connect(assert_motors_exist=False) - is_calibrated = motors_bus.is_calibrated + is_calibrated = bus.is_calibrated assert is_calibrated assert mock_motors.stubs[offsets_stub].called @@ -381,13 +349,10 @@ def test_reset_calibration(mock_motors, dummy_motors): mock_motors.build_write_stub(*STS_SMS_SERIES_CONTROL_TABLE["Max_Position_Limit"], motor.id, 4095) ) - motors_bus = FeetechMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) - motors_bus.reset_calibration() + bus.reset_calibration() assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs) assert all(mock_motors.stubs[stub].called for stub in write_mins_stubs) @@ -420,16 +385,13 @@ def test_set_half_turn_homings(mock_motors, dummy_motors): ) write_homing_stubs.append(stub) - motors_bus = FeetechMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) - motors_bus.reset_calibration = MagicMock() + bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) + bus.reset_calibration = MagicMock() - motors_bus.set_half_turn_homings() + bus.set_half_turn_homings() - motors_bus.reset_calibration.assert_called_once() + bus.reset_calibration.assert_called_once() assert mock_motors.stubs[read_pos_stub].called assert all(mock_motors.stubs[stub].called for stub in write_homing_stubs) @@ -450,18 +412,15 @@ def test_record_ranges_of_motion(mock_motors, dummy_motors): "dummy_2": 3600, "dummy_3": 4002, } - read_pos_stub = mock_motors.build_sequential_sync_read_stub( + stub = mock_motors.build_sequential_sync_read_stub( *STS_SMS_SERIES_CONTROL_TABLE["Present_Position"], positions ) with patch("lerobot.common.motors.motors_bus.enter_pressed", side_effect=[False, True]): - motors_bus = FeetechMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect(assert_motors_exist=False) + bus = FeetechMotorsBus(port=mock_motors.port, motors=dummy_motors) + bus.connect(assert_motors_exist=False) - mins, maxes = motors_bus.record_ranges_of_motion(display_values=False) + mins, maxes = bus.record_ranges_of_motion(display_values=False) - assert mock_motors.stubs[read_pos_stub].calls == 3 + assert mock_motors.stubs[stub].calls == 3 assert mins == expected_mins assert maxes == expected_maxes