diff --git a/lerobot/common/teleoperators/widowx/configuration_widowx.py b/lerobot/common/teleoperators/widowx/configuration_widowx.py new file mode 100644 index 00000000..133237e2 --- /dev/null +++ b/lerobot/common/teleoperators/widowx/configuration_widowx.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from ..config import TeleoperatorConfig + + +@TeleoperatorConfig.register_subclass("widowx") +@dataclass +class WidowXTeleopConfig(TeleoperatorConfig): + port: str # Port to connect to the teloperator + mock: bool = False + + # /!\ FOR SAFETY, READ THIS /!\ + # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. + # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as + # the number of motors in your follower arms. + # For Aloha, for every goal position request, motor rotations are capped at 5 degrees by default. + # When you feel more confident with teleoperation or running the policy, you can extend + # this safety limit and even removing it by setting it to `null`. + # Also, everything is expected to work safely out-of-the-box, but we highly advise to + # first try to teleoperate the grippers only (by commenting out the rest of the motors in this yaml), + # then to gradually add more motors (by uncommenting), until you can teleoperate both arms fully + max_relative_target: int | None = 5 + + # motors + waist: tuple = (1, "xm430-w350") + shoulder: tuple = (2, "xm430-w350") + shoulder_shadow: tuple = (3, "xm430-w350") + elbow: tuple = (4, "xm430-w350") + elbow_shadow: tuple = (5, "xm430-w350") + forearm_roll: tuple = (6, "xm430-w350") + wrist_angle: tuple = (7, "xm430-w350") + wrist_rotate: tuple = (8, "xl430-w250") + gripper: tuple = (9, "xc430-w150") diff --git a/lerobot/common/teleoperators/widowx/teleop_widowx.py b/lerobot/common/teleoperators/widowx/teleop_widowx.py new file mode 100644 index 00000000..c7bcc18b --- /dev/null +++ b/lerobot/common/teleoperators/widowx/teleop_widowx.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import time + +import numpy as np + +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors.dynamixel import ( + DynamixelMotorsBus, + TorqueMode, + run_arm_calibration, +) + +from ..teleoperator import Teleoperator +from .configuration_widowx import WidowXTeleopConfig + + +class WidowXTeleop(Teleoperator): + """ + [WidowX](https://www.trossenrobotics.com/widowx-250) developed by Trossen Robotics + """ + + config_class = WidowXTeleopConfig + name = "widowx" + + def __init__(self, config: WidowXTeleopConfig): + super().__init__(config) + self.config = config + self.robot_type = config.type + self.id = config.id + + self.arm = DynamixelMotorsBus( + port=self.config.port, + motors={ + "waist": config.waist, + "shoulder": config.shoulder, + "shoulder_shadow": config.shoulder_shadow, + "elbow": config.elbow, + "elbow_shadow": config.elbow_shadow, + "forearm_roll": config.forearm_roll, + "wrist_angle": config.wrist_angle, + "wrist_rotate": config.wrist_rotate, + "gripper": config.gripper, + }, + ) + + self.is_connected = False + self.logs = {} + + @property + def action_feature(self) -> dict: + return { + "dtype": "float32", + "shape": (len(self.arm),), + "names": {"motors": list(self.arm.motors)}, + } + + @property + def feedback_feature(self) -> dict: + return {} + + def _set_shadow_motors(self): + """ + Set secondary/shadow ID for shoulder and elbow. These joints have two motors. + As a result, if only one of them is required to move to a certain position, + the other will follow. This is to avoid breaking the motors. + """ + shoulder_idx = self.config.shoulder[0] + self.arm.write("Secondary_ID", shoulder_idx, "shoulder_shadow") + + elbow_idx = self.config.elbow[0] + self.arm.write("Secondary_ID", elbow_idx, "elbow_shadow") + + def connect(self): + if self.is_connected: + raise DeviceAlreadyConnectedError( + "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." + ) + + logging.info("Connecting arm.") + self.arm.connect() + + # We assume that at connection time, arm is in a rest position, + # and torque can be safely disabled to run calibration. + self.arm.write("Torque_Enable", TorqueMode.DISABLED.value) + self.calibrate() + + self._set_shadow_motors() + + logging.info("Activating torque.") + self.arm.write("Torque_Enable", TorqueMode.ENABLED.value) + + # Check arm can be read + self.arm.read("Present_Position") + + # Connect the cameras + for cam in self.cameras.values(): + cam.connect() + + self.is_connected = True + + def calibrate(self) -> None: + """After calibration all motors function in human interpretable ranges. + Rotations are expressed in degrees in nominal range of [-180, 180], + and linear motions (like gripper of Aloha) in nominal range of [0, 100]. + """ + arm_calib_path = self.calibration_dir / f"{self.id}.json" + + if arm_calib_path.exists(): + with open(arm_calib_path) as f: + calibration = json.load(f) + else: + # TODO(rcadene): display a warning in __init__ if calibration file not available + logging.info(f"Missing calibration file '{arm_calib_path}'") + calibration = run_arm_calibration(self.arm, self.robot_type, self.name, "leader") + + logging.info(f"Calibration is done! Saving calibration file '{arm_calib_path}'") + arm_calib_path.parent.mkdir(parents=True, exist_ok=True) + with open(arm_calib_path, "w") as f: + json.dump(calibration, f) + + self.arm.set_calibration(calibration) + + def get_action(self) -> np.ndarray: + """The returned action does not have a batch dimension.""" + # Read arm position + before_read_t = time.perf_counter() + action = self.arm.read("Present_Position") + self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t + + return action + + def send_feedback(self, feedback: np.ndarray) -> None: + # TODO(rcadene, aliberts): Implement force feedback + pass + + def disconnect(self) -> None: + if not self.is_connected: + raise DeviceNotConnectedError( + "ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting." + ) + + self.arm.disconnect() + self.is_connected = False