Implement read

This commit is contained in:
Simon Alibert 2025-04-10 00:35:14 +02:00
parent 034171a89a
commit 42a87e7211
3 changed files with 73 additions and 0 deletions

View File

@ -116,6 +116,9 @@ class DynamixelMotorsBus(MotorsBus):
self._comm_success = dxl.COMM_SUCCESS
self._no_error = 0x00
def _assert_protocol_is_compatible(self, instruction_name: str) -> None:
pass
def configure_motors(self) -> None:
# By default, Dynamixel motors have a 500µs delay response time (corresponding to a value of 250 on
# the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0).

View File

@ -102,6 +102,7 @@ class FeetechMotorsBus(MotorsBus):
super().__init__(port, motors, calibration)
import scservo_sdk as scs
self.protocol_version = protocol_version
self.port_handler = scs.PortHandler(self.port)
# HACK: monkeypatch
self.port_handler.setPacketTimeout = patch_setPacketTimeout.__get__(
@ -113,6 +114,12 @@ class FeetechMotorsBus(MotorsBus):
self._comm_success = scs.COMM_SUCCESS
self._no_error = 0x00
def _assert_protocol_is_compatible(self, instruction_name: str) -> None:
if instruction_name == "sync_read" and self.protocol_version == 1:
raise NotImplementedError(
"'Sync Read' is not available with Feetech motors using Protocol 1. Use 'Read' instead."
)
def configure_motors(self) -> None:
# By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on the
# 'Return_Delay' address). We ensure this is reduced to the minimum of 2µs (value of 0).

View File

@ -393,6 +393,10 @@ class MotorsBus(abc.ABC):
"was found instead for that id."
)
@abc.abstractmethod
def _assert_protocol_is_compatible(self, instruction_name: str) -> None:
pass
@property
def is_connected(self) -> bool:
return self.port_handler.is_open
@ -723,6 +727,63 @@ class MotorsBus(abc.ABC):
) -> dict[int, list[int, str]] | None:
pass
def read(
self,
data_name: str,
motor: str,
*,
normalize: bool = True,
num_retry: int = 0,
) -> Value:
if not self.is_connected:
raise DeviceNotConnectedError(
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
)
id_ = self.motors[motor].id
model = self.motors[motor].model
addr, n_bytes = get_address(self.model_ctrl_table, model, data_name)
value, comm, error = self._read(addr, n_bytes, id_, num_retry=num_retry)
if not self._is_comm_success(comm):
raise ConnectionError(
f"Failed to read '{data_name}' on {id_=} after {num_retry + 1} tries."
f"{self.packet_handler.getTxRxResult(comm)}"
)
elif self._is_error(error):
raise RuntimeError(
f"Failed to read '{data_name}' on {id_=} after {num_retry + 1} tries."
f"\n{self.packet_handler.getRxPacketError(error)}"
)
id_value = self._decode_sign(data_name, {id_: value})
if normalize and data_name in self.normalized_data:
id_value = self._normalize(data_name, id_value)
return id_value[id_]
def _read(self, addr: int, n_bytes: int, motor_id: int, num_retry: int = 0) -> tuple[int, int]:
if n_bytes == 1:
read_fn = self.packet_handler.read1ByteTxRx
elif n_bytes == 2:
read_fn = self.packet_handler.read2ByteTxRx
elif n_bytes == 4:
read_fn = self.packet_handler.read4ByteTxRx
else:
raise ValueError(n_bytes)
for n_try in range(1 + num_retry):
value, comm, error = read_fn(self.port_handler, motor_id, addr)
if self._is_comm_success(comm):
break
logger.debug(
f"Failed to read @{addr=} ({n_bytes=}) on {motor_id=} ({n_try=}): "
+ self.packet_handler.getTxRxResult(comm)
)
return value, comm, error
def sync_read(
self,
data_name: str,
@ -736,6 +797,8 @@ class MotorsBus(abc.ABC):
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
)
self._assert_protocol_is_compatible("sync_read")
names = self._get_names_list(motors)
ids = [self.motors[name].id for name in names]
models = [self.motors[name].model for name in names]