diff --git a/examples/robots/lekiwi_client_app.py b/examples/robots/lekiwi_client_app.py new file mode 100755 index 00000000..4daca962 --- /dev/null +++ b/examples/robots/lekiwi_client_app.py @@ -0,0 +1,143 @@ +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from lerobot.common.robots.config import RobotMode +from lerobot.common.robots.lekiwi.config_lekiwi import LeKiwiClientConfig +from lerobot.common.robots.lekiwi.lekiwi_client import LeKiwiClient +from lerobot.common.teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig +from lerobot.common.teleoperators.so100 import SO100Leader, SO100LeaderConfig + +DUMMY_FEATURES = { + "observation.state": { + "dtype": "float64", + "shape": (9,), + "names": { + "motors": [ + "arm_shoulder_pan", + "arm_shoulder_lift", + "arm_elbow_flex", + "arm_wrist_flex", + "arm_wrist_roll", + "arm_gripper", + "base_left_wheel", + "base_right_wheel", + "base_back_wheel", + ] + }, + }, + "action": { + "dtype": "float64", + "shape": (9,), + "names": { + "motors": [ + "arm_shoulder_pan", + "arm_shoulder_lift", + "arm_elbow_flex", + "arm_wrist_flex", + "arm_wrist_roll", + "arm_gripper", + "base_left_wheel", + "base_right_wheel", + "base_back_wheel", + ] + }, + }, + "observation.images.front": { + "dtype": "image", + "shape": (640, 480, 3), + "names": [ + "width", + "height", + "channels", + ], + }, + "observation.images.wrist": { + "dtype": "image", + "shape": (480, 640, 3), + "names": [ + "width", + "height", + "channels", + ], + }, +} + + +def main(): + logging.info("Configuring Teleop Devices") + leader_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem58760434171") + leader_arm = SO100Leader(leader_arm_config) + + keyboard_config = KeyboardTeleopConfig() + keyboard = KeyboardTeleop(keyboard_config) + + logging.info("Configuring LeKiwi Client") + robot_config = LeKiwiClientConfig(id="lekiwi", robot_mode=RobotMode.TELEOP) + robot = LeKiwiClient(robot_config) + + logging.info("Creating LeRobot Dataset") + + # # TODO(Steven): Check this creation + # dataset = LeRobotDataset.create( + # repo_id="user/lekiwi2", + # fps=10, + # features=DUMMY_FEATURES, + # ) + + logging.info("Connecting Teleop Devices") + leader_arm.connect() + keyboard.connect() + + logging.info("Connecting remote LeKiwi") + robot.connect() + + if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected: + logging.error("Failed to connect to all devices") + return + + logging.info("Starting LeKiwi teleoperation") + i = 0 + while i < 1000: + arm_action = leader_arm.get_action() + base_action = keyboard.get_action() + action = {**arm_action, **base_action} if len(base_action) > 0 else arm_action + + # TODO(Steven): Deal with policy action space + # robot.set_mode(RobotMode.AUTO) + # policy_action = policy.get_action() # This might be in body frame, key space or smt else + # robot.send_action(policy_action) + action_sent = robot.send_action(action) + observation = robot.get_observation() + + frame = {**action_sent, **observation} + frame.update({"task": "Dummy Task Dataset"}) + + logging.info("Saved a frame into the dataset") + # dataset.add_frame(frame) + i += 1 + + # dataset.save_episode() + # dataset.push_to_hub() + + logging.info("Disconnecting Teleop Devices and LeKiwi Client") + robot.disconnect() + leader_arm.disconnect() + keyboard.disconnect() + logging.info("Finished LeKiwi cleanly") + + +if __name__ == "__main__": + main() diff --git a/lerobot/common/teleoperators/keyboard/teleop_keyboard_app.py b/examples/teleoperators/teleop_keyboard_app.py similarity index 92% rename from lerobot/common/teleoperators/keyboard/teleop_keyboard_app.py rename to examples/teleoperators/teleop_keyboard_app.py index 4f463814..6ecd78bd 100755 --- a/lerobot/common/teleoperators/keyboard/teleop_keyboard_app.py +++ b/examples/teleoperators/teleop_keyboard_app.py @@ -21,7 +21,7 @@ def main(): i += 1 keyboard.disconnect() - logging.info("Finished LeKiwiRobot cleanly") + logging.info("Finished LeKiwi cleanly") if __name__ == "__main__": diff --git a/lerobot/common/constants.py b/lerobot/common/constants.py index 22d7568e..e78e748b 100644 --- a/lerobot/common/constants.py +++ b/lerobot/common/constants.py @@ -48,5 +48,5 @@ default_cache_path = Path(HF_HOME) / "lerobot" HF_LEROBOT_HOME = Path(os.getenv("HF_LEROBOT_HOME", default_cache_path)).expanduser() # calibration dir -default_calibration_path = HF_LEROBOT_HOME / ".calibration" +default_calibration_path = HF_LEROBOT_HOME / "calibration" HF_LEROBOT_CALIBRATION = Path(os.getenv("HF_LEROBOT_CALIBRATION", default_calibration_path)).expanduser() diff --git a/lerobot/common/errors.py b/lerobot/common/errors.py index 4f506b61..6398bc6f 100644 --- a/lerobot/common/errors.py +++ b/lerobot/common/errors.py @@ -15,3 +15,14 @@ class DeviceAlreadyConnectedError(ConnectionError): ): self.message = message super().__init__(self.message) + + +class InvalidActionError(ConnectionError): + """Exception raised when an action is already invalid.""" + + def __init__( + self, + message="The action is invalid. Check the value follows what it is expected from the action space.", + ): + self.message = message + super().__init__(self.message) diff --git a/lerobot/common/motors/motors_bus.py b/lerobot/common/motors/motors_bus.py index d0f8ff3e..9bc6ed4e 100644 --- a/lerobot/common/motors/motors_bus.py +++ b/lerobot/common/motors/motors_bus.py @@ -546,7 +546,7 @@ class MotorsBus(abc.ABC): motors = self.names elif isinstance(motors, (str, int)): motors = [motors] - else: + elif not isinstance(motors, list): raise TypeError(motors) self.reset_calibration(motors) @@ -609,6 +609,7 @@ class MotorsBus(abc.ABC): min_ = self.calibration[name].range_min max_ = self.calibration[name].range_max bounded_val = min(max_, max(min_, val)) + # TODO(Steven): normalization can go boom if max_ == min_, we should add a check probably in record_ranges_of_motions (which probably indicates the user forgot to move a motor) if self.motors[name].norm_mode is MotorNormMode.RANGE_M100_100: normalized_values[id_] = (((bounded_val - min_) / (max_ - min_)) * 200) - 100 elif self.motors[name].norm_mode is MotorNormMode.RANGE_0_100: diff --git a/lerobot/common/robots/config.py b/lerobot/common/robots/config.py index 83a13ca9..1ab0b004 100644 --- a/lerobot/common/robots/config.py +++ b/lerobot/common/robots/config.py @@ -1,16 +1,23 @@ import abc +import enum from dataclasses import dataclass from pathlib import Path import draccus +class RobotMode(enum.Enum): + TELEOP = 0 + AUTO = 1 + + @dataclass(kw_only=True) class RobotConfig(draccus.ChoiceRegistry, abc.ABC): # Allows to distinguish between different robots of the same type id: str | None = None # Directory to store calibration file calibration_dir: Path | None = None + robot_mode: RobotMode | None = None @property def type(self) -> str: diff --git a/lerobot/common/robots/lekiwi/README.md b/lerobot/common/robots/lekiwi/README.md index 1be7cbc4..50667add 100644 --- a/lerobot/common/robots/lekiwi/README.md +++ b/lerobot/common/robots/lekiwi/README.md @@ -1,3 +1,5 @@ +# TODO(Steven): Update README + # Using the [LeKiwi](https://github.com/SIGRobotics-UIUC/LeKiwi) Robot with LeRobot ## Table of Contents @@ -194,11 +196,11 @@ sudo chmod 666 /dev/ttyACM1 #### d. Update config file -IMPORTANTLY: Now that you have your ports of leader and follower arm and ip address of the mobile-so100, update the **ip** in Network configuration, **port** in leader_arms and **port** in lekiwi. In the [`LeKiwiRobotConfig`](../lerobot/common/robot_devices/robots/configs.py) file. Where you will find something like: +IMPORTANTLY: Now that you have your ports of leader and follower arm and ip address of the mobile-so100, update the **ip** in Network configuration, **port** in leader_arms and **port** in lekiwi. In the [`LeKiwiConfig`](../lerobot/common/robot_devices/robots/configs.py) file. Where you will find something like: ```python @RobotConfig.register_subclass("lekiwi") @dataclass -class LeKiwiRobotConfig(RobotConfig): +class LeKiwiConfig(RobotConfig): # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as # the number of motors in your follower arms. @@ -281,7 +283,7 @@ For the wired LeKiwi version your configured IP address should refer to your own ```python @RobotConfig.register_subclass("lekiwi") @dataclass -class LeKiwiRobotConfig(RobotConfig): +class LeKiwiConfig(RobotConfig): # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as # the number of motors in your follower arms. @@ -446,7 +448,7 @@ You should see on your laptop something like this: ```[INFO] Connected to remote | F | Decrease speed | > [!TIP] -> If you use a different keyboard you can change the keys for each command in the [`LeKiwiRobotConfig`](../lerobot/common/robot_devices/robots/configs.py). +> If you use a different keyboard you can change the keys for each command in the [`LeKiwiConfig`](../lerobot/common/robot_devices/robots/configs.py). ### Wired version If you have the **wired** LeKiwi version please run all commands including both these teleoperation commands on your laptop. diff --git a/lerobot/common/robots/lekiwi/__init__.py b/lerobot/common/robots/lekiwi/__init__.py new file mode 100644 index 00000000..e3d10c5c --- /dev/null +++ b/lerobot/common/robots/lekiwi/__init__.py @@ -0,0 +1,3 @@ +from .config_lekiwi import LeKiwiClientConfig, LeKiwiConfig +from .lekiwi import LeKiwi +from .lekiwi_client import LeKiwiClient diff --git a/lerobot/common/robots/lekiwi/config_lekiwi.py b/lerobot/common/robots/lekiwi/config_lekiwi.py new file mode 100644 index 00000000..2d6c0493 --- /dev/null +++ b/lerobot/common/robots/lekiwi/config_lekiwi.py @@ -0,0 +1,70 @@ +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field + +from lerobot.common.cameras.configs import CameraConfig +from lerobot.common.cameras.opencv.configuration_opencv import OpenCVCameraConfig + +from ..config import RobotConfig + + +@RobotConfig.register_subclass("lekiwi") +@dataclass +class LeKiwiConfig(RobotConfig): + port = "/dev/ttyACM0" # port to connect to the bus + + disable_torque_on_disconnect: bool = True + + # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. + # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as + # the number of motors in your follower arms. + max_relative_target: int | None = None + + cameras: dict[str, CameraConfig] = field( + default_factory=lambda: { + "front": OpenCVCameraConfig( + camera_index="/dev/video1", fps=30, width=640, height=480, rotation=90 + ), + "wrist": OpenCVCameraConfig( + camera_index="/dev/video4", fps=30, width=640, height=480, rotation=180 + ), + } + ) + + +@RobotConfig.register_subclass("lekiwi_client") +@dataclass +class LeKiwiClientConfig(RobotConfig): + # Network Configuration + remote_ip: str = "172.18.129.208" + port_zmq_cmd: int = 5555 + port_zmq_observations: int = 5556 + + teleop_keys: dict[str, str] = field( + default_factory=lambda: { + # Movement + "forward": "w", + "backward": "s", + "left": "a", + "right": "d", + "rotate_left": "z", + "rotate_right": "x", + # Speed control + "speed_up": "r", + "speed_down": "f", + # quit teleop + "quit": "q", + } + ) diff --git a/lerobot/common/robots/lekiwi/configuration_lekiwi.py b/lerobot/common/robots/lekiwi/configuration_lekiwi.py deleted file mode 100644 index 076dbbc2..00000000 --- a/lerobot/common/robots/lekiwi/configuration_lekiwi.py +++ /dev/null @@ -1,89 +0,0 @@ -from dataclasses import dataclass, field - -from lerobot.common.cameras.configs import CameraConfig -from lerobot.common.cameras.opencv.configuration_opencv import OpenCVCameraConfig -from lerobot.common.motors.configs import FeetechMotorsBusConfig, MotorsBusConfig -from lerobot.common.robots.config import RobotConfig - - -@RobotConfig.register_subclass("lekiwi") -@dataclass -class LeKiwiRobotConfig(RobotConfig): - # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. - # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as - # the number of motors in your follower arms. - max_relative_target: int | None = None - - # Network Configuration - ip: str = "192.168.0.193" - port: int = 5555 - video_port: int = 5556 - - cameras: dict[str, CameraConfig] = field( - default_factory=lambda: { - "front": OpenCVCameraConfig( - camera_index="/dev/video0", fps=30, width=640, height=480, rotation=90 - ), - "wrist": OpenCVCameraConfig( - camera_index="/dev/video2", fps=30, width=640, height=480, rotation=180 - ), - } - ) - - calibration_dir: str = ".cache/calibration/lekiwi" - - leader_arms: dict[str, MotorsBusConfig] = field( - default_factory=lambda: { - "main": FeetechMotorsBusConfig( - port="/dev/tty.usbmodem585A0077581", - motors={ - # name: (index, model) - "shoulder_pan": [1, "sts3215"], - "shoulder_lift": [2, "sts3215"], - "elbow_flex": [3, "sts3215"], - "wrist_flex": [4, "sts3215"], - "wrist_roll": [5, "sts3215"], - "gripper": [6, "sts3215"], - }, - ), - } - ) - - follower_arms: dict[str, MotorsBusConfig] = field( - default_factory=lambda: { - "main": FeetechMotorsBusConfig( - port="/dev/ttyACM0", - motors={ - # name: (index, model) - "shoulder_pan": [1, "sts3215"], - "shoulder_lift": [2, "sts3215"], - "elbow_flex": [3, "sts3215"], - "wrist_flex": [4, "sts3215"], - "wrist_roll": [5, "sts3215"], - "gripper": [6, "sts3215"], - "left_wheel": (7, "sts3215"), - "back_wheel": (8, "sts3215"), - "right_wheel": (9, "sts3215"), - }, - ), - } - ) - - teleop_keys: dict[str, str] = field( - default_factory=lambda: { - # Movement - "forward": "w", - "backward": "s", - "left": "a", - "right": "d", - "rotate_left": "z", - "rotate_right": "x", - # Speed control - "speed_up": "r", - "speed_down": "f", - # quit teleop - "quit": "q", - } - ) - - mock: bool = False diff --git a/lerobot/common/robots/lekiwi/lekiwi.py b/lerobot/common/robots/lekiwi/lekiwi.py new file mode 100644 index 00000000..7ca0fc07 --- /dev/null +++ b/lerobot/common/robots/lekiwi/lekiwi.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from typing import Any + +from lerobot.common.cameras.utils import make_cameras_from_configs +from lerobot.common.constants import OBS_IMAGES, OBS_STATE +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.common.motors.feetech import ( + FeetechMotorsBus, + OperatingMode, +) + +from ..robot import Robot +from ..utils import ensure_safe_goal_position +from .config_lekiwi import LeKiwiConfig + +logger = logging.getLogger(__name__) + + +class LeKiwi(Robot): + """ + The robot includes a three omniwheel mobile base and a remote follower arm. + The leader arm is connected locally (on the laptop) and its joint positions are recorded and then + forwarded to the remote follower arm (after applying a safety clamp). + In parallel, keyboard teleoperation is used to generate raw velocity commands for the wheels. + """ + + config_class = LeKiwiConfig + name = "lekiwi" + + def __init__(self, config: LeKiwiConfig): + super().__init__(config) + self.config = config + self.id = config.id + self.bus = FeetechMotorsBus( + port=self.config.port, + motors={ + # arm + "arm_shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100), + "arm_shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100), + "arm_elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100), + "arm_wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100), + "arm_wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100), + "arm_gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), + # base + "base_left_wheel": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100), + "base_right_wheel": Motor(8, "sts3215", MotorNormMode.RANGE_M100_100), + "base_back_wheel": Motor(9, "sts3215", MotorNormMode.RANGE_M100_100), + }, + calibration=self.calibration, + ) + self.arm_motors = [m for m in self.bus.names if m.startswith("arm")] + self.base_motors = [m for m in self.bus.names if m.startswith("base")] + self.cameras = make_cameras_from_configs(config.cameras) + + @property + def state_feature(self) -> dict: + return { + "dtype": "float32", + "shape": (len(self.bus),), + "names": {"motors": list(self.bus.motors)}, + } + + @property + def action_feature(self) -> dict: + return self.state_feature + + @property + def camera_features(self) -> dict[str, dict]: + cam_ft = {} + for cam_key, cam in self.cameras.items(): + cam_ft[cam_key] = { + "shape": (cam.height, cam.width, cam.channels), + "names": ["height", "width", "channels"], + "info": None, + } + return cam_ft + + @property + def is_connected(self) -> bool: + # TODO(aliberts): add cam.is_connected for cam in self.cameras + return self.bus.is_connected + + def connect(self) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.bus.connect() + if not self.is_calibrated: + self.calibrate() + + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.bus.is_calibrated + + # TODO(Steven): I think we should extend this to give the user the option of re-calibrate + # calibrate(recalibrate: bool = False) -> None: + # If true, then we overwrite the previous calibration file with new values + def calibrate(self) -> None: + logger.info(f"\nRunning calibration of {self}") + + motors = self.arm_motors + self.base_motors + + self.bus.disable_torque(self.arm_motors) + for name in self.arm_motors: + self.bus.write("Operating_Mode", name, OperatingMode.POSITION.value) + + input("Move robot to the middle of its range of motion and press ENTER....") + homing_offsets = self.bus.set_half_turn_homings(motors) + + # TODO(Steven): Might be worth to do this also in other robots but it should be added in the docs + full_turn_motor = [ + motor for motor in motors if any(keyword in motor for keyword in ["wheel", "gripper"]) + ] + unknown_range_motors = [motor for motor in motors if motor not in full_turn_motor] + + print( + f"Move all arm joints except '{full_turn_motor}' sequentially through their " + "entire ranges of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) + for name in full_turn_motor: + range_mins[name] = 0 + range_maxes[name] = 4095 + + self.calibration = {} + for name, motor in self.bus.motors.items(): + self.calibration[name] = MotorCalibration( + id=motor.id, + drive_mode=0, + homing_offset=homing_offsets[name], + range_min=range_mins[name], + range_max=range_maxes[name], + ) + + self.bus.write_calibration(self.calibration) + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self): + # Set-up arm actuators (position mode) + # We assume that at connection time, arm is in a rest position, + # and torque can be safely disabled to run calibration. + self.bus.disable_torque(self.arm_motors) + for name in self.arm_motors: + self.bus.write("Operating_Mode", name, OperatingMode.POSITION.value) + # Set P_Coefficient to lower value to avoid shakiness (Default is 32) + self.bus.write("P_Coefficient", name, 16) + # Set I_Coefficient and D_Coefficient to default value 0 and 32 + self.bus.write("I_Coefficient", name, 0) + self.bus.write("D_Coefficient", name, 32) + # Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of + # the motors. Note: this configuration is not in the official STS3215 Memory Table + self.bus.write("Maximum_Acceleration", name, 254) + self.bus.write("Acceleration", name, 254) + + for name in self.base_motors: + self.bus.write("Operating_Mode", name, OperatingMode.VELOCITY.value) + + self.bus.enable_torque() # TODO(Steven): Operation has failed with: ConnectionError: Failed to write 'Lock' on id_=6 with '1' after 1 tries. [TxRxResult] Incorrect status packet! + + def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + obs_dict = {OBS_IMAGES: {}} + + # Read actuators position for arm and vel for base + start = time.perf_counter() + arm_pos = self.bus.sync_read("Present_Position", self.arm_motors) + base_vel = self.bus.sync_read("Present_Speed", self.base_motors) + obs_dict[OBS_STATE] = {**arm_pos, **base_vel} + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[OBS_IMAGES][cam_key] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + # Copied from S100 robot + """Command lekiwi to move to a target joint configuration. + + The relative action magnitude may be clipped depending on the configuration parameter + `max_relative_target`. In this case, the action sent differs from original action. + Thus, this function always returns the action actually sent. + + Raises: + RobotDeviceNotConnectedError: if robot is not connected. + + Returns: + np.ndarray: the action sent to the motors, potentially clipped. + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + arm_goal_pos = {k: v for k, v in action.items() if k in self.arm_motors} + base_goal_vel = {k: v for k, v in action.items() if k in self.base_motors} + + # Cap goal position when too far away from present position. + # /!\ Slower fps expected due to reading from the follower. + if self.config.max_relative_target is not None: + present_pos = self.bus.sync_read("Present_Position", self.arm_motors) + goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in arm_goal_pos.items()} + arm_safe_goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) + arm_goal_pos = arm_safe_goal_pos + + # TODO(Steven): Message fetching failed: Magnitude 34072 exceeds 32767 (max for sign_bit_index=15) + # TODO(Steven): IMO, this should be a check in the motor bus + + # Send goal position to the actuators + self.bus.sync_write("Goal_Position", arm_goal_pos) + self.bus.sync_write("Goal_Speed", base_goal_vel) + + return {**arm_goal_pos, **base_goal_vel} + + def stop_base(self): + self.bus.sync_write("Goal_Speed", dict.fromkeys(self.base_motors, 0), num_retry=5) + logger.info("Base motors stopped") + + def disconnect(self): + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.stop_base() + self.bus.disconnect(self.config.disable_torque_on_disconnect) + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/robots/lekiwi/lekiwi_client.py b/lerobot/common/robots/lekiwi/lekiwi_client.py new file mode 100644 index 00000000..b9e3a5cd --- /dev/null +++ b/lerobot/common/robots/lekiwi/lekiwi_client.py @@ -0,0 +1,504 @@ +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import json +import logging +from typing import Any + +import cv2 +import numpy as np +import torch +import zmq + +from lerobot.common.constants import OBS_IMAGES, OBS_STATE +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError, InvalidActionError +from lerobot.common.robots.config import RobotMode + +from ..robot import Robot +from .config_lekiwi import LeKiwiClientConfig + + +# TODO(Steven): This doesn't need to inherit from Robot +# But we do it for now to offer a familiar API +# TODO(Steven): This doesn't need to take care of the +# mapping from teleop to motor commands, but given that +# we already have a middle-man (this class) we add it here +# Other options include: +# 1. Adding it to the Telop implementation for lekiwi +# (meaning each robot will need a teleop imple) or +# 2. Adding it into the robot implementation +# (meaning the policy might be needed to be train +# over the teleop action space) +# TODO(Steven): Check if we can move everything to 32 instead +class LeKiwiClient(Robot): + config_class = LeKiwiClientConfig + name = "lekiwi_client" + + def __init__(self, config: LeKiwiClientConfig): + super().__init__(config) + self.config = config + self.id = config.id + self.robot_type = config.type + self.robot_mode = config.robot_mode + + self.remote_ip = config.remote_ip + self.port_zmq_cmd = config.port_zmq_cmd + self.port_zmq_observations = config.port_zmq_observations + + self.teleop_keys = config.teleop_keys + + self.zmq_context = None + self.zmq_cmd_socket = None + self.zmq_observation_socket = None + + self.last_frames = {} + self.last_present_speed = {"x_cmd": 0.0, "y_cmd": 0.0, "theta_cmd": 0.0} + + self.last_remote_arm_state = {} + + # Define three speed levels and a current index + self.speed_levels = [ + {"xy": 0.1, "theta": 30}, # slow + {"xy": 0.2, "theta": 60}, # medium + {"xy": 0.3, "theta": 90}, # fast + ] + self.speed_index = 0 # Start at slow + + self._is_connected = False + self.logs = {} + + @property + def state_feature(self) -> dict: + # TODO(Steven): Get this from the data fetched? + # TODO(Steven): Motor names are unknown for the Daemon + # Or assume its size/metadata? + return { + "dtype": "float64", + "shape": (9,), + "names": { + "motors": [ + "arm_shoulder_pan", + "arm_shoulder_lift", + "arm_elbow_flex", + "arm_wrist_flex", + "arm_wrist_roll", + "arm_gripper", + "base_left_wheel", + "base_right_wheel", + "base_back_wheel", + ] + }, + } + + @property + def action_feature(self) -> dict: + return self.state_feature + + @property + def camera_features(self) -> dict[str, dict]: + # TODO(Steven): Get this from the data fetched? + # TODO(Steven): camera names are unknown for the Daemon + # Or assume its size/metadata? + # TODO(Steven): Check consistency of image sizes + cam_ft = { + "front": { + "shape": (480, 640, 3), + "names": ["height", "width", "channels"], + "info": None, + }, + "wrist": { + "shape": (480, 640, 3), + "names": ["height", "width", "channels"], + "info": None, + }, + } + return cam_ft + + @property + def is_connected(self) -> bool: + return self._is_connected + + @property + def is_calibrated(self) -> bool: + pass + + def connect(self) -> None: + """Establishes ZMQ sockets with the remote mobile robot""" + + if self._is_connected: + raise DeviceAlreadyConnectedError( + "LeKiwi Daemon is already connected. Do not run `robot.connect()` twice." + ) + + self.zmq_context = zmq.Context() + self.zmq_cmd_socket = self.zmq_context.socket(zmq.PUSH) + zmq_cmd_locator = f"tcp://{self.remote_ip}:{self.port_zmq_cmd}" + self.zmq_cmd_socket.connect(zmq_cmd_locator) + self.zmq_cmd_socket.setsockopt(zmq.CONFLATE, 1) + + self.zmq_observation_socket = self.zmq_context.socket(zmq.PULL) + zmq_observations_locator = f"tcp://{self.remote_ip}:{self.port_zmq_observations}" + self.zmq_observation_socket.connect(zmq_observations_locator) + self.zmq_observation_socket.setsockopt(zmq.CONFLATE, 1) + + self._is_connected = True + + def calibrate(self) -> None: + logging.warning("LeKiwiClient has nothing to calibrate.") + return + + # Consider moving these static functions out of the class + # Copied from robot_lekiwi MobileManipulator class* (before the refactor) + @staticmethod + def _degps_to_raw(degps: float) -> int: + steps_per_deg = 4096.0 / 360.0 + speed_in_steps = degps * steps_per_deg + speed_int = int(round(speed_in_steps)) + # Cap the value to fit within signed 16-bit range (-32768 to 32767) + if speed_int > 0x7FFF: + speed_int = 0x7FFF # 32767 -> maximum positive value + elif speed_int < -0x8000: + speed_int = -0x8000 # -32768 -> minimum negative value + return speed_int + + # Copied from robot_lekiwi MobileManipulator class + @staticmethod + def _raw_to_degps(raw_speed: int) -> float: + steps_per_deg = 4096.0 / 360.0 + magnitude = raw_speed + degps = magnitude / steps_per_deg + return degps + + # Copied from robot_lekiwi MobileManipulator class + def _body_to_wheel_raw( + self, + x_cmd: float, + y_cmd: float, + theta_cmd: float, + wheel_radius: float = 0.05, + base_radius: float = 0.125, + max_raw: int = 3000, + ) -> dict: + """ + Convert desired body-frame velocities into wheel raw commands. + + Parameters: + x_cmd : Linear velocity in x (m/s). + y_cmd : Linear velocity in y (m/s). + theta_cmd : Rotational velocity (deg/s). + wheel_radius: Radius of each wheel (meters). + base_radius : Distance from the center of rotation to each wheel (meters). + max_raw : Maximum allowed raw command (ticks) per wheel. + + Returns: + A dictionary with wheel raw commands: + {"left_wheel": value, "back_wheel": value, "right_wheel": value}. + + Notes: + - Internally, the method converts theta_cmd to rad/s for the kinematics. + - The raw command is computed from the wheels angular speed in deg/s + using _degps_to_raw(). If any command exceeds max_raw, all commands + are scaled down proportionally. + """ + # Convert rotational velocity from deg/s to rad/s. + theta_rad = theta_cmd * (np.pi / 180.0) + # Create the body velocity vector [x, y, theta_rad]. + velocity_vector = np.array([x_cmd, y_cmd, theta_rad]) + + # Define the wheel mounting angles with a -90° offset. + angles = np.radians(np.array([240, 120, 0]) - 90) + # Build the kinematic matrix: each row maps body velocities to a wheel’s linear speed. + # The third column (base_radius) accounts for the effect of rotation. + m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles]) + + # Compute each wheel’s linear speed (m/s) and then its angular speed (rad/s). + wheel_linear_speeds = m.dot(velocity_vector) + wheel_angular_speeds = wheel_linear_speeds / wheel_radius + + # Convert wheel angular speeds from rad/s to deg/s. + wheel_degps = wheel_angular_speeds * (180.0 / np.pi) + + # Scaling + steps_per_deg = 4096.0 / 360.0 + raw_floats = [abs(degps) * steps_per_deg for degps in wheel_degps] + max_raw_computed = max(raw_floats) + if max_raw_computed > max_raw: + scale = max_raw / max_raw_computed + wheel_degps = wheel_degps * scale + + # Convert each wheel’s angular speed (deg/s) to a raw integer. + wheel_raw = [LeKiwiClient._degps_to_raw(deg) for deg in wheel_degps] + + # TODO(Steven): remove hard-coded names + return {"left_wheel": wheel_raw[0], "back_wheel": wheel_raw[1], "right_wheel": wheel_raw[2]} + + # Copied from robot_lekiwi MobileManipulator class + def _wheel_raw_to_body( + self, wheel_raw: dict[str, Any], wheel_radius: float = 0.05, base_radius: float = 0.125 + ) -> dict[str, Any]: + """ + Convert wheel raw command feedback back into body-frame velocities. + + Parameters: + wheel_raw : Vector with raw wheel commands ("left_wheel", "back_wheel", "right_wheel"). + wheel_radius: Radius of each wheel (meters). + base_radius : Distance from the robot center to each wheel (meters). + + Returns: + A tuple (x_cmd, y_cmd, theta_cmd) where: + x_cmd : Linear velocity in x (m/s). + y_cmd : Linear velocity in y (m/s). + theta_cmd : Rotational velocity in deg/s. + """ + + # TODO(Steven): No check is done for dict keys + # Convert each raw command back to an angular speed in deg/s. + wheel_degps = np.array([LeKiwiClient._raw_to_degps(int(v)) for _, v in wheel_raw.items()]) + # Convert from deg/s to rad/s. + wheel_radps = wheel_degps * (np.pi / 180.0) + # Compute each wheel’s linear speed (m/s) from its angular speed. + wheel_linear_speeds = wheel_radps * wheel_radius + + # Define the wheel mounting angles with a -90° offset. + angles = np.radians(np.array([240, 120, 0]) - 90) + m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles]) + + # Solve the inverse kinematics: body_velocity = M⁻¹ · wheel_linear_speeds. + m_inv = np.linalg.inv(m) + velocity_vector = m_inv.dot(wheel_linear_speeds) + x_cmd, y_cmd, theta_rad = velocity_vector + theta_cmd = theta_rad * (180.0 / np.pi) + return {"x_cmd": x_cmd, "y_cmd": y_cmd, "theta_cmd": theta_cmd} + + # TODO(Steven): This is flaky, for example, if we received a state but failed decoding the image, we will not update any value + # TODO(Steven): All this function needs to be refactored + def _get_data(self): + # Copied from robot_lekiwi.py + """Polls the video socket for up to 15 ms. If data arrives, decode only + the *latest* message, returning frames, speed, and arm state. If + nothing arrives for any field, use the last known values.""" + + frames = {} + present_speed = {} + + remote_arm_state_tensor = {} + + # Poll up to 15 ms + poller = zmq.Poller() + poller.register(self.zmq_observation_socket, zmq.POLLIN) + socks = dict(poller.poll(15)) + if self.zmq_observation_socket not in socks or socks[self.zmq_observation_socket] != zmq.POLLIN: + # No new data arrived → reuse ALL old data + # TODO(Steven): This might return empty variables at init + return (self.last_frames, self.last_present_speed, self.last_remote_arm_state) + + # Drain all messages, keep only the last + last_msg = None + # TODO(Steven): There's probably a way to do this without while True + # TODO(Steven): Even consider changing to PUB/SUB + while True: + try: + obs_string = self.zmq_observation_socket.recv_string(zmq.NOBLOCK) + last_msg = obs_string + except zmq.Again: + break + + if not last_msg: + # No new message → also reuse old + return (self.last_frames, self.last_present_speed, self.last_remote_arm_state) + + # Decode only the final message + try: + observation = json.loads(last_msg) + + state_observation = observation[OBS_STATE] + image_observation = observation[OBS_IMAGES] + + # Convert images + for cam_name, image_b64 in image_observation.items(): + if image_b64: + jpg_data = base64.b64decode(image_b64) + np_arr = np.frombuffer(jpg_data, dtype=np.uint8) + frame_candidate = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) + if frame_candidate is not None: + frames[cam_name] = frame_candidate + + # TODO(Steven): Should we really ignore the arm state if the image is None? + # If remote_arm_state is None and frames is None there is no message then use the previous message + if state_observation is not None and frames is not None: + self.last_frames = frames + + # TODO(Steven): hard-coded name of expected keys, not good + remote_arm_state_tensor = {k: v for k, v in state_observation.items() if k.startswith("arm")} + self.last_remote_arm_state = remote_arm_state_tensor + + present_speed = {k: v for k, v in state_observation.items() if k.startswith("base")} + self.last_present_speed = present_speed + else: + frames = self.last_frames + remote_arm_state_tensor = self.last_remote_arm_state + present_speed = self.last_present_speed + + except Exception as e: + print(f"[DEBUG] Error decoding video message: {e}") + # If decode fails, fall back to old data + return (self.last_frames, self.last_present_speed, self.last_remote_arm_state) + return frames, present_speed, remote_arm_state_tensor + + # TODO(Steven): The returned space is different from the get_observation of LeKiwi + # This returns body-frames velocities instead of wheel pos/speeds + def get_observation(self) -> dict[str, Any]: + """ + Capture observations from the remote robot: current follower arm positions, + present wheel speeds (converted to body-frame velocities: x, y, theta), + and a camera frame. Receives over ZMQ, translate to body-frame vel + """ + if not self._is_connected: + raise DeviceNotConnectedError("LeKiwiClient is not connected. You need to run `robot.connect()`.") + + # TODO(Steven): remove hard-coded cam name + # This is needed at init for when there's no comms + obs_dict = { + OBS_IMAGES: {"wrist": np.zeros(shape=(480, 640, 3)), "front": np.zeros(shape=(640, 480, 3))} + } + + frames, present_speed, remote_arm_state_tensor = self._get_data() + body_state = self._wheel_raw_to_body(present_speed) + # TODO(Steven): output isdict[str,Any] and we multiply by 1000.0. This should be more explicit and specify the expected type instead of Any + body_state_mm = {k: v * 1000.0 for k, v in body_state.items()} # Convert x,y to mm/s + + obs_dict[OBS_STATE] = {**remote_arm_state_tensor, **body_state_mm} + + # Loop over each configured camera + for cam_name, frame in frames.items(): + if frame is None: + # TODO(Steven): Daemon doesn't know camera dimensions (hard-coded for now), consider at least getting them from state features + logging.warning("Frame is None") + frame = np.zeros((480, 640, 3), dtype=np.uint8) + obs_dict[OBS_IMAGES][cam_name] = torch.from_numpy(frame) + + return obs_dict + + def _from_keyboard_to_wheel_action(self, pressed_keys: np.ndarray): + # Speed control + if self.teleop_keys["speed_up"] in pressed_keys: + self.speed_index = min(self.speed_index + 1, 2) + if self.teleop_keys["speed_down"] in pressed_keys: + self.speed_index = max(self.speed_index - 1, 0) + speed_setting = self.speed_levels[self.speed_index] + xy_speed = speed_setting["xy"] # e.g. 0.1, 0.25, or 0.4 + theta_speed = speed_setting["theta"] # e.g. 30, 60, or 90 + + x_cmd = 0.0 # m/s forward/backward + y_cmd = 0.0 # m/s lateral + theta_cmd = 0.0 # deg/s rotation + + if self.teleop_keys["forward"] in pressed_keys: + x_cmd += xy_speed + if self.teleop_keys["backward"] in pressed_keys: + x_cmd -= xy_speed + if self.teleop_keys["left"] in pressed_keys: + y_cmd += xy_speed + if self.teleop_keys["right"] in pressed_keys: + y_cmd -= xy_speed + if self.teleop_keys["rotate_left"] in pressed_keys: + theta_cmd += theta_speed + if self.teleop_keys["rotate_right"] in pressed_keys: + theta_cmd -= theta_speed + return self._body_to_wheel_raw(x_cmd, y_cmd, theta_cmd) + + def configure(self): + pass + + # TODO(Steven): This assumes this call is always called from a keyboard teleop command + # TODO(Steven): Doing this mapping in here adds latecy between send_action and movement from the user perspective. + # t0: get teleop_cmd + # t1: send_action(teleop_cmd) + # t2: mapping teleop_cmd -> motor_cmd + # t3: execute motor_md + # This mapping for other robots/teleop devices might be slower. Doing this in the teleop will make this explicit + # t0': get teleop_cmd + # t1': mapping teleop_cmd -> motor_cmd + # t2': send_action(motor_cmd) + # t3': execute motor_cmd + # t3'-t2' << t3-t1 + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + """Command lekiwi to move to a target joint configuration. Translates to motor space + sends over ZMQ + + Args: + action (np.ndarray): array containing the goal positions for the motors. + + Raises: + RobotDeviceNotConnectedError: if robot is not connected. + + Returns: + np.ndarray: the action sent to the motors, potentially clipped. + """ + if not self._is_connected: + raise DeviceNotConnectedError( + "ManipulatorRobot is not connected. You need to run `robot.connect()`." + ) + + if self.robot_mode is RobotMode.AUTO: + # TODO(Steven): Not yet implemented. The policy outputs might need a different conversion + raise Exception + + goal_pos = {} + # TODO(Steven): This assumes teleop mode is always used with keyboard. Tomorrow we could teleop with another device ... ? + if self.robot_mode is RobotMode.TELEOP: + motors_name = self.state_feature.get("names").get("motors") + + common_keys = [ + key for key in action if key in (motor.replace("arm_", "") for motor in motors_name) + ] + + # TODO(Steven): I don't like this + if len(common_keys) < 6: + logging.error("Action should include at least the states of the leader arm") + raise InvalidActionError + + arm_actions = {"arm_" + arm_motor: action[arm_motor] for arm_motor in common_keys} + goal_pos = arm_actions + + if len(action) > 6: + keyboard_keys = np.array(list(set(action.keys()) - set(common_keys))) + wheel_actions = { + "base_" + k: v for k, v in self._from_keyboard_to_wheel_action(keyboard_keys).items() + } + goal_pos = {**arm_actions, **wheel_actions} + + self.zmq_cmd_socket.send_string(json.dumps(goal_pos)) # action is in motor space + + return goal_pos + + def print_logs(self): + # TODO(Steven): Refactor logger + pass + + def disconnect(self): + """Cleans ZMQ comms""" + + if not self._is_connected: + raise DeviceNotConnectedError( + "LeKiwi is not connected. You need to run `robot.connect()` before disconnecting." + ) + self.zmq_observation_socket.close() + self.zmq_cmd_socket.close() + self.zmq_context.term() + self._is_connected = False + + def __del__(self): + if getattr(self, "is_connected", False): + self.disconnect() diff --git a/lerobot/common/robots/lekiwi/lekiwi_host.py b/lerobot/common/robots/lekiwi/lekiwi_host.py new file mode 100644 index 00000000..56228ac9 --- /dev/null +++ b/lerobot/common/robots/lekiwi/lekiwi_host.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import json +import logging +import time + +import cv2 +import zmq + +from lerobot.common.constants import OBS_IMAGES + +from .config_lekiwi import LeKiwiConfig +from .lekiwi import LeKiwi + +# Network Configuration +PORT_ZMQ_CMD: int = 5555 +PORT_ZMQ_OBSERVATIONS: int = 5556 + + +class HostAgent: + def __init__(self): + self.zmq_context = zmq.Context() + self.zmq_cmd_socket = self.zmq_context.socket(zmq.PULL) + self.zmq_cmd_socket.setsockopt(zmq.CONFLATE, 1) + self.zmq_cmd_socket.bind(f"tcp://*:{PORT_ZMQ_CMD}") + + self.zmq_observation_socket = self.zmq_context.socket(zmq.PUSH) + self.zmq_observation_socket.setsockopt(zmq.CONFLATE, 1) + self.zmq_observation_socket.bind(f"tcp://*:{PORT_ZMQ_OBSERVATIONS}") + + def disconnect(self): + self.zmq_observation_socket.close() + self.zmq_cmd_socket.close() + self.zmq_context.term() + + +def main(): + logging.info("Configuring LeKiwi") + robot_config = LeKiwiConfig() + robot = LeKiwi(robot_config) + + logging.info("Connecting LeKiwi") + robot.connect() + + logging.info("Starting HostAgent") + remote_agent = HostAgent() + + last_cmd_time = time.time() + logging.info("Waiting for commands...") + try: + # Business logic + start = time.perf_counter() + duration = 0 + while duration < 100: + loop_start_time = time.time() + try: + msg = remote_agent.zmq_cmd_socket.recv_string(zmq.NOBLOCK) + data = dict(json.loads(msg)) + _action_sent = robot.send_action(data) + last_cmd_time = time.time() + except zmq.Again: + logging.warning("No command available") + except Exception as e: + logging.error("Message fetching failed: %s", e) + + # TODO(Steven): Check this value + # Watchdog: stop the robot if no command is received for over 0.5 seconds. + now = time.time() + if now - last_cmd_time > 0.5: + robot.stop_base() + + last_observation = robot.get_observation() + + # Encode ndarrays to base64 strings + for cam_key, _ in robot.cameras.items(): + ret, buffer = cv2.imencode( + ".jpg", last_observation[OBS_IMAGES][cam_key], [int(cv2.IMWRITE_JPEG_QUALITY), 90] + ) + if ret: + last_observation[OBS_IMAGES][cam_key] = base64.b64encode(buffer).decode("utf-8") + else: + last_observation[OBS_IMAGES][cam_key] = "" + + # Send the observation to the remote agent + remote_agent.zmq_observation_socket.send_string(json.dumps(last_observation)) + + # Ensure a short sleep to avoid overloading the CPU. + elapsed = time.time() - loop_start_time + + # TODO(Steven): Check this value + time.sleep( + max(0.033 - elapsed, 0) + ) # If robot jitters increase the sleep and monitor cpu load with `top` in cmd + duration = time.perf_counter() - start + + except KeyboardInterrupt: + print("Shutting down LeKiwi server.") + finally: + robot.disconnect() + remote_agent.disconnect() + + logging.info("Finished LeKiwi cleanly") + + +if __name__ == "__main__": + main() diff --git a/lerobot/common/robots/lekiwi/lekiwi_remote.py b/lerobot/common/robots/lekiwi/lekiwi_remote.py deleted file mode 100644 index 1a3af5cf..00000000 --- a/lerobot/common/robots/lekiwi/lekiwi_remote.py +++ /dev/null @@ -1,224 +0,0 @@ -# Copyright 2024 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import base64 -import json -import threading -import time -from pathlib import Path - -import cv2 -import zmq - -from lerobot.common.robots.mobile_manipulator import LeKiwi - - -def setup_zmq_sockets(config): - context = zmq.Context() - cmd_socket = context.socket(zmq.PULL) - cmd_socket.setsockopt(zmq.CONFLATE, 1) - cmd_socket.bind(f"tcp://*:{config.port}") - - video_socket = context.socket(zmq.PUSH) - video_socket.setsockopt(zmq.CONFLATE, 1) - video_socket.bind(f"tcp://*:{config.video_port}") - - return context, cmd_socket, video_socket - - -def run_camera_capture(cameras, images_lock, latest_images_dict, stop_event): - while not stop_event.is_set(): - local_dict = {} - for name, cam in cameras.items(): - frame = cam.async_read() - ret, buffer = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90]) - if ret: - local_dict[name] = base64.b64encode(buffer).decode("utf-8") - else: - local_dict[name] = "" - with images_lock: - latest_images_dict.update(local_dict) - time.sleep(0.01) - - -def calibrate_follower_arm(motors_bus, calib_dir_str): - """ - Calibrates the follower arm. Attempts to load an existing calibration file; - if not found, runs manual calibration and saves the result. - """ - calib_dir = Path(calib_dir_str) - calib_dir.mkdir(parents=True, exist_ok=True) - calib_file = calib_dir / "main_follower.json" - try: - from lerobot.common.motors.feetech.feetech_calibration import run_full_arm_calibration - except ImportError: - print("[WARNING] Calibration function not available. Skipping calibration.") - return - - if calib_file.exists(): - with open(calib_file) as f: - calibration = json.load(f) - print(f"[INFO] Loaded calibration from {calib_file}") - else: - print("[INFO] Calibration file not found. Running manual calibration...") - calibration = run_full_arm_calibration(motors_bus, "lekiwi", "follower_arm", "follower") - print(f"[INFO] Calibration complete. Saving to {calib_file}") - with open(calib_file, "w") as f: - json.dump(calibration, f) - try: - motors_bus.set_calibration(calibration) - print("[INFO] Applied calibration for follower arm.") - except Exception as e: - print(f"[WARNING] Could not apply calibration: {e}") - - -def run_lekiwi(robot_config): - """ - Runs the LeKiwi robot: - - Sets up cameras and connects them. - - Initializes the follower arm motors. - - Calibrates the follower arm if necessary. - - Creates ZeroMQ sockets for receiving commands and streaming observations. - - Processes incoming commands (arm and wheel commands) and sends back sensor and camera data. - """ - # Import helper functions and classes - from lerobot.common.cameras.utils import make_cameras_from_configs - from lerobot.common.motors.feetech.feetech import FeetechMotorsBus, TorqueMode - - # Initialize cameras from the robot configuration. - cameras = make_cameras_from_configs(robot_config.cameras) - for cam in cameras.values(): - cam.connect() - - # Initialize the motors bus using the follower arm configuration. - motor_config = robot_config.follower_arms.get("main") - if motor_config is None: - print("[ERROR] Follower arm 'main' configuration not found.") - return - motors_bus = FeetechMotorsBus(motor_config) - motors_bus.connect() - - # Calibrate the follower arm. - calibrate_follower_arm(motors_bus, robot_config.calibration_dir) - - # Create the LeKiwi robot instance. - robot = LeKiwi(motors_bus) - - # Define the expected arm motor IDs. - arm_motor_ids = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"] - - # Disable torque for each arm motor. - for motor in arm_motor_ids: - motors_bus.write("Torque_Enable", TorqueMode.DISABLED.value, motor) - - # Set up ZeroMQ sockets. - context, cmd_socket, video_socket = setup_zmq_sockets(robot_config) - - # Start the camera capture thread. - latest_images_dict = {} - images_lock = threading.Lock() - stop_event = threading.Event() - cam_thread = threading.Thread( - target=run_camera_capture, args=(cameras, images_lock, latest_images_dict, stop_event), daemon=True - ) - cam_thread.start() - - last_cmd_time = time.time() - print("LeKiwi robot server started. Waiting for commands...") - - try: - while True: - loop_start_time = time.time() - - # Process incoming commands (non-blocking). - while True: - try: - msg = cmd_socket.recv_string(zmq.NOBLOCK) - except zmq.Again: - break - try: - data = json.loads(msg) - # Process arm position commands. - if "arm_positions" in data: - arm_positions = data["arm_positions"] - if not isinstance(arm_positions, list): - print(f"[ERROR] Invalid arm_positions: {arm_positions}") - elif len(arm_positions) < len(arm_motor_ids): - print( - f"[WARNING] Received {len(arm_positions)} arm positions, expected {len(arm_motor_ids)}" - ) - else: - for motor, pos in zip(arm_motor_ids, arm_positions, strict=False): - motors_bus.write("Goal_Position", pos, motor) - # Process wheel (base) commands. - if "raw_velocity" in data: - raw_command = data["raw_velocity"] - # Expect keys: "left_wheel", "back_wheel", "right_wheel". - command_speeds = [ - int(raw_command.get("left_wheel", 0)), - int(raw_command.get("back_wheel", 0)), - int(raw_command.get("right_wheel", 0)), - ] - robot.set_velocity(command_speeds) - last_cmd_time = time.time() - except Exception as e: - print(f"[ERROR] Parsing message failed: {e}") - - # Watchdog: stop the robot if no command is received for over 0.5 seconds. - now = time.time() - if now - last_cmd_time > 0.5: - robot.stop() - last_cmd_time = now - - # Read current wheel speeds from the robot. - current_velocity = robot.read_velocity() - - # Read the follower arm state from the motors bus. - follower_arm_state = [] - for motor in arm_motor_ids: - try: - pos = motors_bus.read("Present_Position", motor) - # Convert the position to a float (or use as is if already numeric). - follower_arm_state.append(float(pos) if not isinstance(pos, (int, float)) else pos) - except Exception as e: - print(f"[ERROR] Reading motor {motor} failed: {e}") - - # Get the latest camera images. - with images_lock: - images_dict_copy = dict(latest_images_dict) - - # Build the observation dictionary. - observation = { - "images": images_dict_copy, - "present_speed": current_velocity, - "follower_arm_state": follower_arm_state, - } - # Send the observation over the video socket. - video_socket.send_string(json.dumps(observation)) - - # Ensure a short sleep to avoid overloading the CPU. - elapsed = time.time() - loop_start_time - time.sleep( - max(0.033 - elapsed, 0) - ) # If robot jitters increase the sleep and monitor cpu load with `top` in cmd - except KeyboardInterrupt: - print("Shutting down LeKiwi server.") - finally: - stop_event.set() - cam_thread.join() - robot.stop() - motors_bus.disconnect() - cmd_socket.close() - video_socket.close() - context.term() diff --git a/lerobot/common/robots/lekiwi/robot_lekiwi.py b/lerobot/common/robots/lekiwi/robot_lekiwi.py deleted file mode 100644 index 8f2f9037..00000000 --- a/lerobot/common/robots/lekiwi/robot_lekiwi.py +++ /dev/null @@ -1,692 +0,0 @@ -import base64 -import json -import os -import sys -from pathlib import Path - -import cv2 -import numpy as np -import torch -import zmq - -from lerobot.common.cameras.utils import make_cameras_from_configs -from lerobot.common.errors import DeviceNotConnectedError -from lerobot.common.motors.feetech.feetech import TorqueMode -from lerobot.common.motors.feetech.feetech_calibration import run_full_arm_calibration -from lerobot.common.motors.motors_bus import MotorsBus -from lerobot.common.motors.utils import make_motors_buses_from_configs -from lerobot.common.robots.lekiwi.configuration_lekiwi import LeKiwiRobotConfig -from lerobot.common.robots.utils import get_arm_id - -PYNPUT_AVAILABLE = True -try: - # Only import if there's a valid X server or if we're not on a Pi - if ("DISPLAY" not in os.environ) and ("linux" in sys.platform): - print("No DISPLAY set. Skipping pynput import.") - raise ImportError("pynput blocked intentionally due to no display.") - - from pynput import keyboard -except ImportError: - keyboard = None - PYNPUT_AVAILABLE = False -except Exception as e: - keyboard = None - PYNPUT_AVAILABLE = False - print(f"Could not import pynput: {e}") - - -class MobileManipulator: - """ - MobileManipulator is a class for connecting to and controlling a remote mobile manipulator robot. - The robot includes a three omniwheel mobile base and a remote follower arm. - The leader arm is connected locally (on the laptop) and its joint positions are recorded and then - forwarded to the remote follower arm (after applying a safety clamp). - In parallel, keyboard teleoperation is used to generate raw velocity commands for the wheels. - """ - - def __init__(self, config: LeKiwiRobotConfig): - """ - Expected keys in config: - - ip, port, video_port for the remote connection. - - calibration_dir, leader_arms, follower_arms, max_relative_target, etc. - """ - self.robot_type = config.type - self.config = config - self.remote_ip = config.ip - self.remote_port = config.port - self.remote_port_video = config.video_port - self.calibration_dir = Path(self.config.calibration_dir) - self.logs = {} - - self.teleop_keys = self.config.teleop_keys - - # For teleoperation, the leader arm (local) is used to record the desired arm pose. - self.leader_arms = make_motors_buses_from_configs(self.config.leader_arms) - - self.follower_arms = make_motors_buses_from_configs(self.config.follower_arms) - - self.cameras = make_cameras_from_configs(self.config.cameras) - - self.is_connected = False - - self.last_frames = {} - self.last_present_speed = {} - self.last_remote_arm_state = torch.zeros(6, dtype=torch.float32) - - # Define three speed levels and a current index - self.speed_levels = [ - {"xy": 0.1, "theta": 30}, # slow - {"xy": 0.2, "theta": 60}, # medium - {"xy": 0.3, "theta": 90}, # fast - ] - self.speed_index = 0 # Start at slow - - # ZeroMQ context and sockets. - self.context = None - self.cmd_socket = None - self.video_socket = None - - # Keyboard state for base teleoperation. - self.running = True - self.pressed_keys = { - "forward": False, - "backward": False, - "left": False, - "right": False, - "rotate_left": False, - "rotate_right": False, - } - - if PYNPUT_AVAILABLE: - print("pynput is available - enabling local keyboard listener.") - self.listener = keyboard.Listener( - on_press=self.on_press, - on_release=self.on_release, - ) - self.listener.start() - else: - print("pynput not available - skipping local keyboard listener.") - self.listener = None - - def get_motor_names(self, arms: dict[str, MotorsBus]) -> list: - return [f"{arm}_{motor}" for arm, bus in arms.items() for motor in bus.motors] - - @property - def camera_features(self) -> dict: - cam_ft = {} - for cam_key, cam in self.cameras.items(): - key = f"observation.images.{cam_key}" - cam_ft[key] = { - "shape": (cam.height, cam.width, cam.channels), - "names": ["height", "width", "channels"], - "info": None, - } - return cam_ft - - @property - def motor_features(self) -> dict: - follower_arm_names = [ - "shoulder_pan", - "shoulder_lift", - "elbow_flex", - "wrist_flex", - "wrist_roll", - "gripper", - ] - observations = ["x_mm", "y_mm", "theta"] - combined_names = follower_arm_names + observations - return { - "action": { - "dtype": "float32", - "shape": (len(combined_names),), - "names": combined_names, - }, - "observation.state": { - "dtype": "float32", - "shape": (len(combined_names),), - "names": combined_names, - }, - } - - @property - def features(self): - return {**self.motor_features, **self.camera_features} - - @property - def has_camera(self): - return len(self.cameras) > 0 - - @property - def num_cameras(self): - return len(self.cameras) - - @property - def available_arms(self): - available = [] - for name in self.leader_arms: - available.append(get_arm_id(name, "leader")) - for name in self.follower_arms: - available.append(get_arm_id(name, "follower")) - return available - - def on_press(self, key): - try: - # Movement - if key.char == self.teleop_keys["forward"]: - self.pressed_keys["forward"] = True - elif key.char == self.teleop_keys["backward"]: - self.pressed_keys["backward"] = True - elif key.char == self.teleop_keys["left"]: - self.pressed_keys["left"] = True - elif key.char == self.teleop_keys["right"]: - self.pressed_keys["right"] = True - elif key.char == self.teleop_keys["rotate_left"]: - self.pressed_keys["rotate_left"] = True - elif key.char == self.teleop_keys["rotate_right"]: - self.pressed_keys["rotate_right"] = True - - # Quit teleoperation - elif key.char == self.teleop_keys["quit"]: - self.running = False - return False - - # Speed control - elif key.char == self.teleop_keys["speed_up"]: - self.speed_index = min(self.speed_index + 1, 2) - print(f"Speed index increased to {self.speed_index}") - elif key.char == self.teleop_keys["speed_down"]: - self.speed_index = max(self.speed_index - 1, 0) - print(f"Speed index decreased to {self.speed_index}") - - except AttributeError: - # e.g., if key is special like Key.esc - if key == keyboard.Key.esc: - self.running = False - return False - - def on_release(self, key): - try: - if hasattr(key, "char"): - if key.char == self.teleop_keys["forward"]: - self.pressed_keys["forward"] = False - elif key.char == self.teleop_keys["backward"]: - self.pressed_keys["backward"] = False - elif key.char == self.teleop_keys["left"]: - self.pressed_keys["left"] = False - elif key.char == self.teleop_keys["right"]: - self.pressed_keys["right"] = False - elif key.char == self.teleop_keys["rotate_left"]: - self.pressed_keys["rotate_left"] = False - elif key.char == self.teleop_keys["rotate_right"]: - self.pressed_keys["rotate_right"] = False - except AttributeError: - pass - - def connect(self): - if not self.leader_arms: - raise ValueError("MobileManipulator has no leader arm to connect.") - for name in self.leader_arms: - print(f"Connecting {name} leader arm.") - self.calibrate_leader() - - # Set up ZeroMQ sockets to communicate with the remote mobile robot. - self.context = zmq.Context() - self.cmd_socket = self.context.socket(zmq.PUSH) - connection_string = f"tcp://{self.remote_ip}:{self.remote_port}" - self.cmd_socket.connect(connection_string) - self.cmd_socket.setsockopt(zmq.CONFLATE, 1) - self.video_socket = self.context.socket(zmq.PULL) - video_connection = f"tcp://{self.remote_ip}:{self.remote_port_video}" - self.video_socket.connect(video_connection) - self.video_socket.setsockopt(zmq.CONFLATE, 1) - print( - f"[INFO] Connected to remote robot at {connection_string} and video stream at {video_connection}." - ) - self.is_connected = True - - def load_or_run_calibration_(self, name, arm, arm_type): - arm_id = get_arm_id(name, arm_type) - arm_calib_path = self.calibration_dir / f"{arm_id}.json" - - if arm_calib_path.exists(): - with open(arm_calib_path) as f: - calibration = json.load(f) - else: - print(f"Missing calibration file '{arm_calib_path}'") - calibration = run_full_arm_calibration(arm, self.robot_type, name, arm_type) - print(f"Calibration is done! Saving calibration file '{arm_calib_path}'") - arm_calib_path.parent.mkdir(parents=True, exist_ok=True) - with open(arm_calib_path, "w") as f: - json.dump(calibration, f) - - return calibration - - def calibrate_leader(self): - for name, arm in self.leader_arms.items(): - # Connect the bus - arm.connect() - - # Disable torque on all motors - for motor_id in arm.motors: - arm.write("Torque_Enable", TorqueMode.DISABLED.value, motor_id) - - # Now run calibration - calibration = self.load_or_run_calibration_(name, arm, "leader") - arm.set_calibration(calibration) - - def calibrate_follower(self): - for name, bus in self.follower_arms.items(): - bus.connect() - - # Disable torque on all motors - for motor_id in bus.motors: - bus.write("Torque_Enable", 0, motor_id) - - # Then filter out wheels - arm_only_dict = {k: v for k, v in bus.motors.items() if not k.startswith("wheel_")} - if not arm_only_dict: - continue - - original_motors = bus.motors - bus.motors = arm_only_dict - - calibration = self.load_or_run_calibration_(name, bus, "follower") - bus.set_calibration(calibration) - - bus.motors = original_motors - - def _get_data(self): - """ - Polls the video socket for up to 15 ms. If data arrives, decode only - the *latest* message, returning frames, speed, and arm state. If - nothing arrives for any field, use the last known values. - """ - frames = {} - present_speed = {} - remote_arm_state_tensor = torch.zeros(6, dtype=torch.float32) - - # Poll up to 15 ms - poller = zmq.Poller() - poller.register(self.video_socket, zmq.POLLIN) - socks = dict(poller.poll(15)) - if self.video_socket not in socks or socks[self.video_socket] != zmq.POLLIN: - # No new data arrived → reuse ALL old data - return (self.last_frames, self.last_present_speed, self.last_remote_arm_state) - - # Drain all messages, keep only the last - last_msg = None - while True: - try: - obs_string = self.video_socket.recv_string(zmq.NOBLOCK) - last_msg = obs_string - except zmq.Again: - break - - if not last_msg: - # No new message → also reuse old - return (self.last_frames, self.last_present_speed, self.last_remote_arm_state) - - # Decode only the final message - try: - observation = json.loads(last_msg) - - images_dict = observation.get("images", {}) - new_speed = observation.get("present_speed", {}) - new_arm_state = observation.get("follower_arm_state", None) - - # Convert images - for cam_name, image_b64 in images_dict.items(): - if image_b64: - jpg_data = base64.b64decode(image_b64) - np_arr = np.frombuffer(jpg_data, dtype=np.uint8) - frame_candidate = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) - if frame_candidate is not None: - frames[cam_name] = frame_candidate - - # If remote_arm_state is None and frames is None there is no message then use the previous message - if new_arm_state is not None and frames is not None: - self.last_frames = frames - - remote_arm_state_tensor = torch.tensor(new_arm_state, dtype=torch.float32) - self.last_remote_arm_state = remote_arm_state_tensor - - present_speed = new_speed - self.last_present_speed = new_speed - else: - frames = self.last_frames - - remote_arm_state_tensor = self.last_remote_arm_state - - present_speed = self.last_present_speed - - except Exception as e: - print(f"[DEBUG] Error decoding video message: {e}") - # If decode fails, fall back to old data - return (self.last_frames, self.last_present_speed, self.last_remote_arm_state) - - return frames, present_speed, remote_arm_state_tensor - - def _process_present_speed(self, present_speed: dict) -> torch.Tensor: - state_tensor = torch.zeros(3, dtype=torch.int32) - if present_speed: - decoded = {key: MobileManipulator.raw_to_degps(value) for key, value in present_speed.items()} - if "1" in decoded: - state_tensor[0] = decoded["1"] - if "2" in decoded: - state_tensor[1] = decoded["2"] - if "3" in decoded: - state_tensor[2] = decoded["3"] - return state_tensor - - def teleop_step( - self, record_data: bool = False - ) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: - if not self.is_connected: - raise DeviceNotConnectedError("MobileManipulator is not connected. Run `connect()` first.") - - speed_setting = self.speed_levels[self.speed_index] - xy_speed = speed_setting["xy"] # e.g. 0.1, 0.25, or 0.4 - theta_speed = speed_setting["theta"] # e.g. 30, 60, or 90 - - # Prepare to assign the position of the leader to the follower - arm_positions = [] - for name in self.leader_arms: - pos = self.leader_arms[name].read("Present_Position") - pos_tensor = torch.from_numpy(pos).float() - # Instead of pos_tensor.item(), use tolist() to convert the entire tensor to a list - arm_positions.extend(pos_tensor.tolist()) - - # (The rest of your code for generating wheel commands remains unchanged) - x_cmd = 0.0 # m/s forward/backward - y_cmd = 0.0 # m/s lateral - theta_cmd = 0.0 # deg/s rotation - if self.pressed_keys["forward"]: - x_cmd += xy_speed - if self.pressed_keys["backward"]: - x_cmd -= xy_speed - if self.pressed_keys["left"]: - y_cmd += xy_speed - if self.pressed_keys["right"]: - y_cmd -= xy_speed - if self.pressed_keys["rotate_left"]: - theta_cmd += theta_speed - if self.pressed_keys["rotate_right"]: - theta_cmd -= theta_speed - - wheel_commands = self.body_to_wheel_raw(x_cmd, y_cmd, theta_cmd) - - message = {"raw_velocity": wheel_commands, "arm_positions": arm_positions} - self.cmd_socket.send_string(json.dumps(message)) - - if not record_data: - return - - obs_dict = self.capture_observation() - - arm_state_tensor = torch.tensor(arm_positions, dtype=torch.float32) - - wheel_velocity_tuple = self.wheel_raw_to_body(wheel_commands) - wheel_velocity_mm = ( - wheel_velocity_tuple[0] * 1000.0, - wheel_velocity_tuple[1] * 1000.0, - wheel_velocity_tuple[2], - ) - wheel_tensor = torch.tensor(wheel_velocity_mm, dtype=torch.float32) - action_tensor = torch.cat([arm_state_tensor, wheel_tensor]) - action_dict = {"action": action_tensor} - - return obs_dict, action_dict - - def capture_observation(self) -> dict: - """ - Capture observations from the remote robot: current follower arm positions, - present wheel speeds (converted to body-frame velocities: x, y, theta), - and a camera frame. - """ - if not self.is_connected: - raise DeviceNotConnectedError("Not connected. Run `connect()` first.") - - frames, present_speed, remote_arm_state_tensor = self._get_data() - - body_state = self.wheel_raw_to_body(present_speed) - - body_state_mm = (body_state[0] * 1000.0, body_state[1] * 1000.0, body_state[2]) # Convert x,y to mm/s - wheel_state_tensor = torch.tensor(body_state_mm, dtype=torch.float32) - combined_state_tensor = torch.cat((remote_arm_state_tensor, wheel_state_tensor), dim=0) - - obs_dict = {"observation.state": combined_state_tensor} - - # Loop over each configured camera - for cam_name, cam in self.cameras.items(): - frame = frames.get(cam_name, None) - if frame is None: - # Create a black image using the camera's configured width, height, and channels - frame = np.zeros((cam.height, cam.width, cam.channels), dtype=np.uint8) - obs_dict[f"observation.images.{cam_name}"] = torch.from_numpy(frame) - - return obs_dict - - def send_action(self, action: torch.Tensor) -> torch.Tensor: - if not self.is_connected: - raise DeviceNotConnectedError("Not connected. Run `connect()` first.") - - # Ensure the action tensor has at least 9 elements: - # - First 6: arm positions. - # - Last 3: base commands. - if action.numel() < 9: - # Pad with zeros if there are not enough elements. - padded = torch.zeros(9, dtype=action.dtype) - padded[: action.numel()] = action - action = padded - - # Extract arm and base actions. - arm_actions = action[:6].flatten() - base_actions = action[6:].flatten() - - x_cmd_mm = base_actions[0].item() # mm/s - y_cmd_mm = base_actions[1].item() # mm/s - theta_cmd = base_actions[2].item() # deg/s - - # Convert mm/s to m/s for the kinematics calculations. - x_cmd = x_cmd_mm / 1000.0 # m/s - y_cmd = y_cmd_mm / 1000.0 # m/s - - # Compute wheel commands from body commands. - wheel_commands = self.body_to_wheel_raw(x_cmd, y_cmd, theta_cmd) - - arm_positions_list = arm_actions.tolist() - - message = {"raw_velocity": wheel_commands, "arm_positions": arm_positions_list} - self.cmd_socket.send_string(json.dumps(message)) - - return action - - def print_logs(self): - pass - - def disconnect(self): - if not self.is_connected: - raise DeviceNotConnectedError("Not connected.") - if self.cmd_socket: - stop_cmd = { - "raw_velocity": {"left_wheel": 0, "back_wheel": 0, "right_wheel": 0}, - "arm_positions": {}, - } - self.cmd_socket.send_string(json.dumps(stop_cmd)) - self.cmd_socket.close() - if self.video_socket: - self.video_socket.close() - if self.context: - self.context.term() - if PYNPUT_AVAILABLE: - self.listener.stop() - self.is_connected = False - print("[INFO] Disconnected from remote robot.") - - def __del__(self): - if getattr(self, "is_connected", False): - self.disconnect() - if PYNPUT_AVAILABLE: - self.listener.stop() - - @staticmethod - def degps_to_raw(degps: float) -> int: - steps_per_deg = 4096.0 / 360.0 - speed_in_steps = abs(degps) * steps_per_deg - speed_int = int(round(speed_in_steps)) - if speed_int > 0x7FFF: - speed_int = 0x7FFF - if degps < 0: - return speed_int | 0x8000 - else: - return speed_int & 0x7FFF - - @staticmethod - def raw_to_degps(raw_speed: int) -> float: - steps_per_deg = 4096.0 / 360.0 - magnitude = raw_speed & 0x7FFF - degps = magnitude / steps_per_deg - if raw_speed & 0x8000: - degps = -degps - return degps - - def body_to_wheel_raw( - self, - x_cmd: float, - y_cmd: float, - theta_cmd: float, - wheel_radius: float = 0.05, - base_radius: float = 0.125, - max_raw: int = 3000, - ) -> dict: - """ - Convert desired body-frame velocities into wheel raw commands. - - Parameters: - x_cmd : Linear velocity in x (m/s). - y_cmd : Linear velocity in y (m/s). - theta_cmd : Rotational velocity (deg/s). - wheel_radius: Radius of each wheel (meters). - base_radius : Distance from the center of rotation to each wheel (meters). - max_raw : Maximum allowed raw command (ticks) per wheel. - - Returns: - A dictionary with wheel raw commands: - {"left_wheel": value, "back_wheel": value, "right_wheel": value}. - - Notes: - - Internally, the method converts theta_cmd to rad/s for the kinematics. - - The raw command is computed from the wheels angular speed in deg/s - using degps_to_raw(). If any command exceeds max_raw, all commands - are scaled down proportionally. - """ - # Convert rotational velocity from deg/s to rad/s. - theta_rad = theta_cmd * (np.pi / 180.0) - # Create the body velocity vector [x, y, theta_rad]. - velocity_vector = np.array([x_cmd, y_cmd, theta_rad]) - - # Define the wheel mounting angles with a -90° offset. - angles = np.radians(np.array([240, 120, 0]) - 90) - # Build the kinematic matrix: each row maps body velocities to a wheel’s linear speed. - # The third column (base_radius) accounts for the effect of rotation. - m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles]) - - # Compute each wheel’s linear speed (m/s) and then its angular speed (rad/s). - wheel_linear_speeds = m.dot(velocity_vector) - wheel_angular_speeds = wheel_linear_speeds / wheel_radius - - # Convert wheel angular speeds from rad/s to deg/s. - wheel_degps = wheel_angular_speeds * (180.0 / np.pi) - - # Scaling - steps_per_deg = 4096.0 / 360.0 - raw_floats = [abs(degps) * steps_per_deg for degps in wheel_degps] - max_raw_computed = max(raw_floats) - if max_raw_computed > max_raw: - scale = max_raw / max_raw_computed - wheel_degps = wheel_degps * scale - - # Convert each wheel’s angular speed (deg/s) to a raw integer. - wheel_raw = [MobileManipulator.degps_to_raw(deg) for deg in wheel_degps] - - return {"left_wheel": wheel_raw[0], "back_wheel": wheel_raw[1], "right_wheel": wheel_raw[2]} - - def wheel_raw_to_body( - self, wheel_raw: dict, wheel_radius: float = 0.05, base_radius: float = 0.125 - ) -> tuple: - """ - Convert wheel raw command feedback back into body-frame velocities. - - Parameters: - wheel_raw : Dictionary with raw wheel commands (keys: "left_wheel", "back_wheel", "right_wheel"). - wheel_radius: Radius of each wheel (meters). - base_radius : Distance from the robot center to each wheel (meters). - - Returns: - A tuple (x_cmd, y_cmd, theta_cmd) where: - x_cmd : Linear velocity in x (m/s). - y_cmd : Linear velocity in y (m/s). - theta_cmd : Rotational velocity in deg/s. - """ - # Extract the raw values in order. - raw_list = [ - int(wheel_raw.get("left_wheel", 0)), - int(wheel_raw.get("back_wheel", 0)), - int(wheel_raw.get("right_wheel", 0)), - ] - - # Convert each raw command back to an angular speed in deg/s. - wheel_degps = np.array([MobileManipulator.raw_to_degps(r) for r in raw_list]) - # Convert from deg/s to rad/s. - wheel_radps = wheel_degps * (np.pi / 180.0) - # Compute each wheel’s linear speed (m/s) from its angular speed. - wheel_linear_speeds = wheel_radps * wheel_radius - - # Define the wheel mounting angles with a -90° offset. - angles = np.radians(np.array([240, 120, 0]) - 90) - m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles]) - - # Solve the inverse kinematics: body_velocity = M⁻¹ · wheel_linear_speeds. - m_inv = np.linalg.inv(m) - velocity_vector = m_inv.dot(wheel_linear_speeds) - x_cmd, y_cmd, theta_rad = velocity_vector - theta_cmd = theta_rad * (180.0 / np.pi) - return (x_cmd, y_cmd, theta_cmd) - - -class LeKiwi: - def __init__(self, motor_bus): - """ - Initializes the LeKiwi with Feetech motors bus. - """ - self.motor_bus = motor_bus - self.motor_ids = ["left_wheel", "back_wheel", "right_wheel"] - - # Initialize motors in velocity mode. - self.motor_bus.write("Lock", 0) - self.motor_bus.write("Mode", [1, 1, 1], self.motor_ids) - self.motor_bus.write("Lock", 1) - print("Motors set to velocity mode.") - - def read_velocity(self): - """ - Reads the raw speeds for all wheels. Returns a dictionary with motor names: - """ - raw_speeds = self.motor_bus.read("Present_Speed", self.motor_ids) - return { - "left_wheel": int(raw_speeds[0]), - "back_wheel": int(raw_speeds[1]), - "right_wheel": int(raw_speeds[2]), - } - - def set_velocity(self, command_speeds): - """ - Sends raw velocity commands (16-bit encoded values) directly to the motor bus. - The order of speeds must correspond to self.motor_ids. - """ - self.motor_bus.write("Goal_Speed", command_speeds, self.motor_ids) - - def stop(self): - """Stops the robot by setting all motor speeds to zero.""" - self.motor_bus.write("Goal_Speed", [0, 0, 0], self.motor_ids) - print("Motors stopped.") diff --git a/lerobot/common/robots/mobile_manipulator.py b/lerobot/common/robots/mobile_manipulator.py deleted file mode 100644 index 33027791..00000000 --- a/lerobot/common/robots/mobile_manipulator.py +++ /dev/null @@ -1,704 +0,0 @@ -# Copyright 2024 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import base64 -import json -import os -import sys -from pathlib import Path - -import cv2 -import numpy as np -import torch -import zmq - -from lerobot.common.cameras.utils import make_cameras_from_configs -from lerobot.common.errors import DeviceNotConnectedError -from lerobot.common.motors.feetech.feetech import TorqueMode -from lerobot.common.motors.feetech.feetech_calibration import run_full_arm_calibration -from lerobot.common.motors.motors_bus import MotorsBus -from lerobot.common.motors.utils import make_motors_buses_from_configs -from lerobot.common.robots.lekiwi.configuration_lekiwi import LeKiwiRobotConfig -from lerobot.common.robots.utils import get_arm_id - -PYNPUT_AVAILABLE = True -try: - # Only import if there's a valid X server or if we're not on a Pi - if ("DISPLAY" not in os.environ) and ("linux" in sys.platform): - print("No DISPLAY set. Skipping pynput import.") - raise ImportError("pynput blocked intentionally due to no display.") - - from pynput import keyboard -except ImportError: - keyboard = None - PYNPUT_AVAILABLE = False -except Exception as e: - keyboard = None - PYNPUT_AVAILABLE = False - print(f"Could not import pynput: {e}") - - -class MobileManipulator: - """ - MobileManipulator is a class for connecting to and controlling a remote mobile manipulator robot. - The robot includes a three omniwheel mobile base and a remote follower arm. - The leader arm is connected locally (on the laptop) and its joint positions are recorded and then - forwarded to the remote follower arm (after applying a safety clamp). - In parallel, keyboard teleoperation is used to generate raw velocity commands for the wheels. - """ - - def __init__(self, config: LeKiwiRobotConfig): - """ - Expected keys in config: - - ip, port, video_port for the remote connection. - - calibration_dir, leader_arms, follower_arms, max_relative_target, etc. - """ - self.robot_type = config.type - self.config = config - self.remote_ip = config.ip - self.remote_port = config.port - self.remote_port_video = config.video_port - self.calibration_dir = Path(self.config.calibration_dir) - self.logs = {} - - self.teleop_keys = self.config.teleop_keys - - # For teleoperation, the leader arm (local) is used to record the desired arm pose. - self.leader_arms = make_motors_buses_from_configs(self.config.leader_arms) - - self.follower_arms = make_motors_buses_from_configs(self.config.follower_arms) - - self.cameras = make_cameras_from_configs(self.config.cameras) - - self.is_connected = False - - self.last_frames = {} - self.last_present_speed = {} - self.last_remote_arm_state = torch.zeros(6, dtype=torch.float32) - - # Define three speed levels and a current index - self.speed_levels = [ - {"xy": 0.1, "theta": 30}, # slow - {"xy": 0.2, "theta": 60}, # medium - {"xy": 0.3, "theta": 90}, # fast - ] - self.speed_index = 0 # Start at slow - - # ZeroMQ context and sockets. - self.context = None - self.cmd_socket = None - self.video_socket = None - - # Keyboard state for base teleoperation. - self.running = True - self.pressed_keys = { - "forward": False, - "backward": False, - "left": False, - "right": False, - "rotate_left": False, - "rotate_right": False, - } - - if PYNPUT_AVAILABLE: - print("pynput is available - enabling local keyboard listener.") - self.listener = keyboard.Listener( - on_press=self.on_press, - on_release=self.on_release, - ) - self.listener.start() - else: - print("pynput not available - skipping local keyboard listener.") - self.listener = None - - def get_motor_names(self, arms: dict[str, MotorsBus]) -> list: - return [f"{arm}_{motor}" for arm, bus in arms.items() for motor in bus.motors] - - @property - def camera_features(self) -> dict: - cam_ft = {} - for cam_key, cam in self.cameras.items(): - key = f"observation.images.{cam_key}" - cam_ft[key] = { - "shape": (cam.height, cam.width, cam.channels), - "names": ["height", "width", "channels"], - "info": None, - } - return cam_ft - - @property - def motor_features(self) -> dict: - follower_arm_names = [ - "shoulder_pan", - "shoulder_lift", - "elbow_flex", - "wrist_flex", - "wrist_roll", - "gripper", - ] - observations = ["x_mm", "y_mm", "theta"] - combined_names = follower_arm_names + observations - return { - "action": { - "dtype": "float32", - "shape": (len(combined_names),), - "names": combined_names, - }, - "observation.state": { - "dtype": "float32", - "shape": (len(combined_names),), - "names": combined_names, - }, - } - - @property - def features(self): - return {**self.motor_features, **self.camera_features} - - @property - def has_camera(self): - return len(self.cameras) > 0 - - @property - def num_cameras(self): - return len(self.cameras) - - @property - def available_arms(self): - available = [] - for name in self.leader_arms: - available.append(get_arm_id(name, "leader")) - for name in self.follower_arms: - available.append(get_arm_id(name, "follower")) - return available - - def on_press(self, key): - try: - # Movement - if key.char == self.teleop_keys["forward"]: - self.pressed_keys["forward"] = True - elif key.char == self.teleop_keys["backward"]: - self.pressed_keys["backward"] = True - elif key.char == self.teleop_keys["left"]: - self.pressed_keys["left"] = True - elif key.char == self.teleop_keys["right"]: - self.pressed_keys["right"] = True - elif key.char == self.teleop_keys["rotate_left"]: - self.pressed_keys["rotate_left"] = True - elif key.char == self.teleop_keys["rotate_right"]: - self.pressed_keys["rotate_right"] = True - - # Quit teleoperation - elif key.char == self.teleop_keys["quit"]: - self.running = False - return False - - # Speed control - elif key.char == self.teleop_keys["speed_up"]: - self.speed_index = min(self.speed_index + 1, 2) - print(f"Speed index increased to {self.speed_index}") - elif key.char == self.teleop_keys["speed_down"]: - self.speed_index = max(self.speed_index - 1, 0) - print(f"Speed index decreased to {self.speed_index}") - - except AttributeError: - # e.g., if key is special like Key.esc - if key == keyboard.Key.esc: - self.running = False - return False - - def on_release(self, key): - try: - if hasattr(key, "char"): - if key.char == self.teleop_keys["forward"]: - self.pressed_keys["forward"] = False - elif key.char == self.teleop_keys["backward"]: - self.pressed_keys["backward"] = False - elif key.char == self.teleop_keys["left"]: - self.pressed_keys["left"] = False - elif key.char == self.teleop_keys["right"]: - self.pressed_keys["right"] = False - elif key.char == self.teleop_keys["rotate_left"]: - self.pressed_keys["rotate_left"] = False - elif key.char == self.teleop_keys["rotate_right"]: - self.pressed_keys["rotate_right"] = False - except AttributeError: - pass - - def connect(self): - if not self.leader_arms: - raise ValueError("MobileManipulator has no leader arm to connect.") - for name in self.leader_arms: - print(f"Connecting {name} leader arm.") - self.calibrate_leader() - - # Set up ZeroMQ sockets to communicate with the remote mobile robot. - self.context = zmq.Context() - self.cmd_socket = self.context.socket(zmq.PUSH) - connection_string = f"tcp://{self.remote_ip}:{self.remote_port}" - self.cmd_socket.connect(connection_string) - self.cmd_socket.setsockopt(zmq.CONFLATE, 1) - self.video_socket = self.context.socket(zmq.PULL) - video_connection = f"tcp://{self.remote_ip}:{self.remote_port_video}" - self.video_socket.connect(video_connection) - self.video_socket.setsockopt(zmq.CONFLATE, 1) - print( - f"[INFO] Connected to remote robot at {connection_string} and video stream at {video_connection}." - ) - self.is_connected = True - - def load_or_run_calibration_(self, name, arm, arm_type): - arm_id = get_arm_id(name, arm_type) - arm_calib_path = self.calibration_dir / f"{arm_id}.json" - - if arm_calib_path.exists(): - with open(arm_calib_path) as f: - calibration = json.load(f) - else: - print(f"Missing calibration file '{arm_calib_path}'") - calibration = run_full_arm_calibration(arm, self.robot_type, name, arm_type) - print(f"Calibration is done! Saving calibration file '{arm_calib_path}'") - arm_calib_path.parent.mkdir(parents=True, exist_ok=True) - with open(arm_calib_path, "w") as f: - json.dump(calibration, f) - - return calibration - - def calibrate_leader(self): - for name, arm in self.leader_arms.items(): - # Connect the bus - arm.connect() - - # Disable torque on all motors - for motor_id in arm.motors: - arm.write("Torque_Enable", TorqueMode.DISABLED.value, motor_id) - - # Now run calibration - calibration = self.load_or_run_calibration_(name, arm, "leader") - arm.set_calibration(calibration) - - def calibrate_follower(self): - for name, bus in self.follower_arms.items(): - bus.connect() - - # Disable torque on all motors - for motor_id in bus.motors: - bus.write("Torque_Enable", 0, motor_id) - - # Then filter out wheels - arm_only_dict = {k: v for k, v in bus.motors.items() if not k.startswith("wheel_")} - if not arm_only_dict: - continue - - original_motors = bus.motors - bus.motors = arm_only_dict - - calibration = self.load_or_run_calibration_(name, bus, "follower") - bus.set_calibration(calibration) - - bus.motors = original_motors - - def _get_data(self): - """ - Polls the video socket for up to 15 ms. If data arrives, decode only - the *latest* message, returning frames, speed, and arm state. If - nothing arrives for any field, use the last known values. - """ - frames = {} - present_speed = {} - remote_arm_state_tensor = torch.zeros(6, dtype=torch.float32) - - # Poll up to 15 ms - poller = zmq.Poller() - poller.register(self.video_socket, zmq.POLLIN) - socks = dict(poller.poll(15)) - if self.video_socket not in socks or socks[self.video_socket] != zmq.POLLIN: - # No new data arrived → reuse ALL old data - return (self.last_frames, self.last_present_speed, self.last_remote_arm_state) - - # Drain all messages, keep only the last - last_msg = None - while True: - try: - obs_string = self.video_socket.recv_string(zmq.NOBLOCK) - last_msg = obs_string - except zmq.Again: - break - - if not last_msg: - # No new message → also reuse old - return (self.last_frames, self.last_present_speed, self.last_remote_arm_state) - - # Decode only the final message - try: - observation = json.loads(last_msg) - - images_dict = observation.get("images", {}) - new_speed = observation.get("present_speed", {}) - new_arm_state = observation.get("follower_arm_state", None) - - # Convert images - for cam_name, image_b64 in images_dict.items(): - if image_b64: - jpg_data = base64.b64decode(image_b64) - np_arr = np.frombuffer(jpg_data, dtype=np.uint8) - frame_candidate = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) - if frame_candidate is not None: - frames[cam_name] = frame_candidate - - # If remote_arm_state is None and frames is None there is no message then use the previous message - if new_arm_state is not None and frames is not None: - self.last_frames = frames - - remote_arm_state_tensor = torch.tensor(new_arm_state, dtype=torch.float32) - self.last_remote_arm_state = remote_arm_state_tensor - - present_speed = new_speed - self.last_present_speed = new_speed - else: - frames = self.last_frames - - remote_arm_state_tensor = self.last_remote_arm_state - - present_speed = self.last_present_speed - - except Exception as e: - print(f"[DEBUG] Error decoding video message: {e}") - # If decode fails, fall back to old data - return (self.last_frames, self.last_present_speed, self.last_remote_arm_state) - - return frames, present_speed, remote_arm_state_tensor - - def _process_present_speed(self, present_speed: dict) -> torch.Tensor: - state_tensor = torch.zeros(3, dtype=torch.int32) - if present_speed: - decoded = {key: MobileManipulator.raw_to_degps(value) for key, value in present_speed.items()} - if "1" in decoded: - state_tensor[0] = decoded["1"] - if "2" in decoded: - state_tensor[1] = decoded["2"] - if "3" in decoded: - state_tensor[2] = decoded["3"] - return state_tensor - - def teleop_step( - self, record_data: bool = False - ) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]: - if not self.is_connected: - raise DeviceNotConnectedError("MobileManipulator is not connected. Run `connect()` first.") - - speed_setting = self.speed_levels[self.speed_index] - xy_speed = speed_setting["xy"] # e.g. 0.1, 0.25, or 0.4 - theta_speed = speed_setting["theta"] # e.g. 30, 60, or 90 - - # Prepare to assign the position of the leader to the follower - arm_positions = [] - for name in self.leader_arms: - pos = self.leader_arms[name].read("Present_Position") - pos_tensor = torch.from_numpy(pos).float() - arm_positions.extend(pos_tensor.tolist()) - - y_cmd = 0.0 # m/s forward/backward - x_cmd = 0.0 # m/s lateral - theta_cmd = 0.0 # deg/s rotation - if self.pressed_keys["forward"]: - y_cmd += xy_speed - if self.pressed_keys["backward"]: - y_cmd -= xy_speed - if self.pressed_keys["left"]: - x_cmd += xy_speed - if self.pressed_keys["right"]: - x_cmd -= xy_speed - if self.pressed_keys["rotate_left"]: - theta_cmd += theta_speed - if self.pressed_keys["rotate_right"]: - theta_cmd -= theta_speed - - wheel_commands = self.body_to_wheel_raw(x_cmd, y_cmd, theta_cmd) - - message = {"raw_velocity": wheel_commands, "arm_positions": arm_positions} - self.cmd_socket.send_string(json.dumps(message)) - - if not record_data: - return - - obs_dict = self.capture_observation() - - arm_state_tensor = torch.tensor(arm_positions, dtype=torch.float32) - - wheel_velocity_tuple = self.wheel_raw_to_body(wheel_commands) - wheel_velocity_mm = ( - wheel_velocity_tuple[0] * 1000.0, - wheel_velocity_tuple[1] * 1000.0, - wheel_velocity_tuple[2], - ) - wheel_tensor = torch.tensor(wheel_velocity_mm, dtype=torch.float32) - action_tensor = torch.cat([arm_state_tensor, wheel_tensor]) - action_dict = {"action": action_tensor} - - return obs_dict, action_dict - - def capture_observation(self) -> dict: - """ - Capture observations from the remote robot: current follower arm positions, - present wheel speeds (converted to body-frame velocities: x, y, theta), - and a camera frame. - """ - if not self.is_connected: - raise DeviceNotConnectedError("Not connected. Run `connect()` first.") - - frames, present_speed, remote_arm_state_tensor = self._get_data() - - body_state = self.wheel_raw_to_body(present_speed) - - body_state_mm = (body_state[0] * 1000.0, body_state[1] * 1000.0, body_state[2]) # Convert x,y to mm/s - wheel_state_tensor = torch.tensor(body_state_mm, dtype=torch.float32) - combined_state_tensor = torch.cat((remote_arm_state_tensor, wheel_state_tensor), dim=0) - - obs_dict = {"observation.state": combined_state_tensor} - - # Loop over each configured camera - for cam_name, cam in self.cameras.items(): - frame = frames.get(cam_name, None) - if frame is None: - # Create a black image using the camera's configured width, height, and channels - frame = np.zeros((cam.height, cam.width, cam.channels), dtype=np.uint8) - obs_dict[f"observation.images.{cam_name}"] = torch.from_numpy(frame) - - return obs_dict - - def send_action(self, action: torch.Tensor) -> torch.Tensor: - if not self.is_connected: - raise DeviceNotConnectedError("Not connected. Run `connect()` first.") - - # Ensure the action tensor has at least 9 elements: - # - First 6: arm positions. - # - Last 3: base commands. - if action.numel() < 9: - # Pad with zeros if there are not enough elements. - padded = torch.zeros(9, dtype=action.dtype) - padded[: action.numel()] = action - action = padded - - # Extract arm and base actions. - arm_actions = action[:6].flatten() - base_actions = action[6:].flatten() - - x_cmd_mm = base_actions[0].item() # mm/s - y_cmd_mm = base_actions[1].item() # mm/s - theta_cmd = base_actions[2].item() # deg/s - - # Convert mm/s to m/s for the kinematics calculations. - x_cmd = x_cmd_mm / 1000.0 # m/s - y_cmd = y_cmd_mm / 1000.0 # m/s - - # Compute wheel commands from body commands. - wheel_commands = self.body_to_wheel_raw(x_cmd, y_cmd, theta_cmd) - - arm_positions_list = arm_actions.tolist() - - message = {"raw_velocity": wheel_commands, "arm_positions": arm_positions_list} - self.cmd_socket.send_string(json.dumps(message)) - - return action - - def print_logs(self): - pass - - def disconnect(self): - if not self.is_connected: - raise DeviceNotConnectedError("Not connected.") - if self.cmd_socket: - stop_cmd = { - "raw_velocity": {"left_wheel": 0, "back_wheel": 0, "right_wheel": 0}, - "arm_positions": {}, - } - self.cmd_socket.send_string(json.dumps(stop_cmd)) - self.cmd_socket.close() - if self.video_socket: - self.video_socket.close() - if self.context: - self.context.term() - if PYNPUT_AVAILABLE: - self.listener.stop() - self.is_connected = False - print("[INFO] Disconnected from remote robot.") - - def __del__(self): - if getattr(self, "is_connected", False): - self.disconnect() - if PYNPUT_AVAILABLE: - self.listener.stop() - - @staticmethod - def degps_to_raw(degps: float) -> int: - steps_per_deg = 4096.0 / 360.0 - speed_in_steps = abs(degps) * steps_per_deg - speed_int = int(round(speed_in_steps)) - if speed_int > 0x7FFF: - speed_int = 0x7FFF - if degps < 0: - return speed_int | 0x8000 - else: - return speed_int & 0x7FFF - - @staticmethod - def raw_to_degps(raw_speed: int) -> float: - steps_per_deg = 4096.0 / 360.0 - magnitude = raw_speed & 0x7FFF - degps = magnitude / steps_per_deg - if raw_speed & 0x8000: - degps = -degps - return degps - - def body_to_wheel_raw( - self, - x_cmd: float, - y_cmd: float, - theta_cmd: float, - wheel_radius: float = 0.05, - base_radius: float = 0.125, - max_raw: int = 3000, - ) -> dict: - """ - Convert desired body-frame velocities into wheel raw commands. - - Parameters: - x_cmd : Linear velocity in x (m/s). - y_cmd : Linear velocity in y (m/s). - theta_cmd : Rotational velocity (deg/s). - wheel_radius: Radius of each wheel (meters). - base_radius : Distance from the center of rotation to each wheel (meters). - max_raw : Maximum allowed raw command (ticks) per wheel. - - Returns: - A dictionary with wheel raw commands: - {"left_wheel": value, "back_wheel": value, "right_wheel": value}. - - Notes: - - Internally, the method converts theta_cmd to rad/s for the kinematics. - - The raw command is computed from the wheels angular speed in deg/s - using degps_to_raw(). If any command exceeds max_raw, all commands - are scaled down proportionally. - """ - # Convert rotational velocity from deg/s to rad/s. - theta_rad = theta_cmd * (np.pi / 180.0) - # Create the body velocity vector [x, y, theta_rad]. - velocity_vector = np.array([x_cmd, y_cmd, theta_rad]) - - # Define the wheel mounting angles (defined from y axis cw) - angles = np.radians(np.array([300, 180, 60])) - # Build the kinematic matrix: each row maps body velocities to a wheel’s linear speed. - # The third column (base_radius) accounts for the effect of rotation. - m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles]) - - # Compute each wheel’s linear speed (m/s) and then its angular speed (rad/s). - wheel_linear_speeds = m.dot(velocity_vector) - wheel_angular_speeds = wheel_linear_speeds / wheel_radius - - # Convert wheel angular speeds from rad/s to deg/s. - wheel_degps = wheel_angular_speeds * (180.0 / np.pi) - - # Scaling - steps_per_deg = 4096.0 / 360.0 - raw_floats = [abs(degps) * steps_per_deg for degps in wheel_degps] - max_raw_computed = max(raw_floats) - if max_raw_computed > max_raw: - scale = max_raw / max_raw_computed - wheel_degps = wheel_degps * scale - - # Convert each wheel’s angular speed (deg/s) to a raw integer. - wheel_raw = [MobileManipulator.degps_to_raw(deg) for deg in wheel_degps] - - return {"left_wheel": wheel_raw[0], "back_wheel": wheel_raw[1], "right_wheel": wheel_raw[2]} - - def wheel_raw_to_body( - self, wheel_raw: dict, wheel_radius: float = 0.05, base_radius: float = 0.125 - ) -> tuple: - """ - Convert wheel raw command feedback back into body-frame velocities. - - Parameters: - wheel_raw : Dictionary with raw wheel commands (keys: "left_wheel", "back_wheel", "right_wheel"). - wheel_radius: Radius of each wheel (meters). - base_radius : Distance from the robot center to each wheel (meters). - - Returns: - A tuple (x_cmd, y_cmd, theta_cmd) where: - x_cmd : Linear velocity in x (m/s). - y_cmd : Linear velocity in y (m/s). - theta_cmd : Rotational velocity in deg/s. - """ - # Extract the raw values in order. - raw_list = [ - int(wheel_raw.get("left_wheel", 0)), - int(wheel_raw.get("back_wheel", 0)), - int(wheel_raw.get("right_wheel", 0)), - ] - - # Convert each raw command back to an angular speed in deg/s. - wheel_degps = np.array([MobileManipulator.raw_to_degps(r) for r in raw_list]) - # Convert from deg/s to rad/s. - wheel_radps = wheel_degps * (np.pi / 180.0) - # Compute each wheel’s linear speed (m/s) from its angular speed. - wheel_linear_speeds = wheel_radps * wheel_radius - - # Define the wheel mounting angles (defined from y axis cw) - angles = np.radians(np.array([300, 180, 60])) - m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles]) - - # Solve the inverse kinematics: body_velocity = M⁻¹ · wheel_linear_speeds. - m_inv = np.linalg.inv(m) - velocity_vector = m_inv.dot(wheel_linear_speeds) - x_cmd, y_cmd, theta_rad = velocity_vector - theta_cmd = theta_rad * (180.0 / np.pi) - return (x_cmd, y_cmd, theta_cmd) - - -class LeKiwi: - def __init__(self, motor_bus): - """ - Initializes the LeKiwi with Feetech motors bus. - """ - self.motor_bus = motor_bus - self.motor_ids = ["left_wheel", "back_wheel", "right_wheel"] - - # Initialize motors in velocity mode. - self.motor_bus.write("Lock", 0) - self.motor_bus.write("Mode", [1, 1, 1], self.motor_ids) - self.motor_bus.write("Lock", 1) - print("Motors set to velocity mode.") - - def read_velocity(self): - """ - Reads the raw speeds for all wheels. Returns a dictionary with motor names: - """ - raw_speeds = self.motor_bus.read("Present_Speed", self.motor_ids) - return { - "left_wheel": int(raw_speeds[0]), - "back_wheel": int(raw_speeds[1]), - "right_wheel": int(raw_speeds[2]), - } - - def set_velocity(self, command_speeds): - """ - Sends raw velocity commands (16-bit encoded values) directly to the motor bus. - The order of speeds must correspond to self.motor_ids. - """ - self.motor_bus.write("Goal_Speed", command_speeds, self.motor_ids) - - def stop(self): - """Stops the robot by setting all motor speeds to zero.""" - self.motor_bus.write("Goal_Speed", [0, 0, 0], self.motor_ids) - print("Motors stopped.") diff --git a/lerobot/common/robots/robot.py b/lerobot/common/robots/robot.py index a7ec4eda..d63026fc 100644 --- a/lerobot/common/robots/robot.py +++ b/lerobot/common/robots/robot.py @@ -22,8 +22,11 @@ class Robot(abc.ABC): def __init__(self, config: RobotConfig): self.robot_type = self.name self.id = config.id + self.robot_mode = config.robot_mode self.calibration_dir = ( - config.calibration_dir if config.calibration_dir else HF_LEROBOT_CALIBRATION / ROBOTS / self.name + Path(config.calibration_dir) + if config.calibration_dir + else HF_LEROBOT_CALIBRATION / ROBOTS / self.name ) self.calibration_dir.mkdir(parents=True, exist_ok=True) self.calibration_fpath = self.calibration_dir / f"{self.id}.json" diff --git a/lerobot/common/robots/utils.py b/lerobot/common/robots/utils.py index 19b7c1b9..86f67882 100644 --- a/lerobot/common/robots/utils.py +++ b/lerobot/common/robots/utils.py @@ -49,22 +49,22 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig: return Stretch3RobotConfig(**kwargs) elif robot_type == "lekiwi": - from .lekiwi.configuration_lekiwi import LeKiwiRobotConfig + from .lekiwi.config_lekiwi import LeKiwiConfig - return LeKiwiRobotConfig(**kwargs) + return LeKiwiConfig(**kwargs) else: raise ValueError(f"Robot type '{robot_type}' is not available.") def make_robot_from_config(config: RobotConfig): - from .lekiwi.configuration_lekiwi import LeKiwiRobotConfig + from .lekiwi.config_lekiwi import LeKiwiConfig from .manipulator import ManipulatorRobotConfig if isinstance(config, ManipulatorRobotConfig): from lerobot.common.robots.manipulator import ManipulatorRobot return ManipulatorRobot(config) - elif isinstance(config, LeKiwiRobotConfig): + elif isinstance(config, LeKiwiConfig): from lerobot.common.robots.mobile_manipulator import MobileManipulator return MobileManipulator(config) diff --git a/lerobot/common/teleoperators/keyboard/configuration_keyboard.py b/lerobot/common/teleoperators/keyboard/configuration_keyboard.py index 91b596bf..a1cfbbe7 100644 --- a/lerobot/common/teleoperators/keyboard/configuration_keyboard.py +++ b/lerobot/common/teleoperators/keyboard/configuration_keyboard.py @@ -22,4 +22,5 @@ from ..config import TeleoperatorConfig @TeleoperatorConfig.register_subclass("keyboard") @dataclass class KeyboardTeleopConfig(TeleoperatorConfig): + # TODO(Steven): Maybe set in here the keys that we want to capture/listen mock: bool = False diff --git a/lerobot/common/teleoperators/keyboard/teleop_keyboard.py b/lerobot/common/teleoperators/keyboard/teleop_keyboard.py index 65f1dff7..ccb00252 100644 --- a/lerobot/common/teleoperators/keyboard/teleop_keyboard.py +++ b/lerobot/common/teleoperators/keyboard/teleop_keyboard.py @@ -19,8 +19,7 @@ import os import sys import time from queue import Queue - -import numpy as np +from typing import Any from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError @@ -59,11 +58,12 @@ class KeyboardTeleop(Teleoperator): self.event_queue = Queue() self.current_pressed = {} self.listener = None - self.is_connected = False + self._is_connected = False self.logs = {} @property def action_feature(self) -> dict: + # TODO(Steven): Verify this is correct return { "dtype": "float32", "shape": (len(self.arm),), @@ -74,8 +74,22 @@ class KeyboardTeleop(Teleoperator): def feedback_feature(self) -> dict: return {} + @property + def is_connected(self) -> bool: + return self._is_connected + + @property + def is_calibrated(self) -> bool: + pass + def connect(self) -> None: - if self.is_connected: + # TODO(Steven): Consider instead of raising a warning and then returning the status + # if self._is_connected: + # logging.warning( + # "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." + # ) + # return self._is_connected + if self._is_connected: raise DeviceAlreadyConnectedError( "ManipulatorRobot is already connected. Do not run `robot.connect()` twice." ) @@ -83,24 +97,24 @@ class KeyboardTeleop(Teleoperator): if PYNPUT_AVAILABLE: logging.info("pynput is available - enabling local keyboard listener.") self.listener = keyboard.Listener( - on_press=self.on_press, - on_release=self.on_release, + on_press=self._on_press, + on_release=self._on_release, ) self.listener.start() else: logging.info("pynput not available - skipping local keyboard listener.") self.listener = None - self.is_connected = True + self._is_connected = True def calibrate(self) -> None: pass - def on_press(self, key): + def _on_press(self, key): if hasattr(key, "char"): self.event_queue.put((key.char, True)) - def on_release(self, key): + def _on_release(self, key): if hasattr(key, "char"): self.event_queue.put((key.char, False)) if key == keyboard.Key.esc: @@ -112,10 +126,13 @@ class KeyboardTeleop(Teleoperator): key_char, is_pressed = self.event_queue.get_nowait() self.current_pressed[key_char] = is_pressed - def get_action(self) -> np.ndarray: + def configure(self): + pass + + def get_action(self) -> dict[str, Any]: before_read_t = time.perf_counter() - if not self.is_connected: + if not self._is_connected: raise DeviceNotConnectedError( "KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`." ) @@ -126,17 +143,17 @@ class KeyboardTeleop(Teleoperator): action = {key for key, val in self.current_pressed.items() if val} self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t - return np.array(list(action)) + return dict.fromkeys(action, None) - def send_feedback(self, feedback: np.ndarray) -> None: + def send_feedback(self, feedback: dict[str, Any]) -> None: pass def disconnect(self) -> None: - if not self.is_connected: + if not self._is_connected: raise DeviceNotConnectedError( "KeyboardTeleop is not connected. You need to run `robot.connect()` before `disconnect()`." ) if self.listener is not None: self.listener.stop() - self.is_connected = False + self._is_connected = False diff --git a/lerobot/common/teleoperators/so100/config_so100_leader.py b/lerobot/common/teleoperators/so100/config_so100_leader.py index a97949b7..aa2747b0 100644 --- a/lerobot/common/teleoperators/so100/config_so100_leader.py +++ b/lerobot/common/teleoperators/so100/config_so100_leader.py @@ -24,3 +24,4 @@ from ..config import TeleoperatorConfig class SO100LeaderConfig(TeleoperatorConfig): # Port to connect to the arm port: str + id = "so100" diff --git a/lerobot/common/teleoperators/so100/so100_leader.py b/lerobot/common/teleoperators/so100/so100_leader.py index f8f7239e..cdd16300 100644 --- a/lerobot/common/teleoperators/so100/so100_leader.py +++ b/lerobot/common/teleoperators/so100/so100_leader.py @@ -95,7 +95,7 @@ class SO100Leader(Teleoperator): full_turn_motor = "wrist_roll" unknown_range_motors = [name for name in self.arm.names if name != full_turn_motor] - logger.info( + print( f"Move all joints except '{full_turn_motor}' sequentially through their " "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) diff --git a/lerobot/common/teleoperators/teleoperator.py b/lerobot/common/teleoperators/teleoperator.py index d6285f5c..5da1aeb5 100644 --- a/lerobot/common/teleoperators/teleoperator.py +++ b/lerobot/common/teleoperators/teleoperator.py @@ -45,6 +45,9 @@ class Teleoperator(abc.ABC): def is_connected(self) -> bool: pass + # TODO(Steven): I think connect() should return a bool, such that the client/application code can check if the device connected successfully + # if not device.connect(): + # raise DeviceNotConnectedError(f"{device} failed to connect") @abc.abstractmethod def connect(self) -> None: """Connects to the teleoperator.""" diff --git a/lerobot/scripts/control_robot.py b/lerobot/scripts/control_robot.py index 18787b03..b744c167 100644 --- a/lerobot/scripts/control_robot.py +++ b/lerobot/scripts/control_robot.py @@ -422,7 +422,7 @@ def control_robot(cfg: ControlPipelineConfig): elif isinstance(cfg.control, ReplayControlConfig): replay(robot, cfg.control) elif isinstance(cfg.control, RemoteRobotConfig): - from lerobot.common.robots.lekiwi.lekiwi_remote import run_lekiwi + from lerobot.common.robots.lekiwi.old_lekiwi_remote import run_lekiwi _init_rerun(control_config=cfg.control, session_name="lerobot_control_loop_remote") run_lekiwi(cfg.robot)