From e4a6d035f990d7727f2e4acc9d7b93da52845981 Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Wed, 19 Mar 2025 19:02:25 +0100 Subject: [PATCH] Remove Dxl mock sdk & update tests --- tests/mocks/mock_dynamixel_sdk.py | 133 ----------------------- tests/motors/test_dynamixel.py | 168 +++++++++++++++++++++++++----- 2 files changed, 144 insertions(+), 157 deletions(-) delete mode 100644 tests/mocks/mock_dynamixel_sdk.py diff --git a/tests/mocks/mock_dynamixel_sdk.py b/tests/mocks/mock_dynamixel_sdk.py deleted file mode 100644 index 6212285e..00000000 --- a/tests/mocks/mock_dynamixel_sdk.py +++ /dev/null @@ -1,133 +0,0 @@ -# Copyright 2024 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# ruff: noqa: N802, E741 - -""" -Mocked classes and functions from dynamixel_sdk to allow for testing DynamixelMotorsBus code. - -Warning: These mocked versions are minimalist. They do not exactly mock every behaviors -from the original classes and functions (e.g. return types might be None instead of boolean). -""" - -DEFAULT_BAUDRATE = 9_600 -COMM_SUCCESS = 0 # tx or rx packet communication success - - -def convert_to_bytes(value, bytes): - # TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform - # `convert_bytes_to_value` - del bytes # unused - return value - - -def get_default_motor_values(motor_index): - return { - # Key (int) are from X_SERIES_CONTROL_TABLE - 7: motor_index, # ID - 8: DEFAULT_BAUDRATE, # Baud_rate - 10: 0, # Drive_Mode - 64: 0, # Torque_Enable - # Set 2560 since calibration values for Aloha gripper is between start_pos=2499 and end_pos=3144 - # For other joints, 2560 will be autocorrected to be in calibration range - 132: 2560, # Present_Position - } - - -# Macro for Control Table Value -def DXL_MAKEWORD(a, b): - return (a & 0xFF) | ((b & 0xFF) << 8) - - -def DXL_MAKEDWORD(a, b): - return (a & 0xFFFF) | (b & 0xFFFF) << 16 - - -def DXL_LOWORD(l): - return l & 0xFFFF - - -def DXL_HIWORD(l): - return (l >> 16) & 0xFFFF - - -def DXL_LOBYTE(w): - return w & 0xFF - - -def DXL_HIBYTE(w): - return (w >> 8) & 0xFF - - -class PortHandler: - def __init__(self, port): - self.port = port - # factory default baudrate - self.baudrate = DEFAULT_BAUDRATE - - def openPort(self): # noqa: N802 - return True - - def closePort(self): # noqa: N802 - pass - - def setPacketTimeoutMillis(self, timeout_ms): # noqa: N802 - del timeout_ms # unused - - def getBaudRate(self): # noqa: N802 - return self.baudrate - - def setBaudRate(self, baudrate): # noqa: N802 - self.baudrate = baudrate - - -class PacketHandler: - def __init__(self, protocol_version): - del protocol_version # unused - # Use packet_handler.data to communicate across Read and Write - self.data = {} - - -class GroupSyncRead: - def __init__(self, port_handler, packet_handler, address, bytes): - self.packet_handler = packet_handler - - def addParam(self, motor_index): # noqa: N802 - # Initialize motor default values - if motor_index not in self.packet_handler.data: - self.packet_handler.data[motor_index] = get_default_motor_values(motor_index) - - def txRxPacket(self): # noqa: N802 - return COMM_SUCCESS - - def getData(self, index, address, bytes): # noqa: N802 - return self.packet_handler.data[index][address] - - -class GroupSyncWrite: - def __init__(self, port_handler, packet_handler, address, bytes): - self.packet_handler = packet_handler - self.address = address - - def addParam(self, index, data): # noqa: N802 - # Initialize motor default values - if index not in self.packet_handler.data: - self.packet_handler.data[index] = get_default_motor_values(index) - self.changeParam(index, data) - - def txPacket(self): # noqa: N802 - return COMM_SUCCESS - - def changeParam(self, index, data): # noqa: N802 - self.packet_handler.data[index][self.address] = data diff --git a/tests/motors/test_dynamixel.py b/tests/motors/test_dynamixel.py index 22cdef23..37bb0b46 100644 --- a/tests/motors/test_dynamixel.py +++ b/tests/motors/test_dynamixel.py @@ -1,21 +1,55 @@ -import sys from unittest.mock import patch +import dynamixel_sdk as dxl import pytest -from lerobot.common.motors.dynamixel import DynamixelMotorsBus -from tests.mocks import mock_dynamixel_sdk +from lerobot.common.motors.dynamixel.dynamixel import DynamixelMotorsBus +from tests.mocks.mock_dynamixel import MockInstructionPacket, MockMotors, MockPortHandler @pytest.fixture(autouse=True) -def patch_dynamixel_sdk(): - with patch.dict(sys.modules, {"dynamixel_sdk": mock_dynamixel_sdk}): - yield +def patch_port_handler(): + with patch.object(dxl, "PortHandler", MockPortHandler): + yield # Patch is applied for the duration of each test -def test_patch_sdk(): - assert "dynamixel_sdk" in sys.modules # Should be patched - assert sys.modules["dynamixel_sdk"] is mock_dynamixel_sdk # Should match the mock +def test_autouse_patch(): + """Ensure that the autouse fixture correctly patches dxl.PortHandler with MockPortHandler.""" + assert dxl.PortHandler is MockPortHandler + + +@pytest.mark.parametrize( + "value, n_bytes, expected", + [ + (0x12, 1, [0x12]), # Single byte + (0x1234, 2, [0x34, 0x12]), # Two bytes + (0x12345678, 4, [0x78, 0x56, 0x34, 0x12]), # Four bytes + (0, 1, [0x00]), # Zero with 1 byte + (0, 2, [0x00, 0x00]), # Zero with 2 bytes + (0, 4, [0x00, 0x00, 0x00, 0x00]), # Zero with 4 bytes + (255, 1, [0xFF]), # Max single byte + (65535, 2, [0xFF, 0xFF]), # Max two bytes + (4294967295, 4, [0xFF, 0xFF, 0xFF, 0xFF]), # Max four bytes + ], +) # fmt: skip +def test_split_int_bytes(value, n_bytes, expected): + assert DynamixelMotorsBus.split_int_bytes(value, n_bytes) == expected + + +def test_split_int_bytes_invalid_n_bytes(): + with pytest.raises(NotImplementedError): + DynamixelMotorsBus.split_int_bytes(100, 3) + + +def test_split_int_bytes_negative_numbers(): + with pytest.raises(ValueError): + neg = DynamixelMotorsBus.split_int_bytes(-1, 1) + print(neg) + + +def test_split_int_bytes_large_number(): + with pytest.raises(ValueError): + DynamixelMotorsBus.split_int_bytes(2**32, 4) # 4-byte max is 0xFFFFFFFF def test_abc_implementation(): @@ -23,22 +57,108 @@ def test_abc_implementation(): DynamixelMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "xl330-m077")}) -def test_configure_motors_all_ids_1(): - # see X_SERIES_BAUDRATE_TABLE - smaller_baudrate = 9_600 - smaller_baudrate_value = 0 - - # This test expect the configuration was already correct. - motors_bus = DynamixelMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "xl330-m077")}) +@pytest.mark.parametrize( + "motors", + [ + None, + [1, 2, 3], + ["dummy_1", "dummy_2", "dummy_3"], + [1, "dummy_2", 3], # Mixed + ], +) +def test_read_all_motors(motors): + mock_motors = MockMotors([1, 2, 3]) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors={ + "dummy_1": (1, "xl330-m077"), + "dummy_2": (2, "xl330-m077"), + "dummy_3": (3, "xl330-m077"), + }, + ) motors_bus.connect() - motors_bus.write("Baud_Rate", [smaller_baudrate_value] * len(motors_bus)) - motors_bus.set_baudrate(smaller_baudrate) - motors_bus.write("ID", [1] * len(motors_bus)) - del motors_bus + pos_dict = motors_bus.read("Present_Position", motors=motors) - # Test configure - motors_bus = DynamixelMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "xl330-m077")}) + assert mock_motors.stubs["SyncRead_Present_Position_all"].called + assert len(pos_dict) == 3 + assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values()) + + +@pytest.mark.parametrize("idx", [1, 2, 3]) +def test_read_single_motor_name(idx): + mock_motors = MockMotors([1, 2, 3]) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors={ + "dummy_1": (1, "xl330-m077"), + "dummy_2": (2, "xl330-m077"), + "dummy_3": (3, "xl330-m077"), + }, + ) motors_bus.connect() - assert motors_bus.are_motors_configured() - del motors_bus + + pos_dict = motors_bus.read("Present_Position", f"dummy_{idx}") + + assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called + assert len(pos_dict) == 1 + assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values()) + + +@pytest.mark.parametrize("idx", [1, 2, 3]) +def test_read_single_motor_id(idx): + mock_motors = MockMotors([1, 2, 3]) + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors={ + "dummy_1": (1, "xl330-m077"), + "dummy_2": (2, "xl330-m077"), + "dummy_3": (3, "xl330-m077"), + }, + ) + motors_bus.connect() + + pos_dict = motors_bus.read("Present_Position", idx) + + assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called + assert len(pos_dict) == 1 + assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values()) + + +@pytest.mark.parametrize( + "num_retry, num_invalid_try", + [ + [1, 2], + [2, 3], + [3, 2], + [2, 1], + ], +) +def test_read_num_retry(num_retry, num_invalid_try): + mock_motors = MockMotors([1, 2, 3], default_stubs=None) + address, length = mock_motors.ctrl_table["Present_Position"] + receive_bytes = MockInstructionPacket.sync_read([1], address, length) + send_fn = mock_motors._create_present_pos_send_fn( + [1], "Present_Position", num_invalid_try=num_invalid_try + ) + mock_motors.stub(name="num_retry", receive_bytes=receive_bytes, send_fn=send_fn) + + motors_bus = DynamixelMotorsBus( + port=mock_motors.port, + motors={ + "dummy_1": (1, "xl330-m077"), + "dummy_2": (2, "xl330-m077"), + "dummy_3": (3, "xl330-m077"), + }, + ) + motors_bus.connect() + + if num_retry >= num_invalid_try: + pos_dict = motors_bus.read("Present_Position", 1, num_retry=num_retry) + assert len(pos_dict) == 1 + assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values()) + else: + with pytest.raises(ConnectionError): + _ = motors_bus.read("Present_Position", 1, num_retry=num_retry) + + assert mock_motors.stubs["num_retry"].calls == num_retry