diff --git a/tests/mocks/mock_dynamixel.py b/tests/mocks/mock_dynamixel.py index 21668e17..2f12283c 100644 --- a/tests/mocks/mock_dynamixel.py +++ b/tests/mocks/mock_dynamixel.py @@ -1,15 +1,15 @@ import abc import random -import threading -import time from typing import Callable import dynamixel_sdk as dxl import serial -from mock_serial.mock_serial import MockSerial, Stub +from mock_serial.mock_serial import MockSerial from lerobot.common.motors.dynamixel import X_SERIES_CONTROL_TABLE, DynamixelMotorsBus +from .mock_serial_patch import WaitableStub + # https://emanual.robotis.com/docs/en/dxl/crc/ DXL_CRC_TABLE = [ 0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011, @@ -420,34 +420,6 @@ class MockPortHandler(dxl.PortHandler): return True -class WaitableStub(Stub): - """ - In some situations, a test might be checking if a stub has been called before `MockSerial` thread had time - to read, match, and call the stub. In these situations, the test can fail randomly. - - Use `wait_called()` or `wait_calls()` to block until the stub is called, avoiding race conditions. - """ - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self._event = threading.Event() - - def call(self): - self._event.set() - return super().call() - - def wait_called(self, timeout: float = 1.0): - return self._event.wait(timeout) - - def wait_calls(self, min_calls: int = 1, timeout: float = 1.0): - start = time.perf_counter() - while time.perf_counter() - start < timeout: - if self.calls >= min_calls: - return self.calls - time.sleep(0.005) - raise TimeoutError(f"Stub not called {min_calls} times within {timeout} seconds.") - - class MockMotors(MockSerial): """ This class will simulate physical motors by responding with valid status packets upon receiving some diff --git a/tests/mocks/mock_feetech.py b/tests/mocks/mock_feetech.py index f23dd6ee..d685c970 100644 --- a/tests/mocks/mock_feetech.py +++ b/tests/mocks/mock_feetech.py @@ -1,15 +1,15 @@ import abc import random -import threading -import time from typing import Callable import scservo_sdk as scs import serial -from mock_serial.mock_serial import MockSerial, Stub +from mock_serial import MockSerial from lerobot.common.motors.feetech import SCS_SERIES_CONTROL_TABLE, FeetechMotorsBus +from .mock_serial_patch import WaitableStub + # https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf INSTRUCTION_TYPES = { "Ping": 0x01, # Checks whether the Packet has arrived at a device with the same ID as the specified packet ID @@ -274,34 +274,6 @@ class MockPortHandler(scs.PortHandler): return True -class WaitableStub(Stub): - """ - In some situations, a test might be checking if a stub has been called before `MockSerial` thread had time - to read, match, and call the stub. In these situations, the test can fail randomly. - - Use `wait_called()` or `wait_calls()` to block until the stub is called, avoiding race conditions. - """ - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self._event = threading.Event() - - def call(self): - self._event.set() - return super().call() - - def wait_called(self, timeout: float = 1.0): - return self._event.wait(timeout) - - def wait_calls(self, min_calls: int = 1, timeout: float = 1.0): - start = time.perf_counter() - while time.perf_counter() - start < timeout: - if self.calls >= min_calls: - return self.calls - time.sleep(0.005) - raise TimeoutError(f"Stub not called {min_calls} times within {timeout} seconds.") - - class MockMotors(MockSerial): """ This class will simulate physical motors by responding with valid status packets upon receiving some diff --git a/tests/mocks/mock_serial_patch.py b/tests/mocks/mock_serial_patch.py new file mode 100644 index 00000000..e3992318 --- /dev/null +++ b/tests/mocks/mock_serial_patch.py @@ -0,0 +1,35 @@ +import threading +import time + +from mock_serial.mock_serial import Stub + + +class WaitableStub(Stub): + """ + In some situations, a test might be checking if a stub has been called before `MockSerial` thread had time + to read, match, and call the stub. In these situations, the test can fail randomly. + + Use `wait_called()` or `wait_calls()` to block until the stub is called, avoiding race conditions. + + Proposed fix: + https://github.com/benthorner/mock_serial/pull/3 + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._event = threading.Event() + + def call(self): + self._event.set() + return super().call() + + def wait_called(self, timeout: float = 1.0): + return self._event.wait(timeout) + + def wait_calls(self, min_calls: int = 1, timeout: float = 1.0): + start = time.perf_counter() + while time.perf_counter() - start < timeout: + if self.calls >= min_calls: + return self.calls + time.sleep(0.005) + raise TimeoutError(f"Stub not called {min_calls} times within {timeout} seconds.")