From eeb8490016263eecf1ba1f9bfc38d2652f541463 Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Wed, 2 Apr 2025 22:33:48 +0200 Subject: [PATCH] Update Koch & SO-100 --- .../common/robots/koch/configuration_koch.py | 2 + lerobot/common/robots/koch/robot_koch.py | 146 +++++++++--------- lerobot/common/robots/robot.py | 11 +- .../robots/so100/configuration_so100.py | 2 + lerobot/common/robots/so100/robot_so100.py | 115 +++++++------- .../common/teleoperators/koch/teleop_koch.py | 112 +++++++------- .../teleoperators/so100/teleop_so100.py | 64 ++++---- lerobot/common/teleoperators/teleoperator.py | 11 +- 8 files changed, 226 insertions(+), 237 deletions(-) diff --git a/lerobot/common/robots/koch/configuration_koch.py b/lerobot/common/robots/koch/configuration_koch.py index 1f465e94..a72bd05f 100644 --- a/lerobot/common/robots/koch/configuration_koch.py +++ b/lerobot/common/robots/koch/configuration_koch.py @@ -11,6 +11,8 @@ class KochRobotConfig(RobotConfig): # Port to connect to the robot port: str + disable_torque_on_disconnect: bool = True + # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as # the number of motors in your follower arms. diff --git a/lerobot/common/robots/koch/robot_koch.py b/lerobot/common/robots/koch/robot_koch.py index e05280e6..72724daf 100644 --- a/lerobot/common/robots/koch/robot_koch.py +++ b/lerobot/common/robots/koch/robot_koch.py @@ -25,13 +25,14 @@ from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.dynamixel import ( DynamixelMotorsBus, OperatingMode, - TorqueMode, ) from ..robot import Robot from ..utils import ensure_safe_goal_position from .configuration_koch import KochRobotConfig +logger = logging.getLogger(__name__) + class KochRobot(Robot): """ @@ -46,8 +47,6 @@ class KochRobot(Robot): def __init__(self, config: KochRobotConfig): super().__init__(config) self.config = config - self.robot_type = config.type - self.arm = DynamixelMotorsBus( port=self.config.port, motors={ @@ -58,11 +57,10 @@ class KochRobot(Robot): "wrist_roll": Motor(5, "xl330-m288", MotorNormMode.RANGE_M100_100), "gripper": Motor(6, "xl330-m288", MotorNormMode.RANGE_0_100), }, + calibration=self.calibration, ) self.cameras = make_cameras_from_configs(config.cameras) - self.logs = {} - @property def state_feature(self) -> dict: return { @@ -92,33 +90,70 @@ class KochRobot(Robot): return self.arm.is_connected def connect(self) -> None: + """ + We assume that at connection time, arm is in a rest position, + and torque can be safely disabled to run calibration. + """ if self.is_connected: - raise DeviceAlreadyConnectedError( - "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." - ) + raise DeviceAlreadyConnectedError(f"{self} already connected") - logging.info("Connecting arm.") self.arm.connect() if not self.is_calibrated: self.calibrate() - self.configure() - - # Connect the cameras for cam in self.cameras.values(): cam.connect() - def configure(self) -> None: + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.arm.is_calibrated + + def calibrate(self) -> None: + logger.info(f"\nRunning calibration of {self}") + self.arm.disable_torque() + for name in self.arm.names: + self.arm.write("Operating_Mode", name, OperatingMode.EXTENDED_POSITION.value) + + input("Move robot to the middle of its range of motion and press ENTER....") + homing_offsets = self.arm.set_half_turn_homings() + + full_turn_motors = ["shoulder_pan", "wrist_roll"] + unknown_range_motors = [name for name in self.arm.names if name not in full_turn_motors] + logger.info( + f"Move all joints except {full_turn_motors} sequentially through their entire " + "ranges of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + for name in full_turn_motors: + range_mins[name] = 0 + range_maxes[name] = 4095 + + self.calibration = {} + for name, motor in self.arm.motors.items(): + self.calibration[name] = MotorCalibration( + id=motor.id, + drive_mode=0, + homing_offset=homing_offsets[name], + range_min=range_mins[name], + range_max=range_maxes[name], + ) + + self.arm.write_calibration(self.calibration) + self._save_calibration() + logger.info(f"Calibration saved to {self.calibration_fpath}") + + def configure(self) -> None: + self.arm.disable_torque() + self.arm.configure_motors() + # Use 'extended position mode' for all motors except gripper, because in joint mode the servos + # can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while + # assembling the arm, you could end up with a servo with a position 0 or 4095 at a crucial + # point for name in self.arm.names: - # Torque must be deactivated to change values in the motors' EEPROM area - # We assume that at configuration time, arm is in a rest position, - # and torque can be safely disabled to run calibration. - self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) if name != "gripper": - # Use 'extended position mode' for all motors except gripper, because in joint mode the servos - # can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while - # assembling the arm, you could end up with a servo with a position 0 or 4095 at a crucial - # point self.arm.write("Operating_Mode", name, OperatingMode.EXTENDED_POSITION.value) # Use 'position control current based' for gripper to be limited by the limit of the current. @@ -133,63 +168,26 @@ class KochRobot(Robot): self.arm.write("Position_P_Gain", "elbow_flex", 1500) self.arm.write("Position_I_Gain", "elbow_flex", 0) self.arm.write("Position_D_Gain", "elbow_flex", 600) - - for name in self.arm.names: - self.arm.write("Torque_Enable", name, TorqueMode.ENABLED.value) - - logging.info("Torque enabled.") - - @property - def is_calibrated(self) -> bool: - motors_calibration = self.arm.get_calibration() - return motors_calibration == self.calibration - - def calibrate(self) -> None: - print(f"\nRunning calibration of {self.id} Koch robot") - for name in self.arm.names: - self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) - self.arm.write("Operating_Mode", name, OperatingMode.EXTENDED_POSITION.value) - - input("Move robot to the middle of its range of motion and press ENTER....") - homing_offsets = self.arm.set_half_turn_homings() - - fixed_range = ["shoulder_pan", "wrist_roll"] - auto_range_motors = [name for name in self.arm.names if name not in fixed_range] - print( - "Move all joints except 'wrist_roll' (id=5) sequentially through their entire ranges of motion." - ) - print("Recording positions. Press ENTER to stop...") - ranges = self.arm.register_ranges_of_motion(auto_range_motors) - for name in fixed_range: - ranges[name] = {"min": 0, "max": 4095} - - self.calibration = {} - for name, motor in self.arm.motors.items(): - self.calibration[name] = MotorCalibration( - id=motor.id, - drive_mode=0, - homing_offset=homing_offsets[name], - range_min=ranges[name]["min"], - range_max=ranges[name]["max"], - ) - - self._save_calibration() - print("Calibration saved to", self.calibration_fpath) + self.arm.enable_torque() def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + obs_dict = {} # Read arm position - before_read_t = time.perf_counter() + start = time.perf_counter() obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") - self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") # Capture images from cameras for cam_key, cam in self.cameras.items(): - before_camread_t = time.perf_counter() + start = time.perf_counter() obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read() - self.logs[f"read_camera_{cam_key}_dt_s"] = cam.logs["delta_timestamp_s"] - self.logs[f"async_read_camera_{cam_key}_dt_s"] = time.perf_counter() - before_camread_t + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") return obs_dict @@ -206,6 +204,9 @@ class KochRobot(Robot): Returns: dict[str, float]: The action sent to the motors, potentially clipped. """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + goal_pos = action # Cap goal position when too far away from present position. @@ -217,19 +218,16 @@ class KochRobot(Robot): # Send goal position to the arm self.arm.sync_write("Goal_Position", goal_pos) - return goal_pos - def print_logs(self): - # TODO(aliberts): move robot-specific logs logic here - pass - def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError( "ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting." ) - self.arm.disconnect() + self.arm.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() + + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/robots/robot.py b/lerobot/common/robots/robot.py index 7ae67ba5..a7ec4eda 100644 --- a/lerobot/common/robots/robot.py +++ b/lerobot/common/robots/robot.py @@ -31,6 +31,9 @@ class Robot(abc.ABC): if self.calibration_fpath.is_file(): self._load_calibration() + def __str__(self) -> str: + return f"{self.id} {self.__class__.__name__}" + # TODO(aliberts): create a proper Feature class for this that links with datasets @abc.abstractproperty def state_feature(self) -> dict: @@ -53,10 +56,6 @@ class Robot(abc.ABC): """Connects to the robot.""" pass - @abc.abstractmethod - def configure(self) -> None: - pass - @abc.abstractproperty def is_calibrated(self) -> bool: pass @@ -76,6 +75,10 @@ class Robot(abc.ABC): with open(fpath, "w") as f, draccus.config_type("json"): draccus.dump(self.calibration, f, indent=4) + @abc.abstractmethod + def configure(self) -> None: + pass + @abc.abstractmethod def get_observation(self) -> dict[str, Any]: """Gets observation from the robot.""" diff --git a/lerobot/common/robots/so100/configuration_so100.py b/lerobot/common/robots/so100/configuration_so100.py index 969f8060..e6cb9c75 100644 --- a/lerobot/common/robots/so100/configuration_so100.py +++ b/lerobot/common/robots/so100/configuration_so100.py @@ -11,6 +11,8 @@ class SO100RobotConfig(RobotConfig): # Port to connect to the robot port: str + disable_torque_on_disconnect: bool = True + # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as # the number of motors in your follower arms. diff --git a/lerobot/common/robots/so100/robot_so100.py b/lerobot/common/robots/so100/robot_so100.py index 78273723..e7222a9d 100644 --- a/lerobot/common/robots/so100/robot_so100.py +++ b/lerobot/common/robots/so100/robot_so100.py @@ -25,13 +25,14 @@ from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.feetech import ( FeetechMotorsBus, OperatingMode, - TorqueMode, ) from ..robot import Robot from ..utils import ensure_safe_goal_position from .configuration_so100 import SO100RobotConfig +logger = logging.getLogger(__name__) + class SO100Robot(Robot): """ @@ -44,9 +45,6 @@ class SO100Robot(Robot): def __init__(self, config: SO100RobotConfig): super().__init__(config) self.config = config - self.robot_type = config.type - self.logs = {} - self.arm = FeetechMotorsBus( port=self.config.port, motors={ @@ -89,65 +87,46 @@ class SO100Robot(Robot): return self.arm.is_connected def connect(self) -> None: + """ + We assume that at connection time, arm is in a rest position, + and torque can be safely disabled to run calibration. + """ if self.is_connected: - raise DeviceAlreadyConnectedError( - "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." - ) + raise DeviceAlreadyConnectedError(f"{self} already connected") - logging.info("Connecting arm.") self.arm.connect() if not self.is_calibrated: self.calibrate() - self.configure() - # Connect the cameras for cam in self.cameras.values(): cam.connect() - def configure(self) -> None: - for name in self.arm.names: - # We assume that at connection time, arm is in a rest position, - # and torque can be safely disabled to run calibration. - self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) - self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value) - - # Set P_Coefficient to lower value to avoid shakiness (Default is 32) - self.arm.write("P_Coefficient", name, 16) - # Set I_Coefficient and D_Coefficient to default value 0 and 32 - self.arm.write("I_Coefficient", name, 0) - self.arm.write("D_Coefficient", name, 32) - # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of - # the motors. Note: this configuration is not in the official STS3215 Memory Table - self.arm.write("Maximum_Acceleration", name, 254) - self.arm.write("Acceleration", name, 254) - - for name in self.arm.names: - self.arm.write("Torque_Enable", name, TorqueMode.ENABLED.value) - - logging.info("Torque activated.") + self.configure() + logger.info(f"{self} connected.") @property def is_calibrated(self) -> bool: - motors_calibration = self.arm.get_calibration() - return motors_calibration == self.calibration + return self.arm.is_calibrated def calibrate(self) -> None: - print(f"\nRunning calibration of {self.id} SO-100 robot") + logger.info(f"\nRunning calibration of {self}") + self.arm.disable_torque() for name in self.arm.names: - self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value) input("Move robot to the middle of its range of motion and press ENTER....") homing_offsets = self.arm.set_half_turn_homings() - print( - "Move all joints except 'wrist_roll' (id=5) sequentially through their entire ranges of motion." + full_turn_motor = "wrist_roll" + unknown_range_motors = [name for name in self.arm.names if name != full_turn_motor] + logger.info( + f"Move all joints except '{full_turn_motor}' sequentially through their " + "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) - print("Recording positions. Press ENTER to stop...") - auto_range_motors = [name for name in self.arm.names if name != "wrist_roll"] - ranges = self.arm.register_ranges_of_motion(auto_range_motors) - ranges["wrist_roll"] = {"min": 0, "max": 4095} + range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins[full_turn_motor] = 0 + range_maxes[full_turn_motor] = 4095 self.calibration = {} for name, motor in self.arm.motors.items(): @@ -155,33 +134,49 @@ class SO100Robot(Robot): id=motor.id, drive_mode=0, homing_offset=homing_offsets[name], - range_min=ranges[name]["min"], - range_max=ranges[name]["max"], + range_min=range_mins[name], + range_max=range_maxes[name], ) + self.arm.write_calibration(self.calibration) self._save_calibration() print("Calibration saved to", self.calibration_fpath) + def configure(self) -> None: + self.arm.disable_torque() + self.arm.configure_motors() + for name in self.arm.names: + self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value) + # Set P_Coefficient to lower value to avoid shakiness (Default is 32) + self.arm.write("P_Coefficient", name, 16) + # Set I_Coefficient and D_Coefficient to default value 0 and 32 + self.arm.write("I_Coefficient", name, 0) + self.arm.write("D_Coefficient", name, 32) + # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of + # the motors. Note: this address is not in the official STS3215 Memory Table + self.arm.write("Maximum_Acceleration", name, 254) + self.arm.write("Acceleration", name, 254) + + self.arm.enable_torque() + def get_observation(self) -> dict[str, Any]: - """The returned observations do not have a batch dimension.""" if not self.is_connected: - raise DeviceNotConnectedError( - "ManipulatorRobot is not connected. You need to run `robot.connect()`." - ) + raise DeviceNotConnectedError(f"{self} is not connected.") obs_dict = {} # Read arm position - before_read_t = time.perf_counter() + start = time.perf_counter() obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") - self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") # Capture images from cameras for cam_key, cam in self.cameras.items(): - before_camread_t = time.perf_counter() + start = time.perf_counter() obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read() - self.logs[f"read_camera_{cam_key}_dt_s"] = cam.logs["delta_timestamp_s"] - self.logs[f"async_read_camera_{cam_key}_dt_s"] = time.perf_counter() - before_camread_t + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") return obs_dict @@ -199,9 +194,7 @@ class SO100Robot(Robot): the action sent to the motors, potentially clipped. """ if not self.is_connected: - raise DeviceNotConnectedError( - "ManipulatorRobot is not connected. You need to run `robot.connect()`." - ) + raise DeviceNotConnectedError(f"{self} is not connected.") goal_pos = action @@ -209,23 +202,21 @@ class SO100Robot(Robot): # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: present_pos = self.arm.sync_read("Present_Position") - goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target) + goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} + goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) # Send goal position to the arm self.arm.sync_write("Goal_Position", goal_pos) - return goal_pos - def print_logs(self): - # TODO(aliberts): move robot-specific logs logic here - pass - def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError( "ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting." ) - self.arm.disconnect() + self.arm.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() + + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/teleoperators/koch/teleop_koch.py b/lerobot/common/teleoperators/koch/teleop_koch.py index 1d49ea10..e0cfc89e 100644 --- a/lerobot/common/teleoperators/koch/teleop_koch.py +++ b/lerobot/common/teleoperators/koch/teleop_koch.py @@ -23,12 +23,13 @@ from lerobot.common.motors.dynamixel import ( DriveMode, DynamixelMotorsBus, OperatingMode, - TorqueMode, ) from ..teleoperator import Teleoperator from .configuration_koch import KochTeleopConfig +logger = logging.getLogger(__name__) + class KochTeleop(Teleoperator): """ @@ -43,8 +44,6 @@ class KochTeleop(Teleoperator): def __init__(self, config: KochTeleopConfig): super().__init__(config) self.config = config - self.robot_type = config.type - self.arm = DynamixelMotorsBus( port=self.config.port, motors={ @@ -55,10 +54,9 @@ class KochTeleop(Teleoperator): "wrist_roll": Motor(5, "xl330-m077", MotorNormMode.RANGE_M100_100), "gripper": Motor(6, "xl330-m077", MotorNormMode.RANGE_0_100), }, + calibration=self.calibration, ) - self.logs = {} - @property def action_feature(self) -> dict: return { @@ -77,23 +75,60 @@ class KochTeleop(Teleoperator): def connect(self) -> None: if self.is_connected: - raise DeviceAlreadyConnectedError( - "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." - ) + raise DeviceAlreadyConnectedError(f"{self} already connected") - logging.info("Connecting arm.") self.arm.connect() if not self.is_calibrated: self.calibrate() self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.arm.is_calibrated + + def calibrate(self) -> None: + logger.info(f"\nRunning calibration of {self}") + self.arm.disable_torque() + for name in self.arm.names: + self.arm.write("Operating_Mode", name, OperatingMode.EXTENDED_POSITION.value) + + self.arm.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value) + drive_modes = {name: 1 if name == "elbow_flex" else 0 for name in self.arm.names} + + input("Move robot to the middle of its range of motion and press ENTER....") + homing_offsets = self.arm.set_half_turn_homings() + + full_turn_motors = ["shoulder_pan", "wrist_roll"] + unknown_range_motors = [name for name in self.arm.names if name not in full_turn_motors] + logger.info( + f"Move all joints except {full_turn_motors} sequentially through their " + "entire ranges of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + for name in full_turn_motors: + range_mins[name] = 0 + range_maxes[name] = 4095 + + self.calibration = {} + for name, motor in self.arm.motors.items(): + self.calibration[name] = MotorCalibration( + id=motor.id, + drive_mode=drive_modes[name], + homing_offset=homing_offsets[name], + range_min=range_mins[name], + range_max=range_maxes[name], + ) + + self.arm.write_calibration(self.calibration) + self._save_calibration() + logger.info(f"Calibration saved to {self.calibration_fpath}") def configure(self) -> None: + self.arm.disable_torque() + self.arm.configure_motors() for name in self.arm.names: - # Torque must be deactivated to change values in the motors' EEPROM area - # We assume that at configuration time, arm is in a rest position, - # and torque can be safely disabled to run calibration. - self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) if name != "gripper": # Use 'extended position mode' for all motors except gripper, because in joint mode the servos # can't rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while @@ -108,56 +143,14 @@ class KochTeleop(Teleoperator): # to make it move, and it will move back to its original target position when we release the force. self.arm.write("Operating_Mode", "gripper", OperatingMode.CURRENT_POSITION.value) # Set gripper's goal pos in current position mode so that we can use it as a trigger. - self.arm.write("Torque_Enable", "gripper", TorqueMode.ENABLED.value) + self.arm.enable_torque("gripper") self.arm.write("Goal_Position", "gripper", self.config.gripper_open_pos) - @property - def is_calibrated(self) -> bool: - motors_calibration = self.arm.get_calibration() - return motors_calibration == self.calibration - - def calibrate(self) -> None: - print(f"\nRunning calibration of {self.id} Koch teleop") - for name in self.arm.names: - self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) - self.arm.write("Operating_Mode", name, OperatingMode.EXTENDED_POSITION.value) - - self.arm.write("Drive_Mode", "elbow_flex", DriveMode.INVERTED.value) - drive_modes = {name: 1 if name == "elbow_flex" else 0 for name in self.arm.names} - - input("Move robot to the middle of its range of motion and press ENTER....") - homing_offsets = self.arm.set_half_turn_homings() - - fixed_range = ["shoulder_pan", "wrist_roll"] - auto_range_motors = [name for name in self.arm.names if name not in fixed_range] - print( - "Move all joints except 'wrist_roll' (id=5) sequentially through their entire ranges of motion." - ) - print("Recording positions. Press ENTER to stop...") - ranges = self.arm.register_ranges_of_motion(auto_range_motors) - for name in fixed_range: - ranges[name] = {"min": 0, "max": 4095} - - self.calibration = {} - for name, motor in self.arm.motors.items(): - self.calibration[name] = MotorCalibration( - id=motor.id, - drive_mode=drive_modes[name], - homing_offset=homing_offsets[name], - range_min=ranges[name]["min"], - range_max=ranges[name]["max"], - ) - - self._save_calibration() - print("Calibration saved to", self.calibration_fpath) - def get_action(self) -> dict[str, float]: - """The returned action does not have a batch dimension.""" - # Read arm position - start_time = time.perf_counter() + start = time.perf_counter() action = self.arm.sync_read("Present_Position") - self.logs["read_pos_dt_s"] = time.perf_counter() - start_time - + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read action: {dt_ms:.1f}ms") return action def send_feedback(self, feedback: dict[str, float]) -> None: @@ -171,3 +164,4 @@ class KochTeleop(Teleoperator): ) self.arm.disconnect() + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/teleoperators/so100/teleop_so100.py b/lerobot/common/teleoperators/so100/teleop_so100.py index 75e56d15..e42c48d3 100644 --- a/lerobot/common/teleoperators/so100/teleop_so100.py +++ b/lerobot/common/teleoperators/so100/teleop_so100.py @@ -22,12 +22,13 @@ from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.feetech import ( FeetechMotorsBus, OperatingMode, - TorqueMode, ) from ..teleoperator import Teleoperator from .configuration_so100 import SO100TeleopConfig +logger = logging.getLogger(__name__) + class SO100Teleop(Teleoperator): """ @@ -40,8 +41,6 @@ class SO100Teleop(Teleoperator): def __init__(self, config: SO100TeleopConfig): super().__init__(config) self.config = config - self.robot_type = config.type - self.arm = FeetechMotorsBus( port=self.config.port, motors={ @@ -54,8 +53,6 @@ class SO100Teleop(Teleoperator): }, ) - self.logs = {} - @property def action_feature(self) -> dict: return { @@ -74,44 +71,37 @@ class SO100Teleop(Teleoperator): def connect(self) -> None: if self.is_connected: - raise DeviceAlreadyConnectedError( - "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." - ) + raise DeviceAlreadyConnectedError(f"{self} already connected") - logging.info("Connecting arm.") self.arm.connect() if not self.is_calibrated: self.calibrate() self.configure() - - def configure(self) -> None: - # We assume that at connection time, arm is in a rest position, - # and torque can be safely disabled to run calibration. - for name in self.arm.names: - self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) + logger.info(f"{self} connected.") @property def is_calibrated(self) -> bool: - motors_calibration = self.arm.get_calibration() - return motors_calibration == self.calibration + return self.arm.is_calibrated def calibrate(self) -> None: - print(f"\nRunning calibration of {self.id} SO-100 teleop") + logger.info(f"\nRunning calibration of {self}") + self.arm.disable_torque() for name in self.arm.names: - self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value) input("Move robot to the middle of its range of motion and press ENTER....") homing_offsets = self.arm.set_half_turn_homings() - print( - "Move all joints except 'wrist_roll' (id=5) sequentially through their entire ranges of motion." + full_turn_motor = "wrist_roll" + unknown_range_motors = [name for name in self.arm.names if name != full_turn_motor] + logger.info( + f"Move all joints except '{full_turn_motor}' sequentially through their " + "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) - print("Recording positions. Press ENTER to stop...") - auto_range_motors = [name for name in self.arm.names if name != "wrist_roll"] - ranges = self.arm.register_ranges_of_motion(auto_range_motors) - ranges["wrist_roll"] = {"min": 0, "max": 4095} + range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins[full_turn_motor] = 0 + range_maxes[full_turn_motor] = 4095 self.calibration = {} for name, motor in self.arm.motors.items(): @@ -119,25 +109,30 @@ class SO100Teleop(Teleoperator): id=motor.id, drive_mode=0, homing_offset=homing_offsets[name], - range_min=ranges[name]["min"], - range_max=ranges[name]["max"], + range_min=range_mins[name], + range_max=range_maxes[name], ) + self.arm.write_calibration(self.calibration) self._save_calibration() - print("Calibration saved to", self.calibration_fpath) + logger.info(f"Calibration saved to {self.calibration_fpath}") + + def configure(self) -> None: + self.arm.disable_torque() + self.arm.configure_motors() + for name in self.arm.names: + self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value) def get_action(self) -> dict[str, float]: - """The returned action does not have a batch dimension.""" - # Read arm position - before_read_t = time.perf_counter() + start = time.perf_counter() action = self.arm.sync_read("Present_Position") - self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t - + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read action: {dt_ms:.1f}ms") return action def send_feedback(self, feedback: dict[str, float]) -> None: # TODO(rcadene, aliberts): Implement force feedback - pass + raise NotImplementedError def disconnect(self) -> None: if not self.is_connected: @@ -146,3 +141,4 @@ class SO100Teleop(Teleoperator): ) self.arm.disconnect() + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/teleoperators/teleoperator.py b/lerobot/common/teleoperators/teleoperator.py index 7267f27a..d6285f5c 100644 --- a/lerobot/common/teleoperators/teleoperator.py +++ b/lerobot/common/teleoperators/teleoperator.py @@ -30,6 +30,9 @@ class Teleoperator(abc.ABC): if self.calibration_fpath.is_file(): self._load_calibration() + def __str__(self) -> str: + return f"{self.id} {self.__class__.__name__}" + @abc.abstractproperty def action_feature(self) -> dict: pass @@ -47,10 +50,6 @@ class Teleoperator(abc.ABC): """Connects to the teleoperator.""" pass - @abc.abstractmethod - def configure(self) -> None: - pass - @abc.abstractproperty def is_calibrated(self) -> bool: pass @@ -70,6 +69,10 @@ class Teleoperator(abc.ABC): with open(fpath, "w") as f, draccus.config_type("json"): draccus.dump(self.calibration, f, indent=4) + @abc.abstractmethod + def configure(self) -> None: + pass + @abc.abstractmethod def get_action(self) -> dict[str, Any]: """Gets the action to send to a teleoperator."""