diff --git a/lerobot/common/motors/dynamixel/dynamixel.py b/lerobot/common/motors/dynamixel/dynamixel.py index dc16ba6e..8f69b8b5 100644 --- a/lerobot/common/motors/dynamixel/dynamixel.py +++ b/lerobot/common/motors/dynamixel/dynamixel.py @@ -125,13 +125,13 @@ class DynamixelMotorsBus(MotorsBus): for id_ in self.ids: self.write("Return_Delay_Time", id_, 0) - def _disable_torque(self, motors: list[NameOrID]) -> None: - for motor in motors: - self.write("Torque_Enable", motor, TorqueMode.DISABLED.value) + def disable_torque(self, motors: str | list[str] | None = None) -> None: + for name in self._get_names_list(motors): + self.write("Torque_Enable", name, TorqueMode.DISABLED.value) - def _enable_torque(self, motors: list[NameOrID]) -> None: - for motor in motors: - self.write("Torque_Enable", motor, TorqueMode.ENABLED.value) + def enable_torque(self, motors: str | list[str] | None = None) -> None: + for name in self._get_names_list(motors): + self.write("Torque_Enable", name, TorqueMode.ENABLED.value) def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]: for id_ in ids_values: @@ -167,22 +167,9 @@ class DynamixelMotorsBus(MotorsBus): return half_turn_homings @staticmethod - def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]: - # Validate input - if value < 0: - raise ValueError(f"Negative values are not allowed: {value}") - - max_value = {1: 0xFF, 2: 0xFFFF, 4: 0xFFFFFFFF}.get(n_bytes) - if max_value is None: - raise NotImplementedError(f"Unsupported byte size: {n_bytes}. Expected [1, 2, 4].") - - if value > max_value: - raise ValueError(f"Value {value} exceeds the maximum for {n_bytes} bytes ({max_value}).") - + def _split_into_byte_chunks(value: int, n_bytes: int) -> list[int]: import dynamixel_sdk as dxl - # Note: No need to convert back into unsigned int, since this byte preprocessing - # already handles it for us. if n_bytes == 1: data = [value] elif n_bytes == 2: diff --git a/lerobot/common/motors/feetech/feetech.py b/lerobot/common/motors/feetech/feetech.py index 89c8ac9f..f7557c97 100644 --- a/lerobot/common/motors/feetech/feetech.py +++ b/lerobot/common/motors/feetech/feetech.py @@ -139,15 +139,15 @@ class FeetechMotorsBus(MotorsBus): return half_turn_homings - def _disable_torque(self, motors: list[NameOrID]) -> None: - for motor in motors: - self.write("Torque_Enable", motor, TorqueMode.DISABLED.value) - self.write("Lock", motor, 0) + def disable_torque(self, motors: str | list[str] | None = None) -> None: + for name in self._get_names_list(motors): + self.write("Torque_Enable", name, TorqueMode.DISABLED.value) + self.write("Lock", name, 0) - def _enable_torque(self, motors: list[NameOrID]) -> None: - for motor in motors: - self.write("Torque_Enable", motor, TorqueMode.ENABLED.value) - self.write("Lock", motor, 1) + def enable_torque(self, motors: str | list[str] | None = None) -> None: + for name in self._get_names_list(motors): + self.write("Torque_Enable", name, TorqueMode.ENABLED.value) + self.write("Lock", name, 1) def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]: for id_ in ids_values: @@ -170,18 +170,7 @@ class FeetechMotorsBus(MotorsBus): return ids_values @staticmethod - def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]: - # Validate input - if value < 0: - raise ValueError(f"Negative values are not allowed: {value}") - - max_value = {1: 0xFF, 2: 0xFFFF, 4: 0xFFFFFFFF}.get(n_bytes) - if max_value is None: - raise NotImplementedError(f"Unsupported byte size: {n_bytes}. Expected [1, 2, 4].") - - if value > max_value: - raise ValueError(f"Value {value} exceeds the maximum for {n_bytes} bytes ({max_value}).") - + def _split_into_byte_chunks(value: int, n_bytes: int) -> list[int]: import scservo_sdk as scs if n_bytes == 1: @@ -197,7 +186,23 @@ class FeetechMotorsBus(MotorsBus): ] return data - def _broadcast_ping(self) -> tuple[dict[int, int], int]: + def _broadcast_ping_p1(self, known_motors_only: bool = True, num_retry: int = 0) -> dict[int, int]: + if known_motors_only: + ids = self.ids + else: + import scservo_sdk as scs + + ids = range(scs.MAX_ID + 1) + + ids_models = {} + for id_ in ids: + model_number = self.ping(id_, num_retry) + if model_number is not None: + ids_models[id_] = model_number + + return ids_models + + def _broadcast_ping_p0(self) -> tuple[dict[int, int], int]: import scservo_sdk as scs data_list = {} @@ -251,7 +256,7 @@ class FeetechMotorsBus(MotorsBus): for idx in range(2, status_length - 1): # except header & checksum checksum += rxpacket[idx] - checksum = scs.SCS_LOBYTE(~checksum) + checksum = ~checksum & 0xFF if rxpacket[status_length - 1] == checksum: result = scs.COMM_SUCCESS data_list[rxpacket[scs.PKT_ID]] = rxpacket[scs.PKT_ERROR] @@ -272,24 +277,31 @@ class FeetechMotorsBus(MotorsBus): rx_length = rx_length - idx def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None: - for n_try in range(1 + num_retry): - ids_status, comm = self._broadcast_ping() - if self._is_comm_success(comm): - break - logger.debug(f"Broadcast ping failed on port '{self.port}' ({n_try=})") - logger.debug(self.packet_handler.getTxRxResult(comm)) + if self.protocol_version == 0: + for n_try in range(1 + num_retry): + ids_status, comm = self._broadcast_ping_p0() + if self._is_comm_success(comm): + break + logger.debug(f"Broadcast ping failed on port '{self.port}' ({n_try=})") + logger.debug(self.packet_handler.getTxRxResult(comm)) - if not self._is_comm_success(comm): - if raise_on_error: - raise ConnectionError(self.packet_handler.getTxRxResult(comm)) - return + if not self._is_comm_success(comm): + if raise_on_error: + raise ConnectionError(self.packet_handler.getTxRxResult(comm)) + return - ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)} - if ids_errors: - display_dict = {id_: self.packet_handler.getRxPacketError(err) for id_, err in ids_errors.items()} - logger.error(f"Some motors found returned an error status:\n{pformat(display_dict, indent=4)}") + ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)} + if ids_errors: + display_dict = { + id_: self.packet_handler.getRxPacketError(err) for id_, err in ids_errors.items() + } + logger.error( + f"Some motors found returned an error status:\n{pformat(display_dict, indent=4)}" + ) - return self._get_model_number(list(ids_status), raise_on_error) + return self._get_model_number(list(ids_status), raise_on_error) + else: + return self._broadcast_ping_p1(num_retry=num_retry) def _get_firmware_version(self, motor_ids: list[int], raise_on_error: bool = False) -> dict[int, int]: # comm, major = self._sync_read(*FIRMWARE_MAJOR_VERSION, motor_ids) @@ -328,11 +340,20 @@ class FeetechMotorsBus(MotorsBus): # return # return {id_: f"{major[id_]}.{minor[id_]}" for id_ in motor_ids} + if self.protocol_version == 1: + model_numbers = {} + for id_ in motor_ids: + model_nb, comm, error = self._read(*MODEL_NUMBER, id_) + if self._is_comm_success(comm) and not self._is_error(error): + model_numbers[id_] = model_nb + elif raise_on_error: + raise Exception # FIX - comm, model_numbers = self._sync_read(*MODEL_NUMBER, motor_ids) - if not self._is_comm_success(comm): - if raise_on_error: - raise ConnectionError(self.packet_handler.getTxRxResult(comm)) - return + else: + comm, model_numbers = self._sync_read(*MODEL_NUMBER, motor_ids) + if not self._is_comm_success(comm): + if raise_on_error: + raise ConnectionError(self.packet_handler.getTxRxResult(comm)) + return return model_numbers diff --git a/lerobot/common/motors/feetech/tables.py b/lerobot/common/motors/feetech/tables.py index 0fa2fa84..e6d08cf8 100644 --- a/lerobot/common/motors/feetech/tables.py +++ b/lerobot/common/motors/feetech/tables.py @@ -199,5 +199,5 @@ MODEL_NUMBER_TABLE = { "sts3215": 777, "sts3250": None, "sm8512bl": None, - "scs0009": None, + "scs0009": 1284, } diff --git a/lerobot/common/motors/motors_bus.py b/lerobot/common/motors/motors_bus.py index 4c2a836c..3c64be7b 100644 --- a/lerobot/common/motors/motors_bus.py +++ b/lerobot/common/motors/motors_bus.py @@ -445,34 +445,12 @@ class MotorsBus(abc.ABC): def configure_motors(self) -> None: pass - def disable_torque(self, motors: NameOrID | list[NameOrID] | None = None) -> None: - pass - if motors is None: - motors = self.names - elif isinstance(motors, (str, int)): - motors = [motors] - elif not isinstance(motors, list): - raise TypeError(motors) - - self._disable_torque(motors) - - def enable_torque(self, motors: NameOrID | list[NameOrID] | None = None) -> None: - pass - if motors is None: - motors = self.names - elif isinstance(motors, (str, int)): - motors = [motors] - elif not isinstance(motors, list): - raise TypeError(motors) - - self._enable_torque(motors) - @abc.abstractmethod - def _enable_torque(self, motors: list[NameOrID]) -> None: + def disable_torque(self, motors: str | list[str] | None = None) -> None: pass @abc.abstractmethod - def _disable_torque(self, motors: list[NameOrID]) -> None: + def enable_torque(self, motors: str | list[str] | None = None) -> None: pass def set_timeout(self, timeout_ms: int | None = None): @@ -620,6 +598,8 @@ class MotorsBus(abc.ABC): return mins, maxes def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]: + if not self.calibration: + raise RuntimeError(f"{self} has no calibration registered.") normalized_values = {} for id_, val in ids_values.items(): name = self._id_to_name(id_) @@ -662,11 +642,10 @@ class MotorsBus(abc.ABC): def _decode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]: pass - @staticmethod - @abc.abstractmethod - def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]: + def _serialize_data(self, value: int, n_bytes: int) -> list[int]: """ - Splits an unsigned integer into a list of bytes in little-endian order. + Converts an unsigned integer value into a list of byte-sized integers to be sent via a communication + protocol. Depending on the protocol, split values can be in big-endian or little-endian order. This function extracts the individual bytes of an integer based on the specified number of bytes (`n_bytes`). The output is a list of integers, @@ -678,7 +657,8 @@ class MotorsBus(abc.ABC): Args: value (int): The unsigned integer to be converted into a byte list. Must be within the valid range for the specified `n_bytes`. - n_bytes (int): The number of bytes to use for conversion. Supported values: + n_bytes (int): The number of bytes to use for conversion. Supported values for both Feetech and + Dynamixel: - 1 (for values 0 to 255) - 2 (for values 0 to 65,535) - 4 (for values 0 to 4,294,967,295) @@ -690,7 +670,7 @@ class MotorsBus(abc.ABC): Returns: list[int]: A list of integers, each representing a byte in **little-endian order**. - Examples: + Examples (for a little-endian protocol): >>> split_int_bytes(0x12, 1) [18] >>> split_int_bytes(0x1234, 2) @@ -698,6 +678,22 @@ class MotorsBus(abc.ABC): >>> split_int_bytes(0x12345678, 4) [120, 86, 52, 18] # 0x12345678 → 0x78 0x56 0x34 0x12 """ + if value < 0: + raise ValueError(f"Negative values are not allowed: {value}") + + max_value = {1: 0xFF, 2: 0xFFFF, 4: 0xFFFFFFFF}.get(n_bytes) + if max_value is None: + raise NotImplementedError(f"Unsupported byte size: {n_bytes}. Expected [1, 2, 4].") + + if value > max_value: + raise ValueError(f"Value {value} exceeds the maximum for {n_bytes} bytes ({max_value}).") + + return self._split_into_byte_chunks(value, n_bytes) + + @staticmethod + @abc.abstractmethod + def _split_into_byte_chunks(value: int, n_bytes: int) -> list[int]: + """Convert an integer into a list of byte-sized integers.""" pass def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None: @@ -814,7 +810,7 @@ class MotorsBus(abc.ABC): def _write( self, addr: int, n_bytes: int, motor_id: int, value: int, num_retry: int = 0 ) -> tuple[int, int]: - data = self._split_int_to_bytes(value, n_bytes) + data = self._serialize_data(value, n_bytes) for n_try in range(1 + num_retry): comm, error = self.packet_handler.writeTxRx(self.port_handler, motor_id, addr, n_bytes, data) if self._is_comm_success(comm): @@ -953,7 +949,7 @@ class MotorsBus(abc.ABC): self.sync_writer.start_address = addr self.sync_writer.data_length = n_bytes for id_, value in ids_values.items(): - data = self._split_int_to_bytes(value, n_bytes) + data = self._serialize_data(value, n_bytes) self.sync_writer.addParam(id_, data) def disconnect(self, disable_torque: bool = True) -> None: