diff --git a/lerobot/common/motors/dynamixel/dynamixel.py b/lerobot/common/motors/dynamixel/dynamixel.py index b3e11053..b0f9556f 100644 --- a/lerobot/common/motors/dynamixel/dynamixel.py +++ b/lerobot/common/motors/dynamixel/dynamixel.py @@ -16,7 +16,6 @@ import logging import math from copy import deepcopy -import dynamixel_sdk as dxl import numpy as np from ..motors_bus import ( @@ -172,24 +171,21 @@ def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str return steps -def convert_to_bytes(value, bytes, mock=False): - if mock: - return value - +def convert_to_bytes(value, n_bytes: int): import dynamixel_sdk as dxl # Note: No need to convert back into unsigned int, since this byte preprocessing # already handles it for us. - if bytes == 1: + if n_bytes == 1: data = [ dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)), ] - elif bytes == 2: + elif n_bytes == 2: data = [ dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)), dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)), ] - elif bytes == 4: + elif n_bytes == 4: data = [ dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)), dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)), @@ -199,7 +195,7 @@ def convert_to_bytes(value, bytes, mock=False): else: raise NotImplementedError( f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but " - f"{bytes} is provided instead." + f"{n_bytes} is provided instead." ) return data @@ -255,6 +251,8 @@ class DynamixelMotorsBus(MotorsBus): super().__init__(port, motors) def _set_handlers(self): + import dynamixel_sdk as dxl + self.port_handler = dxl.PortHandler(self.port) self.packet_handler = dxl.PacketHandler(PROTOCOL_VERSION) @@ -486,6 +484,8 @@ class DynamixelMotorsBus(MotorsBus): return values def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY): + import dynamixel_sdk as dxl + return_list = True if not isinstance(motor_ids, list): return_list = False @@ -519,6 +519,8 @@ class DynamixelMotorsBus(MotorsBus): return values[0] def _read(self, data_name, motor_names: str | list[str] | None = None): + import dynamixel_sdk as dxl + motor_ids = [] models = [] for name in motor_names: @@ -566,6 +568,8 @@ class DynamixelMotorsBus(MotorsBus): return values def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY): + import dynamixel_sdk as dxl + if not isinstance(motor_ids, list): motor_ids = [motor_ids] if not isinstance(values, list): @@ -575,7 +579,7 @@ class DynamixelMotorsBus(MotorsBus): addr, bytes = self.model_ctrl_table[motor_models[0]][data_name] group = dxl.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes) for idx, value in zip(motor_ids, values, strict=True): - data = convert_to_bytes(value, bytes, self.mock) + data = convert_to_bytes(value, bytes) group.addParam(idx, data) for _ in range(num_retry): @@ -590,6 +594,8 @@ class DynamixelMotorsBus(MotorsBus): ) def _write(self, data_name: str, values: list[int], motor_names: list[str]) -> None: + import dynamixel_sdk as dxl + motor_ids = [] models = [] for name in motor_names: @@ -611,7 +617,7 @@ class DynamixelMotorsBus(MotorsBus): ) for idx, value in zip(motor_ids, values, strict=True): - data = convert_to_bytes(value, bytes, self.mock) + data = convert_to_bytes(value, bytes) if init_group: self.group_writers[group_key].addParam(idx, data) else: diff --git a/lerobot/common/motors/feetech/feetech.py b/lerobot/common/motors/feetech/feetech.py index c62f63d6..a0bcc083 100644 --- a/lerobot/common/motors/feetech/feetech.py +++ b/lerobot/common/motors/feetech/feetech.py @@ -15,7 +15,6 @@ from copy import deepcopy import numpy as np -import scservo_sdk as scs from ..motors_bus import ( CalibrationMode, @@ -185,24 +184,21 @@ def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motorbus, motor_id: i return ticks -def convert_to_bytes(value, bytes, mock=False): - if mock: - return value - +def convert_to_bytes(value, n_bytes: int): import scservo_sdk as scs # Note: No need to convert back into unsigned int, since this byte preprocessing # already handles it for us. - if bytes == 1: + if n_bytes == 1: data = [ scs.SCS_LOBYTE(scs.SCS_LOWORD(value)), ] - elif bytes == 2: + elif n_bytes == 2: data = [ scs.SCS_LOBYTE(scs.SCS_LOWORD(value)), scs.SCS_HIBYTE(scs.SCS_LOWORD(value)), ] - elif bytes == 4: + elif n_bytes == 4: data = [ scs.SCS_LOBYTE(scs.SCS_LOWORD(value)), scs.SCS_HIBYTE(scs.SCS_LOWORD(value)), @@ -212,7 +208,7 @@ def convert_to_bytes(value, bytes, mock=False): else: raise NotImplementedError( f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but " - f"{bytes} is provided instead." + f"{n_bytes} is provided instead." ) return data @@ -268,6 +264,8 @@ class FeetechMotorsBus(MotorsBus): super().__init__(port, motors) def _set_handlers(self): + import scservo_sdk as scs + self.port_handler = scs.PortHandler(self.port) self.packet_handler = scs.PacketHandler(PROTOCOL_VERSION) @@ -340,6 +338,8 @@ class FeetechMotorsBus(MotorsBus): return values def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY): + import scservo_sdk as scs + return_list = True if not isinstance(motor_ids, list): return_list = False @@ -373,6 +373,8 @@ class FeetechMotorsBus(MotorsBus): return values[0] def _read(self, data_name, motor_names: str | list[str] | None = None): + import scservo_sdk as scs + motor_ids = [] models = [] for name in motor_names: @@ -420,6 +422,8 @@ class FeetechMotorsBus(MotorsBus): return values def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY): + import scservo_sdk as scs + if not isinstance(motor_ids, list): motor_ids = [motor_ids] if not isinstance(values, list): @@ -429,7 +433,7 @@ class FeetechMotorsBus(MotorsBus): addr, bytes = self.model_ctrl_table[motor_models[0]][data_name] group = scs.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes) for idx, value in zip(motor_ids, values, strict=True): - data = convert_to_bytes(value, bytes, self.mock) + data = convert_to_bytes(value, bytes) group.addParam(idx, data) for _ in range(num_retry): @@ -444,6 +448,8 @@ class FeetechMotorsBus(MotorsBus): ) def _write(self, data_name: str, values: list[int], motor_names: list[str]) -> None: + import scservo_sdk as scs + motor_ids = [] models = [] for name in motor_names: @@ -465,7 +471,7 @@ class FeetechMotorsBus(MotorsBus): ) for idx, value in zip(motor_ids, values, strict=True): - data = convert_to_bytes(value, bytes, self.mock) + data = convert_to_bytes(value, bytes) if init_group: self.group_writers[group_key].addParam(idx, data) else: