Add setup_motor

This commit is contained in:
Simon Alibert 2025-04-17 13:14:06 +02:00
parent 57e5e4cc07
commit d07c7347f8
6 changed files with 141 additions and 59 deletions

View File

@ -35,7 +35,7 @@ from .tables import (
) )
PROTOCOL_VERSION = 2.0 PROTOCOL_VERSION = 2.0
BAUDRATE = 1_000_000 DEFAULT_BAUDRATE = 1_000_000
DEFAULT_TIMEOUT_MS = 1000 DEFAULT_TIMEOUT_MS = 1000
NORMALIZED_DATA = ["Goal_Position", "Present_Position"] NORMALIZED_DATA = ["Goal_Position", "Present_Position"]
@ -109,6 +109,7 @@ class DynamixelMotorsBus(MotorsBus):
""" """
available_baudrates = deepcopy(AVAILABLE_BAUDRATES) available_baudrates = deepcopy(AVAILABLE_BAUDRATES)
default_baudrate = DEFAULT_BAUDRATE
default_timeout = DEFAULT_TIMEOUT_MS default_timeout = DEFAULT_TIMEOUT_MS
model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE) model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
@ -139,6 +140,28 @@ class DynamixelMotorsBus(MotorsBus):
def _handshake(self) -> None: def _handshake(self) -> None:
self._assert_motors_exist() self._assert_motors_exist()
def _find_single_motor(self, motor: str, initial_baudrate: int | None) -> tuple[int, int]:
model = self.motors[motor].model
search_baudrates = (
[initial_baudrate] if initial_baudrate is not None else self.model_baudrate_table[model]
)
for baudrate in search_baudrates:
self.set_baudrate(baudrate)
id_model = self.broadcast_ping()
if id_model:
found_id, found_model = next(iter(id_model.items()))
expected_model_nb = self.model_number_table[model]
if found_model != expected_model_nb:
raise RuntimeError(
f"Found one motor on {baudrate=} with id={found_id} but it has a "
f"model number '{found_model}' different than the one expected: '{expected_model_nb}' "
f"Make sure you are connected only connected to the '{motor}' motor (model '{model}')."
)
return baudrate, found_id
raise RuntimeError(f"Motor '{motor}' (model '{model}') was not found. Make sure it is connected.")
def configure_motors(self) -> None: def configure_motors(self) -> None:
# By default, Dynamixel motors have a 500µs delay response time (corresponding to a value of 250 on # By default, Dynamixel motors have a 500µs delay response time (corresponding to a value of 250 on
# the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0). # the 'Return_Delay_Time' address). We ensure this is reduced to the minimum of 2µs (value of 0).

View File

