Add configuration of motors

This commit is contained in:
Remi Cadene 2024-07-26 16:46:43 +02:00
parent 875c1fbb2a
commit 5a16163790
6 changed files with 377 additions and 84 deletions

View File

@ -5,6 +5,7 @@ from copy import deepcopy
from pathlib import Path from pathlib import Path
import numpy as np import numpy as np
import tqdm
from dynamixel_sdk import ( from dynamixel_sdk import (
COMM_SUCCESS, COMM_SUCCESS,
DXL_HIBYTE, DXL_HIBYTE,
@ -21,9 +22,11 @@ from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError,
from lerobot.common.utils.utils import capture_timestamp_utc from lerobot.common.utils.utils import capture_timestamp_utc
PROTOCOL_VERSION = 2.0 PROTOCOL_VERSION = 2.0
BAUD_RATE = 1_000_000 BAUDRATE = 1_000_000
TIMEOUT_MS = 1000 TIMEOUT_MS = 1000
MAX_ID_RANGE = 252
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m077 # https://emanual.robotis.com/docs/en/dxl/x/xl330-m077
# https://emanual.robotis.com/docs/en/dxl/x/xl330-m288 # https://emanual.robotis.com/docs/en/dxl/x/xl330-m288
# https://emanual.robotis.com/docs/en/dxl/x/xl430-w250 # https://emanual.robotis.com/docs/en/dxl/x/xl430-w250
@ -86,6 +89,16 @@ X_SERIES_CONTROL_TABLE = {
"Present_Temperature": (146, 1), "Present_Temperature": (146, 1),
} }
X_SERIES_BAUDRATE_TABLE = {
0: 9_600,
1: 57_600,
2: 115_200,
3: 1_000_000,
4: 2_000_000,
5: 3_000_000,
6: 4_000_000,
}
CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"] CALIBRATION_REQUIRED = ["Goal_Position", "Present_Position"]
CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"] CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
@ -107,7 +120,64 @@ MODEL_RESOLUTION = {
"xm540-w270": 4096, "xm540-w270": 4096,
} }
MODEL_BAUDRATE_TABLE = {
"x_series": X_SERIES_BAUDRATE_TABLE,
"xl330-m077": X_SERIES_BAUDRATE_TABLE,
"xl330-m288": X_SERIES_BAUDRATE_TABLE,
"xl430-w250": X_SERIES_BAUDRATE_TABLE,
"xm430-w350": X_SERIES_BAUDRATE_TABLE,
"xm540-w270": X_SERIES_BAUDRATE_TABLE,
}
NUM_READ_RETRY = 10 NUM_READ_RETRY = 10
NUM_WRITE_RETRY = 10
def convert_indices_to_baudrates(values: np.ndarray | list[int], models: list[str]):
assert len(values) == len(models)
for i in range(len(values)):
model = models[i]
index = values[i]
values[i] = MODEL_BAUDRATE_TABLE[model][index]
return values
def convert_baudrates_to_indices(values: np.ndarray | list[int], models: list[str]):
assert len(values) == len(models)
for i in range(len(values)):
model = models[i]
brate = values[i]
table_values = list(MODEL_BAUDRATE_TABLE[model].values())
table_keys = list(MODEL_BAUDRATE_TABLE[model].keys())
values[i] = table_keys[table_values.index(brate)]
return values
def convert_to_bytes(value, bytes):
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if bytes == 1:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
]
elif bytes == 2:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
DXL_HIBYTE(DXL_LOWORD(value)),
]
elif bytes == 4:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
DXL_HIBYTE(DXL_LOWORD(value)),
DXL_LOBYTE(DXL_HIWORD(value)),
DXL_HIBYTE(DXL_HIWORD(value)),
]
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{bytes} is provided instead."
)
return data
def get_group_sync_key(data_name, motor_names): def get_group_sync_key(data_name, motor_names):
@ -282,15 +352,184 @@ class DynamixelMotorsBus:
) )
raise raise
self.port_handler.setBaudRate(BAUD_RATE) # Allow to read and write
self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS)
self.is_connected = True self.is_connected = True
self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS)
# Set expected baud rate for the bus
self.set_bus_baudrate(BAUDRATE)
if not self.are_motors_configured():
print(
r"/!\ First, verify that all the cables are connected the proper way. If you detect an issue, before making any modification, unplug the power cord to not damage the motors. Rewire correctly. Then plug the power again and relaunch the script."
)
print(
r"/!\ Secondly, if the cables connection look correct and it is the first time that you use these motors, follow these manual steps to configure them."
)
input("Press Enter to configure your motors...")
print()
self.configure_motors()
def reconnect(self):
self.port_handler = PortHandler(self.port)
self.packet_handler = PacketHandler(PROTOCOL_VERSION)
if not self.port_handler.openPort():
raise OSError(f"Failed to open port '{self.port}'.")
self.is_connected = True
def are_motors_configured(self):
try:
return (self.motor_indices == self.read("ID")).all()
except ConnectionError as e:
print(e)
return False
def configure_motors(self):
# TODO(rcadene): This script assumes motors follow the X_SERIES baudrates
print("Scanning all baudrates and motor indices")
all_baudrates = set(X_SERIES_BAUDRATE_TABLE.values())
ids_per_baudrate = {}
for baudrate in all_baudrates:
self.set_bus_baudrate(baudrate)
present_ids = self.find_motor_indices()
if len(present_ids) > 0:
ids_per_baudrate[baudrate] = present_ids
print(f"Motor indices detected: {ids_per_baudrate}")
print()
possible_baudrates = list(ids_per_baudrate.keys())
possible_ids = list({idx for sublist in ids_per_baudrate.values() for idx in sublist})
untaken_ids = list(set(range(MAX_ID_RANGE)) - set(possible_ids) - set(self.motor_indices))
# Connect successively one motor to the chain and write a unique random index for each
for i in range(len(self.motors)):
self.disconnect()
print("1. Unplug the power cord")
print(
f"2. Plug/unplug minimal number of cables to only have the first {i+1} motor(s) ({self.motor_names[:i+1]}) connected."
)
print("3. Re-plug the power cord.")
input("Press Enter to continue...")
print()
self.reconnect()
if i > 0:
try:
self._read_with_motor_ids(self.motor_models, untaken_ids[:i], "ID")
except ConnectionError:
print(f"Failed to read from {untaken_ids[:i+1]}. Make sure the power cord is plugged in.")
input("Press Enter to continue...")
print()
self.reconnect()
print("Scanning possible baudrates and motor indices")
motor_found = False
for baudrate in possible_baudrates:
self.set_bus_baudrate(baudrate)
present_ids = self.find_motor_indices(possible_ids)
if len(present_ids) == 1:
present_idx = present_ids[0]
print(f"Detected motor with index {present_idx}")
if baudrate != BAUDRATE:
print(f"Setting its baudrate to {BAUDRATE}")
baudrate_idx = list(X_SERIES_BAUDRATE_TABLE.values()).index(BAUDRATE)
# The write can fail, so we allow retries
for _ in range(NUM_WRITE_RETRY):
self._write_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate", baudrate_idx
)
time.sleep(0.5)
self.set_bus_baudrate(BAUDRATE)
try:
present_baudrate_idx = self._read_with_motor_ids(
self.motor_models, present_idx, "Baud_Rate"
)
except ConnectionError:
print("Failed to write baudrate. Retrying.")
self.set_bus_baudrate(baudrate)
continue
break
else:
raise
if present_baudrate_idx != baudrate_idx:
raise OSError("Failed to write baudrate.")
print(f"Setting its index to a temporary untaken index ({untaken_ids[i]})")
self._write_with_motor_ids(self.motor_models, present_idx, "ID", untaken_ids[i])
present_idx = self._read_with_motor_ids(self.motor_models, untaken_ids[i], "ID")
if present_idx != untaken_ids[i]:
raise OSError("Failed to write index.")
motor_found = True
break
elif len(present_ids) > 1:
raise OSError(f"More than one motor detected ({present_ids}), but only one was expected.")
if not motor_found:
raise OSError(
"No motor found, but one new motor expected. Verify power cord is plugged in and retry."
)
print()
print(f"Setting expected motor indices: {self.motor_indices}")
self.set_bus_baudrate(BAUDRATE)
self._write_with_motor_ids(
self.motor_models, untaken_ids[: len(self.motors)], "ID", self.motor_indices
)
print()
if (self.read("ID") != self.motor_indices).any():
raise OSError("Failed to write motors indices.")
print("Configuration is done!")
def find_motor_indices(self, possible_ids=None):
if possible_ids is None:
possible_ids = range(MAX_ID_RANGE)
indices = []
for idx in tqdm.tqdm(possible_ids):
try:
present_idx = self._read_with_motor_ids(self.motor_models, [idx], "ID")[0]
except ConnectionError:
continue
if idx != present_idx:
# sanity check
raise OSError(
"Motor index used to communicate through the bus is not the same as the one present in the motor memory. The motor memory might be damaged."
)
indices.append(idx)
return indices
def set_bus_baudrate(self, baudrate):
present_bus_baudrate = self.port_handler.getBaudRate()
if present_bus_baudrate != baudrate:
print(f"Setting bus baud rate to {baudrate}. Previously {present_bus_baudrate}.")
self.port_handler.setBaudRate(baudrate)
if self.port_handler.getBaudRate() != baudrate:
raise OSError("Failed to write bus baud rate.")
@property @property
def motor_names(self) -> list[int]: def motor_names(self) -> list[str]:
return list(self.motors.keys()) return list(self.motors.keys())
@property
def motor_models(self) -> list[str]:
return [model for _, model in self.motors.values()]
@property
def motor_indices(self) -> list[int]:
return [idx for idx, _ in self.motors.values()]
def set_calibration(self, calibration: dict[str, tuple[int, bool]]): def set_calibration(self, calibration: dict[str, tuple[int, bool]]):
self.calibration = calibration self.calibration = calibration
@ -329,27 +568,12 @@ class DynamixelMotorsBus:
resolution = self.model_resolution[model] resolution = self.model_resolution[model]
values[i] = values[i] / (resolution // 2) * 180 values[i] = values[i] / (resolution // 2) * 180
if (values < -180).any() or (values >= 180).any():
raise ValueError(
f"At least one of the motor has a joint value outside of its centered degree range of [-180, 180[."
'This "jump of range" can be caused by a hardware issue, or you might have unexpectedly completed a full rotation of the motor '
"during manipulation or transportation of your robot. Try to recalibrate all motors by setting a different "
"`calibration_path` during the instatiation of your robot. "
f"The values and motors: {values} {motor_names}"
)
return values return values
def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
if motor_names is None: if motor_names is None:
motor_names = self.motor_names motor_names = self.motor_names
if (values < -180).any() or (values >= 180).any():
raise ValueError(
f"At least one of the motor has a joint value outside of its centered degree range of [-180, 180[. "
f"The values and motors: {values} {motor_names}"
)
# Convert from the universal float32 centered degree range [-180, 180[ to centered resolution range [-resolution, resolution[ # Convert from the universal float32 centered degree range [-180, 180[ to centered resolution range [-resolution, resolution[
for i, name in enumerate(motor_names): for i, name in enumerate(motor_names):
_, model = self.motors[name] _, model = self.motors[name]
@ -372,6 +596,35 @@ class DynamixelMotorsBus:
return values return values
def _read_with_motor_ids(self, motor_models, motor_ids, data_name):
return_list = True
if not isinstance(motor_ids, list):
return_list = False
motor_ids = [motor_ids]
assert_same_address(self.model_ctrl_table, self.motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
for idx in motor_ids:
group.addParam(idx)
comm = group.txRxPacket()
if comm != COMM_SUCCESS:
raise ConnectionError(
f"Read failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
values = []
for idx in motor_ids:
value = group.getData(idx, addr, bytes)
values.append(value)
if return_list:
return values
else:
return values[0]
def read(self, data_name, motor_names: str | list[str] | None = None): def read(self, data_name, motor_names: str | list[str] | None = None):
if not self.is_connected: if not self.is_connected:
raise RobotDeviceNotConnectedError( raise RobotDeviceNotConnectedError(
@ -438,6 +691,26 @@ class DynamixelMotorsBus:
return values return values
def _write_with_motor_ids(self, motor_models, motor_ids, data_name, values):
if not isinstance(motor_ids, list):
motor_ids = [motor_ids]
if not isinstance(values, list):
values = [values]
assert_same_address(self.model_ctrl_table, motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes)
group.addParam(idx, data)
comm = group.txPacket()
if comm != COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None): def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None):
if not self.is_connected: if not self.is_connected:
raise RobotDeviceNotConnectedError( raise RobotDeviceNotConnectedError(
@ -480,30 +753,7 @@ class DynamixelMotorsBus:
) )
for idx, value in zip(motor_ids, values, strict=True): for idx, value in zip(motor_ids, values, strict=True):
# Note: No need to convert back into unsigned int, since this byte preprocessing data = convert_to_bytes(value, bytes)
# already handles it for us.
if bytes == 1:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
]
elif bytes == 2:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
DXL_HIBYTE(DXL_LOWORD(value)),
]
elif bytes == 4:
data = [
DXL_LOBYTE(DXL_LOWORD(value)),
DXL_HIBYTE(DXL_LOWORD(value)),
DXL_LOBYTE(DXL_HIWORD(value)),
DXL_HIBYTE(DXL_HIWORD(value)),
]
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{bytes} is provided instead."
)
if init_group: if init_group:
self.group_writers[group_key].addParam(idx, data) self.group_writers[group_key].addParam(idx, data)
else: else:

