diff --git a/lerobot/common/robots/viperx/configuration_viperx.py b/lerobot/common/robots/viperx/configuration_viperx.py index 186f0ad6..e6c68b18 100644 --- a/lerobot/common/robots/viperx/configuration_viperx.py +++ b/lerobot/common/robots/viperx/configuration_viperx.py @@ -8,6 +8,10 @@ from ..config import RobotConfig @RobotConfig.register_subclass("viperx") @dataclass class ViperXRobotConfig(RobotConfig): + port: str # Port to connect to the arm + + disable_torque_on_disconnect: bool = True + # /!\ FOR SAFETY, READ THIS /!\ # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as diff --git a/lerobot/common/robots/viperx/robot_viperx.py b/lerobot/common/robots/viperx/robot_viperx.py index 5471a312..fc11b603 100644 --- a/lerobot/common/robots/viperx/robot_viperx.py +++ b/lerobot/common/robots/viperx/robot_viperx.py @@ -4,25 +4,25 @@ and send orders to its motors. # TODO(rcadene, aliberts): reorganize the codebase into one file per robot, with the associated # calibration procedure, to make it easy for people to add their own robot. -import json import logging import time - -import numpy as np +from typing import Any from lerobot.common.cameras.utils import make_cameras_from_configs from lerobot.common.constants import OBS_IMAGES, OBS_STATE from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -from lerobot.common.motors import Motor, MotorNormMode, TorqueMode +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.dynamixel import ( DynamixelMotorsBus, - run_arm_calibration, + OperatingMode, ) from ..robot import Robot from ..utils import ensure_safe_goal_position from .configuration_viperx import ViperXRobotConfig +logger = logging.getLogger(__name__) + class ViperXRobot(Robot): """ @@ -38,8 +38,6 @@ class ViperXRobot(Robot): ): super().__init__(config) self.config = config - self.robot_type = config.type - self.arm = DynamixelMotorsBus( port=self.config.port, motors={ @@ -56,9 +54,6 @@ class ViperXRobot(Robot): ) self.cameras = make_cameras_from_configs(config.cameras) - self.is_connected = False - self.logs = {} - @property def state_feature(self) -> dict: return { @@ -83,111 +78,119 @@ class ViperXRobot(Robot): } return cam_ft - def _set_shadow_motors(self): - """ - Set secondary/shadow ID for shoulder and elbow. These joints have two motors. - As a result, if only one of them is required to move to a certain position, - the other will follow. This is to avoid breaking the motors. - """ - shoulder_idx = self.config.shoulder[0] - self.arm.write("Secondary_ID", shoulder_idx, "shoulder_shadow") + @property + def is_connected(self) -> bool: + # TODO(aliberts): add cam.is_connected for cam in self.cameras + return self.arm.is_connected - elbow_idx = self.config.elbow[0] - self.arm.write("Secondary_ID", elbow_idx, "elbow_shadow") - - def connect(self): + def connect(self) -> None: + """ + We assume that at connection time, arm is in a rest position, + and torque can be safely disabled to run calibration. + """ if self.is_connected: - raise DeviceAlreadyConnectedError( - "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.arm.connect() + if not self.is_calibrated: + self.calibrate() + + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.arm.is_calibrated + + def calibrate(self) -> None: + raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch + logger.info(f"\nRunning calibration of {self}") + self.arm.disable_torque() + for name in self.arm.names: + self.arm.write("Operating_Mode", name, OperatingMode.EXTENDED_POSITION.value) + + input("Move robot to the middle of its range of motion and press ENTER....") + homing_offsets = self.arm.set_half_turn_homings() + + full_turn_motors = ["shoulder_pan", "wrist_roll"] + unknown_range_motors = [name for name in self.arm.names if name not in full_turn_motors] + logger.info( + f"Move all joints except {full_turn_motors} sequentially through their entire " + "ranges of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + for name in full_turn_motors: + range_mins[name] = 0 + range_maxes[name] = 4095 + + self.calibration = {} + for name, motor in self.arm.motors.items(): + self.calibration[name] = MotorCalibration( + id=motor.id, + drive_mode=0, + homing_offset=homing_offsets[name], + range_min=range_mins[name], + range_max=range_maxes[name], ) - logging.info("Connecting arm.") - self.arm.connect() + self.arm.write_calibration(self.calibration) + self._save_calibration() + logger.info(f"Calibration saved to {self.calibration_fpath}") - # We assume that at connection time, arm is in a rest position, - # and torque can be safely disabled to run calibration. - self.arm.write("Torque_Enable", TorqueMode.DISABLED.value) - self.calibrate() + def configure(self) -> None: + self.arm.disable_torque() + self.arm.configure_motors() - self._set_shadow_motors() + # Set secondary/shadow ID for shoulder and elbow. These joints have two motors. + # As a result, if only one of them is required to move to a certain position, + # the other will follow. This is to avoid breaking the motors. + self.arm.write("Secondary_ID", "shoulder_shadow", 2) + self.arm.write("Secondary_ID", "elbow_shadow", 4) # Set a velocity limit of 131 as advised by Trossen Robotics + # TODO(aliberts): remove as it's actually useless in position control self.arm.write("Velocity_Limit", 131) # Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't # rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm, - # you could end up with a servo with a position 0 or 4095 at a crucial point See [ - # https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11] - all_motors_except_gripper = [name for name in self.arm.motor_names if name != "gripper"] - if len(all_motors_except_gripper) > 0: - # 4 corresponds to Extended Position on Aloha motors - self.arm.write("Operating_Mode", 4, all_motors_except_gripper) + # you could end up with a servo with a position 0 or 4095 at a crucial point. See: + # https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11 + for name in self.arm.names: + if name != "gripper": + self.arm.write("Operating_Mode", name, OperatingMode.EXTENDED_POSITION.value) # Use 'position control current based' for follower gripper to be limited by the limit of the current. - # It can grasp an object without forcing too much even tho, - # it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch). - # 5 corresponds to Current Controlled Position on Aloha gripper follower "xm430-w350" - self.arm.write("Operating_Mode", 5, "gripper") + # It can grasp an object without forcing too much even tho, it's goal position is a complete grasp + # (both gripper fingers are ordered to join and reach a touch). + self.arm.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value) + self.arm.enable_torque() - # Note: We can't enable torque on the leader gripper since "xc430-w150" doesn't have - # a Current Controlled Position mode. - - logging.info("Activating torque.") - self.arm.write("Torque_Enable", TorqueMode.ENABLED.value) - - # Check arm can be read - self.arm.read("Present_Position") - - # Connect the cameras - for cam in self.cameras.values(): - cam.connect() - - self.is_connected = True - - def calibrate(self): - """After calibration all motors function in human interpretable ranges. - Rotations are expressed in degrees in nominal range of [-180, 180], - and linear motions (like gripper of Aloha) in nominal range of [0, 100]. - """ - if self.calibration_fpath.exists(): - with open(self.calibration_fpath) as f: - calibration = json.load(f) - else: - # TODO(rcadene): display a warning in __init__ if calibration file not available - logging.info(f"Missing calibration file '{self.calibration_fpath}'") - calibration = run_arm_calibration(self.arm, self.robot_type, self.name, "follower") - - logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'") - self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True) - with open(self.calibration_fpath, "w") as f: - json.dump(calibration, f) - - self.arm.set_calibration(calibration) - - def get_observation(self) -> dict[str, np.ndarray]: + def get_observation(self) -> dict[str, Any]: """The returned observations do not have a batch dimension.""" if not self.is_connected: - raise DeviceNotConnectedError( - "ManipulatorRobot is not connected. You need to run `robot.connect()`." - ) + raise DeviceNotConnectedError(f"{self} is not connected.") obs_dict = {} # Read arm position - before_read_t = time.perf_counter() - obs_dict[OBS_STATE] = self.arm.read("Present_Position") - self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t + start = time.perf_counter() + obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") # Capture images from cameras for cam_key, cam in self.cameras.items(): - before_camread_t = time.perf_counter() + start = time.perf_counter() obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read() - self.logs[f"read_camera_{cam_key}_dt_s"] = cam.logs["delta_timestamp_s"] - self.logs[f"async_read_camera_{cam_key}_dt_s"] = time.perf_counter() - before_camread_t + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") return obs_dict - def send_action(self, action: np.ndarray) -> np.ndarray: + def send_action(self, action: dict[str, float]) -> dict[str, float]: """Command arm to move to a target joint configuration. The relative action magnitude may be clipped depending on the configuration parameter @@ -195,44 +198,33 @@ class ViperXRobot(Robot): Thus, this function always returns the action actually sent. Args: - action (np.ndarray): array containing the goal positions for the motors. - - Raises: - RobotDeviceNotConnectedError: if robot is not connected. + action (dict[str, float]): The goal positions for the motors. Returns: - np.ndarray: the action sent to the motors, potentially clipped. + dict[str, float]: The action sent to the motors, potentially clipped. """ if not self.is_connected: - raise DeviceNotConnectedError( - "ManipulatorRobot is not connected. You need to run `robot.connect()`." - ) + raise DeviceNotConnectedError(f"{self} is not connected.") goal_pos = action # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: - present_pos = self.arm.read("Present_Position") - goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target) + present_pos = self.arm.sync_read("Present_Position") + goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} + goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) # Send goal position to the arm - self.arm.write("Goal_Position", goal_pos.astype(np.int32)) - + self.arm.sync_write("Goal_Position", goal_pos) return goal_pos - def print_logs(self): - # TODO(aliberts): move robot-specific logs logic here - pass - def disconnect(self): if not self.is_connected: - raise DeviceNotConnectedError( - "ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting." - ) + raise DeviceNotConnectedError(f"{self} is not connected.") - self.arm.disconnect() + self.arm.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() - self.is_connected = False + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/teleoperators/widowx/configuration_widowx.py b/lerobot/common/teleoperators/widowx/configuration_widowx.py index 896405e9..6aa5810b 100644 --- a/lerobot/common/teleoperators/widowx/configuration_widowx.py +++ b/lerobot/common/teleoperators/widowx/configuration_widowx.py @@ -22,17 +22,4 @@ from ..config import TeleoperatorConfig @TeleoperatorConfig.register_subclass("widowx") @dataclass class WidowXTeleopConfig(TeleoperatorConfig): - port: str # Port to connect to the teloperator - mock: bool = False - - # /!\ FOR SAFETY, READ THIS /!\ - # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. - # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as - # the number of motors in your follower arms. - # For Aloha, for every goal position request, motor rotations are capped at 5 degrees by default. - # When you feel more confident with teleoperation or running the policy, you can extend - # this safety limit and even removing it by setting it to `null`. - # Also, everything is expected to work safely out-of-the-box, but we highly advise to - # first try to teleoperate the grippers only (by commenting out the rest of the motors in this yaml), - # then to gradually add more motors (by uncommenting), until you can teleoperate both arms fully - max_relative_target: int | None = 5 + port: str # Port to connect to the arm diff --git a/lerobot/common/teleoperators/widowx/teleop_widowx.py b/lerobot/common/teleoperators/widowx/teleop_widowx.py index c1aca4a9..b9997abb 100644 --- a/lerobot/common/teleoperators/widowx/teleop_widowx.py +++ b/lerobot/common/teleoperators/widowx/teleop_widowx.py @@ -14,22 +14,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import logging import time -import numpy as np - from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -from lerobot.common.motors import Motor, MotorNormMode, TorqueMode +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.dynamixel import ( + DriveMode, DynamixelMotorsBus, - run_arm_calibration, + OperatingMode, ) from ..teleoperator import Teleoperator from .configuration_widowx import WidowXTeleopConfig +logger = logging.getLogger(__name__) + class WidowXTeleop(Teleoperator): """ @@ -42,8 +42,6 @@ class WidowXTeleop(Teleoperator): def __init__(self, config: WidowXTeleopConfig): super().__init__(config) self.config = config - self.robot_type = config.type - self.arm = DynamixelMotorsBus( port=self.config.port, motors={ @@ -59,9 +57,6 @@ class WidowXTeleop(Teleoperator): }, ) - self.is_connected = False - self.logs = {} - @property def action_feature(self) -> dict: return { @@ -74,84 +69,85 @@ class WidowXTeleop(Teleoperator): def feedback_feature(self) -> dict: return {} - def _set_shadow_motors(self): - """ - Set secondary/shadow ID for shoulder and elbow. These joints have two motors. - As a result, if only one of them is required to move to a certain position, - the other will follow. This is to avoid breaking the motors. - """ - shoulder_idx = self.config.shoulder[0] - self.arm.write("Secondary_ID", shoulder_idx, "shoulder_shadow") - - elbow_idx = self.config.elbow[0] - self.arm.write("Secondary_ID", elbow_idx, "elbow_shadow") + @property + def is_connected(self) -> bool: + return self.arm.is_connected def connect(self): if self.is_connected: - raise DeviceAlreadyConnectedError( - "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." - ) + raise DeviceAlreadyConnectedError(f"{self} already connected") - logging.info("Connecting arm.") self.arm.connect() + if not self.is_calibrated: + self.calibrate() - # We assume that at connection time, arm is in a rest position, - # and torque can be safely disabled to run calibration. - self.arm.write("Torque_Enable", TorqueMode.DISABLED.value) - self.calibrate() - - self._set_shadow_motors() - - logging.info("Activating torque.") - self.arm.write("Torque_Enable", TorqueMode.ENABLED.value) - - # Check arm can be read - self.arm.read("Present_Position") - - # Connect the cameras - for cam in self.cameras.values(): - cam.connect() - - self.is_connected = True + self.configure() + logger.info(f"{self} connected.") def calibrate(self) -> None: - """After calibration all motors function in human interpretable ranges. - Rotations are expressed in degrees in nominal range of [-180, 180], - and linear motions (like gripper of Aloha) in nominal range of [0, 100]. - """ - if self.calibration_fpath.exists(): - with open(self.calibration_fpath) as f: - calibration = json.load(f) - else: - # TODO(rcadene): display a warning in __init__ if calibration file not available - logging.info(f"Missing calibration file '{self.calibration_fpath}'") - calibration = run_arm_calibration(self.arm, self.robot_type, self.name, "leader") + raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch) + logger.info(f"\nRunning calibration of {self}") + self.arm.disable_torque() + for name in self.arm.names: + self.arm.write("Operating_Mode", name, OperatingMode.EXTENDED_POSITION.value) - logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'") - self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True) - with open(self.calibration_fpath, "w") as f: - json.dump(calibration, f) + self.arm.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value) + drive_modes = {name: 1 if name == "elbow_flex" else 0 for name in self.arm.names} - self.arm.set_calibration(calibration) + input("Move robot to the middle of its range of motion and press ENTER....") + homing_offsets = self.arm.set_half_turn_homings() - def get_action(self) -> np.ndarray: - """The returned action does not have a batch dimension.""" - # Read arm position - before_read_t = time.perf_counter() + full_turn_motors = ["shoulder_pan", "wrist_roll"] + unknown_range_motors = [name for name in self.arm.names if name not in full_turn_motors] + logger.info( + f"Move all joints except {full_turn_motors} sequentially through their " + "entire ranges of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + for name in full_turn_motors: + range_mins[name] = 0 + range_maxes[name] = 4095 + + self.calibration = {} + for name, motor in self.arm.motors.items(): + self.calibration[name] = MotorCalibration( + id=motor.id, + drive_mode=drive_modes[name], + homing_offset=homing_offsets[name], + range_min=range_mins[name], + range_max=range_maxes[name], + ) + + self.arm.write_calibration(self.calibration) + self._save_calibration() + logger.info(f"Calibration saved to {self.calibration_fpath}") + + def configure(self) -> None: + self.arm.disable_torque() + self.arm.configure_motors() + + # Set secondary/shadow ID for shoulder and elbow. These joints have two motors. + # As a result, if only one of them is required to move to a certain position, + # the other will follow. This is to avoid breaking the motors. + self.arm.write("Secondary_ID", "shoulder_shadow", 2) + self.arm.write("Secondary_ID", "elbow_shadow", 4) + + def get_action(self) -> dict[str, float]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + start = time.perf_counter() action = self.arm.read("Present_Position") - self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t - + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read action: {dt_ms:.1f}ms") return action - def send_feedback(self, feedback: np.ndarray) -> None: - # TODO(rcadene, aliberts): Implement force feedback - pass + def send_feedback(self, feedback: dict[str, float]) -> None: + raise NotImplementedError def disconnect(self) -> None: if not self.is_connected: - raise DeviceNotConnectedError( - "ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting." - ) + raise DeviceNotConnectedError(f"{self} is not connected.") self.arm.disconnect() - self.is_connected = False + logger.info(f"{self} disconnected.")