Move mock_serial patch to dedicated file

This commit is contained in:
Simon Alibert 2025-03-23 17:03:04 +01:00
parent 57c97762e1
commit ff7cfdaf40
3 changed files with 41 additions and 62 deletions

View File

@ -1,15 +1,15 @@
import abc import abc
import random import random
import threading
import time
from typing import Callable from typing import Callable
import dynamixel_sdk as dxl import dynamixel_sdk as dxl
import serial import serial
from mock_serial.mock_serial import MockSerial, Stub from mock_serial.mock_serial import MockSerial
from lerobot.common.motors.dynamixel import X_SERIES_CONTROL_TABLE, DynamixelMotorsBus from lerobot.common.motors.dynamixel import X_SERIES_CONTROL_TABLE, DynamixelMotorsBus
from .mock_serial_patch import WaitableStub
# https://emanual.robotis.com/docs/en/dxl/crc/ # https://emanual.robotis.com/docs/en/dxl/crc/
DXL_CRC_TABLE = [ DXL_CRC_TABLE = [
0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011, 0x0000, 0x8005, 0x800F, 0x000A, 0x801B, 0x001E, 0x0014, 0x8011,
@ -420,34 +420,6 @@ class MockPortHandler(dxl.PortHandler):
return True return True
class WaitableStub(Stub):
"""
In some situations, a test might be checking if a stub has been called before `MockSerial` thread had time
to read, match, and call the stub. In these situations, the test can fail randomly.
Use `wait_called()` or `wait_calls()` to block until the stub is called, avoiding race conditions.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._event = threading.Event()
def call(self):
self._event.set()
return super().call()
def wait_called(self, timeout: float = 1.0):
return self._event.wait(timeout)
def wait_calls(self, min_calls: int = 1, timeout: float = 1.0):
start = time.perf_counter()
while time.perf_counter() - start < timeout:
if self.calls >= min_calls:
return self.calls
time.sleep(0.005)
raise TimeoutError(f"Stub not called {min_calls} times within {timeout} seconds.")
class MockMotors(MockSerial): class MockMotors(MockSerial):
""" """
This class will simulate physical motors by responding with valid status packets upon receiving some This class will simulate physical motors by responding with valid status packets upon receiving some

View File

@ -1,15 +1,15 @@
import abc import abc
import random import random
import threading
import time
from typing import Callable from typing import Callable
import scservo_sdk as scs import scservo_sdk as scs
import serial import serial
from mock_serial.mock_serial import MockSerial, Stub from mock_serial import MockSerial
from lerobot.common.motors.feetech import SCS_SERIES_CONTROL_TABLE, FeetechMotorsBus from lerobot.common.motors.feetech import SCS_SERIES_CONTROL_TABLE, FeetechMotorsBus
from .mock_serial_patch import WaitableStub
# https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf # https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf
INSTRUCTION_TYPES = { INSTRUCTION_TYPES = {
"Ping": 0x01, # Checks whether the Packet has arrived at a device with the same ID as the specified packet ID "Ping": 0x01, # Checks whether the Packet has arrived at a device with the same ID as the specified packet ID
@ -274,34 +274,6 @@ class MockPortHandler(scs.PortHandler):
return True return True
class WaitableStub(Stub):
"""
In some situations, a test might be checking if a stub has been called before `MockSerial` thread had time
to read, match, and call the stub. In these situations, the test can fail randomly.
Use `wait_called()` or `wait_calls()` to block until the stub is called, avoiding race conditions.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._event = threading.Event()
def call(self):
self._event.set()
return super().call()
def wait_called(self, timeout: float = 1.0):
return self._event.wait(timeout)
def wait_calls(self, min_calls: int = 1, timeout: float = 1.0):
start = time.perf_counter()
while time.perf_counter() - start < timeout:
if self.calls >= min_calls:
return self.calls
time.sleep(0.005)
raise TimeoutError(f"Stub not called {min_calls} times within {timeout} seconds.")
class MockMotors(MockSerial): class MockMotors(MockSerial):
""" """
This class will simulate physical motors by responding with valid status packets upon receiving some This class will simulate physical motors by responding with valid status packets upon receiving some

View File

@ -0,0 +1,35 @@
import threading
import time
from mock_serial.mock_serial import Stub
class WaitableStub(Stub):
"""
In some situations, a test might be checking if a stub has been called before `MockSerial` thread had time
to read, match, and call the stub. In these situations, the test can fail randomly.
Use `wait_called()` or `wait_calls()` to block until the stub is called, avoiding race conditions.
Proposed fix:
https://github.com/benthorner/mock_serial/pull/3
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._event = threading.Event()
def call(self):
self._event.set()
return super().call()
def wait_called(self, timeout: float = 1.0):
return self._event.wait(timeout)
def wait_calls(self, min_calls: int = 1, timeout: float = 1.0):
start = time.perf_counter()
while time.perf_counter() - start < timeout:
if self.calls >= min_calls:
return self.calls
time.sleep(0.005)
raise TimeoutError(f"Stub not called {min_calls} times within {timeout} seconds.")