diff --git a/lerobot/common/robots/lekiwi/configuration_lekiwi.py b/lerobot/common/robots/lekiwi/configuration_lekiwi.py index a112ae8b..df163109 100644 --- a/lerobot/common/robots/lekiwi/configuration_lekiwi.py +++ b/lerobot/common/robots/lekiwi/configuration_lekiwi.py @@ -22,7 +22,14 @@ from lerobot.common.robots.config import RobotConfig @RobotConfig.register_subclass("lekiwi") @dataclass class LeKiwiRobotConfig(RobotConfig): - id = "lekiwi" + port = "/dev/ttyACM0" # port to connect to the bus + + disable_torque_on_disconnect: bool = True + + # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. + # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as + # the number of motors in your follower arms. + max_relative_target: int | None = None cameras: dict[str, CameraConfig] = field( default_factory=lambda: { @@ -34,25 +41,3 @@ class LeKiwiRobotConfig(RobotConfig): ), } ) - - calibration_dir: str = ".cache/calibration/lekiwi" - - port_motor_bus = "/dev/ttyACM0" - - # TODO(Steven): consider split this into arm and base - # TODO(Steven): Consider also removing this entirely as we can say that - # LeKiwiRobot will always have (and needs) such - shoulder_pan: tuple = (1, "sts3215") - shoulder_lift: tuple = (2, "sts3215") - elbow_flex: tuple = (3, "sts3215") - wrist_flex: tuple = (4, "sts3215") - wrist_roll: tuple = (5, "sts3215") - gripper: tuple = (6, "sts3215") - left_wheel: tuple = (7, "sts3215") - back_wheel: tuple = (8, "sts3215") - right_wheel: tuple = (9, "sts3215") - - # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. - # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as - # the number of motors in your follower arms. - max_relative_target: int | None = None diff --git a/lerobot/common/robots/lekiwi/lekiwi_robot.py b/lerobot/common/robots/lekiwi/lekiwi_robot.py index 139478be..512f3d21 100644 --- a/lerobot/common/robots/lekiwi/lekiwi_robot.py +++ b/lerobot/common/robots/lekiwi/lekiwi_robot.py @@ -15,31 +15,28 @@ # limitations under the License. import base64 -import json import logging import time +from typing import Any import cv2 -import numpy as np from lerobot.common.cameras.utils import make_cameras_from_configs from lerobot.common.constants import OBS_IMAGES, OBS_STATE from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -from lerobot.common.motors import TorqueMode +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.feetech import ( FeetechMotorsBus, - apply_feetech_offsets_from_calibration, - run_full_arm_calibration, + OperatingMode, ) from ..robot import Robot from ..utils import ensure_safe_goal_position from .configuration_lekiwi import LeKiwiRobotConfig +logger = logging.getLogger(__name__) + -# TODO(Steven): Everything in here is pretty much single-threaded and synchronous. The assumption is that we can run the loop at a -# high enough frequency to not need to worry about threading. This is a good assumption for now, but we should consider -# making this more robust in the future. class LeKiwiRobot(Robot): """ The robot includes a three omniwheel mobile base and a remote follower arm. @@ -55,43 +52,33 @@ class LeKiwiRobot(Robot): super().__init__(config) self.config = config self.id = config.id - - # TODO(Steven): Consider in the future using S100 robot class - # TODO(Steven): Another option is to use the motorbus factory, but in this case we assume that - # what we consider 'lekiwi robot' always uses the FeetechMotorsBus - # TODO(Steven): Order and dimension are generally assumed to be 6 first for arm, 3 last for base - self.actuators_bus = FeetechMotorsBus( - port=self.config.port_motor_bus, + self.bus = FeetechMotorsBus( + port=self.config.port, motors={ - "arm_shoulder_pan": config.shoulder_pan, - "arm_shoulder_lift": config.shoulder_lift, - "arm_elbow_flex": config.elbow_flex, - "arm_wrist_flex": config.wrist_flex, - "arm_wrist_roll": config.wrist_roll, - "arm_gripper": config.gripper, - "base_left_wheel": config.left_wheel, - "base_right_wheel": config.right_wheel, - "base_back_wheel": config.back_wheel, + # arm + "arm_shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100), + "arm_shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100), + "arm_elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100), + "arm_wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100), + "arm_wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100), + "arm_gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), + # base + "base_left_wheel": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100), + "base_back_wheel": Motor(8, "sts3215", MotorNormMode.RANGE_M100_100), + "base_right_wheel": Motor(9, "sts3215", MotorNormMode.RANGE_M100_100), }, + calibration=self.calibration, ) - - self.arm_actuators = [m for m in self.actuators_bus.motor_names if m.startswith("arm")] - self.base_actuators = [m for m in self.actuators_bus.motor_names if m.startswith("base")] - - self.max_relative_target = config.max_relative_target - - # TODO(Steven): Consider removing cameras from configs + self.arm_motors = [m for m in self.bus.names if m.startswith("arm")] + self.base_motors = [m for m in self.bus.names if m.startswith("base")] self.cameras = make_cameras_from_configs(config.cameras) - self.is_connected = False - self.logs = {} - @property def state_feature(self) -> dict: return { "dtype": "float32", - "shape": (len(self.actuators_bus),), - "names": {"motors": list(self.actuators_bus.motors)}, + "shape": (len(self.bus),), + "names": {"motors": list(self.bus.motors)}, } @property @@ -109,111 +96,114 @@ class LeKiwiRobot(Robot): } return cam_ft - def setup_actuators(self): - # Set-up arm actuators (position mode) - # We assume that at connection time, arm is in a rest position, - # and torque can be safely disabled to run calibration. - self.actuators_bus.write("Torque_Enable", TorqueMode.DISABLED.value, self.arm_actuators) - self.calibrate() # TODO(Steven): This should be only for the arm, but apparently it doesn't harm the base - - # Mode=0 for Position Control - self.actuators_bus.write("Mode", 0, self.arm_actuators) - # Set P_Coefficient to lower value to avoid shakiness (Default is 32) - self.actuators_bus.write("P_Coefficient", 16, self.arm_actuators) - # Set I_Coefficient and D_Coefficient to default value 0 and 32 - self.actuators_bus.write("I_Coefficient", 0, self.arm_actuators) - self.actuators_bus.write("D_Coefficient", 32, self.arm_actuators) - # Close the write lock so that Maximum_Acceleration gets written to EPROM address, - # which is mandatory for Maximum_Acceleration to take effect after rebooting. - self.actuators_bus.write("Lock", 0, self.arm_actuators) - # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of - # the motors. Note: this configuration is not in the official STS3215 Memory Table - self.actuators_bus.write("Maximum_Acceleration", 254, self.arm_actuators) - self.actuators_bus.write("Acceleration", 254, self.arm_actuators) - - logging.info("Activating torque.") - self.actuators_bus.write("Torque_Enable", TorqueMode.ENABLED.value, self.arm_actuators) - - # Check arm can be read - self.actuators_bus.read("Present_Position", self.arm_actuators) - - # Set-up base actuators (velocity mode) - self.actuators_bus.write("Lock", 0, self.base_actuators) - self.actuators_bus.write("Mode", [1, 1, 1], self.base_actuators) - self.actuators_bus.write("Lock", 1, self.base_actuators) + @property + def is_connected(self) -> bool: + # TODO(aliberts): add cam.is_connected for cam in self.cameras + return self.bus.is_connected def connect(self) -> None: if self.is_connected: - raise DeviceAlreadyConnectedError( - "LeKiwi Robot is already connected. Do not run `robot.connect()` twice." - ) + raise DeviceAlreadyConnectedError(f"{self} already connected") - logging.info("Connecting actuators.") - self.actuators_bus.connect() - self.setup_actuators() + self.bus.connect() + if not self.is_calibrated: + self.calibrate() - logging.info("Connecting cameras.") for cam in self.cameras.values(): cam.connect() - self.is_connected = True + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.bus.is_calibrated def calibrate(self) -> None: - # Copied from S100 robot - """After calibration all motors function in human interpretable ranges. - Rotations are expressed in degrees in nominal range of [-180, 180], - and linear motions (like gripper of Aloha) in nominal range of [0, 100]. - """ - actuators_calib_path = self.calibration_dir / f"{self.config.id}.json" + logger.info(f"\nRunning calibration of {self}") + self.bus.disable_torque(self.arm_motors) + for name in self.arm_motors: + self.bus.write("Operating_Mode", name, OperatingMode.POSITION.value) - if actuators_calib_path.exists(): - with open(actuators_calib_path, encoding="utf-8") as f: - calibration = json.load(f) - else: - logging.info("Missing calibration file '%s'", actuators_calib_path) - calibration = run_full_arm_calibration(self.actuators_bus, self.robot_type, self.name, "follower") + input("Move robot to the middle of its range of motion and press ENTER....") + homing_offsets = self.bus.set_half_turn_homings(self.arm_motors) - logging.info("Calibration is done! Saving calibration file '%s'", actuators_calib_path) - actuators_calib_path.parent.mkdir(parents=True, exist_ok=True) - with open(actuators_calib_path, "w", encoding="utf-8") as f: - json.dump(calibration, f) + full_turn_motor = "arm_wrist_roll" + unknown_range_motors = [name for name in self.arm_motors if name != full_turn_motor] + logger.info( + f"Move all arm joints except '{full_turn_motor}' sequentially through their " + "entire ranges of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) + for name in [*self.base_motors, full_turn_motor]: + range_mins[name] = 0 + range_maxes[name] = 4095 - self.actuators_bus.set_calibration(calibration) - apply_feetech_offsets_from_calibration(self.actuators_bus, calibration) + self.calibration = {} + for name, motor in self.bus.motors.items(): + self.calibration[name] = MotorCalibration( + id=motor.id, + drive_mode=0, + homing_offset=homing_offsets[name], + range_min=range_mins[name], + range_max=range_maxes[name], + ) - # TODO(Steven): Should this be dict[str, Any] ? - def get_observation(self) -> dict[str, np.ndarray]: - """The returned observations do not have a batch dimension.""" + self.bus.write_calibration(self.calibration) + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self): + # Set-up arm actuators (position mode) + # We assume that at connection time, arm is in a rest position, + # and torque can be safely disabled to run calibration. + self.bus.disable_torque(self.arm_motors) + for name in self.arm_motors: + self.bus.write("Mode", name, OperatingMode.POSITION.value) + # Set P_Coefficient to lower value to avoid shakiness (Default is 32) + self.bus.write("P_Coefficient", name, 16) + # Set I_Coefficient and D_Coefficient to default value 0 and 32 + self.bus.write("I_Coefficient", name, 0) + self.bus.write("D_Coefficient", name, 32) + # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of + # the motors. Note: this configuration is not in the official STS3215 Memory Table + self.bus.write("Maximum_Acceleration", name, 254) + self.bus.write("Acceleration", name, 254) + + for name in self.base_motors: + self.bus.write("Mode", name, OperatingMode.VELOCITY.value) + + self.bus.enable_torque() + + def get_observation(self) -> dict[str, Any]: if not self.is_connected: - raise DeviceNotConnectedError("LeKiwiRobot is not connected. You need to run `robot.connect()`.") + raise DeviceNotConnectedError(f"{self} is not connected.") obs_dict = {} # Read actuators position for arm and vel for base - before_read_t = time.perf_counter() - obs_dict[OBS_STATE] = np.concatenate( - ( - self.actuators_bus.read("Present_Position", self.arm_actuators), - self.actuators_bus.read("Present_Speed", self.base_actuators), - ) - ) - self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t + start = time.perf_counter() + arm_pos = self.bus.sync_read("Present_Position", self.arm_motors) + base_vel = self.bus.sync_read("Present_Speed", self.base_motors) + obs_dict[OBS_STATE] = {**arm_pos, **base_vel} + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") # Capture images from cameras for cam_key, cam in self.cameras.items(): - before_camread_t = time.perf_counter() + start = time.perf_counter() frame = cam.async_read() ret, buffer = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) if ret: obs_dict[f"{OBS_IMAGES}.{cam_key}"] = base64.b64encode(buffer).decode("utf-8") else: obs_dict[f"{OBS_IMAGES}.{cam_key}"] = "" - self.logs[f"read_camera_{cam_key}_dt_s"] = cam.logs["delta_timestamp_s"] - self.logs[f"async_read_camera_{cam_key}_dt_s"] = time.perf_counter() - before_camread_t + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") return obs_dict - def send_action(self, action: np.ndarray) -> np.ndarray: + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: # Copied from S100 robot """Command lekiwi to move to a target joint configuration. @@ -221,9 +211,6 @@ class LeKiwiRobot(Robot): `max_relative_target`. In this case, the action sent differs from original action. Thus, this function always returns the action actually sent. - Args: - action (np.ndarray): array containing the goal positions for the motors. - Raises: RobotDeviceNotConnectedError: if robot is not connected. @@ -231,48 +218,35 @@ class LeKiwiRobot(Robot): np.ndarray: the action sent to the motors, potentially clipped. """ if not self.is_connected: - raise DeviceNotConnectedError("LeKiwiRobot is not connected. You need to run `robot.connect()`.") + raise DeviceNotConnectedError(f"{self} is not connected.") + + arm_goal_pos = {k: v for k, v in action.items() if k in self.arm_motors} + base_goal_vel = {k: v for k, v in action.items() if k in self.base_motors} - # Input action is in motor space - goal_pos = action # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: - present_pos = self.actuators_bus.read("Present_Position", self.arm_actuators) - goal_pos[:6] = ensure_safe_goal_position( - goal_pos[:6], present_pos, self.config.max_relative_target - ) + present_pos = self.bus.sync_read("Present_Position", self.arm_motors) + goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in arm_goal_pos.items()} + arm_safe_goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) # Send goal position to the actuators - # TODO(Steven): This happens synchronously - self.actuators_bus.write("Goal_Position", goal_pos[:6].astype(np.int32), self.arm_actuators) - self.actuators_bus.write("Goal_Speed", goal_pos[6:].astype(np.int32), self.base_actuators) + self.bus.sync_write("Goal_Position", arm_safe_goal_pos) + self.bus.sync_write("Goal_Speed", base_goal_vel) - return goal_pos + return {**arm_safe_goal_pos, **base_goal_vel} def stop_base(self): - # TODO(Steven): Assumes there's only 3 motors for base - logging.info("Stopping base") - # TODO(Steven): Check if these operations are thread safe! - self.actuators_bus.write("Goal_Speed", [0, 0, 0], self.base_actuators) - logging.info("Base motors stopped") - - def print_logs(self): - # TODO(Steven): Refactor logger - pass + self.bus.sync_write("Goal_Speed", {name: 0 for name in self.base_motors}, num_retry=5) + logger.info("Base motors stopped") def disconnect(self): if not self.is_connected: - raise DeviceNotConnectedError( - "LeKiwi is not connected. You need to run `robot.connect()` before disconnecting." - ) + raise DeviceNotConnectedError(f"{self} is not connected.") self.stop_base() - self.actuators_bus.disconnect() + self.bus.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() - self.is_connected = False - def __del__(self): - if getattr(self, "is_connected", False): - self.disconnect() + logger.info(f"{self} disconnected.")