From 9f46a3d8f9b5d6660ed217eb9427e4c81e99cfdd Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Sun, 23 Mar 2025 16:11:24 +0100 Subject: [PATCH] Refactor dxl tests by functionality --- tests/motors/test_dynamixel.py | 262 +++++++++++++++++++++++---------- 1 file changed, 186 insertions(+), 76 deletions(-) diff --git a/tests/motors/test_dynamixel.py b/tests/motors/test_dynamixel.py index 271e494a..5b58f570 100644 --- a/tests/motors/test_dynamixel.py +++ b/tests/motors/test_dynamixel.py @@ -95,9 +95,9 @@ def test_abc_implementation(dummy_motors): @pytest.mark.parametrize( "idx, model_nb", [ - [1, 1190], - [2, 1200], - [3, 1120], + (1, 1190), + (2, 1200), + (3, 1120), ], ) def test_ping(idx, model_nb, mock_motors, dummy_motors): @@ -133,22 +133,61 @@ def test_broadcast_ping(mock_motors, dummy_motors): assert mock_motors.stubs[stub_name].called -@pytest.mark.parametrize( - "motors", - [ - None, - [1, 2, 3], - ["dummy_1", "dummy_2", "dummy_3"], - [1, "dummy_2", 3], - ], - ids=["None", "by ids", "by names", "mixed"], -) -def test_sync_read_all_motors(motors, mock_motors, dummy_motors): +def test_sync_read_none(mock_motors, dummy_motors): expected_positions = { - 1: 1337, - 2: 42, - 3: 4016, + "dummy_1": 1337, + "dummy_2": 42, + "dummy_3": 4016, } + ids_values = dict(zip([1, 2, 3], expected_positions.values(), strict=True)) + stub_name = mock_motors.build_sync_read_stub("Present_Position", ids_values) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + + positions_read = motors_bus.sync_read("Present_Position") + + assert mock_motors.stubs[stub_name].called + assert positions_read == expected_positions + + +@pytest.mark.parametrize( + "dxl_id, position", + [ + (1, 1337), + (2, 42), + (3, 4016), + ], +) +def test_sync_read_by_id(dxl_id, position, mock_motors, dummy_motors): + expected_position = {dxl_id: position} + stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + + positions_read = motors_bus.sync_read("Present_Position", dxl_id) + + assert mock_motors.stubs[stub_name].called + assert positions_read == expected_position + + +@pytest.mark.parametrize( + "ids, positions", + [ + ([1], [1337]), + ([1, 2], [1337, 42]), + ([1, 2, 3], [1337, 42, 4016]), + ], + ids=["1 motor", "2 motors", "3 motors"], +) # fmt: skip +def test_sync_read_by_ids(ids, positions, mock_motors, dummy_motors): + assert len(ids) == len(positions) + expected_positions = dict(zip(ids, positions, strict=True)) stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_positions) motors_bus = DynamixelMotorsBus( port=mock_motors.port, @@ -156,66 +195,69 @@ def test_sync_read_all_motors(motors, mock_motors, dummy_motors): ) motors_bus.connect() - positions_read = motors_bus.sync_read("Present_Position", motors=motors) + positions_read = motors_bus.sync_read("Present_Position", ids) - motors = ["dummy_1", "dummy_2", "dummy_3"] if motors is None else motors assert mock_motors.stubs[stub_name].called - assert positions_read == dict(zip(motors, expected_positions.values(), strict=True)) + assert positions_read == expected_positions @pytest.mark.parametrize( - "idx, pos", + "id_, position", [ - [1, 1337], - [2, 42], - [3, 4016], + (1, 1337), + (2, 42), + (3, 4016), ], ) -def test_sync_read_single_motor_by_name(idx, pos, mock_motors, dummy_motors): - expected_position = {idx: pos} - stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position) +def test_sync_read_by_name(id_, position, mock_motors, dummy_motors): + expected_position = {f"dummy_{id_}": position} + stub_name = mock_motors.build_sync_read_stub("Present_Position", {id_: position}) motors_bus = DynamixelMotorsBus( port=mock_motors.port, motors=dummy_motors, ) motors_bus.connect() - pos_dict = motors_bus.sync_read("Present_Position", f"dummy_{idx}") + positions_read = motors_bus.sync_read("Present_Position", f"dummy_{id_}") assert mock_motors.stubs[stub_name].called - assert pos_dict == {f"dummy_{idx}": pos} + assert positions_read == expected_position @pytest.mark.parametrize( - "idx, pos", + "ids, positions", [ - [1, 1337], - [2, 42], - [3, 4016], + ([1], [1337]), + ([1, 2], [1337, 42]), + ([1, 2, 3], [1337, 42, 4016]), ], -) -def test_sync_read_single_motor_by_id(idx, pos, mock_motors, dummy_motors): - expected_position = {idx: pos} - stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position) + ids=["1 motor", "2 motors", "3 motors"], +) # fmt: skip +def test_sync_read_by_names(ids, positions, mock_motors, dummy_motors): + assert len(ids) == len(positions) + names = [f"dummy_{dxl_id}" for dxl_id in ids] + expected_positions = dict(zip(names, positions, strict=True)) + ids_values = dict(zip(ids, positions, strict=True)) + stub_name = mock_motors.build_sync_read_stub("Present_Position", ids_values) motors_bus = DynamixelMotorsBus( port=mock_motors.port, motors=dummy_motors, ) motors_bus.connect() - pos_dict = motors_bus.sync_read("Present_Position", idx) + positions_read = motors_bus.sync_read("Present_Position", names) assert mock_motors.stubs[stub_name].called - assert pos_dict == {idx: pos} + assert positions_read == expected_positions @pytest.mark.parametrize( "num_retry, num_invalid_try, pos", [ - [0, 2, 1337], - [2, 3, 42], - [3, 2, 4016], - [2, 1, 999], + (0, 2, 1337), + (2, 3, 42), + (3, 2, 4016), + (2, 1, 999), ], ) def test_sync_read_num_retry(num_retry, num_invalid_try, pos, mock_motors, dummy_motors): @@ -240,44 +282,18 @@ def test_sync_read_num_retry(num_retry, num_invalid_try, pos, mock_motors, dummy assert mock_motors.stubs[stub_name].calls == expected_calls -@pytest.mark.parametrize( - "motors", - [ - [1, 2, 3], - ["dummy_1", "dummy_2", "dummy_3"], - [1, "dummy_2", 3], - ], - ids=["by ids", "by names", "mixed"], -) -def test_sync_write_all_motors(motors, mock_motors, dummy_motors): - goal_positions = { - 1: 1337, - 2: 42, - 3: 4016, - } - stub_name = mock_motors.build_sync_write_stub("Goal_Position", goal_positions) - motors_bus = DynamixelMotorsBus( - port=mock_motors.port, - motors=dummy_motors, - ) - motors_bus.connect() - - values = dict(zip(motors, goal_positions.values(), strict=True)) - motors_bus.sync_write("Goal_Position", values) - - assert mock_motors.stubs[stub_name].wait_called() - - @pytest.mark.parametrize( "data_name, value", [ - ["Torque_Enable", 0], - ["Torque_Enable", 1], + ("Torque_Enable", 0), + ("Torque_Enable", 1), + ("Goal_Position", 1337), + ("Goal_Position", 42), ], ) -def test_sync_write_all_motors_single_value(data_name, value, mock_motors, dummy_motors): - values = {m.id: value for m in dummy_motors.values()} - stub_name = mock_motors.build_sync_write_stub(data_name, values) +def test_sync_write_single_value(data_name, value, mock_motors, dummy_motors): + ids_values = {m.id: value for m in dummy_motors.values()} + stub_name = mock_motors.build_sync_write_stub(data_name, ids_values) motors_bus = DynamixelMotorsBus( port=mock_motors.port, motors=dummy_motors, @@ -287,3 +303,97 @@ def test_sync_write_all_motors_single_value(data_name, value, mock_motors, dummy motors_bus.sync_write(data_name, value) assert mock_motors.stubs[stub_name].wait_called() + + +@pytest.mark.parametrize( + "id_, position", + [ + (1, 1337), + (2, 42), + (3, 4016), + ], +) +def test_sync_write_by_id(id_, position, mock_motors, dummy_motors): + value = {id_: position} + stub_name = mock_motors.build_sync_write_stub("Goal_Position", value) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + + motors_bus.sync_write("Goal_Position", value) + + assert mock_motors.stubs[stub_name].wait_called() + + +@pytest.mark.parametrize( + "ids, positions", + [ + ([1], [1337]), + ([1, 2], [1337, 42]), + ([1, 2, 3], [1337, 42, 4016]), + ], + ids=["1 motor", "2 motors", "3 motors"], +) # fmt: skip +def test_sync_write_by_ids(ids, positions, mock_motors, dummy_motors): + assert len(ids) == len(positions) + values = dict(zip(ids, positions, strict=True)) + stub_name = mock_motors.build_sync_write_stub("Goal_Position", values) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + + motors_bus.sync_write("Goal_Position", values) + + assert mock_motors.stubs[stub_name].wait_called() + + +@pytest.mark.parametrize( + "id_, position", + [ + (1, 1337), + (2, 42), + (3, 4016), + ], +) +def test_sync_write_by_name(id_, position, mock_motors, dummy_motors): + id_value = {id_: position} + stub_name = mock_motors.build_sync_write_stub("Goal_Position", id_value) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + + write_value = {f"dummy_{id_}": position} + motors_bus.sync_write("Goal_Position", write_value) + + assert mock_motors.stubs[stub_name].wait_called() + + +@pytest.mark.parametrize( + "ids, positions", + [ + ([1], [1337]), + ([1, 2], [1337, 42]), + ([1, 2, 3], [1337, 42, 4016]), + ], + ids=["1 motor", "2 motors", "3 motors"], +) # fmt: skip +def test_sync_write_by_names(ids, positions, mock_motors, dummy_motors): + assert len(ids) == len(positions) + ids_values = dict(zip(ids, positions, strict=True)) + stub_name = mock_motors.build_sync_write_stub("Goal_Position", ids_values) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors=dummy_motors, + ) + motors_bus.connect() + + write_values = {f"dummy_{id_}": pos for id_, pos in ids_values.items()} + motors_bus.sync_write("Goal_Position", write_values) + + assert mock_motors.stubs[stub_name].wait_called()