Update so100

This commit is contained in:
Simon Alibert 2025-03-23 20:15:47 +01:00
parent 039b437ef0
commit 30fcd3d417
2 changed files with 81 additions and 93 deletions

View File

@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import logging import logging
import time import time
@ -23,11 +22,11 @@ import numpy as np
from lerobot.common.cameras.utils import make_cameras_from_configs from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import TorqueMode from lerobot.common.motors import CalibrationMode, Motor, TorqueMode
from lerobot.common.motors.feetech import ( from lerobot.common.motors.feetech import (
FeetechMotorsBus, FeetechMotorsBus,
OperatingMode,
apply_feetech_offsets_from_calibration, apply_feetech_offsets_from_calibration,
run_full_arm_calibration,
) )
from ..robot import Robot from ..robot import Robot
@ -47,23 +46,21 @@ class SO100Robot(Robot):
super().__init__(config) super().__init__(config)
self.config = config self.config = config
self.robot_type = config.type self.robot_type = config.type
self.logs = {}
self.arm = FeetechMotorsBus( self.arm = FeetechMotorsBus(
port=self.config.port, port=self.config.port,
motors={ motors={
"shoulder_pan": (1, "sts3215"), "shoulder_pan": Motor(1, "sts3215", CalibrationMode.RANGE_M100_100),
"shoulder_lift": (2, "sts3215"), "shoulder_lift": Motor(2, "sts3215", CalibrationMode.RANGE_M100_100),
"elbow_flex": (3, "sts3215"), "elbow_flex": Motor(3, "sts3215", CalibrationMode.RANGE_M100_100),
"wrist_flex": (4, "sts3215"), "wrist_flex": Motor(4, "sts3215", CalibrationMode.RANGE_M100_100),
"wrist_roll": (5, "sts3215"), "wrist_roll": Motor(5, "sts3215", CalibrationMode.RANGE_M100_100),
"gripper": (6, "sts3215"), "gripper": Motor(6, "sts3215", CalibrationMode.RANGE_0_100),
}, },
) )
self.cameras = make_cameras_from_configs(config.cameras) self.cameras = make_cameras_from_configs(config.cameras)
self.is_connected = False
self.logs = {}
@property @property
def state_feature(self) -> dict: def state_feature(self) -> dict:
return { return {
@ -87,6 +84,38 @@ class SO100Robot(Robot):
} }
return cam_ft return cam_ft
def configure(self) -> None:
for name in self.arm.names:
# We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration.
self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value)
self.arm.write("Mode", name, OperatingMode.POSITION.value)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.arm.write("P_Coefficient", name, 16)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.arm.write("I_Coefficient", name, 0)
self.arm.write("D_Coefficient", name, 32)
# Close the write lock so that Maximum_Acceleration gets written to EPROM address,
# which is mandatory for Maximum_Acceleration to take effect after rebooting.
self.arm.write("Lock", name, 0)
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table
self.arm.write("Maximum_Acceleration", name, 254)
self.arm.write("Acceleration", name, 254)
self.calibrate()
for name in self.arm.names:
self.arm.write("Torque_Enable", name, TorqueMode.ENABLED.value)
logging.info("Torque activated.")
@property
def is_connected(self) -> bool:
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected
def connect(self) -> None: def connect(self) -> None:
if self.is_connected: if self.is_connected:
raise DeviceAlreadyConnectedError( raise DeviceAlreadyConnectedError(
@ -95,59 +124,25 @@ class SO100Robot(Robot):
logging.info("Connecting arm.") logging.info("Connecting arm.")
self.arm.connect() self.arm.connect()
self.configure()
# We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration.
self.arm.write("Torque_Enable", TorqueMode.DISABLED.value)
self.calibrate()
# Mode=0 for Position Control
self.arm.write("Mode", 0)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.arm.write("P_Coefficient", 16)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.arm.write("I_Coefficient", 0)
self.arm.write("D_Coefficient", 32)
# Close the write lock so that Maximum_Acceleration gets written to EPROM address,
# which is mandatory for Maximum_Acceleration to take effect after rebooting.
self.arm.write("Lock", 0)
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table
self.arm.write("Maximum_Acceleration", 254)
self.arm.write("Acceleration", 254)
logging.info("Activating torque.")
self.arm.write("Torque_Enable", TorqueMode.ENABLED.value)
# Check arm can be read # Check arm can be read
self.arm.read("Present_Position") self.arm.sync_read("Present_Position")
# Connect the cameras # Connect the cameras
for cam in self.cameras.values(): for cam in self.cameras.values():
cam.connect() cam.connect()
self.is_connected = True
def calibrate(self) -> None: def calibrate(self) -> None:
"""After calibration all motors function in human interpretable ranges. raise NotImplementedError
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
if self.calibration_fpath.exists():
with open(self.calibration_fpath) as f:
calibration = json.load(f)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
logging.info(f"Missing calibration file '{self.calibration_fpath}'")
calibration = run_full_arm_calibration(self.arm, self.robot_type, self.name, "follower")
logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'") def set_calibration(self) -> None:
self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True) if not self.calibration_fpath.exists():
with open(self.calibration_fpath, "w") as f: logging.error("Calibration file not found. Please run calibration first")
json.dump(calibration, f) raise FileNotFoundError(self.calibration_fpath)
self.arm.set_calibration(calibration) self.arm.set_calibration(self.calibration_fpath)
apply_feetech_offsets_from_calibration(self.arm, calibration) apply_feetech_offsets_from_calibration(self.arm, self.arm.calibration)
def get_observation(self) -> dict[str, np.ndarray]: def get_observation(self) -> dict[str, np.ndarray]:
"""The returned observations do not have a batch dimension.""" """The returned observations do not have a batch dimension."""
@ -160,7 +155,7 @@ class SO100Robot(Robot):
# Read arm position # Read arm position
before_read_t = time.perf_counter() before_read_t = time.perf_counter()
obs_dict[OBS_STATE] = self.arm.read("Present_Position") obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position")
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
# Capture images from cameras # Capture images from cameras
@ -198,11 +193,11 @@ class SO100Robot(Robot):
# Cap goal position when too far away from present position. # Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower. # /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None: if self.config.max_relative_target is not None:
present_pos = self.arm.read("Present_Position") present_pos = self.arm.sync_read("Present_Position")
goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target) goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target)
# Send goal position to the arm # Send goal position to the arm
self.arm.write("Goal_Position", goal_pos.astype(np.int32)) self.arm.sync_write("Goal_Position", goal_pos.astype(np.int32))
return goal_pos return goal_pos
@ -219,5 +214,3 @@ class SO100Robot(Robot):
self.arm.disconnect() self.arm.disconnect()
for cam in self.cameras.values(): for cam in self.cameras.values():
cam.disconnect() cam.disconnect()
self.is_connected = False

View File

@ -14,18 +14,16 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import logging import logging
import time import time
import numpy as np import numpy as np
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import TorqueMode from lerobot.common.motors import CalibrationMode, Motor, TorqueMode
from lerobot.common.motors.feetech import ( from lerobot.common.motors.feetech import (
FeetechMotorsBus, FeetechMotorsBus,
apply_feetech_offsets_from_calibration, apply_feetech_offsets_from_calibration,
run_full_arm_calibration,
) )
from ..teleoperator import Teleoperator from ..teleoperator import Teleoperator
@ -48,16 +46,15 @@ class SO100Teleop(Teleoperator):
self.arm = FeetechMotorsBus( self.arm = FeetechMotorsBus(
port=self.config.port, port=self.config.port,
motors={ motors={
"shoulder_pan": (1, "sts3215"), "shoulder_pan": Motor(1, "sts3215", CalibrationMode.RANGE_M100_100),
"shoulder_lift": (2, "sts3215"), "shoulder_lift": Motor(2, "sts3215", CalibrationMode.RANGE_M100_100),
"elbow_flex": (3, "sts3215"), "elbow_flex": Motor(3, "sts3215", CalibrationMode.RANGE_M100_100),
"wrist_flex": (4, "sts3215"), "wrist_flex": Motor(4, "sts3215", CalibrationMode.RANGE_M100_100),
"wrist_roll": (5, "sts3215"), "wrist_roll": Motor(5, "sts3215", CalibrationMode.RANGE_M100_100),
"gripper": (6, "sts3215"), "gripper": Motor(6, "sts3215", CalibrationMode.RANGE_0_100),
}, },
) )
self.is_connected = False
self.logs = {} self.logs = {}
@property @property
@ -72,6 +69,16 @@ class SO100Teleop(Teleoperator):
def feedback_feature(self) -> dict: def feedback_feature(self) -> dict:
return {} return {}
def configure(self) -> None:
# We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration.
for name in self.arm.names:
self.arm.write("Torque_Enable", name, TorqueMode.DISABLED.value)
@property
def is_connected(self) -> bool:
return self.arm.is_connected
def connect(self) -> None: def connect(self) -> None:
if self.is_connected: if self.is_connected:
raise DeviceAlreadyConnectedError( raise DeviceAlreadyConnectedError(
@ -80,43 +87,32 @@ class SO100Teleop(Teleoperator):
logging.info("Connecting arm.") logging.info("Connecting arm.")
self.arm.connect() self.arm.connect()
self.configure()
# We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration.
self.arm.write("Torque_Enable", TorqueMode.DISABLED.value)
self.calibrate() self.calibrate()
# Check arm can be read # Check arm can be read
self.arm.read("Present_Position") self.arm.sync_read("Present_Position")
self.is_connected = True
def calibrate(self) -> None: def calibrate(self) -> None:
raise NotImplementedError
def set_calibration(self) -> None:
"""After calibration all motors function in human interpretable ranges. """After calibration all motors function in human interpretable ranges.
Rotations are expressed in degrees in nominal range of [-180, 180], Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100]. and linear motions (like gripper of Aloha) in nominal range of [0, 100].
""" """
if self.calibration_fpath.exists(): if not self.calibration_fpath.exists():
with open(self.calibration_fpath) as f: logging.error("Calibration file not found. Please run calibration first")
calibration = json.load(f) raise FileNotFoundError(self.calibration_fpath)
else:
# TODO(rcadene): display a warning in __init__ if calibration file not available
logging.info(f"Missing calibration file '{self.calibration_fpath}'")
calibration = run_full_arm_calibration(self.arm, self.robot_type, self.name, "leader")
logging.info(f"Calibration is done! Saving calibration file '{self.calibration_fpath}'") self.arm.set_calibration(self.calibration_fpath)
self.calibration_fpath.parent.mkdir(parents=True, exist_ok=True) apply_feetech_offsets_from_calibration(self.arm, self.arm.calibration)
with open(self.calibration_fpath, "w") as f:
json.dump(calibration, f)
self.arm.set_calibration(calibration)
apply_feetech_offsets_from_calibration(self.arm, calibration)
def get_action(self) -> np.ndarray: def get_action(self) -> np.ndarray:
"""The returned action does not have a batch dimension.""" """The returned action does not have a batch dimension."""
# Read arm position # Read arm position
before_read_t = time.perf_counter() before_read_t = time.perf_counter()
action = self.arm.read("Present_Position") action = self.arm.sync_read("Present_Position")
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
return action return action
@ -132,4 +128,3 @@ class SO100Teleop(Teleoperator):
) )
self.arm.disconnect() self.arm.disconnect()
self.is_connected = False