diff --git a/lerobot/common/robots/so100/robot_so100.py b/lerobot/common/robots/so100/robot_so100.py index 13229709..ccbe9dad 100644 --- a/lerobot/common/robots/so100/robot_so100.py +++ b/lerobot/common/robots/so100/robot_so100.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import logging import time @@ -23,11 +22,11 @@ import numpy as np from lerobot.common.cameras.utils import make_cameras_from_configs from lerobot.common.constants import OBS_IMAGES, OBS_STATE from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -from lerobot.common.motors import TorqueMode +from lerobot.common.motors import CalibrationMode, Motor, TorqueMode from lerobot.common.motors.feetech import ( FeetechMotorsBus, + OperatingMode, apply_feetech_offsets_from_calibration, - run_full_arm_calibration, ) from ..robot import Robot @@ -47,23 +46,21 @@ class SO100Robot(Robot): super().__init__(config) self.config = config self.robot_type = config.type + self.logs = {} self.arm = FeetechMotorsBus( port=self.config.port, motors={ - "shoulder_pan": (1, "sts3215"), - "shoulder_lift": (2, "sts3215"), - "elbow_flex": (3, "sts3215"), - "wrist_flex": (4, "sts3215"), - "wrist_roll": (5, "sts3215"), - "gripper": (6, "sts3215"), + "shoulder_pan": Motor(1, "sts3215", CalibrationMode.RANGE_M100_100), + "shoulder_lift": Motor(2, "sts3215", CalibrationMode.RANGE_M100_100), + "elbow_flex": Motor(3, "sts3215", CalibrationMode.RANGE_M100_100), + "wrist_flex": Motor(4, "sts3215", CalibrationMode.RANGE_M100_100), + "wrist_roll": Motor(5, "sts3215", CalibrationMode.RANGE_M100_100), + "gripper": Motor(6, "sts3215", CalibrationMode.RANGE_0_100), }, ) self.cameras = make_cameras_from_configs(config.cameras) - self.is_connected = False - self.logs = {} - @property def state_feature(self) -> dict: return { @@ -87,6 +84,38 @@ class SO100Robot(Robot): } return cam_ft + def configure(self) -> None: + for name in self.arm.names: + # We assume that at connection time, arm is in a rest position, + # and torque can be safely disabled to run calibration. + self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) + self.arm.write("Mode", name, OperatingMode.POSITION.value) + + # Set P_Coefficient to lower value to avoid shakiness (Default is 32) + self.arm.write("P_Coefficient", name, 16) + # Set I_Coefficient and D_Coefficient to default value 0 and 32 + self.arm.write("I_Coefficient", name, 0) + self.arm.write("D_Coefficient", name, 32) + # Close the write lock so that Maximum_Acceleration gets written to EPROM address, + # which is mandatory for Maximum_Acceleration to take effect after rebooting. + self.arm.write("Lock", name, 0) + # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of + # the motors. Note: this configuration is not in the official STS3215 Memory Table + self.arm.write("Maximum_Acceleration", name, 254) + self.arm.write("Acceleration", name, 254) + + self.calibrate() + + for name in self.arm.names: + self.arm.write("Torque_Enable", name, TorqueMode.ENABLED.value) + + logging.info("Torque activated.") + + @property + def is_connected(self) -> bool: + # TODO(aliberts): add cam.is_connected for cam in self.cameras + return self.arm.is_connected + def connect(self) -> None: if self.is_connected: raise DeviceAlreadyConnectedError( @@ -95,59 +124,25 @@ class SO100Robot(Robot): logging.info("Connecting arm.") self.arm.connect() - - # We assume that at connection time, arm is in a rest position, - # and torque can be safely disabled to run calibration. - self.arm.write("Torque_Enable", TorqueMode.DISABLED.value) - self.calibrate() - - # Mode=0 for Position Control - self.arm.write("Mode", 0) - # Set P_Coefficient to lower value to avoid shakiness (Default is 32) - self.arm.write("P_Coefficient", 16) - # Set I_Coefficient and D_Coefficient to default value 0 and 32 - self.arm.write("I_Coefficient", 0) - self.arm.write("D_Coefficient", 32) - # Close the write lock so that Maximum_Acceleration gets written to EPROM address, - # which is mandatory for Maximum_Acceleration to take effect after rebooting. - self.arm.write("Lock", 0) - # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of - # the motors. Note: this configuration is not in the official STS3215 Memory Table - self.arm.write("Maximum_Acceleration", 254) - self.arm.write("Acceleration", 254) - - logging.info("Activating torque.") - self.arm.write("Torque_Enable", TorqueMode.ENABLED.value) + self.configure() # Check arm can be read - self.arm.read("Present_Position") + self.arm.sync_read("Present_Position") # Connect the cameras for cam in self.cameras.values(): cam.connect() - self.is_connected = True - def calibrate(self) -> None: - """After calibration all motors function in human interpretable ranges. - Rotations are expressed in degrees in nominal range of [-180, 180], - and linear motions (like gripper of Aloha) in nominal range of [0, 100]. - """ - if self.calibration_fpath.exists(): - with open(self.calibration_fpath) as f: - calibration = json.load(f) - else: - # TODO(rcadene): display a warning in __init__ if calibration file not available - logging.info(f"Missing calibration file '{self.calibration_fpath}'") - calibration = run_full_arm_calibration(self.arm, self.robot_type, self.name, "follower") + raise NotImplementedError - logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'") - self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True) - with open(self.calibration_fpath, "w") as f: - json.dump(calibration, f) + def set_calibration(self) -> None: + if not self.calibration_fpath.exists(): + logging.error("Calibration file not found. Please run calibration first") + raise FileNotFoundError(self.calibration_fpath) - self.arm.set_calibration(calibration) - apply_feetech_offsets_from_calibration(self.arm, calibration) + self.arm.set_calibration(self.calibration_fpath) + apply_feetech_offsets_from_calibration(self.arm, self.arm.calibration) def get_observation(self) -> dict[str, np.ndarray]: """The returned observations do not have a batch dimension.""" @@ -160,7 +155,7 @@ class SO100Robot(Robot): # Read arm position before_read_t = time.perf_counter() - obs_dict[OBS_STATE] = self.arm.read("Present_Position") + obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t # Capture images from cameras @@ -198,11 +193,11 @@ class SO100Robot(Robot): # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: - present_pos = self.arm.read("Present_Position") + present_pos = self.arm.sync_read("Present_Position") goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target) # Send goal position to the arm - self.arm.write("Goal_Position", goal_pos.astype(np.int32)) + self.arm.sync_write("Goal_Position", goal_pos.astype(np.int32)) return goal_pos @@ -219,5 +214,3 @@ class SO100Robot(Robot): self.arm.disconnect() for cam in self.cameras.values(): cam.disconnect() - - self.is_connected = False diff --git a/lerobot/common/teleoperators/so100/teleop_so100.py b/lerobot/common/teleoperators/so100/teleop_so100.py index 90af62a8..0e02b313 100644 --- a/lerobot/common/teleoperators/so100/teleop_so100.py +++ b/lerobot/common/teleoperators/so100/teleop_so100.py @@ -14,18 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import logging import time import numpy as np from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -from lerobot.common.motors import TorqueMode +from lerobot.common.motors import CalibrationMode, Motor, TorqueMode from lerobot.common.motors.feetech import ( FeetechMotorsBus, apply_feetech_offsets_from_calibration, - run_full_arm_calibration, ) from ..teleoperator import Teleoperator @@ -48,16 +46,15 @@ class SO100Teleop(Teleoperator): self.arm = FeetechMotorsBus( port=self.config.port, motors={ - "shoulder_pan": (1, "sts3215"), - "shoulder_lift": (2, "sts3215"), - "elbow_flex": (3, "sts3215"), - "wrist_flex": (4, "sts3215"), - "wrist_roll": (5, "sts3215"), - "gripper": (6, "sts3215"), + "shoulder_pan": Motor(1, "sts3215", CalibrationMode.RANGE_M100_100), + "shoulder_lift": Motor(2, "sts3215", CalibrationMode.RANGE_M100_100), + "elbow_flex": Motor(3, "sts3215", CalibrationMode.RANGE_M100_100), + "wrist_flex": Motor(4, "sts3215", CalibrationMode.RANGE_M100_100), + "wrist_roll": Motor(5, "sts3215", CalibrationMode.RANGE_M100_100), + "gripper": Motor(6, "sts3215", CalibrationMode.RANGE_0_100), }, ) - self.is_connected = False self.logs = {} @property @@ -72,6 +69,16 @@ class SO100Teleop(Teleoperator): def feedback_feature(self) -> dict: return {} + def configure(self) -> None: + # We assume that at connection time, arm is in a rest position, + # and torque can be safely disabled to run calibration. + for name in self.arm.names: + self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value) + + @property + def is_connected(self) -> bool: + return self.arm.is_connected + def connect(self) -> None: if self.is_connected: raise DeviceAlreadyConnectedError( @@ -80,43 +87,32 @@ class SO100Teleop(Teleoperator): logging.info("Connecting arm.") self.arm.connect() - - # We assume that at connection time, arm is in a rest position, - # and torque can be safely disabled to run calibration. - self.arm.write("Torque_Enable", TorqueMode.DISABLED.value) + self.configure() self.calibrate() # Check arm can be read - self.arm.read("Present_Position") - - self.is_connected = True + self.arm.sync_read("Present_Position") def calibrate(self) -> None: + raise NotImplementedError + + def set_calibration(self) -> None: """After calibration all motors function in human interpretable ranges. Rotations are expressed in degrees in nominal range of [-180, 180], and linear motions (like gripper of Aloha) in nominal range of [0, 100]. """ - if self.calibration_fpath.exists(): - with open(self.calibration_fpath) as f: - calibration = json.load(f) - else: - # TODO(rcadene): display a warning in __init__ if calibration file not available - logging.info(f"Missing calibration file '{self.calibration_fpath}'") - calibration = run_full_arm_calibration(self.arm, self.robot_type, self.name, "leader") + if not self.calibration_fpath.exists(): + logging.error("Calibration file not found. Please run calibration first") + raise FileNotFoundError(self.calibration_fpath) - logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'") - self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True) - with open(self.calibration_fpath, "w") as f: - json.dump(calibration, f) - - self.arm.set_calibration(calibration) - apply_feetech_offsets_from_calibration(self.arm, calibration) + self.arm.set_calibration(self.calibration_fpath) + apply_feetech_offsets_from_calibration(self.arm, self.arm.calibration) def get_action(self) -> np.ndarray: """The returned action does not have a batch dimension.""" # Read arm position before_read_t = time.perf_counter() - action = self.arm.read("Present_Position") + action = self.arm.sync_read("Present_Position") self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t return action @@ -132,4 +128,3 @@ class SO100Teleop(Teleoperator): ) self.arm.disconnect() - self.is_connected = False