Rename idx -> id_

This commit is contained in:
Simon Alibert 2025-03-24 20:58:56 +01:00
parent c6212d585d
commit 0c1d4cb323
2 changed files with 27 additions and 27 deletions

View File

@ -163,15 +163,15 @@ class FeetechMotorsBus(MotorsBus):
return data_list, scs.COMM_RX_CORRUPT return data_list, scs.COMM_RX_CORRUPT
# find packet header # find packet header
for idx in range(0, (rx_length - 1)): for id_ in range(0, (rx_length - 1)):
if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF): if (rxpacket[id_] == 0xFF) and (rxpacket[id_ + 1] == 0xFF):
break break
if idx == 0: # found at the beginning of the packet if id_ == 0: # found at the beginning of the packet
# calculate checksum # calculate checksum
checksum = 0 checksum = 0
for idx in range(2, status_length - 1): # except header & checksum for id_ in range(2, status_length - 1): # except header & checksum
checksum += rxpacket[idx] checksum += rxpacket[id_]
checksum = scs.SCS_LOBYTE(~checksum) checksum = scs.SCS_LOBYTE(~checksum)
if rxpacket[status_length - 1] == checksum: if rxpacket[status_length - 1] == checksum:
@ -190,8 +190,8 @@ class FeetechMotorsBus(MotorsBus):
rx_length = rx_length - 2 rx_length = rx_length - 2
else: else:
# remove unnecessary packets # remove unnecessary packets
del rxpacket[0:idx] del rxpacket[0:id_]
rx_length = rx_length - idx rx_length = rx_length - id_
def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, str] | None: def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, str] | None:
for n_try in range(1 + num_retry): for n_try in range(1 + num_retry):

View File

