129 lines
4.6 KiB
Python
129 lines
4.6 KiB
Python
|
import pickle
|
||
|
from pathlib import Path
|
||
|
|
||
|
import pytest
|
||
|
import torch
|
||
|
|
||
|
from lerobot.common.robot_devices.robots.factory import make_robot
|
||
|
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
|
||
|
from tests.utils import require_koch
|
||
|
|
||
|
|
||
|
@require_koch
|
||
|
def test_robot(tmpdir, request):
|
||
|
# TODO(rcadene): measure fps in nightly?
|
||
|
# TODO(rcadene): test logs
|
||
|
# TODO(rcadene): add compatibility with other robots
|
||
|
from lerobot.common.robot_devices.robots.koch import KochRobot
|
||
|
|
||
|
# Save calibration preset
|
||
|
calibration = {
|
||
|
"follower_main": {
|
||
|
"shoulder_pan": (-2048, False),
|
||
|
"shoulder_lift": (2048, True),
|
||
|
"elbow_flex": (-1024, False),
|
||
|
"wrist_flex": (2048, True),
|
||
|
"wrist_roll": (2048, True),
|
||
|
"gripper": (2048, True),
|
||
|
},
|
||
|
"leader_main": {
|
||
|
"shoulder_pan": (-2048, False),
|
||
|
"shoulder_lift": (1024, True),
|
||
|
"elbow_flex": (2048, True),
|
||
|
"wrist_flex": (-2048, False),
|
||
|
"wrist_roll": (2048, True),
|
||
|
"gripper": (2048, True),
|
||
|
},
|
||
|
}
|
||
|
tmpdir = Path(tmpdir)
|
||
|
calibration_path = tmpdir / "calibration.pkl"
|
||
|
calibration_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
with open(calibration_path, "wb") as f:
|
||
|
pickle.dump(calibration, f)
|
||
|
|
||
|
# Test connecting without devices raises an error
|
||
|
robot = KochRobot()
|
||
|
with pytest.raises(ValueError):
|
||
|
robot.connect()
|
||
|
del robot
|
||
|
|
||
|
# Test using robot before connecting raises an error
|
||
|
robot = KochRobot()
|
||
|
with pytest.raises(RobotDeviceNotConnectedError):
|
||
|
robot.teleop_step()
|
||
|
with pytest.raises(RobotDeviceNotConnectedError):
|
||
|
robot.teleop_step(record_data=True)
|
||
|
with pytest.raises(RobotDeviceNotConnectedError):
|
||
|
robot.capture_observation()
|
||
|
with pytest.raises(RobotDeviceNotConnectedError):
|
||
|
robot.send_action(None)
|
||
|
with pytest.raises(RobotDeviceNotConnectedError):
|
||
|
robot.disconnect()
|
||
|
|
||
|
# Test deleting the object without connecting first
|
||
|
del robot
|
||
|
|
||
|
# Test connecting
|
||
|
robot = make_robot("koch")
|
||
|
# TODO(rcadene): proper monkey patch
|
||
|
robot.calibration_path = calibration_path
|
||
|
robot.connect() # run the manual calibration precedure
|
||
|
assert robot.is_connected
|
||
|
|
||
|
# Test connecting twice raises an error
|
||
|
with pytest.raises(RobotDeviceAlreadyConnectedError):
|
||
|
robot.connect()
|
||
|
|
||
|
# Test disconnecting with `__del__`
|
||
|
del robot
|
||
|
|
||
|
# Test teleop can run
|
||
|
robot = make_robot("koch")
|
||
|
robot.calibration_path = calibration_path
|
||
|
robot.connect()
|
||
|
robot.teleop_step()
|
||
|
|
||
|
# Test data recorded during teleop are well formated
|
||
|
observation, action = robot.teleop_step(record_data=True)
|
||
|
# State
|
||
|
assert "observation.state" in observation
|
||
|
assert isinstance(observation["observation.state"], torch.Tensor)
|
||
|
assert observation["observation.state"].ndim == 1
|
||
|
dim_state = sum(len(robot.follower_arms[name].motors) for name in robot.follower_arms)
|
||
|
assert observation["observation.state"].shape[0] == dim_state
|
||
|
# Cameras
|
||
|
for name in robot.cameras:
|
||
|
assert f"observation.images.{name}" in observation
|
||
|
assert isinstance(observation[f"observation.images.{name}"], torch.Tensor)
|
||
|
assert observation[f"observation.images.{name}"].ndim == 3
|
||
|
# Action
|
||
|
assert "action" in action
|
||
|
assert isinstance(action["action"], torch.Tensor)
|
||
|
assert action["action"].ndim == 1
|
||
|
dim_action = sum(len(robot.follower_arms[name].motors) for name in robot.follower_arms)
|
||
|
assert action["action"].shape[0] == dim_action
|
||
|
# TODO(rcadene): test if observation and action data are returned as expected
|
||
|
|
||
|
# Test capture_observation can run and observation returned are the same (since the arm didnt move)
|
||
|
captured_observation = robot.capture_observation()
|
||
|
assert set(captured_observation.keys()) == set(observation.keys())
|
||
|
for name in captured_observation:
|
||
|
if "image" in name:
|
||
|
# TODO(rcadene): skipping image for now as it's challenging to assess equality between two consecutive frames
|
||
|
continue
|
||
|
assert torch.allclose(captured_observation[name], observation[name], atol=1)
|
||
|
|
||
|
# Test send_action can run
|
||
|
robot.send_action(action["action"])
|
||
|
|
||
|
# Test disconnecting
|
||
|
robot.disconnect()
|
||
|
assert not robot.is_connected
|
||
|
for name in robot.follower_arms:
|
||
|
assert not robot.follower_arms[name].is_connected
|
||
|
for name in robot.leader_arms:
|
||
|
assert not robot.leader_arms[name].is_connected
|
||
|
for name in robot.cameras:
|
||
|
assert not robot.cameras[name].is_connected
|
||
|
del robot
|