View File

@ -333,11 +333,13 @@ class KochRobot:
# Connect the arms # Connect the arms
for name in self.follower_arms: for name in self.follower_arms:
print(f"Connecting {name} follower arm.")
self.follower_arms[name].connect() self.follower_arms[name].connect()
print(f"Connecting {name} leader arm.")
self.leader_arms[name].connect() self.leader_arms[name].connect()
# Reset the arms and load or run calibration # Reset the arms and load or run calibration
if self.calibration_path.exists(): if not self.calibration_path.exists():
# Reset all arms before setting calibration # Reset all arms before setting calibration
for name in self.follower_arms: for name in self.follower_arms:
reset_arm(self.follower_arms[name]) reset_arm(self.follower_arms[name])
@ -360,6 +362,18 @@ class KochRobot:
for name in self.leader_arms: for name in self.leader_arms:
self.leader_arms[name].set_calibration(calibration[f"leader_{name}"]) self.leader_arms[name].set_calibration(calibration[f"leader_{name}"])
for name in self.leader_arms:
values = self.leader_arms[name].read("Present_Position")
if (values < -180).any() or (values >= 180).any():
raise ValueError(
f"At least one of the motor of the {name} leader arm has a joint value outside of its centered degree range of ]-180, 180[."
'This "jump of range" can be caused by a hardware issue, or you might have unexpectedly completed a full rotation of the motor '
"during manipulation or transportation of your robot. "
f"The values and motors: {values} {self.leader_arms[name].motor_names}.\n"
"Rotate the arm to fit the range ]-180, 180[ and relaunch the script, or recalibrate all motors by setting a different "
"`calibration_path` during the instatiation of your robot."
)
# Set better PID values to close the gap between recored states and actions # Set better PID values to close the gap between recored states and actions
# TODO(rcadene): Implement an automatic procedure to set optimial PID values for each motor # TODO(rcadene): Implement an automatic procedure to set optimial PID values for each motor
for name in self.follower_arms: for name in self.follower_arms:

View File

@ -4,6 +4,9 @@ Examples of usage:
- Unlimited teleoperation at highest frequency (~200 Hz is expected), to exit with CTRL+C: - Unlimited teleoperation at highest frequency (~200 Hz is expected), to exit with CTRL+C:
```bash ```bash
python lerobot/scripts/control_robot.py teleoperate python lerobot/scripts/control_robot.py teleoperate
# Remove the cameras from the robot definition. They are not used in 'teleoperate' anyway.
python lerobot/scripts/control_robot.py teleoperate '~cameras'
``` ```
- Unlimited teleoperation at a limited frequency of 30 Hz, to simulate data recording frequency: - Unlimited teleoperation at a limited frequency of 30 Hz, to simulate data recording frequency:
@ -14,7 +17,7 @@ python lerobot/scripts/control_robot.py teleoperate \
- Record one episode in order to test replay: - Record one episode in order to test replay:
```bash ```bash
python lerobot/scripts/control_robot.py record_dataset \ python lerobot/scripts/control_robot.py record \
--fps 30 \ --fps 30 \
--root tmp/data \ --root tmp/data \
--repo-id $USER/koch_test \ --repo-id $USER/koch_test \
@ -32,7 +35,7 @@ python lerobot/scripts/visualize_dataset.py \
- Replay this test episode: - Replay this test episode:
```bash ```bash
python lerobot/scripts/control_robot.py replay_episode \ python lerobot/scripts/control_robot.py replay \
--fps 30 \ --fps 30 \
--root tmp/data \ --root tmp/data \
--repo-id $USER/koch_test \ --repo-id $USER/koch_test \
@ -42,12 +45,11 @@ python lerobot/scripts/control_robot.py replay_episode \
- Record a full dataset in order to train a policy, with 2 seconds of warmup, - Record a full dataset in order to train a policy, with 2 seconds of warmup,
30 seconds of recording for each episode, and 10 seconds to reset the environment in between episodes: 30 seconds of recording for each episode, and 10 seconds to reset the environment in between episodes:
```bash ```bash
python lerobot/scripts/control_robot.py record_dataset \ python lerobot/scripts/control_robot.py record \
--fps 30 \ --fps 30 \
--root data \ --root data \
--repo-id $USER/koch_pick_place_lego \ --repo-id $USER/koch_pick_place_lego \
--num-episodes 50 \ --num-episodes 50 \
--run-compute-stats 1 \
--warmup-time-s 2 \ --warmup-time-s 2 \
--episode-time-s 30 \ --episode-time-s 30 \
--reset-time-s 10 --reset-time-s 10
@ -74,7 +76,14 @@ DATA_DIR=data python lerobot/scripts/train.py \
- Run the pretrained policy on the robot: - Run the pretrained policy on the robot:
```bash ```bash
python lerobot/scripts/control_robot.py run_policy \ python lerobot/scripts/control_robot.py record \
--fps 30 \
--root data \
--repo-id $USER/eval_act_koch_real \
--num-episodes 10 \
--warmup-time-s 2 \
--episode-time-s 30 \
--reset-time-s 10
-p outputs/train/act_koch_real/checkpoints/080000/pretrained_model -p outputs/train/act_koch_real/checkpoints/080000/pretrained_model
``` ```
""" """
@ -758,8 +767,9 @@ if __name__ == "__main__":
pretrained_policy_path = get_pretrained_policy_path(pretrained_policy_name_or_path) pretrained_policy_path = get_pretrained_policy_path(pretrained_policy_name_or_path)
policy_cfg = init_hydra_config(pretrained_policy_path / "config.yaml", overrides) policy_cfg = init_hydra_config(pretrained_policy_path / "config.yaml", overrides)
policy = make_policy(hydra_cfg=policy_cfg, pretrained_policy_name_or_path=pretrained_policy_path) policy = make_policy(hydra_cfg=policy_cfg, pretrained_policy_name_or_path=pretrained_policy_path)
record(robot, policy, policy_cfg, **kwargs)
record(robot, policy, policy_cfg, **kwargs) else:
record(robot, **kwargs)
elif control_mode == "replay": elif control_mode == "replay":
replay(robot, **kwargs) replay(robot, **kwargs)