@ -393,7 +393,7 @@ class MotorsBus(abc.ABC):
with open(calibration_fpath) as f: with open(calibration_fpath) as f:
calibration = json.load(f) calibration = json.load(f)
self.calibration = {int(idx): val for idx, val in calibration.items()} self.calibration = {int(id_): val for id_, val in calibration.items()}
@abc.abstractmethod @abc.abstractmethod
def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]: def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
@ -442,12 +442,12 @@ class MotorsBus(abc.ABC):
pass pass
def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> str | None: def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> str | None:
idx = self._get_motor_id(motor) id_ = self._get_motor_id(motor)
for n_try in range(1 + num_retry): for n_try in range(1 + num_retry):
model_number, comm, error = self.packet_handler.ping(self.port_handler, idx) model_number, comm, error = self.packet_handler.ping(self.port_handler, id_)
if self._is_comm_success(comm): if self._is_comm_success(comm):
break break
logger.debug(f"ping failed for {idx=}: {n_try=} got {comm=} {error=}") logger.debug(f"ping failed for {id_=}: {n_try=} got {comm=} {error=}")
if not self._is_comm_success(comm): if not self._is_comm_success(comm):
if raise_on_error: if raise_on_error:
@ -512,13 +512,13 @@ class MotorsBus(abc.ABC):
if not raw_values and data_name in self.calibration_required and self.calibration is not None: if not raw_values and data_name in self.calibration_required and self.calibration is not None:
ids_values = self._calibrate_values(ids_values) ids_values = self._calibrate_values(ids_values)
return {id_key_map[idx]: val for idx, val in ids_values.items()} return {id_key_map[id_]: val for id_, val in ids_values.items()}
def _sync_read( def _sync_read(
self, data_name: str, motor_ids: list[str], num_retry: int = 0 self, data_name: str, motor_ids: list[str], num_retry: int = 0
) -> tuple[int, dict[int, int]]: ) -> tuple[int, dict[int, int]]:
if self._has_different_ctrl_tables: if self._has_different_ctrl_tables:
models = [self._id_to_model(idx) for idx in motor_ids] models = [self._id_to_model(id_) for id_ in motor_ids]
assert_same_address(self.model_ctrl_table, models, data_name) assert_same_address(self.model_ctrl_table, models, data_name)
model = self._id_to_model(next(iter(motor_ids))) model = self._id_to_model(next(iter(motor_ids)))
@ -537,15 +537,15 @@ class MotorsBus(abc.ABC):
logger.debug(f"Failed to sync read '{data_name}' ({addr=} {n_bytes=}) on {motor_ids=} ({n_try=})") logger.debug(f"Failed to sync read '{data_name}' ({addr=} {n_bytes=}) on {motor_ids=} ({n_try=})")
logger.debug(self.packet_handler.getRxPacketError(comm)) logger.debug(self.packet_handler.getRxPacketError(comm))
values = {idx: self.sync_reader.getData(idx, addr, n_bytes) for idx in motor_ids} values = {id_: self.sync_reader.getData(id_, addr, n_bytes) for id_ in motor_ids}
return comm, values return comm, values
def _setup_sync_reader(self, motor_ids: list[str], addr: int, n_bytes: int) -> None: def _setup_sync_reader(self, motor_ids: list[str], addr: int, n_bytes: int) -> None:
self.sync_reader.clearParam() self.sync_reader.clearParam()
self.sync_reader.start_address = addr self.sync_reader.start_address = addr
self.sync_reader.data_length = n_bytes self.sync_reader.data_length = n_bytes
for idx in motor_ids: for id_ in motor_ids:
self.sync_reader.addParam(idx) self.sync_reader.addParam(id_)
# TODO(aliberts, pkooij): Implementing something like this could get even much faster read times if need be. # TODO(aliberts, pkooij): Implementing something like this could get even much faster read times if need be.
# Would have to handle the logic of checking if a packet has been sent previously though but doable. # Would have to handle the logic of checking if a packet has been sent previously though but doable.
@ -554,8 +554,8 @@ class MotorsBus(abc.ABC):
# def _async_read(self, motor_ids: list[str], address: int, n_bytes: int): # def _async_read(self, motor_ids: list[str], address: int, n_bytes: int):
# self.reader.rxPacket() # self.reader.rxPacket()
# self.reader.txPacket() # self.reader.txPacket()
# for idx in motor_ids: # for id_ in motor_ids:
# value = self.reader.getData(idx, address, n_bytes) # value = self.reader.getData(id_, address, n_bytes)
def sync_write( def sync_write(
self, self,
@ -588,7 +588,7 @@ class MotorsBus(abc.ABC):
def _sync_write(self, data_name: str, ids_values: dict[int, int], num_retry: int = 0) -> int: def _sync_write(self, data_name: str, ids_values: dict[int, int], num_retry: int = 0) -> int:
if self._has_different_ctrl_tables: if self._has_different_ctrl_tables:
models = [self._id_to_model(idx) for idx in ids_values] models = [self._id_to_model(id_) for id_ in ids_values]
assert_same_address(self.model_ctrl_table, models, data_name) assert_same_address(self.model_ctrl_table, models, data_name)
model = self._id_to_model(next(iter(ids_values))) model = self._id_to_model(next(iter(ids_values)))
@ -610,9 +610,9 @@ class MotorsBus(abc.ABC):
self.sync_writer.clearParam() self.sync_writer.clearParam()
self.sync_writer.start_address = addr self.sync_writer.start_address = addr
self.sync_writer.data_length = n_bytes self.sync_writer.data_length = n_bytes
for idx, value in ids_values.items(): for id_, value in ids_values.items():
data = self._split_int_to_bytes(value, n_bytes) data = self._split_int_to_bytes(value, n_bytes)
self.sync_writer.addParam(idx, data) self.sync_writer.addParam(id_, data)
def write( def write(
self, data_name: str, motor: NameOrID, value: Value, raw_value: bool = False, num_retry: int = 0 self, data_name: str, motor: NameOrID, value: Value, raw_value: bool = False, num_retry: int = 0
@ -622,21 +622,21 @@ class MotorsBus(abc.ABC):
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`." f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
) )
idx = self._get_motor_id(motor) id_ = self._get_motor_id(motor)
if not raw_value and data_name in self.calibration_required and self.calibration is not None: if not raw_value and data_name in self.calibration_required and self.calibration is not None:
id_value = self._uncalibrate_values({idx: value}) id_value = self._uncalibrate_values({id_: value})
value = id_value[idx] value = id_value[id_]
comm, error = self._write(data_name, idx, value, num_retry) comm, error = self._write(data_name, id_, value, num_retry)
if not self._is_comm_success(comm): if not self._is_comm_success(comm):
raise ConnectionError( raise ConnectionError(
f"Failed to write '{data_name}' on {idx=} with '{value}' after {num_retry + 1} tries." f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries."
f"\n{self.packet_handler.getTxRxResult(comm)}" f"\n{self.packet_handler.getTxRxResult(comm)}"
) )
elif self._is_error(error): elif self._is_error(error):
raise RuntimeError( raise RuntimeError(
f"Failed to write '{data_name}' on {idx=} with '{value}' after {num_retry + 1} tries." f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries."
f"\n{self.packet_handler.getRxPacketError(error)}" f"\n{self.packet_handler.getRxPacketError(error)}"
) )