From 79293800f184381f47f07946f5717c64660d7d07 Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Mon, 31 Mar 2025 18:18:27 +0200 Subject: [PATCH] Implement Koch calibration --- lerobot/common/robots/koch/robot_koch.py | 103 +++++++++--------- .../teleoperators/koch/configuration_koch.py | 6 +- .../common/teleoperators/koch/teleop_koch.py | 103 ++++++++++-------- 3 files changed, 114 insertions(+), 98 deletions(-) diff --git a/lerobot/common/robots/koch/robot_koch.py b/lerobot/common/robots/koch/robot_koch.py index 06e21b1c..e05280e6 100644 --- a/lerobot/common/robots/koch/robot_koch.py +++ b/lerobot/common/robots/koch/robot_koch.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import logging import time from typing import Any @@ -22,12 +21,11 @@ from typing import Any from lerobot.common.cameras.utils import make_cameras_from_configs from lerobot.common.constants import OBS_IMAGES, OBS_STATE from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -from lerobot.common.motors import Motor, MotorNormMode +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.dynamixel import ( DynamixelMotorsBus, OperatingMode, TorqueMode, - run_arm_calibration, ) from ..robot import Robot @@ -63,7 +61,6 @@ class KochRobot(Robot): ) self.cameras = make_cameras_from_configs(config.cameras) - self.is_connected = False self.logs = {} @property @@ -89,6 +86,28 @@ class KochRobot(Robot): } return cam_ft + @property + def is_connected(self) -> bool: + # TODO(aliberts): add cam.is_connected for cam in self.cameras + return self.arm.is_connected + + def connect(self) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError( + "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." + ) + + logging.info("Connecting arm.") + self.arm.connect() + if not self.is_calibrated: + self.calibrate() + + self.configure() + + # Connect the cameras + for cam in self.cameras.values(): + cam.connect() + def configure(self) -> None: for name in self.arm.names: # Torque must be deactivated to change values in the motors' EEPROM area @@ -120,54 +139,42 @@ class KochRobot(Robot): logging.info("Torque enabled.") - # Move gripper to 45 degrees so that we can use it as a trigger. - self.arm.write("Goal_Position", "gripper", self.config.gripper_open_degree) - @property - def is_connected(self) -> bool: - # TODO(aliberts): add cam.is_connected for cam in self.cameras - return self.arm.is_connected - - def connect(self) -> None: - if self.is_connected: - raise DeviceAlreadyConnectedError( - "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." - ) - - logging.info("Connecting arm.") - self.arm.connect() - self.configure() - # self.calibrate() # TODO - - # Check arm can be read - self.arm.sync_read("Present_Position") - - # Connect the cameras - for cam in self.cameras.values(): - cam.connect() - - self.is_connected = True + def is_calibrated(self) -> bool: + motors_calibration = self.arm.get_calibration() + return motors_calibration == self.calibration def calibrate(self) -> None: - # TODO(pepijn): Do calibration in same way as so100 - """After calibration all motors function in human interpretable ranges. - Rotations are expressed in degrees in nominal range of [-180, 180], - and linear motions (like gripper of Aloha) in nominal range of [0, 100]. - """ - if self.calibration_fpath.exists(): - with open(self.calibration_fpath) as f: - calibration = json.load(f) - else: - # TODO(rcadene): display a warning in __init__ if calibration file not available - logging.info(f"Missing calibration file '{self.calibration_fpath}'") - calibration = run_arm_calibration(self.arm, self.robot_type, self.name, "follower") + print(f"\nRunning calibration of {self.id} Koch robot") + for name in self.arm.names: + self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) + self.arm.write("Operating_Mode", name, OperatingMode.EXTENDED_POSITION.value) - logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'") - self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True) - with open(self.calibration_fpath, "w") as f: - json.dump(calibration, f) + input("Move robot to the middle of its range of motion and press ENTER....") + homing_offsets = self.arm.set_half_turn_homings() - self.arm.set_calibration(calibration) + fixed_range = ["shoulder_pan", "wrist_roll"] + auto_range_motors = [name for name in self.arm.names if name not in fixed_range] + print( + "Move all joints except 'wrist_roll' (id=5) sequentially through their entire ranges of motion." + ) + print("Recording positions. Press ENTER to stop...") + ranges = self.arm.register_ranges_of_motion(auto_range_motors) + for name in fixed_range: + ranges[name] = {"min": 0, "max": 4095} + + self.calibration = {} + for name, motor in self.arm.motors.items(): + self.calibration[name] = MotorCalibration( + id=motor.id, + drive_mode=0, + homing_offset=homing_offsets[name], + range_min=ranges[name]["min"], + range_max=ranges[name]["max"], + ) + + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) def get_observation(self) -> dict[str, Any]: obs_dict = {} @@ -226,5 +233,3 @@ class KochRobot(Robot): self.arm.disconnect() for cam in self.cameras.values(): cam.disconnect() - - self.is_connected = False diff --git a/lerobot/common/teleoperators/koch/configuration_koch.py b/lerobot/common/teleoperators/koch/configuration_koch.py index 2c6a2969..cf95f547 100644 --- a/lerobot/common/teleoperators/koch/configuration_koch.py +++ b/lerobot/common/teleoperators/koch/configuration_koch.py @@ -25,6 +25,6 @@ class KochTeleopConfig(TeleoperatorConfig): # Port to connect to the teloperator port: str - # Sets the arm in torque mode with the gripper motor set to this angle. This makes it possible - # to squeeze the gripper and have it spring back to an open position on its own. - gripper_open_degree: float = 35.156 + # Sets the arm in torque mode with the gripper motor set to this value. This makes it possible to squeeze + # the gripper and have it spring back to an open position on its own. + gripper_open_pos: float = 50.0 diff --git a/lerobot/common/teleoperators/koch/teleop_koch.py b/lerobot/common/teleoperators/koch/teleop_koch.py index 1b1506bd..1d49ea10 100644 --- a/lerobot/common/teleoperators/koch/teleop_koch.py +++ b/lerobot/common/teleoperators/koch/teleop_koch.py @@ -14,19 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import logging import time -import numpy as np - from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -from lerobot.common.motors import Motor, MotorNormMode +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.dynamixel import ( + DriveMode, DynamixelMotorsBus, OperatingMode, TorqueMode, - run_arm_calibration, ) from ..teleoperator import Teleoperator @@ -74,6 +71,23 @@ class KochTeleop(Teleoperator): def feedback_feature(self) -> dict: return {} + @property + def is_connected(self) -> bool: + return self.arm.is_connected + + def connect(self) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError( + "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." + ) + + logging.info("Connecting arm.") + self.arm.connect() + if not self.is_calibrated: + self.calibrate() + + self.configure() + def configure(self) -> None: for name in self.arm.names: # Torque must be deactivated to change values in the motors' EEPROM area @@ -93,52 +107,49 @@ class KochTeleop(Teleoperator): # For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger # to make it move, and it will move back to its original target position when we release the force. self.arm.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value) - - for name in self.arm.names: - self.arm.write("Torque_Enable", name, TorqueMode.ENABLED.value) - - logging.info("Torque enabled.") - - # Move gripper to 45 degrees so that we can use it as a trigger. - self.arm.write("Goal_Position", "gripper", self.config.gripper_open_degree) + # Set gripper's goal pos in current position mode so that we can use it as a trigger. + self.arm.write("Torque_Enable", "gripper", TorqueMode.ENABLED.value) + self.arm.write("Goal_Position", "gripper", self.config.gripper_open_pos) @property - def is_connected(self) -> bool: - return self.arm.is_connected - - def connect(self) -> None: - if self.is_connected: - raise DeviceAlreadyConnectedError( - "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." - ) - - logging.info("Connecting arm.") - self.arm.connect() - self.configure() - # self.calibrate() # TODO - - # Check arm can be read - self.arm.sync_read("Present_Position") + def is_calibrated(self) -> bool: + motors_calibration = self.arm.get_calibration() + return motors_calibration == self.calibration def calibrate(self) -> None: - """After calibration all motors function in human interpretable ranges. - Rotations are expressed in degrees in nominal range of [-180, 180], - and linear motions (like gripper of Aloha) in nominal range of [0, 100]. - """ - if self.calibration_fpath.exists(): - with open(self.calibration_fpath) as f: - calibration = json.load(f) - else: - # TODO(rcadene): display a warning in __init__ if calibration file not available - logging.info(f"Missing calibration file '{self.calibration_fpath}'") - calibration = run_arm_calibration(self.arm, self.robot_type, self.name, "leader") + print(f"\nRunning calibration of {self.id} Koch teleop") + for name in self.arm.names: + self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) + self.arm.write("Operating_Mode", name, OperatingMode.EXTENDED_POSITION.value) - logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'") - self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True) - with open(self.calibration_fpath, "w") as f: - json.dump(calibration, f) + self.arm.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value) + drive_modes = {name: 1 if name == "elbow_flex" else 0 for name in self.arm.names} - self.arm.set_calibration(calibration) + input("Move robot to the middle of its range of motion and press ENTER....") + homing_offsets = self.arm.set_half_turn_homings() + + fixed_range = ["shoulder_pan", "wrist_roll"] + auto_range_motors = [name for name in self.arm.names if name not in fixed_range] + print( + "Move all joints except 'wrist_roll' (id=5) sequentially through their entire ranges of motion." + ) + print("Recording positions. Press ENTER to stop...") + ranges = self.arm.register_ranges_of_motion(auto_range_motors) + for name in fixed_range: + ranges[name] = {"min": 0, "max": 4095} + + self.calibration = {} + for name, motor in self.arm.motors.items(): + self.calibration[name] = MotorCalibration( + id=motor.id, + drive_mode=drive_modes[name], + homing_offset=homing_offsets[name], + range_min=ranges[name]["min"], + range_max=ranges[name]["max"], + ) + + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) def get_action(self) -> dict[str, float]: """The returned action does not have a batch dimension.""" @@ -149,7 +160,7 @@ class KochTeleop(Teleoperator): return action - def send_feedback(self, feedback: np.ndarray) -> None: + def send_feedback(self, feedback: dict[str, float]) -> None: # TODO(rcadene, aliberts): Implement force feedback raise NotImplementedError