diff --git a/lerobot/common/motors/dynamixel/dynamixel.py b/lerobot/common/motors/dynamixel/dynamixel.py index 5797a493..413ed173 100644 --- a/lerobot/common/motors/dynamixel/dynamixel.py +++ b/lerobot/common/motors/dynamixel/dynamixel.py @@ -12,24 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging -import math +# TODO(aliberts): Should we implement FastSyncRead/Write? +# https://github.com/ROBOTIS-GIT/DynamixelSDK/pull/643 +# https://github.com/ROBOTIS-GIT/DynamixelSDK/releases/tag/3.8.2 +# https://emanual.robotis.com/docs/en/dxl/protocol2/#fast-sync-read-0x8a +# -> Need to check compatibility across models + from copy import deepcopy import numpy as np -from ..motors_bus import ( - CalibrationMode, - JointOutOfRangeError, - MotorsBus, - TorqueMode, - assert_same_address, - get_group_sync_key, -) +from ..motors_bus import MotorsBus PROTOCOL_VERSION = 2.0 BAUDRATE = 1_000_000 -TIMEOUT_MS = 1000 +DEFAULT_TIMEOUT_MS = 1000 MAX_ID_RANGE = 252 @@ -171,35 +168,6 @@ def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str return steps -def convert_to_bytes(value, n_bytes: int): - import dynamixel_sdk as dxl - - # Note: No need to convert back into unsigned int, since this byte preprocessing - # already handles it for us. - if n_bytes == 1: - data = [ - dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)), - ] - elif n_bytes == 2: - data = [ - dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)), - dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)), - ] - elif n_bytes == 4: - data = [ - dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)), - dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)), - dxl.DXL_LOBYTE(dxl.DXL_HIWORD(value)), - dxl.DXL_HIBYTE(dxl.DXL_HIWORD(value)), - ] - else: - raise NotImplementedError( - f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but " - f"{n_bytes} is provided instead." - ) - return data - - class DynamixelMotorsBus(MotorsBus): """ The Dynamixel implementation for a MotorsBus. It relies on the python dynamixel sdk to communicate with @@ -210,6 +178,8 @@ class DynamixelMotorsBus(MotorsBus): model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) model_resolution_table = deepcopy(MODEL_RESOLUTION) model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE) + calibration_required = deepcopy(CALIBRATION_REQUIRED) + default_timeout = DEFAULT_TIMEOUT_MS def __init__( self, @@ -217,405 +187,62 @@ class DynamixelMotorsBus(MotorsBus): motors: dict[str, tuple[int, str]], ): super().__init__(port, motors) - - def _set_handlers(self): import dynamixel_sdk as dxl self.port_handler = dxl.PortHandler(self.port) self.packet_handler = dxl.PacketHandler(PROTOCOL_VERSION) + self.reader = dxl.GroupSyncRead(self.port_handler, self.packet_handler, 0, 0) + self.writer = dxl.GroupSyncWrite(self.port_handler, self.packet_handler, 0, 0) - def _set_timeout(self, timeout: int = TIMEOUT_MS): - self.port_handler.setPacketTimeoutMillis(timeout) + def broadcast_ping(self) -> tuple[list, int]: + data_list, comm = self.packet_handler.broadcastPing(self.port_handler) + # TODO(aliberts): translate data_list into {id: model} + return data_list, comm - def _apply_calibration_autocorrect(self, values: np.ndarray | list, motor_names: list[str] | None): - """This function applies the calibration, automatically detects out of range errors for motors values and attempts to correct. + def calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]: + # TODO + return ids_values - For more info, see docstring of `apply_calibration` and `autocorrect_calibration`. - """ - try: - values = self.apply_calibration(values, motor_names) - except JointOutOfRangeError as e: - print(e) - self._autocorrect_calibration(values, motor_names) - values = self.apply_calibration(values, motor_names) - return values + def uncalibrate_values(self, ids_values: dict[int, float]) -> dict[int, int]: + # TODO + return ids_values - def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): - """Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with - a "zero position" at 0 degree. - - Note: We say "nominal degree range" since the motors can take values outside this range. For instance, 190 degrees, if the motor - rotate more than a half a turn from the zero position. However, most motors can't rotate more than 180 degrees and will stay in this range. - - Joints values are original in [0, 2**32[ (unsigned int32). Each motor are expected to complete a full rotation - when given a goal position that is + or - their resolution. For instance, dynamixel xl330-m077 have a resolution of 4096, and - at any position in their original range, let's say the position 56734, they complete a full rotation clockwise by moving to 60830, - or anticlockwise by moving to 52638. The position in the original range is arbitrary and might change a lot between each motor. - To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work - in the centered nominal degree range ]-180, 180[. - """ - if motor_names is None: - motor_names = self.motor_names - - # Convert from unsigned int32 original range [0, 2**32] to signed float32 range - values = values.astype(np.float32) - - for i, name in enumerate(motor_names): - calib_idx = self.calibration["motor_names"].index(name) - calib_mode = self.calibration["calib_mode"][calib_idx] - - if CalibrationMode[calib_mode] == CalibrationMode.DEGREE: - drive_mode = self.calibration["drive_mode"][calib_idx] - homing_offset = self.calibration["homing_offset"][calib_idx] - _, model = self.motors[name] - resolution = self.model_resolution[model] - - # Update direction of rotation of the motor to match between leader and follower. - # In fact, the motor of the leader for a given joint can be assembled in an - # opposite direction in term of rotation than the motor of the follower on the same joint. - if drive_mode: - values[i] *= -1 - - # Convert from range [-2**31, 2**31] to - # nominal range [-resolution//2, resolution//2] (e.g. [-2048, 2048]) - values[i] += homing_offset - - # Convert from range [-resolution//2, resolution//2] to - # universal float32 centered degree range [-180, 180] - # (e.g. 2048 / (4096 // 2) * 180 = 180) - values[i] = values[i] / (resolution // 2) * HALF_TURN_DEGREE - - if (values[i] < LOWER_BOUND_DEGREE) or (values[i] > UPPER_BOUND_DEGREE): - raise JointOutOfRangeError( - f"Wrong motor position range detected for {name}. " - f"Expected to be in nominal range of [-{HALF_TURN_DEGREE}, {HALF_TURN_DEGREE}] degrees (a full rotation), " - f"with a maximum range of [{LOWER_BOUND_DEGREE}, {UPPER_BOUND_DEGREE}] degrees to account for joints that can rotate a bit more, " - f"but present value is {values[i]} degree. " - "This might be due to a cable connection issue creating an artificial 360 degrees jump in motor values. " - "You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`" - ) - - elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR: - start_pos = self.calibration["start_pos"][calib_idx] - end_pos = self.calibration["end_pos"][calib_idx] - - # Rescale the present position to a nominal range [0, 100] %, - # useful for joints with linear motions like Aloha gripper - values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100 - - if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR): - raise JointOutOfRangeError( - f"Wrong motor position range detected for {name}. " - f"Expected to be in nominal range of [0, 100] % (a full linear translation), " - f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, " - f"but present value is {values[i]} %. " - "This might be due to a cable connection issue creating an artificial jump in motor values. " - "You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`" - ) - - return values - - def _autocorrect_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): - """This function automatically detects issues with values of motors after calibration, and correct for these issues. - - Some motors might have values outside of expected maximum bounds after calibration. - For instance, for a joint in degree, its value can be outside [-270, 270] degrees, which is totally unexpected given - a nominal range of [-180, 180] degrees, which represents half a turn to the left or right starting from zero position. - - Known issues: - #1: Motor value randomly shifts of a full turn, caused by hardware/connection errors. - #2: Motor internal homing offset is shifted by a full turn, caused by using default calibration (e.g Aloha). - #3: motor internal homing offset is shifted by less or more than a full turn, caused by using default calibration - or by human error during manual calibration. - - Issues #1 and #2 can be solved by shifting the calibration homing offset by a full turn. - Issue #3 will be visually detected by user and potentially captured by the safety feature `max_relative_target`, - that will slow down the motor, raise an error asking to recalibrate. Manual recalibrating will solve the issue. - - Note: A full turn corresponds to 360 degrees but also to 4096 steps for a motor resolution of 4096. - """ - if motor_names is None: - motor_names = self.motor_names - - # Convert from unsigned int32 original range [0, 2**32] to signed float32 range - values = values.astype(np.float32) - - for i, name in enumerate(motor_names): - calib_idx = self.calibration["motor_names"].index(name) - calib_mode = self.calibration["calib_mode"][calib_idx] - - if CalibrationMode[calib_mode] == CalibrationMode.DEGREE: - drive_mode = self.calibration["drive_mode"][calib_idx] - homing_offset = self.calibration["homing_offset"][calib_idx] - _, model = self.motors[name] - resolution = self.model_resolution[model] - - # Update direction of rotation of the motor to match between leader and follower. - # In fact, the motor of the leader for a given joint can be assembled in an - # opposite direction in term of rotation than the motor of the follower on the same joint. - if drive_mode: - values[i] *= -1 - - # Convert from initial range to range [-180, 180] degrees - calib_val = (values[i] + homing_offset) / (resolution // 2) * HALF_TURN_DEGREE - in_range = (calib_val > LOWER_BOUND_DEGREE) and (calib_val < UPPER_BOUND_DEGREE) - - # Solve this inequality to find the factor to shift the range into [-180, 180] degrees - # values[i] = (values[i] + homing_offset + resolution * factor) / (resolution // 2) * HALF_TURN_DEGREE - # - HALF_TURN_DEGREE <= (values[i] + homing_offset + resolution * factor) / (resolution // 2) * HALF_TURN_DEGREE <= HALF_TURN_DEGREE - # (- (resolution // 2) - values[i] - homing_offset) / resolution <= factor <= ((resolution // 2) - values[i] - homing_offset) / resolution - low_factor = (-(resolution // 2) - values[i] - homing_offset) / resolution - upp_factor = ((resolution // 2) - values[i] - homing_offset) / resolution - - elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR: - start_pos = self.calibration["start_pos"][calib_idx] - end_pos = self.calibration["end_pos"][calib_idx] - - # Convert from initial range to range [0, 100] in % - calib_val = (values[i] - start_pos) / (end_pos - start_pos) * 100 - in_range = (calib_val > LOWER_BOUND_LINEAR) and (calib_val < UPPER_BOUND_LINEAR) - - # Solve this inequality to find the factor to shift the range into [0, 100] % - # values[i] = (values[i] - start_pos + resolution * factor) / (end_pos + resolution * factor - start_pos - resolution * factor) * 100 - # values[i] = (values[i] - start_pos + resolution * factor) / (end_pos - start_pos) * 100 - # 0 <= (values[i] - start_pos + resolution * factor) / (end_pos - start_pos) * 100 <= 100 - # (start_pos - values[i]) / resolution <= factor <= (end_pos - values[i]) / resolution - low_factor = (start_pos - values[i]) / resolution - upp_factor = (end_pos - values[i]) / resolution - - if not in_range: - # Get first integer between the two bounds - if low_factor < upp_factor: - factor = math.ceil(low_factor) - - if factor > upp_factor: - raise ValueError(f"No integer found between bounds [{low_factor=}, {upp_factor=}]") - else: - factor = math.ceil(upp_factor) - - if factor > low_factor: - raise ValueError(f"No integer found between bounds [{low_factor=}, {upp_factor=}]") - - if CalibrationMode[calib_mode] == CalibrationMode.DEGREE: - out_of_range_str = f"{LOWER_BOUND_DEGREE} < {calib_val} < {UPPER_BOUND_DEGREE} degrees" - in_range_str = f"{LOWER_BOUND_DEGREE} < {calib_val} < {UPPER_BOUND_DEGREE} degrees" - elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR: - out_of_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %" - in_range_str = f"{LOWER_BOUND_LINEAR} < {calib_val} < {UPPER_BOUND_LINEAR} %" - - logging.warning( - f"Auto-correct calibration of motor '{name}' by shifting value by {abs(factor)} full turns, " - f"from '{out_of_range_str}' to '{in_range_str}'." - ) - - # A full turn corresponds to 360 degrees but also to 4096 steps for a motor resolution of 4096. - self.calibration["homing_offset"][calib_idx] += resolution * factor - - def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): - # TODO(aliberts): remove np - """Inverse of `apply_calibration`.""" - if motor_names is None: - motor_names = self.motor_names - - for i, name in enumerate(motor_names): - calib_idx = self.calibration["motor_names"].index(name) - calib_mode = self.calibration["calib_mode"][calib_idx] - - if CalibrationMode[calib_mode] == CalibrationMode.DEGREE: - drive_mode = self.calibration["drive_mode"][calib_idx] - homing_offset = self.calibration["homing_offset"][calib_idx] - _, model = self.motors[name] - resolution = self.model_resolution[model] - - # Convert from nominal 0-centered degree range [-180, 180] to - # 0-centered resolution range (e.g. [-2048, 2048] for resolution=4096) - values[i] = values[i] / HALF_TURN_DEGREE * (resolution // 2) - - # Subtract the homing offsets to come back to actual motor range of values - # which can be arbitrary. - values[i] -= homing_offset - - # Remove drive mode, which is the rotation direction of the motor, to come back to - # actual motor rotation direction which can be arbitrary. - if drive_mode: - values[i] *= -1 - - elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR: - start_pos = self.calibration["start_pos"][calib_idx] - end_pos = self.calibration["end_pos"][calib_idx] - - # Convert from nominal lnear range of [0, 100] % to - # actual motor range of values which can be arbitrary. - values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos - - values = np.round(values).astype(np.int32) - return values - - def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY): + def _is_comm_success(self, comm: int) -> bool: import dynamixel_sdk as dxl - return_list = True - if not isinstance(motor_ids, list): - return_list = False - motor_ids = [motor_ids] + return comm == dxl.COMM_SUCCESS - assert_same_address(self.model_ctrl_table, self.motor_models, data_name) - addr, bytes = self.model_ctrl_table[motor_models[0]][data_name] - group = dxl.GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes) - for idx in motor_ids: - group.addParam(idx) + @staticmethod + def split_int_bytes(value: int, n_bytes: int) -> list[int]: + # Validate input + if value < 0: + raise ValueError(f"Negative values are not allowed: {value}") - for _ in range(num_retry): - comm = group.txRxPacket() - if comm == dxl.COMM_SUCCESS: - break + max_value = {1: 0xFF, 2: 0xFFFF, 4: 0xFFFFFFFF}.get(n_bytes) + if max_value is None: + raise NotImplementedError(f"Unsupported byte size: {n_bytes}. Expected [1, 2, 4].") - if comm != dxl.COMM_SUCCESS: - raise ConnectionError( - f"Read failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: " - f"{self.packet_handler.getTxRxResult(comm)}" - ) + if value > max_value: + raise ValueError(f"Value {value} exceeds the maximum for {n_bytes} bytes ({max_value}).") - values = [] - for idx in motor_ids: - value = group.getData(idx, addr, bytes) - values.append(value) - - if return_list: - return values - else: - return values[0] - - def _read(self, data_name: str, motor_names: list[str]): import dynamixel_sdk as dxl - motor_ids = [] - models = [] - for name in motor_names: - motor_idx, model = self.motors[name] - motor_ids.append(motor_idx) - models.append(model) - - assert_same_address(self.model_ctrl_table, models, data_name) - addr, bytes = self.model_ctrl_table[model][data_name] - group_key = get_group_sync_key(data_name, motor_names) - - if data_name not in self.group_readers: - # create new group reader - self.group_readers[group_key] = dxl.GroupSyncRead( - self.port_handler, self.packet_handler, addr, bytes - ) - for idx in motor_ids: - self.group_readers[group_key].addParam(idx) - - for _ in range(NUM_READ_RETRY): - comm = self.group_readers[group_key].txRxPacket() - if comm == dxl.COMM_SUCCESS: - break - - if comm != dxl.COMM_SUCCESS: - raise ConnectionError( - f"Read failed due to communication error on port {self.port} for group_key {group_key}: " - f"{self.packet_handler.getTxRxResult(comm)}" - ) - - values = [] - for idx in motor_ids: - value = self.group_readers[group_key].getData(idx, addr, bytes) - values.append(value) - - values = np.array(values) - - # Convert to signed int to use range [-2048, 2048] for our motor positions. - if data_name in CONVERT_UINT32_TO_INT32_REQUIRED: - values = values.astype(np.int32) - - if data_name in CALIBRATION_REQUIRED and self.calibration is not None: - values = self._apply_calibration_autocorrect(values, motor_names) - - return values - - def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY): - import dynamixel_sdk as dxl - - if not isinstance(motor_ids, list): - motor_ids = [motor_ids] - if not isinstance(values, list): - values = [values] - - assert_same_address(self.model_ctrl_table, motor_models, data_name) - addr, bytes = self.model_ctrl_table[motor_models[0]][data_name] - group = dxl.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes) - for idx, value in zip(motor_ids, values, strict=True): - data = convert_to_bytes(value, bytes) - group.addParam(idx, data) - - for _ in range(num_retry): - comm = group.txPacket() - if comm == dxl.COMM_SUCCESS: - break - - if comm != dxl.COMM_SUCCESS: - raise ConnectionError( - f"Write failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: " - f"{self.packet_handler.getTxRxResult(comm)}" - ) - - def _write(self, data_name: str, values: list[int], motor_names: list[str]) -> None: - import dynamixel_sdk as dxl - - motor_ids = [] - models = [] - for name in motor_names: - motor_idx, model = self.motors[name] - motor_ids.append(motor_idx) - models.append(model) - - if data_name in CALIBRATION_REQUIRED and self.calibration is not None: - values = self.revert_calibration(values, motor_names) - - assert_same_address(self.model_ctrl_table, models, data_name) - addr, bytes = self.model_ctrl_table[model][data_name] - group_key = get_group_sync_key(data_name, motor_names) - - init_group = data_name not in self.group_readers - if init_group: - self.group_writers[group_key] = dxl.GroupSyncWrite( - self.port_handler, self.packet_handler, addr, bytes - ) - - for idx, value in zip(motor_ids, values, strict=True): - data = convert_to_bytes(value, bytes) - if init_group: - self.group_writers[group_key].addParam(idx, data) - else: - self.group_writers[group_key].changeParam(idx, data) - - comm = self.group_writers[group_key].txPacket() - if comm != dxl.COMM_SUCCESS: - raise ConnectionError( - f"Write failed due to communication error on port {self.port} for group_key {group_key}: " - f"{self.packet_handler.getTxRxResult(comm)}" - ) - - -def set_operating_mode(bus: DynamixelMotorsBus): - if (bus.read("Torque_Enable") != TorqueMode.DISABLED.value).any(): - raise ValueError("To run set robot preset, the torque must be disabled on all motors.") - - # Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't - # rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm, - # you could end up with a servo with a position 0 or 4095 at a crucial point See [ - # https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11] - all_motors_except_gripper = [name for name in bus.motor_names if name != "gripper"] - if len(all_motors_except_gripper) > 0: - # 4 corresponds to Extended Position on Koch motors - bus.write("Operating_Mode", 4, all_motors_except_gripper) - - # Use 'position control current based' for gripper to be limited by the limit of the current. - # For the follower gripper, it means it can grasp an object without forcing too much even tho, - # it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch). - # For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger - # to make it move, and it will move back to its original target position when we release the force. - # 5 corresponds to Current Controlled Position on Koch gripper motors "xl330-m077, xl330-m288" - bus.write("Operating_Mode", 5, "gripper") + # Note: No need to convert back into unsigned int, since this byte preprocessing + # already handles it for us. + if n_bytes == 1: + data = [ + dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)), + ] + elif n_bytes == 2: + data = [ + dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)), + dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)), + ] + elif n_bytes == 4: + data = [ + dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)), + dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)), + dxl.DXL_LOBYTE(dxl.DXL_HIWORD(value)), + dxl.DXL_HIBYTE(dxl.DXL_HIWORD(value)), + ] + return data