Add unit tests (WIP)
This commit is contained in:
parent
3ba05d53b9
commit
a0432f1608
|
@ -6,6 +6,8 @@ from pathlib import Path
|
|||
from threading import Thread
|
||||
|
||||
import cv2
|
||||
|
||||
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
|
||||
# Using 1 thread to avoid blocking the main thread.
|
||||
# Especially useful during data collection when other threads are used
|
||||
# to save the images.
|
||||
|
@ -97,9 +99,11 @@ class OpenCVCamera:
|
|||
Example of uage:
|
||||
|
||||
```python
|
||||
camera = OpenCVCamera(2)
|
||||
camera = OpenCVCamera(camera_index=0)
|
||||
camera.connect()
|
||||
color_image = camera.read()
|
||||
# when done using the camera, consider disconnecting
|
||||
camera.disconnect()
|
||||
```
|
||||
"""
|
||||
|
||||
|
@ -115,14 +119,12 @@ class OpenCVCamera:
|
|||
self.height = config.height
|
||||
self.color = config.color
|
||||
|
||||
if not isinstance(self.camera_index, int):
|
||||
raise ValueError(f"Camera index must be provided as an int, but {self.camera_index} was given instead.")
|
||||
|
||||
if self.color not in ["rgb", "bgr"]:
|
||||
raise ValueError(f"Expected color values are 'rgb' or 'bgr', but {self.color} is provided.")
|
||||
|
||||
if self.camera_index is None:
|
||||
raise ValueError(
|
||||
f"`camera_index` is expected to be one of these available cameras {OpenCVCamera.AVAILABLE_CAMERAS_INDICES}, but {camera_index} is provided instead."
|
||||
)
|
||||
|
||||
self.camera = None
|
||||
self.is_connected = False
|
||||
self.thread = None
|
||||
|
@ -131,7 +133,7 @@ class OpenCVCamera:
|
|||
|
||||
def connect(self):
|
||||
if self.is_connected:
|
||||
raise ValueError(f"Camera {self.camera_index} is already connected.")
|
||||
raise RobotDeviceAlreadyConnectedError(f"Camera {self.camera_index} is already connected.")
|
||||
|
||||
# First create a temporary camera trying to access `camera_index`,
|
||||
# and verify it is a valid camera by calling `isOpened`.
|
||||
|
@ -144,9 +146,10 @@ class OpenCVCamera:
|
|||
# valid cameras.
|
||||
if not is_camera_open:
|
||||
# Verify that the provided `camera_index` is valid before printing the traceback
|
||||
if self.camera_index not in find_camera_indices():
|
||||
available_cam_ids = find_camera_indices()
|
||||
if self.camera_index not in available_cam_ids:
|
||||
raise ValueError(
|
||||
f"`camera_index` is expected to be one of these available cameras {OpenCVCamera.AVAILABLE_CAMERAS_INDICES}, but {self.camera_index} is provided instead."
|
||||
f"`camera_index` is expected to be one of these available cameras {available_cam_ids}, but {self.camera_index} is provided instead."
|
||||
)
|
||||
|
||||
raise OSError(f"Can't access camera {self.camera_index}.")
|
||||
|
@ -179,12 +182,22 @@ class OpenCVCamera:
|
|||
raise OSError(
|
||||
f"Can't set {self.height=} for camera {self.camera_index}. Actual value is {actual_height}."
|
||||
)
|
||||
|
||||
self.fps = actual_fps
|
||||
self.width = actual_width
|
||||
self.height = actual_height
|
||||
|
||||
self.is_connected = True
|
||||
|
||||
def read(self, temporary_color: str | None = None) -> np.ndarray:
|
||||
"""Read a frame from the camera returned in the format (height, width, channels)
|
||||
(e.g. (640, 480, 3)), contrarily to the pytorch format which is channel first.
|
||||
|
||||
Note: Reading a frame is done every `camera.fps` times per second, and it is blocking.
|
||||
If you are reading data from other sensors, we advise to use `camera.async_read()` which is non blocking version of `camera.read()`.
|
||||
"""
|
||||
if not self.is_connected:
|
||||
self.connect()
|
||||
raise RobotDeviceNotConnectedError(f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first.")
|
||||
|
||||
start_time = time.perf_counter()
|
||||
|
||||
|
@ -216,6 +229,9 @@ class OpenCVCamera:
|
|||
self.color_image = self.read()
|
||||
|
||||
def async_read(self):
|
||||
if not self.is_connected:
|
||||
raise RobotDeviceNotConnectedError(f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first.")
|
||||
|
||||
if self.thread is None:
|
||||
self.thread = Thread(target=self.read_loop, args=())
|
||||
self.thread.daemon = True
|
||||
|
@ -232,15 +248,22 @@ class OpenCVCamera:
|
|||
return self.color_image
|
||||
|
||||
def disconnect(self):
|
||||
if getattr(self, "camera", None):
|
||||
self.camera.release()
|
||||
if self.thread is not None:
|
||||
if self.thread.is_alive():
|
||||
# wait for the thread to finish
|
||||
self.thread.join()
|
||||
if not self.is_connected:
|
||||
raise RobotDeviceNotConnectedError(f"OpenCVCamera({self.camera_index}) is not connected. Try running `camera.connect()` first.")
|
||||
|
||||
self.camera.release()
|
||||
self.camera = None
|
||||
|
||||
if self.thread is not None and self.thread.is_alive():
|
||||
# wait for the thread to finish
|
||||
self.thread.join()
|
||||
self.thread = None
|
||||
|
||||
self.is_connected = False
|
||||
|
||||
def __del__(self):
|
||||
self.disconnect()
|
||||
if self.is_connected:
|
||||
self.disconnect()
|
||||
|
||||
|
||||
def save_images_config(config: OpenCVCameraConfig, out_dir: Path):
|
||||
|
|
|
@ -43,4 +43,5 @@ def save_depth_image(depth, path, write_shape=False):
|
|||
class Camera(Protocol):
|
||||
def connect(self): ...
|
||||
def read(self, temporary_color: str | None = None) -> np.ndarray: ...
|
||||
def async_read(self) -> np.ndarray: ...
|
||||
def disconnect(self): ...
|
||||
|
|
|
@ -17,6 +17,7 @@ from dynamixel_sdk import (
|
|||
PortHandler,
|
||||
)
|
||||
|
||||
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
|
||||
from lerobot.common.utils.utils import capture_timestamp_utc
|
||||
|
||||
PROTOCOL_VERSION = 2.0
|
||||
|
@ -196,14 +197,15 @@ class DynamixelMotorsBus:
|
|||
motor_index = 6
|
||||
motor_model = "xl330-m077"
|
||||
|
||||
robot = DynamixelMotorsBus(
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port="/dev/tty.usbmodem575E0031751",
|
||||
motors={motor_name: (motor_index, motor_model)},
|
||||
)
|
||||
robot.connect()
|
||||
motors_bus.connect()
|
||||
|
||||
while True:
|
||||
robot.teleop_step()
|
||||
motors_bus.teleop_step()
|
||||
|
||||
motors_bus.disconnect()
|
||||
```
|
||||
"""
|
||||
def __init__(
|
||||
|
@ -229,7 +231,7 @@ class DynamixelMotorsBus:
|
|||
|
||||
def connect(self):
|
||||
if self.is_connected:
|
||||
raise ValueError(f"KochRobot is already connected.")
|
||||
raise RobotDeviceAlreadyConnectedError(f"DynamixelMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice.")
|
||||
|
||||
self.port_handler = PortHandler(self.port)
|
||||
self.packet_handler = PacketHandler(PROTOCOL_VERSION)
|
||||
|
@ -240,12 +242,6 @@ class DynamixelMotorsBus:
|
|||
self.port_handler.setBaudRate(BAUD_RATE)
|
||||
self.port_handler.setPacketTimeoutMillis(TIMEOUT_MS)
|
||||
|
||||
# for async_read and async_write
|
||||
self.thread = None
|
||||
self.async_read_args = {}
|
||||
self.write_queue = Queue()
|
||||
self.results = {}
|
||||
|
||||
self.is_connected = True
|
||||
|
||||
@property
|
||||
|
@ -444,87 +440,16 @@ class DynamixelMotorsBus:
|
|||
ts_utc_name = get_log_name("timestamp_utc", "write", data_name, motor_names)
|
||||
self.logs[ts_utc_name] = capture_timestamp_utc()
|
||||
|
||||
def read_write_loop(self, async_read_args, write_queue):
|
||||
while True:
|
||||
for group_name, read_args in async_read_args.items():
|
||||
self.results[group_name] = self.read(*read_args)
|
||||
def disconnect(self):
|
||||
if not self.is_connected:
|
||||
raise RobotDeviceNotConnectedError(f"DynamixelMotorsBus({self.port}) is not connected. Try running `motors_bus.connect()` first.")
|
||||
|
||||
if write_queue.empty():
|
||||
continue
|
||||
|
||||
write_args = write_queue.get()
|
||||
if write_args is None: # A way to terminate the thread
|
||||
break
|
||||
self.write(*write_args)
|
||||
|
||||
write_queue.task_done()
|
||||
|
||||
def async_read(self, data_name, motor_names: list[str] | None = None):
|
||||
if motor_names is None:
|
||||
motor_names = self.motor_names
|
||||
|
||||
if isinstance(motor_names, str):
|
||||
motor_names = [motor_names]
|
||||
|
||||
if self.thread is None:
|
||||
self.thread = Thread(target=self.read_write_loop, args=(self.async_read_args, self.write_queue))
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
|
||||
group_name = get_group_sync_key(data_name, motor_names)
|
||||
self.async_read_args[group_name] = (data_name, motor_names)
|
||||
|
||||
FPS = 200
|
||||
num_tries = 0
|
||||
while group_name not in self.results:
|
||||
num_tries += 1
|
||||
time.sleep(1 / FPS)
|
||||
if num_tries > FPS:
|
||||
if self.thread.ident is None and not self.thread.is_alive():
|
||||
raise Exception(f"The thread responsible for `self.async_read({data_name}, {motor_names})` took too much time to start. There might be an issue. Verify that `self.threads[thread_name].start()` has been called.")
|
||||
|
||||
# ts_utc_name = get_log_name("timestamp_utc", "read", data_name, motor_names)
|
||||
return self.results[group_name] #, self.logs[ts_utc_name]
|
||||
|
||||
def async_write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None):
|
||||
if motor_names is None:
|
||||
motor_names = self.motor_names
|
||||
|
||||
if isinstance(motor_names, str):
|
||||
motor_names = [motor_names]
|
||||
|
||||
if isinstance(values, (int, float, np.integer)):
|
||||
values = [int(values)] * len(motor_names)
|
||||
|
||||
values = np.array(values)
|
||||
|
||||
if self.thread is None:
|
||||
self.thread = Thread(target=self.read_write_loop, args=(self.async_read_args, self.write_queue))
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
|
||||
self.write_queue.put((data_name, values, motor_names))
|
||||
|
||||
FPS = 200
|
||||
num_tries = 0
|
||||
ts_utc_name = get_log_name("timestamp_utc", "write", data_name, motor_names)
|
||||
while ts_utc_name not in self.logs:
|
||||
num_tries += 1
|
||||
time.sleep(1 / FPS)
|
||||
if num_tries > FPS:
|
||||
if self.thread.ident is None and not self.thread.is_alive():
|
||||
raise Exception(f"The thread responsible for `self.async_write({data_name}, {values}, {motor_names})` took too much time to start. There might be an issue. Verify that `self.threads[thread_name].start()` has been called.")
|
||||
|
||||
return self.logs[ts_utc_name]
|
||||
closePort
|
||||
|
||||
def __del__(self):
|
||||
# Send value that corresponds to `break` logic
|
||||
# if self.queue is not None:
|
||||
# self.queue.put(None)
|
||||
# self.queue.join()
|
||||
|
||||
if self.thread is not None:
|
||||
self.thread.join()
|
||||
if self.is_connected:
|
||||
self.disconnect()
|
||||
|
||||
|
||||
# def read(self, data_name, motor_name: str):
|
||||
# motor_idx, model = self.motors[motor_name]
|
||||
|
|
|
@ -223,13 +223,89 @@ class KochRobotConfig:
|
|||
class KochRobot:
|
||||
"""Tau Robotics: https://tau-robotics.com
|
||||
|
||||
Example of usage:
|
||||
Example of highest frequency teleoperation without camera:
|
||||
```python
|
||||
robot = KochRobot()
|
||||
# Defines how to communicate with the motors of the leader and follower arms
|
||||
leader_arms = {
|
||||
"main": DynamixelMotorsBus(
|
||||
port="/dev/tty.usbmodem575E0031751",
|
||||
motors={
|
||||
# name: (index, model)
|
||||
"shoulder_pan": (1, "xl330-m077"),
|
||||
"shoulder_lift": (2, "xl330-m077"),
|
||||
"elbow_flex": (3, "xl330-m077"),
|
||||
"wrist_flex": (4, "xl330-m077"),
|
||||
"wrist_roll": (5, "xl330-m077"),
|
||||
"gripper": (6, "xl330-m077"),
|
||||
},
|
||||
),
|
||||
}
|
||||
follower_arms = {
|
||||
"main": DynamixelMotorsBus(
|
||||
port="/dev/tty.usbmodem575E0032081",
|
||||
motors={
|
||||
# name: (index, model)
|
||||
"shoulder_pan": (1, "xl430-w250"),
|
||||
"shoulder_lift": (2, "xl430-w250"),
|
||||
"elbow_flex": (3, "xl330-m288"),
|
||||
"wrist_flex": (4, "xl330-m288"),
|
||||
"wrist_roll": (5, "xl330-m288"),
|
||||
"gripper": (6, "xl330-m288"),
|
||||
},
|
||||
),
|
||||
}
|
||||
robot = KochRobot(leader_arms, follower_arms)
|
||||
|
||||
# Connect motors buses and cameras if any (Required)
|
||||
robot.connect()
|
||||
|
||||
while True:
|
||||
robot.teleop_step()
|
||||
```
|
||||
|
||||
Example of highest frequency data collection without camera:
|
||||
```python
|
||||
# Assumes leader and follower arms have been instantiated already (see first example)
|
||||
robot = KochRobot(leader_arms, follower_arms)
|
||||
robot.connect()
|
||||
while True:
|
||||
observation, action = robot.teleop_step(record_data=True)
|
||||
```
|
||||
|
||||
Example of highest frequency data collection with cameras:
|
||||
```python
|
||||
# Defines how to communicate with 2 cameras connected to the computer.
|
||||
# Here, the webcam of the mackbookpro and the iphone (connected in USB to the macbookpro)
|
||||
# can be reached respectively using the camera indices 0 and 1. These indices can be
|
||||
# arbitrary. See the documentation of `OpenCVCamera` to find your own camera indices.
|
||||
cameras = {
|
||||
"macbookpro": OpenCVCamera(camera_index=0, fps=30, width=640, height=480),
|
||||
"iphone": OpenCVCamera(camera_index=1, fps=30, width=640, height=480),
|
||||
}
|
||||
|
||||
# Assumes leader and follower arms have been instantiated already (see first example)
|
||||
robot = KochRobot(leader_arms, follower_arms, cameras)
|
||||
robot.connect()
|
||||
while True:
|
||||
observation, action = robot.teleop_step(record_data=True)
|
||||
```
|
||||
|
||||
Example of controlling the robot with a policy (without running multiple policies in parallel to ensure highest frequency):
|
||||
```python
|
||||
# Assumes leader and follower arms + cameras have been instantiated already (see previous example)
|
||||
robot = KochRobot(leader_arms, follower_arms, cameras)
|
||||
robot.connect()
|
||||
while True:
|
||||
# Uses the follower arms and cameras to capture an observation
|
||||
observation = robot.capture_observation()
|
||||
|
||||
# Assumes a policy has been instantiated
|
||||
with torch.inference_mode():
|
||||
action = policy.select_action(observation)
|
||||
|
||||
# Orders the robot to move
|
||||
robot.send_action(action)
|
||||
```
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
|
@ -278,7 +354,6 @@ class KochRobot:
|
|||
for name in self.follower_arms:
|
||||
self.follower_arms[name].set_calibration(calibration[f"follower_{name}"])
|
||||
self.follower_arms[name].write("Torque_Enable", 1)
|
||||
self.follower_arms[name].write("Torque_Enable", 1)
|
||||
|
||||
for name in self.leader_arms:
|
||||
self.leader_arms[name].set_calibration(calibration[f"leader_{name}"])
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
class RobotDeviceNotConnectedError(Exception):
|
||||
"""Exception raised when the robot device is not connected."""
|
||||
def __init__(self, message="This robot device is not connected. Try calling `robot_device.connect()` first."):
|
||||
self.message = message
|
||||
super().__init__(self.message)
|
||||
|
||||
|
||||
class RobotDeviceAlreadyConnectedError(Exception):
|
||||
"""Exception raised when the robot device is already connected."""
|
||||
def __init__(self, message="This robot device is already connected. Try not calling `robot_device.connect()` twice."):
|
||||
self.message = message
|
||||
super().__init__(self.message)
|
|
@ -148,7 +148,8 @@ def log_control_info(robot, dt_s, episode_index=None, frame_index=None):
|
|||
|
||||
|
||||
def teleoperate(robot: Robot, fps: int | None = None):
|
||||
robot.init_teleop()
|
||||
if not robot.is_connected:
|
||||
robot.connect()
|
||||
|
||||
while True:
|
||||
now = time.perf_counter()
|
||||
|
@ -176,7 +177,8 @@ def record_dataset(
|
|||
if not video:
|
||||
raise NotImplementedError()
|
||||
|
||||
robot.init_teleop()
|
||||
if not robot.is_connected:
|
||||
robot.connect()
|
||||
|
||||
local_dir = Path(root) / repo_id
|
||||
if local_dir.exists():
|
||||
|
@ -336,7 +338,8 @@ def replay_episode(robot: Robot, episode: int, fps: int | None = None, root="dat
|
|||
from_idx = dataset.episode_data_index["from"][episode].item()
|
||||
to_idx = dataset.episode_data_index["to"][episode].item()
|
||||
|
||||
robot.init_teleop()
|
||||
if not robot.is_connected:
|
||||
robot.connect()
|
||||
|
||||
logging.info("Replaying episode")
|
||||
os.system('say "Replaying episode"')
|
||||
|
@ -366,6 +369,9 @@ def run_policy(robot: Robot, policy: torch.nn.Module, hydra_cfg: DictConfig):
|
|||
|
||||
fps = hydra_cfg.env.fps
|
||||
|
||||
if not robot.is_connected:
|
||||
robot.connect()
|
||||
|
||||
while True:
|
||||
now = time.perf_counter()
|
||||
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from lerobot.common.robot_devices.cameras.opencv import OpenCVCamera
|
||||
from lerobot.common.robot_devices.utils import RobotDeviceNotConnectedError, RobotDeviceAlreadyConnectedError
|
||||
|
||||
|
||||
def test_camera():
|
||||
# Test instantiating with missing camera index raises an error
|
||||
with pytest.raises(ValueError):
|
||||
camera = OpenCVCamera()
|
||||
|
||||
# Test instantiating with a wrong camera index raises an error
|
||||
with pytest.raises(ValueError):
|
||||
camera = OpenCVCamera(-1)
|
||||
|
||||
# Test instantiating
|
||||
camera = OpenCVCamera(0)
|
||||
|
||||
# Test reading, async reading, disconnecting before connecting raises an error
|
||||
with pytest.raises(RobotDeviceNotConnectedError):
|
||||
camera.read()
|
||||
with pytest.raises(RobotDeviceNotConnectedError):
|
||||
camera.async_read()
|
||||
with pytest.raises(RobotDeviceNotConnectedError):
|
||||
camera.disconnect()
|
||||
|
||||
# Test deleting the object without connecting first
|
||||
del camera
|
||||
|
||||
# Test connecting
|
||||
camera = OpenCVCamera(0)
|
||||
camera.connect()
|
||||
assert camera.is_connected
|
||||
assert camera.fps is not None
|
||||
assert camera.width is not None
|
||||
assert camera.height is not None
|
||||
|
||||
# Test connecting twice raises an error
|
||||
with pytest.raises(RobotDeviceAlreadyConnectedError):
|
||||
camera.connect()
|
||||
|
||||
# Test reading from the camera
|
||||
color_image = camera.read()
|
||||
assert isinstance(color_image, np.ndarray)
|
||||
assert color_image.ndim == 3
|
||||
h, w, c = color_image.shape
|
||||
assert c == 3
|
||||
assert w > h
|
||||
|
||||
# Test reading asynchronously from the camera and image is similar
|
||||
async_color_image = camera.async_read()
|
||||
assert np.allclose(color_image, async_color_image)
|
||||
|
||||
# Test disconnecting
|
||||
camera.disconnect()
|
||||
assert camera.camera is None
|
||||
assert camera.thread is None
|
||||
|
||||
# Test disconnecting with `__del__`
|
||||
camera = OpenCVCamera(0)
|
||||
camera.connect()
|
||||
del camera
|
||||
|
||||
# Test acquiring a bgr image
|
||||
camera = OpenCVCamera(0, color="bgr")
|
||||
camera.connect()
|
||||
assert camera.color == "bgr"
|
||||
bgr_color_image = camera.read()
|
||||
assert np.allclose(color_image, bgr_color_image[[2,1,0]])
|
||||
del camera
|
||||
|
||||
# Test fps can be set
|
||||
camera = OpenCVCamera(0, fps=60)
|
||||
camera.connect()
|
||||
assert camera.fps == 60
|
||||
# TODO(rcadene): measure fps in nightly?
|
||||
del camera
|
||||
|
||||
# Test width and height can be set
|
||||
camera = OpenCVCamera(0, fps=30, width=1280, height=720)
|
||||
camera.connect()
|
||||
assert camera.fps == 30
|
||||
assert camera.width == 1280
|
||||
assert camera.height == 720
|
||||
color_image = camera.read()
|
||||
h, w, c = color_image.shape
|
||||
assert h == 720
|
||||
assert w == 1280
|
||||
assert c == 3
|
||||
del camera
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,13 @@
|
|||
|
||||
|
||||
def test_teleoperate():
|
||||
pass
|
||||
|
||||
def test_record_dataset():
|
||||
pass
|
||||
|
||||
def test_replay_episode():
|
||||
pass
|
||||
|
||||
def test_run_policy():
|
||||
pass
|
|
@ -0,0 +1,73 @@
|
|||
|
||||
import time
|
||||
import numpy as np
|
||||
import pytest
|
||||
from lerobot.common.robot_devices.motors.dynamixel import DynamixelMotorsBus
|
||||
|
||||
|
||||
def test_motors_bus():
|
||||
# Test instantiating a common motors structure.
|
||||
# Here the one from Alexander Koch follower arm.
|
||||
motors = {
|
||||
# name: (index, model)
|
||||
"shoulder_pan": (1, "xl430-w250"),
|
||||
"shoulder_lift": (2, "xl430-w250"),
|
||||
"elbow_flex": (3, "xl330-m288"),
|
||||
"wrist_flex": (4, "xl330-m288"),
|
||||
"wrist_roll": (5, "xl330-m288"),
|
||||
"gripper": (6, "xl330-m288"),
|
||||
}
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port="/dev/tty.usbmodem575E0032081",
|
||||
motors=motors,
|
||||
)
|
||||
|
||||
# Test reading and writting before connecting raises an error
|
||||
with pytest.raises(ValueError):
|
||||
motors_bus.read("Torque_Enable")
|
||||
with pytest.raises(ValueError):
|
||||
motors_bus.write("Torque_Enable")
|
||||
|
||||
motors_bus.connect()
|
||||
|
||||
# Test connecting twice raises an error
|
||||
with pytest.raises(ValueError):
|
||||
motors_bus.connect()
|
||||
|
||||
# Test reading torque on all motors and its 0 after first connection
|
||||
values = motors_bus.read("Torque_Enable")
|
||||
assert isinstance(values, np.ndarray)
|
||||
assert len(values) == len(motors)
|
||||
assert (values == 0).all()
|
||||
|
||||
# Test writing torque on a specific motor
|
||||
motors_bus.write("Torque_Enable", 1, "gripper")
|
||||
|
||||
# Test reading torque from this specific motor. It is now 1
|
||||
values = motors_bus.read("Torque_Enable", "gripper")
|
||||
assert len(values) == 1
|
||||
assert values[0] == 1
|
||||
|
||||
# Test reading torque from all motors. It is 1 for the specific motor,
|
||||
# and 0 on the others.
|
||||
values = motors_bus.read("Torque_Enable")
|
||||
gripper_index = motors_bus.motor_names.index("gripper")
|
||||
assert values[gripper_index] == 1
|
||||
assert values.sum() == 1 # gripper is the only motor to have torque 1
|
||||
|
||||
# Test writing torque on all motors and it is 1 for all.
|
||||
motors_bus.write("Torque_Enable", 1)
|
||||
values = motors_bus.read("Torque_Enable")
|
||||
assert (values == 1).all()
|
||||
|
||||
# Test ordering the motors to move slightly (+1 value among 4096) and this move
|
||||
# can be executed and seen by the motor position sensor
|
||||
values = motors_bus.read("Present_Position")
|
||||
motors_bus.write("Goal_Position", values + 1)
|
||||
# Give time for the motors to move to the goal position
|
||||
time.sleep(1)
|
||||
new_values = motors_bus.read("Present_Position")
|
||||
assert new_values == values
|
||||
|
||||
# TODO(rcadene): test calibration
|
||||
# TODO(rcadene): test logs?
|
Loading…
Reference in New Issue