Update tests

This commit is contained in:
Simon Alibert 2025-03-25 11:12:52 +01:00
parent cf963eb1b0
commit c237d1379e
2 changed files with 45 additions and 47 deletions

View File

@ -94,34 +94,33 @@ def test_abc_implementation(dummy_motors):
@pytest.mark.parametrize("idx", [1, 2, 3])
def test_ping(idx, mock_motors, dummy_motors):
expected_model = dummy_motors[f"dummy_{idx}"].model
model_nb = MODEL_NUMBER[expected_model]
stub_name = mock_motors.build_ping_stub(idx, model_nb)
expected_model_nb = MODEL_NUMBER[dummy_motors[f"dummy_{idx}"].model]
stub_name = mock_motors.build_ping_stub(idx, expected_model_nb)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
ping_model_nb = motors_bus.ping(idx)
assert ping_model_nb == expected_model
assert ping_model_nb == expected_model_nb
assert mock_motors.stubs[stub_name].called
def test_broadcast_ping(mock_motors, dummy_motors):
expected_models = {m.id: m.model for m in dummy_motors.values()}
model_nbs = {id_: MODEL_NUMBER[model] for id_, model in expected_models.items()}
stub_name = mock_motors.build_broadcast_ping_stub(model_nbs)
models = {m.id: m.model for m in dummy_motors.values()}
expected_model_nbs = {id_: MODEL_NUMBER[model] for id_, model in models.items()}
stub_name = mock_motors.build_broadcast_ping_stub(expected_model_nbs)
motors_bus = DynamixelMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
ping_model_nbs = motors_bus.broadcast_ping()
assert ping_model_nbs == expected_models
assert ping_model_nbs == expected_model_nbs
assert mock_motors.stubs[stub_name].called
@ -137,7 +136,7 @@ def test_sync_read_none(mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
read_positions = motors_bus.sync_read("Present_Position")
@ -160,7 +159,7 @@ def test_sync_read_by_id(id_, position, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
read_position = motors_bus.sync_read("Present_Position", id_)
@ -185,7 +184,7 @@ def test_sync_read_by_ids(ids, positions, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
read_positions = motors_bus.sync_read("Present_Position", ids)
@ -208,7 +207,7 @@ def test_sync_read_by_name(id_, position, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
read_position = motors_bus.sync_read("Present_Position", f"dummy_{id_}")
@ -235,7 +234,7 @@ def test_sync_read_by_names(ids, positions, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
read_positions = motors_bus.sync_read("Present_Position", names)
@ -261,7 +260,7 @@ def test_sync_read_num_retry(num_retry, num_invalid_try, pos, mock_motors, dummy
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
if num_retry >= num_invalid_try:
pos_dict = motors_bus.sync_read("Present_Position", 1, num_retry=num_retry)
@ -290,7 +289,7 @@ def test_sync_write_single_value(data_name, value, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
motors_bus.sync_write(data_name, value)
@ -312,7 +311,7 @@ def test_sync_write_by_id(id_, position, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
motors_bus.sync_write("Goal_Position", value)
@ -336,7 +335,7 @@ def test_sync_write_by_ids(ids, positions, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
motors_bus.sync_write("Goal_Position", values)
@ -358,7 +357,7 @@ def test_sync_write_by_name(id_, position, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
write_value = {f"dummy_{id_}": position}
motors_bus.sync_write("Goal_Position", write_value)
@ -383,7 +382,7 @@ def test_sync_write_by_names(ids, positions, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
write_values = {f"dummy_{id_}": pos for id_, pos in ids_values.items()}
motors_bus.sync_write("Goal_Position", write_values)
@ -406,7 +405,7 @@ def test_write_by_id(data_name, dxl_id, value, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
motors_bus.write(data_name, dxl_id, value)
@ -428,7 +427,7 @@ def test_write_by_name(data_name, dxl_id, value, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
motors_bus.write(data_name, f"dummy_{dxl_id}", value)

View File

@ -95,37 +95,36 @@ def test_abc_implementation(dummy_motors):
# @pytest.mark.skip("TODO")
@pytest.mark.parametrize("idx", [1, 2, 3])
def test_ping(idx, mock_motors, dummy_motors):
expected_model = dummy_motors[f"dummy_{idx}"].model
model_nb = MODEL_NUMBER[expected_model]
expected_model_nb = MODEL_NUMBER[dummy_motors[f"dummy_{idx}"].model]
ping_stub = mock_motors.build_ping_stub(idx)
mobel_nb_stub = mock_motors.build_read_stub("Model_Number", idx, model_nb)
mobel_nb_stub = mock_motors.build_read_stub("Model_Number", idx, expected_model_nb)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
ping_model_nb = motors_bus.ping(idx)
assert ping_model_nb == expected_model
assert ping_model_nb == expected_model_nb
assert mock_motors.stubs[ping_stub].called
assert mock_motors.stubs[mobel_nb_stub].called
def test_broadcast_ping(mock_motors, dummy_motors):
expected_models = {m.id: m.model for m in dummy_motors.values()}
model_nbs = {id_: MODEL_NUMBER[model] for id_, model in expected_models.items()}
ping_stub = mock_motors.build_broadcast_ping_stub(list(model_nbs))
mobel_nb_stub = mock_motors.build_sync_read_stub("Model_Number", model_nbs)
models = {m.id: m.model for m in dummy_motors.values()}
expected_model_nbs = {id_: MODEL_NUMBER[model] for id_, model in models.items()}
ping_stub = mock_motors.build_broadcast_ping_stub(list(models))
mobel_nb_stub = mock_motors.build_sync_read_stub("Model_Number", expected_model_nbs)
motors_bus = FeetechMotorsBus(
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
ping_model_nbs = motors_bus.broadcast_ping()
assert ping_model_nbs == expected_models
assert ping_model_nbs == expected_model_nbs
assert mock_motors.stubs[ping_stub].called
assert mock_motors.stubs[mobel_nb_stub].called
@ -142,7 +141,7 @@ def test_sync_read_none(mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
read_positions = motors_bus.sync_read("Present_Position")
@ -165,7 +164,7 @@ def test_sync_read_by_id(id_, position, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
read_position = motors_bus.sync_read("Present_Position", id_)
@ -190,7 +189,7 @@ def test_sync_read_by_ids(ids, positions, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
read_positions = motors_bus.sync_read("Present_Position", ids)
@ -213,7 +212,7 @@ def test_sync_read_by_name(id_, position, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
read_position = motors_bus.sync_read("Present_Position", f"dummy_{id_}")
@ -240,7 +239,7 @@ def test_sync_read_by_names(ids, positions, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
read_positions = motors_bus.sync_read("Present_Position", names)
@ -266,7 +265,7 @@ def test_sync_read_num_retry(num_retry, num_invalid_try, pos, mock_motors, dummy
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
if num_retry >= num_invalid_try:
pos_dict = motors_bus.sync_read("Present_Position", 1, num_retry=num_retry)
@ -295,7 +294,7 @@ def test_sync_write_single_value(data_name, value, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
motors_bus.sync_write(data_name, value)
@ -317,7 +316,7 @@ def test_sync_write_by_id(id_, position, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
motors_bus.sync_write("Goal_Position", value)
@ -341,7 +340,7 @@ def test_sync_write_by_ids(ids, positions, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
motors_bus.sync_write("Goal_Position", values)
@ -363,7 +362,7 @@ def test_sync_write_by_name(id_, position, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
write_value = {f"dummy_{id_}": position}
motors_bus.sync_write("Goal_Position", write_value)
@ -388,7 +387,7 @@ def test_sync_write_by_names(ids, positions, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
write_values = {f"dummy_{id_}": pos for id_, pos in ids_values.items()}
motors_bus.sync_write("Goal_Position", write_values)
@ -411,7 +410,7 @@ def test_write_by_id(data_name, dxl_id, value, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
motors_bus.write(data_name, dxl_id, value)
@ -433,7 +432,7 @@ def test_write_by_name(data_name, dxl_id, value, mock_motors, dummy_motors):
port=mock_motors.port,
motors=dummy_motors,
)
motors_bus.connect()
motors_bus.connect(assert_motors_exist=False)
motors_bus.write(data_name, f"dummy_{dxl_id}", value)