@ -57,13 +57,13 @@ X_SERIES_CONTROL_TABLE = {
# https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#baud-rate8 # https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#baud-rate8
X_SERIES_BAUDRATE_TABLE = { X_SERIES_BAUDRATE_TABLE = {
0: 9_600, 9_600: 0,
1: 57_600, 57_600: 1,
2: 115_200, 115_200: 2,
3: 1_000_000, 1_000_000: 3,
4: 2_000_000, 2_000_000: 4,
5: 3_000_000, 3_000_000: 5,
6: 4_000_000, 4_000_000: 6,
} }
# {data_name: size_byte} # {data_name: size_byte}

View File

@ -34,7 +34,7 @@ from .tables import (
) )
DEFAULT_PROTOCOL_VERSION = 0 DEFAULT_PROTOCOL_VERSION = 0
BAUDRATE = 1_000_000 DEFAULT_BAUDRATE = 1_000_000
DEFAULT_TIMEOUT_MS = 1000 DEFAULT_TIMEOUT_MS = 1000
NORMALIZED_DATA = ["Goal_Position", "Present_Position"] NORMALIZED_DATA = ["Goal_Position", "Present_Position"]
@ -103,6 +103,7 @@ class FeetechMotorsBus(MotorsBus):
""" """
available_baudrates = deepcopy(SCAN_BAUDRATES) available_baudrates = deepcopy(SCAN_BAUDRATES)
default_baudrate = DEFAULT_BAUDRATE
default_timeout = DEFAULT_TIMEOUT_MS default_timeout = DEFAULT_TIMEOUT_MS
model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE) model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
@ -163,6 +164,57 @@ class FeetechMotorsBus(MotorsBus):
self._assert_motors_exist() self._assert_motors_exist()
self._assert_same_firmware() self._assert_same_firmware()
def _find_single_motor(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]:
if self.protocol_version == 0:
return self._find_single_motor_p0(motor, initial_baudrate)
else:
return self._find_single_motor_p1(motor, initial_baudrate)
def _find_single_motor_p0(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]:
model = self.motors[motor].model
search_baudrates = (
[initial_baudrate] if initial_baudrate is not None else self.model_baudrate_table[model]
)
expected_model_nb = self.model_number_table[model]
for baudrate in search_baudrates:
self.set_baudrate(baudrate)
id_model = self.broadcast_ping()
if id_model:
found_id, found_model = next(iter(id_model.items()))
if found_model != expected_model_nb:
raise RuntimeError(
f"Found one motor on {baudrate=} with id={found_id} but it has a "
f"model number '{found_model}' different than the one expected: '{expected_model_nb}' "
f"Make sure you are connected only connected to the '{motor}' motor (model '{model}')."
)
return baudrate, found_id
raise RuntimeError(f"Motor '{motor}' (model '{model}') was not found. Make sure it is connected.")
def _find_single_motor_p1(self, motor: str, initial_baudrate: int | None = None) -> tuple[int, int]:
import scservo_sdk as scs
model = self.motors[motor].model
search_baudrates = (
[initial_baudrate] if initial_baudrate is not None else self.model_baudrate_table[model]
)
expected_model_nb = self.model_number_table[model]
for baudrate in search_baudrates:
self.set_baudrate(baudrate)
for id_ in range(scs.MAX_ID + 1):
found_model = self.ping(id_)
if found_model is not None and found_model != expected_model_nb:
raise RuntimeError(
f"Found one motor on {baudrate=} with id={id_} but it has a "
f"model number '{found_model}' different than the one expected: '{expected_model_nb}' "
f"Make sure you are connected only connected to the '{motor}' motor (model '{model}')."
)
return baudrate, id_
raise RuntimeError(f"Motor '{motor}' (model '{model}') was not found. Make sure it is connected.")
def configure_motors(self) -> None: def configure_motors(self) -> None:
for motor in self.motors: for motor in self.motors:
# By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on # By default, Feetech motors have a 500µs delay response time (corresponding to a value of 250 on
@ -256,29 +308,7 @@ class FeetechMotorsBus(MotorsBus):
def _split_into_byte_chunks(self, value: int, length: int) -> list[int]: def _split_into_byte_chunks(self, value: int, length: int) -> list[int]:
return _split_into_byte_chunks(value, length) return _split_into_byte_chunks(value, length)
def _broadcast_ping_p1( def _broadcast_ping(self) -> tuple[dict[int, int], int]:
self, known_motors_only: bool = True, n_motors: int | None = None, num_retry: int = 0
) -> dict[int, int]:
if known_motors_only:
ids = self.ids
else:
import scservo_sdk as scs
ids = range(scs.MAX_ID + 1)
ids_models = {}
motors_found = 0
for id_ in ids:
model_number = self.ping(id_, num_retry)
if model_number is not None:
ids_models[id_] = model_number
motors_found += 1
if motors_found >= n_motors:
break
return ids_models
def _broadcast_ping_p0(self) -> tuple[dict[int, int], int]:
import scservo_sdk as scs import scservo_sdk as scs
data_list = {} data_list = {}
@ -355,7 +385,7 @@ class FeetechMotorsBus(MotorsBus):
def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None: def broadcast_ping(self, num_retry: int = 0, raise_on_error: bool = False) -> dict[int, int] | None:
self._assert_protocol_is_compatible("broadcast_ping") self._assert_protocol_is_compatible("broadcast_ping")
for n_try in range(1 + num_retry): for n_try in range(1 + num_retry):
ids_status, comm = self._broadcast_ping_p0() ids_status, comm = self._broadcast_ping()
if self._is_comm_success(comm): if self._is_comm_success(comm):
break break
logger.debug(f"Broadcast ping failed on port '{self.port}' ({n_try=})") logger.debug(f"Broadcast ping failed on port '{self.port}' ({n_try=})")

View File

