Add protocol 1 broadcast ping
This commit is contained in:
parent
12abc9ca86
commit
27cb0c40bd
|
@ -125,13 +125,13 @@ class DynamixelMotorsBus(MotorsBus):
|
||||||
for id_ in self.ids:
|
for id_ in self.ids:
|
||||||
self.write("Return_Delay_Time", id_, 0)
|
self.write("Return_Delay_Time", id_, 0)
|
||||||
|
|
||||||
def _disable_torque(self, motors: list[NameOrID]) -> None:
|
def disable_torque(self, motors: str | list[str] | None = None) -> None:
|
||||||
for motor in motors:
|
for name in self._get_names_list(motors):
|
||||||
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value)
|
self.write("Torque_Enable", name, TorqueMode.DISABLED.value)
|
||||||
|
|
||||||
def _enable_torque(self, motors: list[NameOrID]) -> None:
|
def enable_torque(self, motors: str | list[str] | None = None) -> None:
|
||||||
for motor in motors:
|
for name in self._get_names_list(motors):
|
||||||
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value)
|
self.write("Torque_Enable", name, TorqueMode.ENABLED.value)
|
||||||
|
|
||||||
def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
|
def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
|
||||||
for id_ in ids_values:
|
for id_ in ids_values:
|
||||||
|
@ -167,22 +167,9 @@ class DynamixelMotorsBus(MotorsBus):
|
||||||
return half_turn_homings
|
return half_turn_homings
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]:
|
def _split_into_byte_chunks(value: int, n_bytes: int) -> list[int]:
|
||||||
# Validate input
|
|
||||||
if value < 0:
|
|
||||||
raise ValueError(f"Negative values are not allowed: {value}")
|
|
||||||
|
|
||||||
max_value = {1: 0xFF, 2: 0xFFFF, 4: 0xFFFFFFFF}.get(n_bytes)
|
|
||||||
if max_value is None:
|
|
||||||
raise NotImplementedError(f"Unsupported byte size: {n_bytes}. Expected [1, 2, 4].")
|
|
||||||
|
|
||||||
if value > max_value:
|
|
||||||
raise ValueError(f"Value {value} exceeds the maximum for {n_bytes} bytes ({max_value}).")
|
|
||||||
|
|
||||||
import dynamixel_sdk as dxl
|
import dynamixel_sdk as dxl
|
||||||
|
|
||||||
# Note: No need to convert back into unsigned int, since this byte preprocessing
|
|
||||||
# already handles it for us.
|
|
||||||
if n_bytes == 1:
|
if n_bytes == 1:
|
||||||
data = [value]
|
data = [value]
|
||||||
elif n_bytes == 2:
|
elif n_bytes == 2:
|
||||||
|
|
|
@ -139,15 +139,15 @@ class FeetechMotorsBus(MotorsBus):
|
||||||
|
|
||||||
return half_turn_homings
|
return half_turn_homings
|
||||||
|
|
||||||
def _disable_torque(self, motors: list[NameOrID]) -> None:
|
def disable_torque(self, motors: str | list[str] | None = None) -> None:
|
||||||
for motor in motors:
|
for name in self._get_names_list(motors):
|
||||||
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value)
|
self.write("Torque_Enable", name, TorqueMode.DISABLED.value)
|
||||||
self.write("Lock", motor, 0)
|
self.write("Lock", name, 0)
|
||||||
|
|
||||||
def _enable_torque(self, motors: list[NameOrID]) -> None:
|
def enable_torque(self, motors: str | list[str] | None = None) -> None:
|
||||||
for motor in motors:
|
for name in self._get_names_list(motors):
|
||||||
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value)
|
self.write("Torque_Enable", name, TorqueMode.ENABLED.value)
|
||||||
self.write("Lock", motor, 1)
|
self.write("Lock", name, 1)
|
||||||
|
|
||||||
def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
|
def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
|
||||||
for id_ in ids_values:
|
for id_ in ids_values:
|
||||||
|
@ -170,18 +170,7 @@ class FeetechMotorsBus(MotorsBus):
|
||||||
return ids_values
|
return ids_values
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]:
|
def _split_into_byte_chunks(value: int, n_bytes: int) -> list[int]:
|
||||||
# Validate input
|
|
||||||
if value < 0:
|
|
||||||
raise ValueError(f"Negative values are not allowed: {value}")
|
|
||||||
|
|
||||||
max_value = {1: 0xFF, 2: 0xFFFF, 4: 0xFFFFFFFF}.get(n_bytes)
|
|
||||||
if max_value is None:
|
|
||||||
raise NotImplementedError(f"Unsupported byte size: {n_bytes}. Expected [1, 2, 4].")
|
|
||||||
|
|
||||||
if value > max_value:
|
|
||||||
raise ValueError(f"Value {value} exceeds the maximum for {n_bytes} bytes ({max_value}).")
|
|
||||||
|
|
||||||
import scservo_sdk as scs
|
import scservo_sdk as scs
|
||||||
|
|
||||||
if n_bytes == 1:
|
if n_bytes == 1:
|
||||||
|
@ -197,7 +186,23 @@ class FeetechMotorsBus(MotorsBus):
|
||||||
]
|
]
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def _broadcast_ping(self) -> tuple[dict[int, int], int]:
|
def _broadcast_ping_p1(self, known_motors_only: bool = True, num_retry: int = 0) -> dict[int, int]:
|
||||||
|
if known_motors_only:
|
||||||
|
ids = self.ids
|
||||||
|
else:
|
||||||
|
import scservo_sdk as scs
|
||||||
|
|
||||||
|
ids = range(scs.MAX_ID + 1)
|
||||||
|
|
||||||
|
ids_models = {}
|
||||||
|
for id_ in ids:
|
||||||
|
model_number = self.ping(id_, num_retry)
|
||||||
|
if model_number is not None:
|
||||||
|
ids_models[id_] = model_number
|
||||||
|
|
||||||
|
return ids_models
|
||||||
|
|
||||||
|
def _broadcast_ping_p0(self) -> tuple[dict[int, int], int]:
|
||||||
import scservo_sdk as scs
|
import scservo_sdk as scs
|
||||||
|
|
||||||
data_list = {}
|
data_list = {}
|
||||||
|
@ -251,7 +256,7 @@ class FeetechMotorsBus(MotorsBus):
|
||||||
for idx in range(2, status_length - 1): # except header & checksum
|
for idx in range(2, status_length - 1): # except header & checksum
|
||||||
checksum += rxpacket[idx]
|
checksum += rxpacket[idx]
|
||||||
|
|
||||||
checksum = scs.SCS_LOBYTE(~checksum)
|
checksum = ~checksum & 0xFF
|
||||||
if rxpacket[status_length - 1] == checksum:
|
if rxpacket[status_length - 1] == checksum:
|
||||||
result = scs.COMM_SUCCESS
|
result = scs.COMM_SUCCESS
|
||||||
data_list[rxpacket[scs.PKT_ID]] = rxpacket[scs.PKT_ERROR]
|
data_list[rxpacket[scs.PKT_ID]] = rxpacket[scs.PKT_ERROR]
|
||||||
|
@ -272,8 +277,9 @@ class FeetechMotorsBus(MotorsBus):
|
||||||
rx_length = rx_length - idx
|
rx_length = rx_length - idx
|
||||||
|
|
||||||
def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
|
def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
|
||||||
|
if self.protocol_version == 0:
|
||||||
for n_try in range(1 + num_retry):
|
for n_try in range(1 + num_retry):
|
||||||
ids_status, comm = self._broadcast_ping()
|
ids_status, comm = self._broadcast_ping_p0()
|
||||||
if self._is_comm_success(comm):
|
if self._is_comm_success(comm):
|
||||||
break
|
break
|
||||||
logger.debug(f"Broadcast ping failed on port '{self.port}' ({n_try=})")
|
logger.debug(f"Broadcast ping failed on port '{self.port}' ({n_try=})")
|
||||||
|
@ -286,10 +292,16 @@ class FeetechMotorsBus(MotorsBus):
|
||||||
|
|
||||||
ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)}
|
ids_errors = {id_: status for id_, status in ids_status.items() if self._is_error(status)}
|
||||||
if ids_errors:
|
if ids_errors:
|
||||||
display_dict = {id_: self.packet_handler.getRxPacketError(err) for id_, err in ids_errors.items()}
|
display_dict = {
|
||||||
logger.error(f"Some motors found returned an error status:\n{pformat(display_dict, indent=4)}")
|
id_: self.packet_handler.getRxPacketError(err) for id_, err in ids_errors.items()
|
||||||
|
}
|
||||||
|
logger.error(
|
||||||
|
f"Some motors found returned an error status:\n{pformat(display_dict, indent=4)}"
|
||||||
|
)
|
||||||
|
|
||||||
return self._get_model_number(list(ids_status), raise_on_error)
|
return self._get_model_number(list(ids_status), raise_on_error)
|
||||||
|
else:
|
||||||
|
return self._broadcast_ping_p1(num_retry=num_retry)
|
||||||
|
|
||||||
def _get_firmware_version(self, motor_ids: list[int], raise_on_error: bool = False) -> dict[int, int]:
|
def _get_firmware_version(self, motor_ids: list[int], raise_on_error: bool = False) -> dict[int, int]:
|
||||||
# comm, major = self._sync_read(*FIRMWARE_MAJOR_VERSION, motor_ids)
|
# comm, major = self._sync_read(*FIRMWARE_MAJOR_VERSION, motor_ids)
|
||||||
|
@ -328,7 +340,16 @@ class FeetechMotorsBus(MotorsBus):
|
||||||
# return
|
# return
|
||||||
|
|
||||||
# return {id_: f"{major[id_]}.{minor[id_]}" for id_ in motor_ids}
|
# return {id_: f"{major[id_]}.{minor[id_]}" for id_ in motor_ids}
|
||||||
|
if self.protocol_version == 1:
|
||||||
|
model_numbers = {}
|
||||||
|
for id_ in motor_ids:
|
||||||
|
model_nb, comm, error = self._read(*MODEL_NUMBER, id_)
|
||||||
|
if self._is_comm_success(comm) and not self._is_error(error):
|
||||||
|
model_numbers[id_] = model_nb
|
||||||
|
elif raise_on_error:
|
||||||
|
raise Exception # FIX
|
||||||
|
|
||||||
|
else:
|
||||||
comm, model_numbers = self._sync_read(*MODEL_NUMBER, motor_ids)
|
comm, model_numbers = self._sync_read(*MODEL_NUMBER, motor_ids)
|
||||||
if not self._is_comm_success(comm):
|
if not self._is_comm_success(comm):
|
||||||
if raise_on_error:
|
if raise_on_error:
|
||||||
|
|
|
@ -199,5 +199,5 @@ MODEL_NUMBER_TABLE = {
|
||||||
"sts3215": 777,
|
"sts3215": 777,
|
||||||
"sts3250": None,
|
"sts3250": None,
|
||||||
"sm8512bl": None,
|
"sm8512bl": None,
|
||||||
"scs0009": None,
|
"scs0009": 1284,
|
||||||
}
|
}
|
||||||
|
|
|
@ -445,34 +445,12 @@ class MotorsBus(abc.ABC):
|
||||||
def configure_motors(self) -> None:
|
def configure_motors(self) -> None:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def disable_torque(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
|
|
||||||
pass
|
|
||||||
if motors is None:
|
|
||||||
motors = self.names
|
|
||||||
elif isinstance(motors, (str, int)):
|
|
||||||
motors = [motors]
|
|
||||||
elif not isinstance(motors, list):
|
|
||||||
raise TypeError(motors)
|
|
||||||
|
|
||||||
self._disable_torque(motors)
|
|
||||||
|
|
||||||
def enable_torque(self, motors: NameOrID | list[NameOrID] | None = None) -> None:
|
|
||||||
pass
|
|
||||||
if motors is None:
|
|
||||||
motors = self.names
|
|
||||||
elif isinstance(motors, (str, int)):
|
|
||||||
motors = [motors]
|
|
||||||
elif not isinstance(motors, list):
|
|
||||||
raise TypeError(motors)
|
|
||||||
|
|
||||||
self._enable_torque(motors)
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def _enable_torque(self, motors: list[NameOrID]) -> None:
|
def disable_torque(self, motors: str | list[str] | None = None) -> None:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def _disable_torque(self, motors: list[NameOrID]) -> None:
|
def enable_torque(self, motors: str | list[str] | None = None) -> None:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def set_timeout(self, timeout_ms: int | None = None):
|
def set_timeout(self, timeout_ms: int | None = None):
|
||||||
|
@ -620,6 +598,8 @@ class MotorsBus(abc.ABC):
|
||||||
return mins, maxes
|
return mins, maxes
|
||||||
|
|
||||||
def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]:
|
def _normalize(self, data_name: str, ids_values: dict[int, int]) -> dict[int, float]:
|
||||||
|
if not self.calibration:
|
||||||
|
raise RuntimeError(f"{self} has no calibration registered.")
|
||||||
normalized_values = {}
|
normalized_values = {}
|
||||||
for id_, val in ids_values.items():
|
for id_, val in ids_values.items():
|
||||||
name = self._id_to_name(id_)
|
name = self._id_to_name(id_)
|
||||||
|
@ -662,11 +642,10 @@ class MotorsBus(abc.ABC):
|
||||||
def _decode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
|
def _decode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@staticmethod
|
def _serialize_data(self, value: int, n_bytes: int) -> list[int]:
|
||||||
@abc.abstractmethod
|
|
||||||
def _split_int_to_bytes(value: int, n_bytes: int) -> list[int]:
|
|
||||||
"""
|
"""
|
||||||
Splits an unsigned integer into a list of bytes in little-endian order.
|
Converts an unsigned integer value into a list of byte-sized integers to be sent via a communication
|
||||||
|
protocol. Depending on the protocol, split values can be in big-endian or little-endian order.
|
||||||
|
|
||||||
This function extracts the individual bytes of an integer based on the
|
This function extracts the individual bytes of an integer based on the
|
||||||
specified number of bytes (`n_bytes`). The output is a list of integers,
|
specified number of bytes (`n_bytes`). The output is a list of integers,
|
||||||
|
@ -678,7 +657,8 @@ class MotorsBus(abc.ABC):
|
||||||
Args:
|
Args:
|
||||||
value (int): The unsigned integer to be converted into a byte list. Must be within
|
value (int): The unsigned integer to be converted into a byte list. Must be within
|
||||||
the valid range for the specified `n_bytes`.
|
the valid range for the specified `n_bytes`.
|
||||||
n_bytes (int): The number of bytes to use for conversion. Supported values:
|
n_bytes (int): The number of bytes to use for conversion. Supported values for both Feetech and
|
||||||
|
Dynamixel:
|
||||||
- 1 (for values 0 to 255)
|
- 1 (for values 0 to 255)
|
||||||
- 2 (for values 0 to 65,535)
|
- 2 (for values 0 to 65,535)
|
||||||
- 4 (for values 0 to 4,294,967,295)
|
- 4 (for values 0 to 4,294,967,295)
|
||||||
|
@ -690,7 +670,7 @@ class MotorsBus(abc.ABC):
|
||||||
Returns:
|
Returns:
|
||||||
list[int]: A list of integers, each representing a byte in **little-endian order**.
|
list[int]: A list of integers, each representing a byte in **little-endian order**.
|
||||||
|
|
||||||
Examples:
|
Examples (for a little-endian protocol):
|
||||||
>>> split_int_bytes(0x12, 1)
|
>>> split_int_bytes(0x12, 1)
|
||||||
[18]
|
[18]
|
||||||
>>> split_int_bytes(0x1234, 2)
|
>>> split_int_bytes(0x1234, 2)
|
||||||
|
@ -698,6 +678,22 @@ class MotorsBus(abc.ABC):
|
||||||
>>> split_int_bytes(0x12345678, 4)
|
>>> split_int_bytes(0x12345678, 4)
|
||||||
[120, 86, 52, 18] # 0x12345678 → 0x78 0x56 0x34 0x12
|
[120, 86, 52, 18] # 0x12345678 → 0x78 0x56 0x34 0x12
|
||||||
"""
|
"""
|
||||||
|
if value < 0:
|
||||||
|
raise ValueError(f"Negative values are not allowed: {value}")
|
||||||
|
|
||||||
|
max_value = {1: 0xFF, 2: 0xFFFF, 4: 0xFFFFFFFF}.get(n_bytes)
|
||||||
|
if max_value is None:
|
||||||
|
raise NotImplementedError(f"Unsupported byte size: {n_bytes}. Expected [1, 2, 4].")
|
||||||
|
|
||||||
|
if value > max_value:
|
||||||
|
raise ValueError(f"Value {value} exceeds the maximum for {n_bytes} bytes ({max_value}).")
|
||||||
|
|
||||||
|
return self._split_into_byte_chunks(value, n_bytes)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _split_into_byte_chunks(value: int, n_bytes: int) -> list[int]:
|
||||||
|
"""Convert an integer into a list of byte-sized integers."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None:
|
def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> int | None:
|
||||||
|
@ -814,7 +810,7 @@ class MotorsBus(abc.ABC):
|
||||||
def _write(
|
def _write(
|
||||||
self, addr: int, n_bytes: int, motor_id: int, value: int, num_retry: int = 0
|
self, addr: int, n_bytes: int, motor_id: int, value: int, num_retry: int = 0
|
||||||
) -> tuple[int, int]:
|
) -> tuple[int, int]:
|
||||||
data = self._split_int_to_bytes(value, n_bytes)
|
data = self._serialize_data(value, n_bytes)
|
||||||
for n_try in range(1 + num_retry):
|
for n_try in range(1 + num_retry):
|
||||||
comm, error = self.packet_handler.writeTxRx(self.port_handler, motor_id, addr, n_bytes, data)
|
comm, error = self.packet_handler.writeTxRx(self.port_handler, motor_id, addr, n_bytes, data)
|
||||||
if self._is_comm_success(comm):
|
if self._is_comm_success(comm):
|
||||||
|
@ -953,7 +949,7 @@ class MotorsBus(abc.ABC):
|
||||||
self.sync_writer.start_address = addr
|
self.sync_writer.start_address = addr
|
||||||
self.sync_writer.data_length = n_bytes
|
self.sync_writer.data_length = n_bytes
|
||||||
for id_, value in ids_values.items():
|
for id_, value in ids_values.items():
|
||||||
data = self._split_int_to_bytes(value, n_bytes)
|
data = self._serialize_data(value, n_bytes)
|
||||||
self.sync_writer.addParam(id_, data)
|
self.sync_writer.addParam(id_, data)
|
||||||
|
|
||||||
def disconnect(self, disable_torque: bool = True) -> None:
|
def disconnect(self, disable_torque: bool = True) -> None:
|
||||||
|
|
Loading…
Reference in New Issue