View File

@ -15,7 +15,9 @@
# limitations under the License. # limitations under the License.
import pytest import pytest
from .utils import DEVICE from lerobot.common.utils.utils import init_hydra_config
from .utils import DEVICE, KOCH_ROBOT_CONFIG_PATH
def pytest_collection_finish(): def pytest_collection_finish():
@ -27,11 +29,12 @@ def is_koch_available():
try: try:
from lerobot.common.robot_devices.robots.factory import make_robot from lerobot.common.robot_devices.robots.factory import make_robot
robot = make_robot("koch") robot_cfg = init_hydra_config(KOCH_ROBOT_CONFIG_PATH)
robot = make_robot(robot_cfg)
robot.connect() robot.connect()
del robot del robot
return True return True
except Exception as e: except Exception as e:
print("An alexander koch robot is not available.") print("A koch robot is not available.")
print(e) print(e)
return False return False

View File

@ -1,33 +1,54 @@
# TODO(rcadene): measure fps in nightly?
# TODO(rcadene): test logs
# TODO(rcadene): test calibration
# TODO(rcadene): add compatibility with other motors bus
import time import time
import hydra
import numpy as np import numpy as np
import pytest import pytest
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from tests.utils import require_koch from lerobot.common.utils.utils import init_hydra_config
from tests.utils import KOCH_ROBOT_CONFIG_PATH, require_koch
def make_motors_bus():
robot_cfg = init_hydra_config(KOCH_ROBOT_CONFIG_PATH)
# Instantiating a common motors structure.
# Here the one from Alexander Koch follower arm.
motors_bus = hydra.utils.instantiate(robot_cfg.leader_arms.main)
return motors_bus
@require_koch
def test_find_port(request):
from lerobot.common.robot_devices.motors.dynamixel import find_port
find_port()
@require_koch
def test_configure_motors_all_ids_1(request):
# This test expect the configuration was already correct.
motors_bus = make_motors_bus()
motors_bus.connect()
motors_bus.write("Baud_Rate", [0] * len(motors_bus.motors))
motors_bus.set_bus_baudrate(9_600)
motors_bus.write("ID", [1] * len(motors_bus.motors))
del motors_bus
# Test configure
motors_bus = make_motors_bus()
motors_bus.connect()
assert motors_bus.are_motors_configured()
del motors_bus
@require_koch @require_koch
def test_motors_bus(request): def test_motors_bus(request):
# TODO(rcadene): measure fps in nightly? motors_bus = make_motors_bus()
# TODO(rcadene): test logs
# TODO(rcadene): test calibration
# TODO(rcadene): add compatibility with other motors bus
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus
# Test instantiating a common motors structure.
# Here the one from Alexander Koch follower arm.
port = "/dev/tty.usbmodem575E0032081"
motors = {
# name: (index, model)
"shoulder_pan": (1, "xl430-w250"),
"shoulder_lift": (2, "xl430-w250"),
"elbow_flex": (3, "xl330-m288"),
"wrist_flex": (4, "xl330-m288"),
"wrist_roll": (5, "xl330-m288"),
"gripper": (6, "xl330-m288"),
}
motors_bus = DynamixelMotorsBus(port, motors)
# Test reading and writting before connecting raises an error # Test reading and writting before connecting raises an error
with pytest.raises(RobotDeviceNotConnectedError): with pytest.raises(RobotDeviceNotConnectedError):
@ -41,7 +62,7 @@ def test_motors_bus(request):
del motors_bus del motors_bus
# Test connecting # Test connecting
motors_bus = DynamixelMotorsBus(port, motors) motors_bus = make_motors_bus()
motors_bus.connect() motors_bus.connect()
# Test connecting twice raises an error # Test connecting twice raises an error
@ -52,7 +73,7 @@ def test_motors_bus(request):
motors_bus.write("Torque_Enable", 0) motors_bus.write("Torque_Enable", 0)
values = motors_bus.read("Torque_Enable") values = motors_bus.read("Torque_Enable")
assert isinstance(values, np.ndarray) assert isinstance(values, np.ndarray)
assert len(values) == len(motors) assert len(values) == len(motors_bus.motors)
assert (values == 0).all() assert (values == 0).all()
# Test writing torque on a specific motor # Test writing torque on a specific motor
@ -83,10 +104,3 @@ def test_motors_bus(request):
time.sleep(1) time.sleep(1)
new_values = motors_bus.read("Present_Position") new_values = motors_bus.read("Present_Position")
assert (new_values == values).all() assert (new_values == values).all()
@require_koch
def test_find_port(request):
from lerobot.common.robot_devices.motors.dynamixel import find_port
find_port()

View File

@ -23,6 +23,7 @@ from lerobot.common.utils.import_utils import is_package_available
# Pass this as the first argument to init_hydra_config. # Pass this as the first argument to init_hydra_config.
DEFAULT_CONFIG_PATH = "lerobot/configs/default.yaml" DEFAULT_CONFIG_PATH = "lerobot/configs/default.yaml"
KOCH_ROBOT_CONFIG_PATH = "lerobot/configs/robot/koch.yaml"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
@ -161,6 +162,7 @@ def require_koch(func):
if request is None: if request is None:
raise ValueError("The 'request' fixture must be passed to the test function as a parameter.") raise ValueError("The 'request' fixture must be passed to the test function as a parameter.")
# The function `is_koch_available` is defined in `tests/conftest.py`
if not request.getfixturevalue("is_koch_available"): if not request.getfixturevalue("is_koch_available"):
pytest.skip("An alexander koch robot is not available.") pytest.skip("An alexander koch robot is not available.")
return func(*args, **kwargs) return func(*args, **kwargs)