592 lines
26 KiB
Python
592 lines
26 KiB
Python
import logging
|
|
import pickle
|
|
import time
|
|
from dataclasses import dataclass, field, replace
|
|
from pathlib import Path
|
|
from typing import Sequence
|
|
|
|
import numpy as np
|
|
import torch
|
|
|
|
from lerobot.common.robot_devices.cameras.utils import Camera
|
|
from lerobot.common.robot_devices.motors.dynamixel import (
|
|
OperatingMode,
|
|
TorqueMode,
|
|
convert_degrees_to_steps,
|
|
)
|
|
from lerobot.common.robot_devices.motors.utils import MotorsBus
|
|
from lerobot.common.robot_devices.utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
|
|
|
|
########################################################################
|
|
# Calibration logic
|
|
########################################################################
|
|
|
|
URL_TEMPLATE = (
|
|
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
|
|
)
|
|
|
|
# In nominal degree range ]-180, +180[
|
|
ZERO_POSITION_DEGREE = 0
|
|
ROTATED_POSITION_DEGREE = 90
|
|
|
|
|
|
def assert_drive_mode(drive_mode):
|
|
# `drive_mode` is in [0,1] with 0 means original rotation direction for the motor, and 1 means inverted.
|
|
if not np.all(np.isin(drive_mode, [0, 1])):
|
|
raise ValueError(f"`drive_mode` contains values other than 0 or 1: ({drive_mode})")
|
|
|
|
|
|
def apply_drive_mode(position, drive_mode):
|
|
assert_drive_mode(drive_mode)
|
|
# Convert `drive_mode` from [0, 1] with 0 indicates original rotation direction and 1 inverted,
|
|
# to [-1, 1] with 1 indicates original rotation direction and -1 inverted.
|
|
signed_drive_mode = -(drive_mode * 2 - 1)
|
|
position *= signed_drive_mode
|
|
return position
|
|
|
|
|
|
def reset_torque_mode(arm: MotorsBus):
|
|
# To be configured, all servos must be in "torque disable" mode
|
|
arm.write("Torque_Enable", TorqueMode.DISABLED.value)
|
|
|
|
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't
|
|
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm,
|
|
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
|
|
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
|
|
all_motors_except_gripper = [name for name in arm.motor_names if name != "gripper"]
|
|
if len(all_motors_except_gripper) > 0:
|
|
arm.write("Operating_Mode", OperatingMode.EXTENDED_POSITION.value, all_motors_except_gripper)
|
|
|
|
# Use 'position control current based' for gripper to be limited by the limit of the current.
|
|
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
|
|
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
|
|
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
|
|
# to make it move, and it will move back to its original target position when we release the force.
|
|
arm.write("Operating_Mode", OperatingMode.CURRENT_CONTROLLED_POSITION.value, "gripper")
|
|
|
|
|
|
def run_arm_calibration(arm: MotorsBus, name: str, arm_type: str):
|
|
"""This function ensures that a neural network trained on data collected on a given robot
|
|
can work on another robot. For instance before calibration, setting a same goal position
|
|
for each motor of two different robots will get two very different positions. But after calibration,
|
|
the two robots will move to the same position.To this end, this function computes the homing offset
|
|
and the drive mode for each motor of a given robot.
|
|
|
|
Homing offset is used to shift the motor position to a ]-2048, +2048[ nominal range (when the motor uses 2048 steps
|
|
to complete a half a turn). This range is set around an arbitrary "zero position" corresponding to all motor positions
|
|
being 0. During the calibration process, you will need to manually move the robot to this "zero position".
|
|
|
|
Drive mode is used to invert the rotation direction of the motor. This is useful when some motors have been assembled
|
|
in the opposite orientation for some robots. During the calibration process, you will need to manually move the robot
|
|
to the "rotated position".
|
|
|
|
After calibration, the homing offsets and drive modes are stored in a cache.
|
|
|
|
Example of usage:
|
|
```python
|
|
run_arm_calibration(arm, "left", "follower")
|
|
```
|
|
"""
|
|
reset_torque_mode(arm)
|
|
|
|
print(f"\nRunning calibration of {name} {arm_type}...")
|
|
|
|
print("\nMove arm to zero position")
|
|
print("See: " + URL_TEMPLATE.format(robot="koch", arm=arm_type, position="zero"))
|
|
input("Press Enter to continue...")
|
|
|
|
# We arbitrarely choosed our zero target position to be a straight horizontal position with gripper upwards and closed.
|
|
# It is easy to identify and all motors are in a "quarter turn" position. Once calibration is done, this position will
|
|
# corresponds to every motor angle being 0. If you set all 0 as Goal Position, the arm will move in this position.
|
|
zero_position = convert_degrees_to_steps(ZERO_POSITION_DEGREE, arm.motor_models)
|
|
|
|
def _compute_nearest_rounded_position(position, models):
|
|
# TODO(rcadene): Rework this function since some motors cant physically rotate a quarter turn
|
|
# (e.g. the gripper of Aloha arms can only rotate ~50 degree)
|
|
quarter_turn_degree = 90
|
|
quarter_turn = convert_degrees_to_steps(quarter_turn_degree, models)
|
|
nearest_pos = np.round(position.astype(float) / quarter_turn) * quarter_turn
|
|
return nearest_pos.astype(position.dtype)
|
|
|
|
# Compute homing offset so that `present_position + homing_offset ~= target_position`.
|
|
position = arm.read("Present_Position")
|
|
position = _compute_nearest_rounded_position(position, arm.motor_models)
|
|
homing_offset = zero_position - position
|
|
|
|
print("\nMove arm to rotated target position")
|
|
print("See: " + URL_TEMPLATE.format(robot="koch", arm=arm_type, position="rotated"))
|
|
input("Press Enter to continue...")
|
|
|
|
# The rotated target position corresponds to a rotation of a quarter turn from the zero position.
|
|
# This allows to identify the rotation direction of each motor.
|
|
# For instance, if the motor rotates 90 degree, and its value is -90 after applying the homing offset, then we know its rotation direction
|
|
# is inverted. However, for the calibration being successful, we need everyone to follow the same target position.
|
|
# Sometimes, there is only one possible rotation direction. For instance, if the gripper is closed, there is only one direction which
|
|
# corresponds to opening the gripper. When the rotation direction is ambiguous, we arbitrarely rotate clockwise from the point of view
|
|
# of the previous motor in the kinetic chain.
|
|
rotated_position = convert_degrees_to_steps(ROTATED_POSITION_DEGREE, arm.motor_models)
|
|
|
|
# Find drive mode by rotating each motor by a quarter of a turn.
|
|
# Drive mode indicates if the motor rotation direction should be inverted (=1) or not (=0).
|
|
position = arm.read("Present_Position")
|
|
position += homing_offset
|
|
position = _compute_nearest_rounded_position(position, arm.motor_models)
|
|
drive_mode = (position != rotated_position).astype(np.int32)
|
|
|
|
# Re-compute homing offset to take into account drive mode
|
|
position = arm.read("Present_Position")
|
|
position = apply_drive_mode(position, drive_mode)
|
|
position = _compute_nearest_rounded_position(position, arm.motor_models)
|
|
homing_offset = rotated_position - position
|
|
|
|
print("\nMove arm to rest position")
|
|
print("See: " + URL_TEMPLATE.format(robot="koch", arm=arm_type, position="rest"))
|
|
input("Press Enter to continue...")
|
|
print()
|
|
|
|
return homing_offset, drive_mode
|
|
|
|
|
|
########################################################################
|
|
# Alexander Koch robot arm
|
|
########################################################################
|
|
|
|
|
|
@dataclass
|
|
class KochRobotConfig:
|
|
"""
|
|
Example of usage:
|
|
```python
|
|
KochRobotConfig()
|
|
```
|
|
"""
|
|
|
|
# Define all components of the robot
|
|
leader_arms: dict[str, MotorsBus] = field(default_factory=lambda: {})
|
|
follower_arms: dict[str, MotorsBus] = field(default_factory=lambda: {})
|
|
cameras: dict[str, Camera] = field(default_factory=lambda: {})
|
|
|
|
# Optionally limit the magnitude of the relative positional target vector for safety purposes.
|
|
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length
|
|
# as the number of motors in your follower arms (assumes all follower arms have the same number of
|
|
# motors).
|
|
max_relative_target: list[float] | float | None = None
|
|
|
|
# Optionally set the leader arm in torque mode with the gripper motor set to this angle. This makes it
|
|
# possible to squeeze the gripper and have it spring back to an open position on its own. If None, the
|
|
# gripper is not put in torque mode.
|
|
gripper_open_degree: float | None = None
|
|
|
|
def __setattr__(self, prop: str, val):
|
|
if prop == "max_relative_target" and val is not None and isinstance(val, Sequence):
|
|
for name in self.follower_arms:
|
|
if len(self.follower_arms[name].motors) != len(val):
|
|
raise ValueError(
|
|
f"len(max_relative_target)={len(val)} but the follower arm with name {name} has "
|
|
f"{len(self.follower_arms[name].motors)} motors. Please make sure that the "
|
|
f"`max_relative_target` list has as many parameters as there are motors per arm. "
|
|
"Note: This feature does not yet work with robots where different follower arms have "
|
|
"different numbers of motors."
|
|
)
|
|
super().__setattr__(prop, val)
|
|
|
|
|
|
class KochRobot:
|
|
# TODO(rcadene): Implement force feedback
|
|
"""This class allows to control any Koch robot of various number of motors.
|
|
|
|
A few versions are available:
|
|
- [Koch v1.0](https://github.com/AlexanderKoch-Koch/low_cost_robot), with and without the wrist-to-elbow expansion, which was developed
|
|
by Alexander Koch from [Tau Robotics](https://tau-robotics.com): [Github for sourcing and assembly](
|
|
- [Koch v1.1])https://github.com/jess-moss/koch-v1-1), which was developed by Jess Moss.
|
|
|
|
Example of highest frequency teleoperation without camera:
|
|
```python
|
|
# Defines how to communicate with the motors of the leader and follower arms
|
|
leader_arms = {
|
|
"main": DynamixelMotorsBus(
|
|
port="/dev/tty.usbmodem575E0031751",
|
|
motors={
|
|
# name: (index, model)
|
|
"shoulder_pan": (1, "xl330-m077"),
|
|
"shoulder_lift": (2, "xl330-m077"),
|
|
"elbow_flex": (3, "xl330-m077"),
|
|
"wrist_flex": (4, "xl330-m077"),
|
|
"wrist_roll": (5, "xl330-m077"),
|
|
"gripper": (6, "xl330-m077"),
|
|
},
|
|
),
|
|
}
|
|
follower_arms = {
|
|
"main": DynamixelMotorsBus(
|
|
port="/dev/tty.usbmodem575E0032081",
|
|
motors={
|
|
# name: (index, model)
|
|
"shoulder_pan": (1, "xl430-w250"),
|
|
"shoulder_lift": (2, "xl430-w250"),
|
|
"elbow_flex": (3, "xl330-m288"),
|
|
"wrist_flex": (4, "xl330-m288"),
|
|
"wrist_roll": (5, "xl330-m288"),
|
|
"gripper": (6, "xl330-m288"),
|
|
},
|
|
),
|
|
}
|
|
robot = KochRobot(
|
|
leader_arms=leader_arms,
|
|
follower_arms=follower_arms,
|
|
)
|
|
|
|
# Connect motors buses and cameras if any (Required)
|
|
robot.connect()
|
|
|
|
while True:
|
|
robot.teleop_step()
|
|
```
|
|
|
|
Example of highest frequency data collection without camera:
|
|
```python
|
|
# Assumes leader and follower arms have been instantiated already (see first example)
|
|
robot = KochRobot(
|
|
leader_arms=leader_arms,
|
|
follower_arms=follower_arms,
|
|
)
|
|
robot.connect()
|
|
while True:
|
|
observation, action = robot.teleop_step(record_data=True)
|
|
```
|
|
|
|
Example of highest frequency data collection with cameras:
|
|
```python
|
|
# Defines how to communicate with 2 cameras connected to the computer.
|
|
# Here, the webcam of the laptop and the phone (connected in USB to the laptop)
|
|
# can be reached respectively using the camera indices 0 and 1. These indices can be
|
|
# arbitrary. See the documentation of `OpenCVCamera` to find your own camera indices.
|
|
cameras = {
|
|
"laptop": OpenCVCamera(camera_index=0, fps=30, width=640, height=480),
|
|
"phone": OpenCVCamera(camera_index=1, fps=30, width=640, height=480),
|
|
}
|
|
|
|
# Assumes leader and follower arms have been instantiated already (see first example)
|
|
robot = KochRobot(
|
|
leader_arms=leader_arms,
|
|
follower_arms=follower_arms,
|
|
cameras=cameras,
|
|
)
|
|
robot.connect()
|
|
while True:
|
|
observation, action = robot.teleop_step(record_data=True)
|
|
```
|
|
|
|
Example of controlling the robot with a policy (without running multiple policies in parallel to ensure highest frequency):
|
|
```python
|
|
# Assumes leader and follower arms + cameras have been instantiated already (see previous example)
|
|
robot = KochRobot(
|
|
leader_arms=leader_arms,
|
|
follower_arms=follower_arms,
|
|
cameras=cameras,
|
|
)
|
|
robot.connect()
|
|
while True:
|
|
# Uses the follower arms and cameras to capture an observation
|
|
observation = robot.capture_observation()
|
|
|
|
# Assumes a policy has been instantiated
|
|
with torch.inference_mode():
|
|
action = policy.select_action(observation)
|
|
|
|
# Orders the robot to move
|
|
robot.send_action(action)
|
|
```
|
|
|
|
Example of disconnecting which is not mandatory since we disconnect when the object is deleted:
|
|
```python
|
|
robot.disconnect()
|
|
```
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
config: KochRobotConfig | None = None,
|
|
calibration_path: Path = ".cache/calibration/koch.pkl",
|
|
**kwargs,
|
|
):
|
|
if config is None:
|
|
config = KochRobotConfig()
|
|
# Overwrite config arguments using kwargs
|
|
self.config = replace(config, **kwargs)
|
|
self.calibration_path = Path(calibration_path)
|
|
|
|
self.leader_arms = self.config.leader_arms
|
|
self.follower_arms = self.config.follower_arms
|
|
self.cameras = self.config.cameras
|
|
self.is_connected = False
|
|
self.logs = {}
|
|
|
|
def connect(self):
|
|
if self.is_connected:
|
|
raise RobotDeviceAlreadyConnectedError(
|
|
"KochRobot is already connected. Do not run `robot.connect()` twice."
|
|
)
|
|
|
|
if not self.leader_arms and not self.follower_arms and not self.cameras:
|
|
raise ValueError(
|
|
"KochRobot doesn't have any device to connect. See example of usage in docstring of the class."
|
|
)
|
|
|
|
# Connect the arms
|
|
for name in self.follower_arms:
|
|
print(f"Connecting {name} follower arm.")
|
|
self.follower_arms[name].connect()
|
|
print(f"Connecting {name} leader arm.")
|
|
self.leader_arms[name].connect()
|
|
|
|
# Reset the arms and load or run calibration
|
|
if self.calibration_path.exists():
|
|
# Reset all arms before setting calibration
|
|
for name in self.follower_arms:
|
|
reset_torque_mode(self.follower_arms[name])
|
|
for name in self.leader_arms:
|
|
reset_torque_mode(self.leader_arms[name])
|
|
|
|
with open(self.calibration_path, "rb") as f:
|
|
calibration = pickle.load(f)
|
|
else:
|
|
print(f"Missing calibration file '{self.calibration_path}'. Starting calibration precedure.")
|
|
# Run calibration process which begins by reseting all arms
|
|
calibration = self.run_calibration()
|
|
|
|
print(f"Calibration is done! Saving calibration file '{self.calibration_path}'")
|
|
self.calibration_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(self.calibration_path, "wb") as f:
|
|
pickle.dump(calibration, f)
|
|
|
|
# Set calibration
|
|
for name in self.follower_arms:
|
|
self.follower_arms[name].set_calibration(calibration[f"follower_{name}"])
|
|
for name in self.leader_arms:
|
|
self.leader_arms[name].set_calibration(calibration[f"leader_{name}"])
|
|
|
|
# Set better PID values to close the gap between recored states and actions
|
|
# TODO(rcadene): Implement an automatic procedure to set optimial PID values for each motor
|
|
for name in self.follower_arms:
|
|
self.follower_arms[name].write("Position_P_Gain", 1500, "elbow_flex")
|
|
self.follower_arms[name].write("Position_I_Gain", 0, "elbow_flex")
|
|
self.follower_arms[name].write("Position_D_Gain", 600, "elbow_flex")
|
|
|
|
# Enable torque on all motors of the follower arms
|
|
for name in self.follower_arms:
|
|
print(f"Activating torque on {name} follower arm.")
|
|
self.follower_arms[name].write("Torque_Enable", 1)
|
|
|
|
if self.config.gripper_open_degree is not None:
|
|
# Set the leader arm in torque mode with the gripper motor set to an angle. This makes it possible
|
|
# to squeeze the gripper and have it spring back to an open position on its own.
|
|
for name in self.leader_arms:
|
|
self.leader_arms[name].write("Torque_Enable", 1, "gripper")
|
|
self.leader_arms[name].write("Goal_Position", self.config.gripper_open_degree, "gripper")
|
|
|
|
# Connect the cameras
|
|
for name in self.cameras:
|
|
self.cameras[name].connect()
|
|
|
|
self.is_connected = True
|
|
|
|
def run_calibration(self):
|
|
calibration = {}
|
|
|
|
for name in self.follower_arms:
|
|
homing_offset, drive_mode = run_arm_calibration(self.follower_arms[name], name, "follower")
|
|
|
|
calibration[f"follower_{name}"] = {}
|
|
for idx, motor_name in enumerate(self.follower_arms[name].motor_names):
|
|
calibration[f"follower_{name}"][motor_name] = (homing_offset[idx], drive_mode[idx])
|
|
|
|
for name in self.leader_arms:
|
|
homing_offset, drive_mode = run_arm_calibration(self.leader_arms[name], name, "leader")
|
|
|
|
calibration[f"leader_{name}"] = {}
|
|
for idx, motor_name in enumerate(self.leader_arms[name].motor_names):
|
|
calibration[f"leader_{name}"][motor_name] = (homing_offset[idx], drive_mode[idx])
|
|
|
|
return calibration
|
|
|
|
def teleop_step(
|
|
self, record_data=False
|
|
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
|
|
if not self.is_connected:
|
|
raise RobotDeviceNotConnectedError(
|
|
"KochRobot is not connected. You need to run `robot.connect()`."
|
|
)
|
|
|
|
# Prepare to assign the position of the leader to the follower
|
|
leader_pos = {}
|
|
for name in self.leader_arms:
|
|
before_lread_t = time.perf_counter()
|
|
leader_pos[name] = self.leader_arms[name].read("Present_Position")
|
|
self.logs[f"read_leader_{name}_pos_dt_s"] = time.perf_counter() - before_lread_t
|
|
|
|
follower_goal_pos = {}
|
|
for name in self.leader_arms:
|
|
follower_goal_pos[name] = leader_pos[name]
|
|
|
|
# Send action
|
|
for name in self.follower_arms:
|
|
before_fwrite_t = time.perf_counter()
|
|
self.send_action(torch.tensor(follower_goal_pos[name]), [name])
|
|
self.logs[f"write_follower_{name}_goal_pos_dt_s"] = time.perf_counter() - before_fwrite_t
|
|
|
|
# Early exit when recording data is not requested
|
|
if not record_data:
|
|
return
|
|
|
|
# TODO(rcadene): Add velocity and other info
|
|
# Read follower position
|
|
follower_pos = {}
|
|
for name in self.follower_arms:
|
|
before_fread_t = time.perf_counter()
|
|
follower_pos[name] = self.follower_arms[name].read("Present_Position")
|
|
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - before_fread_t
|
|
|
|
# Create state by concatenating follower current position
|
|
state = []
|
|
for name in self.follower_arms:
|
|
if name in follower_pos:
|
|
state.append(follower_pos[name])
|
|
state = np.concatenate(state)
|
|
|
|
# Create action by concatenating follower goal position
|
|
action = []
|
|
for name in self.follower_arms:
|
|
if name in follower_goal_pos:
|
|
action.append(follower_goal_pos[name])
|
|
action = np.concatenate(action)
|
|
|
|
# Capture images from cameras
|
|
images = {}
|
|
for name in self.cameras:
|
|
before_camread_t = time.perf_counter()
|
|
images[name] = self.cameras[name].async_read()
|
|
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
|
|
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
|
|
|
|
# Populate output dictionnaries and format to pytorch
|
|
obs_dict, action_dict = {}, {}
|
|
obs_dict["observation.state"] = torch.from_numpy(state)
|
|
action_dict["action"] = torch.from_numpy(action)
|
|
for name in self.cameras:
|
|
obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name])
|
|
|
|
return obs_dict, action_dict
|
|
|
|
def capture_observation(self):
|
|
"""The returned observations do not have a batch dimension."""
|
|
if not self.is_connected:
|
|
raise RobotDeviceNotConnectedError(
|
|
"KochRobot is not connected. You need to run `robot.connect()`."
|
|
)
|
|
|
|
# Read follower position
|
|
follower_pos = {}
|
|
for name in self.follower_arms:
|
|
before_fread_t = time.perf_counter()
|
|
follower_pos[name] = self.follower_arms[name].read("Present_Position")
|
|
self.logs[f"read_follower_{name}_pos_dt_s"] = time.perf_counter() - before_fread_t
|
|
|
|
# Create state by concatenating follower current position
|
|
state = []
|
|
for name in self.follower_arms:
|
|
if name in follower_pos:
|
|
state.append(follower_pos[name])
|
|
state = np.concatenate(state)
|
|
|
|
# Capture images from cameras
|
|
images = {}
|
|
for name in self.cameras:
|
|
before_camread_t = time.perf_counter()
|
|
images[name] = self.cameras[name].async_read()
|
|
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
|
|
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
|
|
|
|
# Populate output dictionnaries and format to pytorch
|
|
obs_dict = {}
|
|
obs_dict["observation.state"] = torch.from_numpy(state)
|
|
for name in self.cameras:
|
|
obs_dict[f"observation.images.{name}"] = torch.from_numpy(images[name])
|
|
return obs_dict
|
|
|
|
def send_action(self, action: torch.Tensor, follower_names: list[str] | None = None):
|
|
"""Command the follower arms to move to a target joint configuration.
|
|
|
|
The relative action magnitude may be clipped depending on the configuration parameter
|
|
`max_relative_target`.
|
|
|
|
Args:
|
|
action: tensor containing the concatenated joint positions for the follower arms.
|
|
follower_names: Pass follower arm names to only control a subset of all the follower arms.
|
|
"""
|
|
if not self.is_connected:
|
|
raise RobotDeviceNotConnectedError(
|
|
"KochRobot is not connected. You need to run `robot.connect()`."
|
|
)
|
|
|
|
if follower_names is None:
|
|
follower_names = list(self.follower_arms)
|
|
elif not set(follower_names).issubset(self.follower_arms):
|
|
raise ValueError(
|
|
f"You provided {follower_names=} but only the following arms are registered: "
|
|
f"{list(self.follower_arms)}"
|
|
)
|
|
|
|
from_idx = 0
|
|
to_idx = 0
|
|
follower_goal_pos = {}
|
|
for name in follower_names:
|
|
to_idx += len(self.follower_arms[name].motor_names)
|
|
this_action = action[from_idx:to_idx]
|
|
|
|
if self.config.max_relative_target is not None:
|
|
if not isinstance(self.config.max_relative_target, list):
|
|
max_relative_target = [self.config.max_relative_target for _ in range(from_idx, to_idx)]
|
|
max_relative_target = torch.tensor(self.config.max_relative_target)
|
|
# Cap relative action target magnitude for safety.
|
|
current_pos = torch.tensor(self.follower_arms[name].read("Present_Position"))
|
|
diff = this_action - current_pos
|
|
safe_diff = torch.minimum(diff, max_relative_target)
|
|
safe_diff = torch.maximum(safe_diff, -max_relative_target)
|
|
safe_action = current_pos + safe_diff
|
|
if not torch.allclose(safe_action, this_action):
|
|
logging.warning(
|
|
"Relative action magnitude had to be clamped to be safe.\n"
|
|
f" requested relative action target: {diff}\n"
|
|
f" clamped relative action target: {safe_diff}"
|
|
)
|
|
follower_goal_pos[name] = safe_action.numpy()
|
|
else:
|
|
follower_goal_pos[name] = this_action.numpy()
|
|
|
|
from_idx = to_idx
|
|
|
|
for name in self.follower_arms:
|
|
self.follower_arms[name].write("Goal_Position", follower_goal_pos[name].astype(np.int32))
|
|
|
|
def disconnect(self):
|
|
if not self.is_connected:
|
|
raise RobotDeviceNotConnectedError(
|
|
"KochRobot is not connected. You need to run `robot.connect()` before disconnecting."
|
|
)
|
|
|
|
for name in self.follower_arms:
|
|
self.follower_arms[name].disconnect()
|
|
|
|
for name in self.leader_arms:
|
|
self.leader_arms[name].disconnect()
|
|
|
|
for name in self.cameras:
|
|
self.cameras[name].disconnect()
|
|
|
|
self.is_connected = False
|
|
|
|
def __del__(self):
|
|
if getattr(self, "is_connected", False):
|
|
self.disconnect()
|