diff --git a/lerobot/common/motors/feetech/feetech.py b/lerobot/common/motors/feetech/feetech.py index 849136a9..45fcff89 100644 --- a/lerobot/common/motors/feetech/feetech.py +++ b/lerobot/common/motors/feetech/feetech.py @@ -163,15 +163,15 @@ class FeetechMotorsBus(MotorsBus): return data_list, scs.COMM_RX_CORRUPT # find packet header - for idx in range(0, (rx_length - 1)): - if (rxpacket[idx] == 0xFF) and (rxpacket[idx + 1] == 0xFF): + for id_ in range(0, (rx_length - 1)): + if (rxpacket[id_] == 0xFF) and (rxpacket[id_ + 1] == 0xFF): break - if idx == 0: # found at the beginning of the packet + if id_ == 0: # found at the beginning of the packet # calculate checksum checksum = 0 - for idx in range(2, status_length - 1): # except header & checksum - checksum += rxpacket[idx] + for id_ in range(2, status_length - 1): # except header & checksum + checksum += rxpacket[id_] checksum = scs.SCS_LOBYTE(~checksum) if rxpacket[status_length - 1] == checksum: @@ -190,8 +190,8 @@ class FeetechMotorsBus(MotorsBus): rx_length = rx_length - 2 else: # remove unnecessary packets - del rxpacket[0:idx] - rx_length = rx_length - idx + del rxpacket[0:id_] + rx_length = rx_length - id_ def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, str] | None: for n_try in range(1 + num_retry): diff --git a/lerobot/common/motors/motors_bus.py b/lerobot/common/motors/motors_bus.py index 19af9a5d..7946ac44 100644 --- a/lerobot/common/motors/motors_bus.py +++ b/lerobot/common/motors/motors_bus.py @@ -393,7 +393,7 @@ class MotorsBus(abc.ABC): with open(calibration_fpath) as f: calibration = json.load(f) - self.calibration = {int(idx): val for idx, val in calibration.items()} + self.calibration = {int(id_): val for id_, val in calibration.items()} @abc.abstractmethod def _calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]: @@ -442,12 +442,12 @@ class MotorsBus(abc.ABC): pass def ping(self, motor: NameOrID, num_retry: int = 0, raise_on_error: bool = False) -> str | None: - idx = self._get_motor_id(motor) + id_ = self._get_motor_id(motor) for n_try in range(1 + num_retry): - model_number, comm, error = self.packet_handler.ping(self.port_handler, idx) + model_number, comm, error = self.packet_handler.ping(self.port_handler, id_) if self._is_comm_success(comm): break - logger.debug(f"ping failed for {idx=}: {n_try=} got {comm=} {error=}") + logger.debug(f"ping failed for {id_=}: {n_try=} got {comm=} {error=}") if not self._is_comm_success(comm): if raise_on_error: @@ -512,13 +512,13 @@ class MotorsBus(abc.ABC): if not raw_values and data_name in self.calibration_required and self.calibration is not None: ids_values = self._calibrate_values(ids_values) - return {id_key_map[idx]: val for idx, val in ids_values.items()} + return {id_key_map[id_]: val for id_, val in ids_values.items()} def _sync_read( self, data_name: str, motor_ids: list[str], num_retry: int = 0 ) -> tuple[int, dict[int, int]]: if self._has_different_ctrl_tables: - models = [self._id_to_model(idx) for idx in motor_ids] + models = [self._id_to_model(id_) for id_ in motor_ids] assert_same_address(self.model_ctrl_table, models, data_name) model = self._id_to_model(next(iter(motor_ids))) @@ -537,15 +537,15 @@ class MotorsBus(abc.ABC): logger.debug(f"Failed to sync read '{data_name}' ({addr=} {n_bytes=}) on {motor_ids=} ({n_try=})") logger.debug(self.packet_handler.getRxPacketError(comm)) - values = {idx: self.sync_reader.getData(idx, addr, n_bytes) for idx in motor_ids} + values = {id_: self.sync_reader.getData(id_, addr, n_bytes) for id_ in motor_ids} return comm, values def _setup_sync_reader(self, motor_ids: list[str], addr: int, n_bytes: int) -> None: self.sync_reader.clearParam() self.sync_reader.start_address = addr self.sync_reader.data_length = n_bytes - for idx in motor_ids: - self.sync_reader.addParam(idx) + for id_ in motor_ids: + self.sync_reader.addParam(id_) # TODO(aliberts, pkooij): Implementing something like this could get even much faster read times if need be. # Would have to handle the logic of checking if a packet has been sent previously though but doable. @@ -554,8 +554,8 @@ class MotorsBus(abc.ABC): # def _async_read(self, motor_ids: list[str], address: int, n_bytes: int): # self.reader.rxPacket() # self.reader.txPacket() - # for idx in motor_ids: - # value = self.reader.getData(idx, address, n_bytes) + # for id_ in motor_ids: + # value = self.reader.getData(id_, address, n_bytes) def sync_write( self, @@ -588,7 +588,7 @@ class MotorsBus(abc.ABC): def _sync_write(self, data_name: str, ids_values: dict[int, int], num_retry: int = 0) -> int: if self._has_different_ctrl_tables: - models = [self._id_to_model(idx) for idx in ids_values] + models = [self._id_to_model(id_) for id_ in ids_values] assert_same_address(self.model_ctrl_table, models, data_name) model = self._id_to_model(next(iter(ids_values))) @@ -610,9 +610,9 @@ class MotorsBus(abc.ABC): self.sync_writer.clearParam() self.sync_writer.start_address = addr self.sync_writer.data_length = n_bytes - for idx, value in ids_values.items(): + for id_, value in ids_values.items(): data = self._split_int_to_bytes(value, n_bytes) - self.sync_writer.addParam(idx, data) + self.sync_writer.addParam(id_, data) def write( self, data_name: str, motor: NameOrID, value: Value, raw_value: bool = False, num_retry: int = 0 @@ -622,21 +622,21 @@ class MotorsBus(abc.ABC): f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`." ) - idx = self._get_motor_id(motor) + id_ = self._get_motor_id(motor) if not raw_value and data_name in self.calibration_required and self.calibration is not None: - id_value = self._uncalibrate_values({idx: value}) - value = id_value[idx] + id_value = self._uncalibrate_values({id_: value}) + value = id_value[id_] - comm, error = self._write(data_name, idx, value, num_retry) + comm, error = self._write(data_name, id_, value, num_retry) if not self._is_comm_success(comm): raise ConnectionError( - f"Failed to write '{data_name}' on {idx=} with '{value}' after {num_retry + 1} tries." + f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries." f"\n{self.packet_handler.getTxRxResult(comm)}" ) elif self._is_error(error): raise RuntimeError( - f"Failed to write '{data_name}' on {idx=} with '{value}' after {num_retry + 1} tries." + f"Failed to write '{data_name}' on {id_=} with '{value}' after {num_retry + 1} tries." f"\n{self.packet_handler.getRxPacketError(error)}" )