From 2c68c6ca4097e3755da11b2b0e1ba05b6ec88617 Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Thu, 20 Mar 2025 10:22:47 +0100 Subject: [PATCH] Implement FeetechMotorsBus & move functions to calibration --- lerobot/common/motors/feetech/feetech.py | 348 +++--------------- .../motors/feetech/feetech_calibration.py | 76 +++- 2 files changed, 113 insertions(+), 311 deletions(-) diff --git a/lerobot/common/motors/feetech/feetech.py b/lerobot/common/motors/feetech/feetech.py index 1179a4e5..396388f8 100644 --- a/lerobot/common/motors/feetech/feetech.py +++ b/lerobot/common/motors/feetech/feetech.py @@ -14,19 +14,11 @@ from copy import deepcopy -import numpy as np - -from ..motors_bus import ( - CalibrationMode, - JointOutOfRangeError, - MotorsBus, - assert_same_address, - get_group_sync_key, -) +from ..motors_bus import MotorsBus PROTOCOL_VERSION = 0 BAUDRATE = 1_000_000 -TIMEOUT_MS = 1000 +DEFAULT_TIMEOUT_MS = 1000 MAX_ID_RANGE = 252 @@ -125,94 +117,6 @@ NUM_READ_RETRY = 20 NUM_WRITE_RETRY = 20 -def convert_ticks_to_degrees(ticks, model): - resolutions = MODEL_RESOLUTION[model] - # Convert the ticks to degrees - return ticks * (360.0 / resolutions) - - -def convert_degrees_to_ticks(degrees, model): - resolutions = MODEL_RESOLUTION[model] - # Convert degrees to motor ticks - return int(degrees * (resolutions / 360.0)) - - -def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motorbus, motor_id: int) -> int: - """ - Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048' - becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution). - """ - resolutions = MODEL_RESOLUTION[model] - - # Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1]. - ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions - - # If above halfway, fold it into negative territory => [-2048..+2047]. - if ticks > (resolutions // 2): - ticks -= resolutions - - # Flip sign if drive_mode is set. - drive_mode = 0 - if motorbus.calibration is not None: - drive_mode = motorbus.calibration["drive_mode"][motor_id - 1] - - if drive_mode: - ticks *= -1 - - return ticks - - -def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motorbus, motor_id: int) -> int: - """ - Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047] - and recovers the raw [0..(res-1)] ticks with 2048 as midpoint. - """ - # Flip sign if drive_mode was set. - drive_mode = 0 - if motorbus.calibration is not None: - drive_mode = motorbus.calibration["drive_mode"][motor_id - 1] - - if drive_mode: - adjusted_pos *= -1 - - resolutions = MODEL_RESOLUTION[model] - - # Shift by +half-resolution and wrap into [0..res-1]. - # This undoes the earlier shift by -half-resolution. - ticks = (adjusted_pos + (resolutions // 2)) % resolutions - - return ticks - - -def convert_to_bytes(value, n_bytes: int): - import scservo_sdk as scs - - # Note: No need to convert back into unsigned int, since this byte preprocessing - # already handles it for us. - if n_bytes == 1: - data = [ - scs.SCS_LOBYTE(scs.SCS_LOWORD(value)), - ] - elif n_bytes == 2: - data = [ - scs.SCS_LOBYTE(scs.SCS_LOWORD(value)), - scs.SCS_HIBYTE(scs.SCS_LOWORD(value)), - ] - elif n_bytes == 4: - data = [ - scs.SCS_LOBYTE(scs.SCS_LOWORD(value)), - scs.SCS_HIBYTE(scs.SCS_LOWORD(value)), - scs.SCS_LOBYTE(scs.SCS_HIWORD(value)), - scs.SCS_HIBYTE(scs.SCS_HIWORD(value)), - ] - else: - raise NotImplementedError( - f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but " - f"{n_bytes} is provided instead." - ) - return data - - class FeetechMotorsBus(MotorsBus): """ The FeetechMotorsBus class allows to efficiently read and write to the attached motors. It relies on the @@ -222,6 +126,8 @@ class FeetechMotorsBus(MotorsBus): model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) model_resolution_table = deepcopy(MODEL_RESOLUTION) model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE) + calibration_required = deepcopy(CALIBRATION_REQUIRED) + default_timeout = DEFAULT_TIMEOUT_MS def __init__( self, @@ -229,224 +135,60 @@ class FeetechMotorsBus(MotorsBus): motors: dict[str, tuple[int, str]], ): super().__init__(port, motors) - - def _set_handlers(self): import scservo_sdk as scs self.port_handler = scs.PortHandler(self.port) self.packet_handler = scs.PacketHandler(PROTOCOL_VERSION) + self.reader = scs.GroupSyncRead(self.packet_handler, self.packet_handler, 0, 0) + self.writer = scs.GroupSyncWrite(self.packet_handler, self.packet_handler, 0, 0) - def _set_timeout(self, timeout: int = TIMEOUT_MS): - self.port_handler.setPacketTimeoutMillis(timeout) + def broadcast_ping(self, num_retry: int | None = None): + raise NotImplementedError # TODO - def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): - if motor_names is None: - motor_names = self.motor_names + def calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]: + # TODO + return ids_values - # Convert from unsigned int32 original range [0, 2**32] to signed float32 range - values = values.astype(np.float32) + def uncalibrate_values(self, ids_values: dict[int, float]) -> dict[int, int]: + # TODO + return ids_values - for i, name in enumerate(motor_names): - calib_idx = self.calibration["motor_names"].index(name) - calib_mode = self.calibration["calib_mode"][calib_idx] - - if CalibrationMode[calib_mode] == CalibrationMode.DEGREE: - motor_idx, model = self.motors[name] - - # Convert raw motor ticks to homed ticks, then convert the homed ticks to degrees - values[i] = adjusted_to_homing_ticks(values[i], model, self, motor_idx) - values[i] = convert_ticks_to_degrees(values[i], model) - - elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR: - start_pos = self.calibration["start_pos"][calib_idx] - end_pos = self.calibration["end_pos"][calib_idx] - - # Rescale the present position to a nominal range [0, 100] %, - # useful for joints with linear motions like Aloha gripper - values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100 - - if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR): - raise JointOutOfRangeError( - f"Wrong motor position range detected for {name}. " - f"Expected to be in nominal range of [0, 100] % (a full linear translation), " - f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, " - f"but present value is {values[i]} %. " - "This might be due to a cable connection issue creating an artificial jump in motor values. " - "You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`" - ) - - return values - - def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): - """Inverse of `apply_calibration`.""" - if motor_names is None: - motor_names = self.motor_names - - for i, name in enumerate(motor_names): - calib_idx = self.calibration["motor_names"].index(name) - calib_mode = self.calibration["calib_mode"][calib_idx] - - if CalibrationMode[calib_mode] == CalibrationMode.DEGREE: - motor_idx, model = self.motors[name] - - # Convert degrees to homed ticks, then convert the homed ticks to raw ticks - values[i] = convert_degrees_to_ticks(values[i], model) - values[i] = adjusted_to_motor_ticks(values[i], model, self, motor_idx) - - elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR: - start_pos = self.calibration["start_pos"][calib_idx] - end_pos = self.calibration["end_pos"][calib_idx] - - # Convert from nominal lnear range of [0, 100] % to - # actual motor range of values which can be arbitrary. - values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos - - values = np.round(values).astype(np.int32) - return values - - def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY): + def _is_comm_success(self, comm: int) -> bool: import scservo_sdk as scs - return_list = True - if not isinstance(motor_ids, list): - return_list = False - motor_ids = [motor_ids] + return comm == scs.COMM_SUCCESS - assert_same_address(self.model_ctrl_table, self.motor_models, data_name) - addr, bytes = self.model_ctrl_table[motor_models[0]][data_name] - group = scs.GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes) - for idx in motor_ids: - group.addParam(idx) + @staticmethod + def split_int_bytes(value: int, n_bytes: int) -> list[int]: + # Validate input + if value < 0: + raise ValueError(f"Negative values are not allowed: {value}") - for _ in range(num_retry): - comm = group.txRxPacket() - if comm == scs.COMM_SUCCESS: - break + max_value = {1: 0xFF, 2: 0xFFFF, 4: 0xFFFFFFFF}.get(n_bytes) + if max_value is None: + raise NotImplementedError(f"Unsupported byte size: {n_bytes}. Expected [1, 2, 4].") - if comm != scs.COMM_SUCCESS: - raise ConnectionError( - f"Read failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: " - f"{self.packet_handler.getTxRxResult(comm)}" - ) + if value > max_value: + raise ValueError(f"Value {value} exceeds the maximum for {n_bytes} bytes ({max_value}).") - values = [] - for idx in motor_ids: - value = group.getData(idx, addr, bytes) - values.append(value) - - if return_list: - return values - else: - return values[0] - - def _read(self, data_name: str, motor_names: list[str]): import scservo_sdk as scs - motor_ids = [] - models = [] - for name in motor_names: - motor_idx, model = self.motors[name] - motor_ids.append(motor_idx) - models.append(model) - - assert_same_address(self.model_ctrl_table, models, data_name) - addr, bytes = self.model_ctrl_table[model][data_name] - group_key = get_group_sync_key(data_name, motor_names) - - if data_name not in self.group_readers: - # Very Important to flush the buffer! - self.port_handler.ser.reset_output_buffer() - self.port_handler.ser.reset_input_buffer() - - # Create new group reader - self.group_readers[group_key] = scs.GroupSyncRead( - self.port_handler, self.packet_handler, addr, bytes - ) - for idx in motor_ids: - self.group_readers[group_key].addParam(idx) - - for _ in range(NUM_READ_RETRY): - comm = self.group_readers[group_key].txRxPacket() - if comm == scs.COMM_SUCCESS: - break - - if comm != scs.COMM_SUCCESS: - raise ConnectionError( - f"Read failed due to communication error on port {self.port} for group_key {group_key}: " - f"{self.packet_handler.getTxRxResult(comm)}" - ) - - values = [] - for idx in motor_ids: - value = self.group_readers[group_key].getData(idx, addr, bytes) - values.append(value) - - values = np.array(values) - - if data_name in CALIBRATION_REQUIRED and self.calibration is not None: - values = self.apply_calibration(values, motor_names) - - return values - - def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY): - import scservo_sdk as scs - - if not isinstance(motor_ids, list): - motor_ids = [motor_ids] - if not isinstance(values, list): - values = [values] - - assert_same_address(self.model_ctrl_table, motor_models, data_name) - addr, bytes = self.model_ctrl_table[motor_models[0]][data_name] - group = scs.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes) - for idx, value in zip(motor_ids, values, strict=True): - data = convert_to_bytes(value, bytes) - group.addParam(idx, data) - - for _ in range(num_retry): - comm = group.txPacket() - if comm == scs.COMM_SUCCESS: - break - - if comm != scs.COMM_SUCCESS: - raise ConnectionError( - f"Write failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: " - f"{self.packet_handler.getTxRxResult(comm)}" - ) - - def _write(self, data_name: str, values: list[int], motor_names: list[str]) -> None: - import scservo_sdk as scs - - motor_ids = [] - models = [] - for name in motor_names: - motor_idx, model = self.motors[name] - motor_ids.append(motor_idx) - models.append(model) - - if data_name in CALIBRATION_REQUIRED and self.calibration is not None: - values = self.revert_calibration(values, motor_names) - - assert_same_address(self.model_ctrl_table, models, data_name) - addr, bytes = self.model_ctrl_table[model][data_name] - group_key = get_group_sync_key(data_name, motor_names) - - init_group = data_name not in self.group_readers - if init_group: - self.group_writers[group_key] = scs.GroupSyncWrite( - self.port_handler, self.packet_handler, addr, bytes - ) - - for idx, value in zip(motor_ids, values, strict=True): - data = convert_to_bytes(value, bytes) - if init_group: - self.group_writers[group_key].addParam(idx, data) - else: - self.group_writers[group_key].changeParam(idx, data) - - comm = self.group_writers[group_key].txPacket() - if comm != scs.COMM_SUCCESS: - raise ConnectionError( - f"Write failed due to communication error on port {self.port} for group_key {group_key}: " - f"{self.packet_handler.getTxRxResult(comm)}" - ) + # Note: No need to convert back into unsigned int, since this byte preprocessing + # already handles it for us. + if n_bytes == 1: + data = [ + scs.SCS_LOBYTE(scs.SCS_LOWORD(value)), + ] + elif n_bytes == 2: + data = [ + scs.SCS_LOBYTE(scs.SCS_LOWORD(value)), + scs.SCS_HIBYTE(scs.SCS_LOWORD(value)), + ] + elif n_bytes == 4: + data = [ + scs.SCS_LOBYTE(scs.SCS_LOWORD(value)), + scs.SCS_HIBYTE(scs.SCS_LOWORD(value)), + scs.SCS_LOBYTE(scs.SCS_HIWORD(value)), + scs.SCS_HIBYTE(scs.SCS_HIWORD(value)), + ] + return data diff --git a/lerobot/common/motors/feetech/feetech_calibration.py b/lerobot/common/motors/feetech/feetech_calibration.py index de4ed83f..f967f82d 100644 --- a/lerobot/common/motors/feetech/feetech_calibration.py +++ b/lerobot/common/motors/feetech/feetech_calibration.py @@ -13,11 +13,12 @@ # limitations under the License. import numpy as np -from ..motors_bus import MotorsBus, TorqueMode -from .feetech import ( +from ..motors_bus import ( CalibrationMode, - FeetechMotorsBus, + MotorsBus, + TorqueMode, ) +from .feetech import MODEL_RESOLUTION, FeetechMotorsBus URL_TEMPLATE = ( "https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp" @@ -140,19 +141,19 @@ def run_full_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm ) # TODO(pepijn): replace with new instruction homing pos (all motors in middle) in tutorial input("Press Enter to continue...") - start_positions = np.zeros(len(arm.motor_indices)) - end_positions = np.zeros(len(arm.motor_indices)) - encoder_offsets = np.zeros(len(arm.motor_indices)) + start_positions = np.zeros(len(arm.motor_ids)) + end_positions = np.zeros(len(arm.motor_ids)) + encoder_offsets = np.zeros(len(arm.motor_ids)) modes = get_calibration_modes(arm) - for i, motor_id in enumerate(arm.motor_indices): + for i, motor_id in enumerate(arm.motor_ids): if modes[i] == CalibrationMode.DEGREE.name: encoder_offsets[i] = calibrate_homing_motor(motor_id, arm) start_positions[i] = 0 end_positions[i] = 0 - for i, motor_id in enumerate(arm.motor_indices): + for i, motor_id in enumerate(arm.motor_ids): if modes[i] == CalibrationMode.LINEAR.name: start_positions[i], end_positions[i] = calibrate_linear_motor(motor_id, arm) encoder_offsets[i] = 0 @@ -251,3 +252,62 @@ def apply_feetech_offsets_from_calibration(motorsbus: FeetechMotorsBus, calibrat motorsbus.write("Lock", 0) print("Offsets have been saved to EEPROM successfully.") + + +def convert_ticks_to_degrees(ticks, model): + resolutions = MODEL_RESOLUTION[model] + # Convert the ticks to degrees + return ticks * (360.0 / resolutions) + + +def convert_degrees_to_ticks(degrees, model): + resolutions = MODEL_RESOLUTION[model] + # Convert degrees to motor ticks + return int(degrees * (resolutions / 360.0)) + + +def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motorbus, motor_id: int) -> int: + """ + Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048' + becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution). + """ + resolutions = MODEL_RESOLUTION[model] + + # Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1]. + ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions + + # If above halfway, fold it into negative territory => [-2048..+2047]. + if ticks > (resolutions // 2): + ticks -= resolutions + + # Flip sign if drive_mode is set. + drive_mode = 0 + if motorbus.calibration is not None: + drive_mode = motorbus.calibration["drive_mode"][motor_id - 1] + + if drive_mode: + ticks *= -1 + + return ticks + + +def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motorbus, motor_id: int) -> int: + """ + Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047] + and recovers the raw [0..(res-1)] ticks with 2048 as midpoint. + """ + # Flip sign if drive_mode was set. + drive_mode = 0 + if motorbus.calibration is not None: + drive_mode = motorbus.calibration["drive_mode"][motor_id - 1] + + if drive_mode: + adjusted_pos *= -1 + + resolutions = MODEL_RESOLUTION[model] + + # Shift by +half-resolution and wrap into [0..res-1]. + # This undoes the earlier shift by -half-resolution. + ticks = (adjusted_pos + (resolutions // 2)) % resolutions + + return ticks