@ -118,25 +118,25 @@ SCS_SERIES_CONTROL_TABLE = {
} }
STS_SMS_SERIES_BAUDRATE_TABLE = { STS_SMS_SERIES_BAUDRATE_TABLE = {
0: 1_000_000, 1_000_000: 0,
1: 500_000, 500_000: 1,
2: 250_000, 250_000: 2,
3: 128_000, 128_000: 3,
4: 115_200, 115_200: 4,
5: 57_600, 57_600: 5,
6: 38_400, 38_400: 6,
7: 19_200, 19_200: 7,
} }
SCS_SERIES_BAUDRATE_TABLE = { SCS_SERIES_BAUDRATE_TABLE = {
0: 1_000_000, 1_000_000: 0,
1: 500_000, 500_000: 1,
2: 250_000, 250_000: 2,
3: 128_000, 128_000: 3,
4: 115_200, 115_200: 4,
5: 57_600, 57_600: 5,
6: 38_400, 38_400: 6,
7: 19_200, 19_200: 7,
} }
MODEL_CONTROL_TABLE = { MODEL_CONTROL_TABLE = {

View File

@ -255,6 +255,7 @@ class MotorsBus(abc.ABC):
""" """
available_baudrates: list[int] available_baudrates: list[int]
default_baudrate: int
default_timeout: int default_timeout: int
model_baudrate_table: dict[str, dict] model_baudrate_table: dict[str, dict]
model_ctrl_table: dict[str, dict] model_ctrl_table: dict[str, dict]
@ -414,6 +415,11 @@ class MotorsBus(abc.ABC):
f"{self.__class__.__name__}('{self.port}') is already connected. Do not call `{self.__class__.__name__}.connect()` twice." f"{self.__class__.__name__}('{self.port}') is already connected. Do not call `{self.__class__.__name__}.connect()` twice."
) )
self._connect(handshake)
self.set_timeout()
logger.debug(f"{self.__class__.__name__} connected.")
def _connect(self, handshake: bool = True) -> None:
try: try:
if not self.port_handler.openPort(): if not self.port_handler.openPort():
raise OSError(f"Failed to open port '{self.port}'.") raise OSError(f"Failed to open port '{self.port}'.")
@ -425,9 +431,6 @@ class MotorsBus(abc.ABC):
"\nTry running `python lerobot/scripts/find_motors_bus_port.py`\n" "\nTry running `python lerobot/scripts/find_motors_bus_port.py`\n"
) from e ) from e
self.set_timeout()
logger.debug(f"{self.__class__.__name__} connected.")
@abc.abstractmethod @abc.abstractmethod
def _handshake(self) -> None: def _handshake(self) -> None:
pass pass
@ -435,13 +438,7 @@ class MotorsBus(abc.ABC):
@classmethod @classmethod
def scan_port(cls, port: str, *args, **kwargs) -> dict[int, list[int]]: def scan_port(cls, port: str, *args, **kwargs) -> dict[int, list[int]]:
bus = cls(port, {}, *args, **kwargs) bus = cls(port, {}, *args, **kwargs)
try: bus._connect(handshake=False)
bus.port_handler.openPort()
except (FileNotFoundError, OSError, serial.SerialException) as e:
raise ConnectionError(
f"Could not connect to port '{port}'. Make sure you are using the correct port."
"\nTry running `python lerobot/scripts/find_motors_bus_port.py`\n"
) from e
baudrate_ids = {} baudrate_ids = {}
for baudrate in tqdm(bus.available_baudrates, desc="Scanning port"): for baudrate in tqdm(bus.available_baudrates, desc="Scanning port"):
bus.set_baudrate(baudrate) bus.set_baudrate(baudrate)
@ -452,6 +449,37 @@ class MotorsBus(abc.ABC):
return baudrate_ids return baudrate_ids
def setup_motor(
self, motor: str, initial_baudrate: int | None = None, initial_id: int | None = None
) -> None:
if not self.is_connected:
self._connect(handshake=False)
if initial_baudrate is None:
initial_baudrate, initial_id = self._find_single_motor(motor)
if initial_id is None:
_, initial_id = self._find_single_motor(motor, initial_baudrate)
model = self.motors[motor].model
target_id = self.motors[motor].id
self.set_baudrate(initial_baudrate)
# Set ID
addr, length = get_address(self.model_ctrl_table, "ID", model)
self._write(addr, length, initial_id, target_id)
# Set Baudrate
addr, length = get_address(self.model_ctrl_table, "Baud_Rate", model)
baudrate_value = self.model_baudrate_table[model][self.default_baudrate]
self._write(addr, length, target_id, baudrate_value)
self.set_baudrate(self.default_baudrate)
@abc.abstractmethod
def _find_single_motor(self, motor: str, initial_baudrate: int | None) -> tuple[int, int]:
pass
@abc.abstractmethod @abc.abstractmethod
def configure_motors(self) -> None: def configure_motors(self) -> None:
pass pass

View File

@ -132,6 +132,7 @@ class MockMotorsBus(MotorsBus):
def _assert_protocol_is_compatible(self, instruction_name): ... def _assert_protocol_is_compatible(self, instruction_name): ...
def _handshake(self): ... def _handshake(self): ...
def _find_single_motor(self, motor, initial_baudrate): ...
def configure_motors(self): ... def configure_motors(self): ...
def read_calibration(self): ... def read_calibration(self): ...
def write_calibration(self, calibration_dict): ... def write_calibration(self, calibration_dict): ...