Update Lekiwi with new MotorsBus

This commit is contained in:
Simon Alibert 2025-04-04 11:26:11 +02:00 committed by Steven Palma
parent d206b94d74
commit 2cfb897d5f
No known key found for this signature in database
2 changed files with 123 additions and 164 deletions

View File

@ -22,7 +22,14 @@ from lerobot.common.robots.config import RobotConfig
@RobotConfig.register_subclass("lekiwi") @RobotConfig.register_subclass("lekiwi")
@dataclass @dataclass
class LeKiwiRobotConfig(RobotConfig): class LeKiwiRobotConfig(RobotConfig):
id = "lekiwi" port = "/dev/ttyACM0" # port to connect to the bus
disable_torque_on_disconnect: bool = True
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
max_relative_target: int | None = None
cameras: dict[str, CameraConfig] = field( cameras: dict[str, CameraConfig] = field(
default_factory=lambda: { default_factory=lambda: {
@ -34,25 +41,3 @@ class LeKiwiRobotConfig(RobotConfig):
), ),
} }
) )
calibration_dir: str = ".cache/calibration/lekiwi"
port_motor_bus = "/dev/ttyACM0"
# TODO(Steven): consider split this into arm and base
# TODO(Steven): Consider also removing this entirely as we can say that
# LeKiwiRobot will always have (and needs) such
shoulder_pan: tuple = (1, "sts3215")
shoulder_lift: tuple = (2, "sts3215")
elbow_flex: tuple = (3, "sts3215")
wrist_flex: tuple = (4, "sts3215")
wrist_roll: tuple = (5, "sts3215")
gripper: tuple = (6, "sts3215")
left_wheel: tuple = (7, "sts3215")
back_wheel: tuple = (8, "sts3215")
right_wheel: tuple = (9, "sts3215")
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
max_relative_target: int | None = None

View File

@ -15,31 +15,28 @@
# limitations under the License. # limitations under the License.
import base64 import base64
import json
import logging import logging
import time import time
from typing import Any
import cv2 import cv2
import numpy as np
from lerobot.common.cameras.utils import make_cameras_from_configs from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import TorqueMode from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.feetech import ( from lerobot.common.motors.feetech import (
FeetechMotorsBus, FeetechMotorsBus,
apply_feetech_offsets_from_calibration, OperatingMode,
run_full_arm_calibration,
) )
from ..robot import Robot from ..robot import Robot
from ..utils import ensure_safe_goal_position from ..utils import ensure_safe_goal_position
from .configuration_lekiwi import LeKiwiRobotConfig from .configuration_lekiwi import LeKiwiRobotConfig
logger = logging.getLogger(__name__)
# TODO(Steven): Everything in here is pretty much single-threaded and synchronous. The assumption is that we can run the loop at a
# high enough frequency to not need to worry about threading. This is a good assumption for now, but we should consider
# making this more robust in the future.
class LeKiwiRobot(Robot): class LeKiwiRobot(Robot):
""" """
The robot includes a three omniwheel mobile base and a remote follower arm. The robot includes a three omniwheel mobile base and a remote follower arm.
@ -55,43 +52,33 @@ class LeKiwiRobot(Robot):
super().__init__(config) super().__init__(config)
self.config = config self.config = config
self.id = config.id self.id = config.id
self.bus = FeetechMotorsBus(
# TODO(Steven): Consider in the future using S100 robot class port=self.config.port,
# TODO(Steven): Another option is to use the motorbus factory, but in this case we assume that
# what we consider 'lekiwi robot' always uses the FeetechMotorsBus
# TODO(Steven): Order and dimension are generally assumed to be 6 first for arm, 3 last for base
self.actuators_bus = FeetechMotorsBus(
port=self.config.port_motor_bus,
motors={ motors={
"arm_shoulder_pan": config.shoulder_pan, # arm
"arm_shoulder_lift": config.shoulder_lift, "arm_shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
"arm_elbow_flex": config.elbow_flex, "arm_shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100),
"arm_wrist_flex": config.wrist_flex, "arm_elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100),
"arm_wrist_roll": config.wrist_roll, "arm_wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100),
"arm_gripper": config.gripper, "arm_wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100),
"base_left_wheel": config.left_wheel, "arm_gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
"base_right_wheel": config.right_wheel, # base
"base_back_wheel": config.back_wheel, "base_left_wheel": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100),
"base_back_wheel": Motor(8, "sts3215", MotorNormMode.RANGE_M100_100),
"base_right_wheel": Motor(9, "sts3215", MotorNormMode.RANGE_M100_100),
}, },
calibration=self.calibration,
) )
self.arm_motors = [m for m in self.bus.names if m.startswith("arm")]
self.arm_actuators = [m for m in self.actuators_bus.motor_names if m.startswith("arm")] self.base_motors = [m for m in self.bus.names if m.startswith("base")]
self.base_actuators = [m for m in self.actuators_bus.motor_names if m.startswith("base")]
self.max_relative_target = config.max_relative_target
# TODO(Steven): Consider removing cameras from configs
self.cameras = make_cameras_from_configs(config.cameras) self.cameras = make_cameras_from_configs(config.cameras)
self.is_connected = False
self.logs = {}
@property @property
def state_feature(self) -> dict: def state_feature(self) -> dict:
return { return {
"dtype": "float32", "dtype": "float32",
"shape": (len(self.actuators_bus),), "shape": (len(self.bus),),
"names": {"motors": list(self.actuators_bus.motors)}, "names": {"motors": list(self.bus.motors)},
} }
@property @property
@ -109,111 +96,114 @@ class LeKiwiRobot(Robot):
} }
return cam_ft return cam_ft
def setup_actuators(self): @property
# Set-up arm actuators (position mode) def is_connected(self) -> bool:
# We assume that at connection time, arm is in a rest position, # TODO(aliberts): add cam.is_connected for cam in self.cameras
# and torque can be safely disabled to run calibration. return self.bus.is_connected
self.actuators_bus.write("Torque_Enable", TorqueMode.DISABLED.value, self.arm_actuators)
self.calibrate() # TODO(Steven): This should be only for the arm, but apparently it doesn't harm the base
# Mode=0 for Position Control
self.actuators_bus.write("Mode", 0, self.arm_actuators)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.actuators_bus.write("P_Coefficient", 16, self.arm_actuators)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.actuators_bus.write("I_Coefficient", 0, self.arm_actuators)
self.actuators_bus.write("D_Coefficient", 32, self.arm_actuators)
# Close the write lock so that Maximum_Acceleration gets written to EPROM address,
# which is mandatory for Maximum_Acceleration to take effect after rebooting.
self.actuators_bus.write("Lock", 0, self.arm_actuators)
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table
self.actuators_bus.write("Maximum_Acceleration", 254, self.arm_actuators)
self.actuators_bus.write("Acceleration", 254, self.arm_actuators)
logging.info("Activating torque.")
self.actuators_bus.write("Torque_Enable", TorqueMode.ENABLED.value, self.arm_actuators)
# Check arm can be read
self.actuators_bus.read("Present_Position", self.arm_actuators)
# Set-up base actuators (velocity mode)
self.actuators_bus.write("Lock", 0, self.base_actuators)
self.actuators_bus.write("Mode", [1, 1, 1], self.base_actuators)
self.actuators_bus.write("Lock", 1, self.base_actuators)
def connect(self) -> None: def connect(self) -> None:
if self.is_connected: if self.is_connected:
raise DeviceAlreadyConnectedError( raise DeviceAlreadyConnectedError(f"{self} already connected")
"LeKiwi Robot is already connected. Do not run `robot.connect()` twice."
)
logging.info("Connecting actuators.") self.bus.connect()
self.actuators_bus.connect() if not self.is_calibrated:
self.setup_actuators() self.calibrate()
logging.info("Connecting cameras.")
for cam in self.cameras.values(): for cam in self.cameras.values():
cam.connect() cam.connect()
self.is_connected = True self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
def calibrate(self) -> None: def calibrate(self) -> None:
# Copied from S100 robot logger.info(f"\nRunning calibration of {self}")
"""After calibration all motors function in human interpretable ranges. self.bus.disable_torque(self.arm_motors)
Rotations are expressed in degrees in nominal range of [-180, 180], for name in self.arm_motors:
and linear motions (like gripper of Aloha) in nominal range of [0, 100]. self.bus.write("Operating_Mode", name, OperatingMode.POSITION.value)
"""
actuators_calib_path = self.calibration_dir / f"{self.config.id}.json"
if actuators_calib_path.exists(): input("Move robot to the middle of its range of motion and press ENTER....")
with open(actuators_calib_path, encoding="utf-8") as f: homing_offsets = self.bus.set_half_turn_homings(self.arm_motors)
calibration = json.load(f)
else:
logging.info("Missing calibration file '%s'", actuators_calib_path)
calibration = run_full_arm_calibration(self.actuators_bus, self.robot_type, self.name, "follower")
logging.info("Calibration is done! Saving calibration file '%s'", actuators_calib_path) full_turn_motor = "arm_wrist_roll"
actuators_calib_path.parent.mkdir(parents=True, exist_ok=True) unknown_range_motors = [name for name in self.arm_motors if name != full_turn_motor]
with open(actuators_calib_path, "w", encoding="utf-8") as f: logger.info(
json.dump(calibration, f) f"Move all arm joints except '{full_turn_motor}' sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
for name in [*self.base_motors, full_turn_motor]:
range_mins[name] = 0
range_maxes[name] = 4095
self.actuators_bus.set_calibration(calibration) self.calibration = {}
apply_feetech_offsets_from_calibration(self.actuators_bus, calibration) for name, motor in self.bus.motors.items():
self.calibration[name] = MotorCalibration(
id=motor.id,
drive_mode=0,
homing_offset=homing_offsets[name],
range_min=range_mins[name],
range_max=range_maxes[name],
)
# TODO(Steven): Should this be dict[str, Any] ? self.bus.write_calibration(self.calibration)
def get_observation(self) -> dict[str, np.ndarray]: self._save_calibration()
"""The returned observations do not have a batch dimension.""" print("Calibration saved to", self.calibration_fpath)
def configure(self):
# Set-up arm actuators (position mode)
# We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration.
self.bus.disable_torque(self.arm_motors)
for name in self.arm_motors:
self.bus.write("Mode", name, OperatingMode.POSITION.value)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.bus.write("P_Coefficient", name, 16)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.bus.write("I_Coefficient", name, 0)
self.bus.write("D_Coefficient", name, 32)
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table
self.bus.write("Maximum_Acceleration", name, 254)
self.bus.write("Acceleration", name, 254)
for name in self.base_motors:
self.bus.write("Mode", name, OperatingMode.VELOCITY.value)
self.bus.enable_torque()
def get_observation(self) -> dict[str, Any]:
if not self.is_connected: if not self.is_connected:
raise DeviceNotConnectedError("LeKiwiRobot is not connected. You need to run `robot.connect()`.") raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {} obs_dict = {}
# Read actuators position for arm and vel for base # Read actuators position for arm and vel for base
before_read_t = time.perf_counter() start = time.perf_counter()
obs_dict[OBS_STATE] = np.concatenate( arm_pos = self.bus.sync_read("Present_Position", self.arm_motors)
( base_vel = self.bus.sync_read("Present_Speed", self.base_motors)
self.actuators_bus.read("Present_Position", self.arm_actuators), obs_dict[OBS_STATE] = {**arm_pos, **base_vel}
self.actuators_bus.read("Present_Speed", self.base_actuators), dt_ms = (time.perf_counter() - start) * 1e3
) logger.debug(f"{self} read state: {dt_ms:.1f}ms")
)
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
# Capture images from cameras # Capture images from cameras
for cam_key, cam in self.cameras.items(): for cam_key, cam in self.cameras.items():
before_camread_t = time.perf_counter() start = time.perf_counter()
frame = cam.async_read() frame = cam.async_read()
ret, buffer = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) ret, buffer = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
if ret: if ret:
obs_dict[f"{OBS_IMAGES}.{cam_key}"] = base64.b64encode(buffer).decode("utf-8") obs_dict[f"{OBS_IMAGES}.{cam_key}"] = base64.b64encode(buffer).decode("utf-8")
else: else:
obs_dict[f"{OBS_IMAGES}.{cam_key}"] = "" obs_dict[f"{OBS_IMAGES}.{cam_key}"] = ""
self.logs[f"read_camera_{cam_key}_dt_s"] = cam.logs["delta_timestamp_s"] dt_ms = (time.perf_counter() - start) * 1e3
self.logs[f"async_read_camera_{cam_key}_dt_s"] = time.perf_counter() - before_camread_t logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict return obs_dict
def send_action(self, action: np.ndarray) -> np.ndarray: def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
# Copied from S100 robot # Copied from S100 robot
"""Command lekiwi to move to a target joint configuration. """Command lekiwi to move to a target joint configuration.
@ -221,9 +211,6 @@ class LeKiwiRobot(Robot):
`max_relative_target`. In this case, the action sent differs from original action. `max_relative_target`. In this case, the action sent differs from original action.
Thus, this function always returns the action actually sent. Thus, this function always returns the action actually sent.
Args:
action (np.ndarray): array containing the goal positions for the motors.
Raises: Raises:
RobotDeviceNotConnectedError: if robot is not connected. RobotDeviceNotConnectedError: if robot is not connected.
@ -231,48 +218,35 @@ class LeKiwiRobot(Robot):
np.ndarray: the action sent to the motors, potentially clipped. np.ndarray: the action sent to the motors, potentially clipped.
""" """
if not self.is_connected: if not self.is_connected:
raise DeviceNotConnectedError("LeKiwiRobot is not connected. You need to run `robot.connect()`.") raise DeviceNotConnectedError(f"{self} is not connected.")
arm_goal_pos = {k: v for k, v in action.items() if k in self.arm_motors}
base_goal_vel = {k: v for k, v in action.items() if k in self.base_motors}
# Input action is in motor space
goal_pos = action
# Cap goal position when too far away from present position. # Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower. # /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None: if self.config.max_relative_target is not None:
present_pos = self.actuators_bus.read("Present_Position", self.arm_actuators) present_pos = self.bus.sync_read("Present_Position", self.arm_motors)
goal_pos[:6] = ensure_safe_goal_position( goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in arm_goal_pos.items()}
goal_pos[:6], present_pos, self.config.max_relative_target arm_safe_goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
)
# Send goal position to the actuators # Send goal position to the actuators
# TODO(Steven): This happens synchronously self.bus.sync_write("Goal_Position", arm_safe_goal_pos)
self.actuators_bus.write("Goal_Position", goal_pos[:6].astype(np.int32), self.arm_actuators) self.bus.sync_write("Goal_Speed", base_goal_vel)
self.actuators_bus.write("Goal_Speed", goal_pos[6:].astype(np.int32), self.base_actuators)
return goal_pos return {**arm_safe_goal_pos, **base_goal_vel}
def stop_base(self): def stop_base(self):
# TODO(Steven): Assumes there's only 3 motors for base self.bus.sync_write("Goal_Speed", {name: 0 for name in self.base_motors}, num_retry=5)
logging.info("Stopping base") logger.info("Base motors stopped")
# TODO(Steven): Check if these operations are thread safe!
self.actuators_bus.write("Goal_Speed", [0, 0, 0], self.base_actuators)
logging.info("Base motors stopped")
def print_logs(self):
# TODO(Steven): Refactor logger
pass
def disconnect(self): def disconnect(self):
if not self.is_connected: if not self.is_connected:
raise DeviceNotConnectedError( raise DeviceNotConnectedError(f"{self} is not connected.")
"LeKiwi is not connected. You need to run `robot.connect()` before disconnecting."
)
self.stop_base() self.stop_base()
self.actuators_bus.disconnect() self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values(): for cam in self.cameras.values():
cam.disconnect() cam.disconnect()
self.is_connected = False
def __del__(self): logger.info(f"{self} disconnected.")
if getattr(self, "is_connected", False):
self.disconnect()