This commit is contained in:
Steven Palma 2025-04-14 15:47:14 +02:00 committed by GitHub
commit 76d7f359b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 1174 additions and 1738 deletions

View File

@ -0,0 +1,143 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from lerobot.common.robots.config import RobotMode
from lerobot.common.robots.lekiwi.config_lekiwi import LeKiwiClientConfig
from lerobot.common.robots.lekiwi.lekiwi_client import LeKiwiClient
from lerobot.common.teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig
from lerobot.common.teleoperators.so100 import SO100Leader, SO100LeaderConfig
DUMMY_FEATURES = {
"observation.state": {
"dtype": "float64",
"shape": (9,),
"names": {
"motors": [
"arm_shoulder_pan",
"arm_shoulder_lift",
"arm_elbow_flex",
"arm_wrist_flex",
"arm_wrist_roll",
"arm_gripper",
"base_left_wheel",
"base_right_wheel",
"base_back_wheel",
]
},
},
"action": {
"dtype": "float64",
"shape": (9,),
"names": {
"motors": [
"arm_shoulder_pan",
"arm_shoulder_lift",
"arm_elbow_flex",
"arm_wrist_flex",
"arm_wrist_roll",
"arm_gripper",
"base_left_wheel",
"base_right_wheel",
"base_back_wheel",
]
},
},
"observation.images.front": {
"dtype": "image",
"shape": (640, 480, 3),
"names": [
"width",
"height",
"channels",
],
},
"observation.images.wrist": {
"dtype": "image",
"shape": (480, 640, 3),
"names": [
"width",
"height",
"channels",
],
},
}
def main():
logging.info("Configuring Teleop Devices")
leader_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem58760434171")
leader_arm = SO100Leader(leader_arm_config)
keyboard_config = KeyboardTeleopConfig()
keyboard = KeyboardTeleop(keyboard_config)
logging.info("Configuring LeKiwi Client")
robot_config = LeKiwiClientConfig(id="lekiwi", robot_mode=RobotMode.TELEOP)
robot = LeKiwiClient(robot_config)
logging.info("Creating LeRobot Dataset")
# # TODO(Steven): Check this creation
# dataset = LeRobotDataset.create(
# repo_id="user/lekiwi2",
# fps=10,
# features=DUMMY_FEATURES,
# )
logging.info("Connecting Teleop Devices")
leader_arm.connect()
keyboard.connect()
logging.info("Connecting remote LeKiwi")
robot.connect()
if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected:
logging.error("Failed to connect to all devices")
return
logging.info("Starting LeKiwi teleoperation")
i = 0
while i < 1000:
arm_action = leader_arm.get_action()
base_action = keyboard.get_action()
action = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
# TODO(Steven): Deal with policy action space
# robot.set_mode(RobotMode.AUTO)
# policy_action = policy.get_action() # This might be in body frame, key space or smt else
# robot.send_action(policy_action)
action_sent = robot.send_action(action)
observation = robot.get_observation()
frame = {**action_sent, **observation}
frame.update({"task": "Dummy Task Dataset"})
logging.info("Saved a frame into the dataset")
# dataset.add_frame(frame)
i += 1
# dataset.save_episode()
# dataset.push_to_hub()
logging.info("Disconnecting Teleop Devices and LeKiwi Client")
robot.disconnect()
leader_arm.disconnect()
keyboard.disconnect()
logging.info("Finished LeKiwi cleanly")
if __name__ == "__main__":
main()

View File

@ -21,7 +21,7 @@ def main():
i += 1 i += 1
keyboard.disconnect() keyboard.disconnect()
logging.info("Finished LeKiwiRobot cleanly") logging.info("Finished LeKiwi cleanly")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -48,5 +48,5 @@ default_cache_path = Path(HF_HOME) / "lerobot"
HF_LEROBOT_HOME = Path(os.getenv("HF_LEROBOT_HOME", default_cache_path)).expanduser() HF_LEROBOT_HOME = Path(os.getenv("HF_LEROBOT_HOME", default_cache_path)).expanduser()
# calibration dir # calibration dir
default_calibration_path = HF_LEROBOT_HOME / ".calibration" default_calibration_path = HF_LEROBOT_HOME / "calibration"
HF_LEROBOT_CALIBRATION = Path(os.getenv("HF_LEROBOT_CALIBRATION", default_calibration_path)).expanduser() HF_LEROBOT_CALIBRATION = Path(os.getenv("HF_LEROBOT_CALIBRATION", default_calibration_path)).expanduser()

View File

@ -15,3 +15,14 @@ class DeviceAlreadyConnectedError(ConnectionError):
): ):
self.message = message self.message = message
super().__init__(self.message) super().__init__(self.message)
class InvalidActionError(ConnectionError):
"""Exception raised when an action is already invalid."""
def __init__(
self,
message="The action is invalid. Check the value follows what it is expected from the action space.",
):
self.message = message
super().__init__(self.message)

View File

@ -546,7 +546,7 @@ class MotorsBus(abc.ABC):
motors = self.names motors = self.names
elif isinstance(motors, (str, int)): elif isinstance(motors, (str, int)):
motors = [motors] motors = [motors]
else: elif not isinstance(motors, list):
raise TypeError(motors) raise TypeError(motors)
self.reset_calibration(motors) self.reset_calibration(motors)
@ -609,6 +609,7 @@ class MotorsBus(abc.ABC):
min_ = self.calibration[name].range_min min_ = self.calibration[name].range_min
max_ = self.calibration[name].range_max max_ = self.calibration[name].range_max
bounded_val = min(max_, max(min_, val)) bounded_val = min(max_, max(min_, val))
# TODO(Steven): normalization can go boom if max_ == min_, we should add a check probably in record_ranges_of_motions (which probably indicates the user forgot to move a motor)
if self.motors[name].norm_mode is MotorNormMode.RANGE_M100_100: if self.motors[name].norm_mode is MotorNormMode.RANGE_M100_100:
normalized_values[id_] = (((bounded_val - min_) / (max_ - min_)) * 200) - 100 normalized_values[id_] = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
elif self.motors[name].norm_mode is MotorNormMode.RANGE_0_100: elif self.motors[name].norm_mode is MotorNormMode.RANGE_0_100:

View File

@ -1,16 +1,23 @@
import abc import abc
import enum
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import draccus import draccus
class RobotMode(enum.Enum):
TELEOP = 0
AUTO = 1
@dataclass(kw_only=True) @dataclass(kw_only=True)
class RobotConfig(draccus.ChoiceRegistry, abc.ABC): class RobotConfig(draccus.ChoiceRegistry, abc.ABC):
# Allows to distinguish between different robots of the same type # Allows to distinguish between different robots of the same type
id: str | None = None id: str | None = None
# Directory to store calibration file # Directory to store calibration file
calibration_dir: Path | None = None calibration_dir: Path | None = None
robot_mode: RobotMode | None = None
@property @property
def type(self) -> str: def type(self) -> str:

View File

@ -1,3 +1,5 @@
# TODO(Steven): Update README
# Using the [LeKiwi](https://github.com/SIGRobotics-UIUC/LeKiwi) Robot with LeRobot # Using the [LeKiwi](https://github.com/SIGRobotics-UIUC/LeKiwi) Robot with LeRobot
## Table of Contents ## Table of Contents
@ -194,11 +196,11 @@ sudo chmod 666 /dev/ttyACM1
#### d. Update config file #### d. Update config file
IMPORTANTLY: Now that you have your ports of leader and follower arm and ip address of the mobile-so100, update the **ip** in Network configuration, **port** in leader_arms and **port** in lekiwi. In the [`LeKiwiRobotConfig`](../lerobot/common/robot_devices/robots/configs.py) file. Where you will find something like: IMPORTANTLY: Now that you have your ports of leader and follower arm and ip address of the mobile-so100, update the **ip** in Network configuration, **port** in leader_arms and **port** in lekiwi. In the [`LeKiwiConfig`](../lerobot/common/robot_devices/robots/configs.py) file. Where you will find something like:
```python ```python
@RobotConfig.register_subclass("lekiwi") @RobotConfig.register_subclass("lekiwi")
@dataclass @dataclass
class LeKiwiRobotConfig(RobotConfig): class LeKiwiConfig(RobotConfig):
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms. # the number of motors in your follower arms.
@ -281,7 +283,7 @@ For the wired LeKiwi version your configured IP address should refer to your own
```python ```python
@RobotConfig.register_subclass("lekiwi") @RobotConfig.register_subclass("lekiwi")
@dataclass @dataclass
class LeKiwiRobotConfig(RobotConfig): class LeKiwiConfig(RobotConfig):
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms. # the number of motors in your follower arms.
@ -446,7 +448,7 @@ You should see on your laptop something like this: ```[INFO] Connected to remote
| F | Decrease speed | | F | Decrease speed |
> [!TIP] > [!TIP]
> If you use a different keyboard you can change the keys for each command in the [`LeKiwiRobotConfig`](../lerobot/common/robot_devices/robots/configs.py). > If you use a different keyboard you can change the keys for each command in the [`LeKiwiConfig`](../lerobot/common/robot_devices/robots/configs.py).
### Wired version ### Wired version
If you have the **wired** LeKiwi version please run all commands including both these teleoperation commands on your laptop. If you have the **wired** LeKiwi version please run all commands including both these teleoperation commands on your laptop.

View File

@ -0,0 +1,3 @@
from .config_lekiwi import LeKiwiClientConfig, LeKiwiConfig
from .lekiwi import LeKiwi
from .lekiwi_client import LeKiwiClient

View File

@ -0,0 +1,70 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.common.cameras.configs import CameraConfig
from lerobot.common.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from ..config import RobotConfig
@RobotConfig.register_subclass("lekiwi")
@dataclass
class LeKiwiConfig(RobotConfig):
port = "/dev/ttyACM0" # port to connect to the bus
disable_torque_on_disconnect: bool = True
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
max_relative_target: int | None = None
cameras: dict[str, CameraConfig] = field(
default_factory=lambda: {
"front": OpenCVCameraConfig(
camera_index="/dev/video1", fps=30, width=640, height=480, rotation=90
),
"wrist": OpenCVCameraConfig(
camera_index="/dev/video4", fps=30, width=640, height=480, rotation=180
),
}
)
@RobotConfig.register_subclass("lekiwi_client")
@dataclass
class LeKiwiClientConfig(RobotConfig):
# Network Configuration
remote_ip: str = "172.18.129.208"
port_zmq_cmd: int = 5555
port_zmq_observations: int = 5556
teleop_keys: dict[str, str] = field(
default_factory=lambda: {
# Movement
"forward": "w",
"backward": "s",
"left": "a",
"right": "d",
"rotate_left": "z",
"rotate_right": "x",
# Speed control
"speed_up": "r",
"speed_down": "f",
# quit teleop
"quit": "q",
}
)

View File

@ -1,89 +0,0 @@
from dataclasses import dataclass, field
from lerobot.common.cameras.configs import CameraConfig
from lerobot.common.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.common.motors.configs import FeetechMotorsBusConfig, MotorsBusConfig
from lerobot.common.robots.config import RobotConfig
@RobotConfig.register_subclass("lekiwi")
@dataclass
class LeKiwiRobotConfig(RobotConfig):
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
max_relative_target: int | None = None
# Network Configuration
ip: str = "192.168.0.193"
port: int = 5555
video_port: int = 5556
cameras: dict[str, CameraConfig] = field(
default_factory=lambda: {
"front": OpenCVCameraConfig(
camera_index="/dev/video0", fps=30, width=640, height=480, rotation=90
),
"wrist": OpenCVCameraConfig(
camera_index="/dev/video2", fps=30, width=640, height=480, rotation=180
),
}
)
calibration_dir: str = ".cache/calibration/lekiwi"
leader_arms: dict[str, MotorsBusConfig] = field(
default_factory=lambda: {
"main": FeetechMotorsBusConfig(
port="/dev/tty.usbmodem585A0077581",
motors={
# name: (index, model)
"shoulder_pan": [1, "sts3215"],
"shoulder_lift": [2, "sts3215"],
"elbow_flex": [3, "sts3215"],
"wrist_flex": [4, "sts3215"],
"wrist_roll": [5, "sts3215"],
"gripper": [6, "sts3215"],
},
),
}
)
follower_arms: dict[str, MotorsBusConfig] = field(
default_factory=lambda: {
"main": FeetechMotorsBusConfig(
port="/dev/ttyACM0",
motors={
# name: (index, model)
"shoulder_pan": [1, "sts3215"],
"shoulder_lift": [2, "sts3215"],
"elbow_flex": [3, "sts3215"],
"wrist_flex": [4, "sts3215"],
"wrist_roll": [5, "sts3215"],
"gripper": [6, "sts3215"],
"left_wheel": (7, "sts3215"),
"back_wheel": (8, "sts3215"),
"right_wheel": (9, "sts3215"),
},
),
}
)
teleop_keys: dict[str, str] = field(
default_factory=lambda: {
# Movement
"forward": "w",
"backward": "s",
"left": "a",
"right": "d",
"rotate_left": "z",
"rotate_right": "x",
# Speed control
"speed_up": "r",
"speed_down": "f",
# quit teleop
"quit": "q",
}
)
mock: bool = False

View File

@ -0,0 +1,258 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
OperatingMode,
)
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .config_lekiwi import LeKiwiConfig
logger = logging.getLogger(__name__)
class LeKiwi(Robot):
"""
The robot includes a three omniwheel mobile base and a remote follower arm.
The leader arm is connected locally (on the laptop) and its joint positions are recorded and then
forwarded to the remote follower arm (after applying a safety clamp).
In parallel, keyboard teleoperation is used to generate raw velocity commands for the wheels.
"""
config_class = LeKiwiConfig
name = "lekiwi"
def __init__(self, config: LeKiwiConfig):
super().__init__(config)
self.config = config
self.id = config.id
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
# arm
"arm_shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100),
"arm_shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100),
"arm_elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100),
"arm_wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100),
"arm_wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100),
"arm_gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
# base
"base_left_wheel": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100),
"base_right_wheel": Motor(8, "sts3215", MotorNormMode.RANGE_M100_100),
"base_back_wheel": Motor(9, "sts3215", MotorNormMode.RANGE_M100_100),
},
calibration=self.calibration,
)
self.arm_motors = [m for m in self.bus.names if m.startswith("arm")]
self.base_motors = [m for m in self.bus.names if m.startswith("base")]
self.cameras = make_cameras_from_configs(config.cameras)
@property
def state_feature(self) -> dict:
return {
"dtype": "float32",
"shape": (len(self.bus),),
"names": {"motors": list(self.bus.motors)},
}
@property
def action_feature(self) -> dict:
return self.state_feature
@property
def camera_features(self) -> dict[str, dict]:
cam_ft = {}
for cam_key, cam in self.cameras.items():
cam_ft[cam_key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
@property
def is_connected(self) -> bool:
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.bus.is_connected
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect()
if not self.is_calibrated:
self.calibrate()
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
# TODO(Steven): I think we should extend this to give the user the option of re-calibrate
# calibrate(recalibrate: bool = False) -> None:
# If true, then we overwrite the previous calibration file with new values
def calibrate(self) -> None:
logger.info(f"\nRunning calibration of {self}")
motors = self.arm_motors + self.base_motors
self.bus.disable_torque(self.arm_motors)
for name in self.arm_motors:
self.bus.write("Operating_Mode", name, OperatingMode.POSITION.value)
input("Move robot to the middle of its range of motion and press ENTER....")
homing_offsets = self.bus.set_half_turn_homings(motors)
# TODO(Steven): Might be worth to do this also in other robots but it should be added in the docs
full_turn_motor = [
motor for motor in motors if any(keyword in motor for keyword in ["wheel", "gripper"])
]
unknown_range_motors = [motor for motor in motors if motor not in full_turn_motor]
print(
f"Move all arm joints except '{full_turn_motor}' sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors)
for name in full_turn_motor:
range_mins[name] = 0
range_maxes[name] = 4095
self.calibration = {}
for name, motor in self.bus.motors.items():
self.calibration[name] = MotorCalibration(
id=motor.id,
drive_mode=0,
homing_offset=homing_offsets[name],
range_min=range_mins[name],
range_max=range_maxes[name],
)
self.bus.write_calibration(self.calibration)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self):
# Set-up arm actuators (position mode)
# We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration.
self.bus.disable_torque(self.arm_motors)
for name in self.arm_motors:
self.bus.write("Operating_Mode", name, OperatingMode.POSITION.value)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.bus.write("P_Coefficient", name, 16)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.bus.write("I_Coefficient", name, 0)
self.bus.write("D_Coefficient", name, 32)
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table
self.bus.write("Maximum_Acceleration", name, 254)
self.bus.write("Acceleration", name, 254)
for name in self.base_motors:
self.bus.write("Operating_Mode", name, OperatingMode.VELOCITY.value)
self.bus.enable_torque() # TODO(Steven): Operation has failed with: ConnectionError: Failed to write 'Lock' on id_=6 with '1' after 1 tries. [TxRxResult] Incorrect status packet!
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {OBS_IMAGES: {}}
# Read actuators position for arm and vel for base
start = time.perf_counter()
arm_pos = self.bus.sync_read("Present_Position", self.arm_motors)
base_vel = self.bus.sync_read("Present_Speed", self.base_motors)
obs_dict[OBS_STATE] = {**arm_pos, **base_vel}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[OBS_IMAGES][cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
# Copied from S100 robot
"""Command lekiwi to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
`max_relative_target`. In this case, the action sent differs from original action.
Thus, this function always returns the action actually sent.
Raises:
RobotDeviceNotConnectedError: if robot is not connected.
Returns:
np.ndarray: the action sent to the motors, potentially clipped.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
arm_goal_pos = {k: v for k, v in action.items() if k in self.arm_motors}
base_goal_vel = {k: v for k, v in action.items() if k in self.base_motors}
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.bus.sync_read("Present_Position", self.arm_motors)
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in arm_goal_pos.items()}
arm_safe_goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
arm_goal_pos = arm_safe_goal_pos
# TODO(Steven): Message fetching failed: Magnitude 34072 exceeds 32767 (max for sign_bit_index=15)
# TODO(Steven): IMO, this should be a check in the motor bus
# Send goal position to the actuators
self.bus.sync_write("Goal_Position", arm_goal_pos)
self.bus.sync_write("Goal_Speed", base_goal_vel)
return {**arm_goal_pos, **base_goal_vel}
def stop_base(self):
self.bus.sync_write("Goal_Speed", dict.fromkeys(self.base_motors, 0), num_retry=5)
logger.info("Base motors stopped")
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.stop_base()
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@ -0,0 +1,504 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import logging
from typing import Any
import cv2
import numpy as np
import torch
import zmq
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError, InvalidActionError
from lerobot.common.robots.config import RobotMode
from ..robot import Robot
from .config_lekiwi import LeKiwiClientConfig
# TODO(Steven): This doesn't need to inherit from Robot
# But we do it for now to offer a familiar API
# TODO(Steven): This doesn't need to take care of the
# mapping from teleop to motor commands, but given that
# we already have a middle-man (this class) we add it here
# Other options include:
# 1. Adding it to the Telop implementation for lekiwi
# (meaning each robot will need a teleop imple) or
# 2. Adding it into the robot implementation
# (meaning the policy might be needed to be train
# over the teleop action space)
# TODO(Steven): Check if we can move everything to 32 instead
class LeKiwiClient(Robot):
config_class = LeKiwiClientConfig
name = "lekiwi_client"
def __init__(self, config: LeKiwiClientConfig):
super().__init__(config)
self.config = config
self.id = config.id
self.robot_type = config.type
self.robot_mode = config.robot_mode
self.remote_ip = config.remote_ip
self.port_zmq_cmd = config.port_zmq_cmd
self.port_zmq_observations = config.port_zmq_observations
self.teleop_keys = config.teleop_keys
self.zmq_context = None
self.zmq_cmd_socket = None
self.zmq_observation_socket = None
self.last_frames = {}
self.last_present_speed = {"x_cmd": 0.0, "y_cmd": 0.0, "theta_cmd": 0.0}
self.last_remote_arm_state = {}
# Define three speed levels and a current index
self.speed_levels = [
{"xy": 0.1, "theta": 30}, # slow
{"xy": 0.2, "theta": 60}, # medium
{"xy": 0.3, "theta": 90}, # fast
]
self.speed_index = 0 # Start at slow
self._is_connected = False
self.logs = {}
@property
def state_feature(self) -> dict:
# TODO(Steven): Get this from the data fetched?
# TODO(Steven): Motor names are unknown for the Daemon
# Or assume its size/metadata?
return {
"dtype": "float64",
"shape": (9,),
"names": {
"motors": [
"arm_shoulder_pan",
"arm_shoulder_lift",
"arm_elbow_flex",
"arm_wrist_flex",
"arm_wrist_roll",
"arm_gripper",
"base_left_wheel",
"base_right_wheel",
"base_back_wheel",
]
},
}
@property
def action_feature(self) -> dict:
return self.state_feature
@property
def camera_features(self) -> dict[str, dict]:
# TODO(Steven): Get this from the data fetched?
# TODO(Steven): camera names are unknown for the Daemon
# Or assume its size/metadata?
# TODO(Steven): Check consistency of image sizes
cam_ft = {
"front": {
"shape": (480, 640, 3),
"names": ["height", "width", "channels"],
"info": None,
},
"wrist": {
"shape": (480, 640, 3),
"names": ["height", "width", "channels"],
"info": None,
},
}
return cam_ft
@property
def is_connected(self) -> bool:
return self._is_connected
@property
def is_calibrated(self) -> bool:
pass
def connect(self) -> None:
"""Establishes ZMQ sockets with the remote mobile robot"""
if self._is_connected:
raise DeviceAlreadyConnectedError(
"LeKiwi Daemon is already connected. Do not run `robot.connect()` twice."
)
self.zmq_context = zmq.Context()
self.zmq_cmd_socket = self.zmq_context.socket(zmq.PUSH)
zmq_cmd_locator = f"tcp://{self.remote_ip}:{self.port_zmq_cmd}"
self.zmq_cmd_socket.connect(zmq_cmd_locator)
self.zmq_cmd_socket.setsockopt(zmq.CONFLATE, 1)
self.zmq_observation_socket = self.zmq_context.socket(zmq.PULL)
zmq_observations_locator = f"tcp://{self.remote_ip}:{self.port_zmq_observations}"
self.zmq_observation_socket.connect(zmq_observations_locator)
self.zmq_observation_socket.setsockopt(zmq.CONFLATE, 1)
self._is_connected = True
def calibrate(self) -> None:
logging.warning("LeKiwiClient has nothing to calibrate.")
return
# Consider moving these static functions out of the class
# Copied from robot_lekiwi MobileManipulator class* (before the refactor)
@staticmethod
def _degps_to_raw(degps: float) -> int:
steps_per_deg = 4096.0 / 360.0
speed_in_steps = degps * steps_per_deg
speed_int = int(round(speed_in_steps))
# Cap the value to fit within signed 16-bit range (-32768 to 32767)
if speed_int > 0x7FFF:
speed_int = 0x7FFF # 32767 -> maximum positive value
elif speed_int < -0x8000:
speed_int = -0x8000 # -32768 -> minimum negative value
return speed_int
# Copied from robot_lekiwi MobileManipulator class
@staticmethod
def _raw_to_degps(raw_speed: int) -> float:
steps_per_deg = 4096.0 / 360.0
magnitude = raw_speed
degps = magnitude / steps_per_deg
return degps
# Copied from robot_lekiwi MobileManipulator class
def _body_to_wheel_raw(
self,
x_cmd: float,
y_cmd: float,
theta_cmd: float,
wheel_radius: float = 0.05,
base_radius: float = 0.125,
max_raw: int = 3000,
) -> dict:
"""
Convert desired body-frame velocities into wheel raw commands.
Parameters:
x_cmd : Linear velocity in x (m/s).
y_cmd : Linear velocity in y (m/s).
theta_cmd : Rotational velocity (deg/s).
wheel_radius: Radius of each wheel (meters).
base_radius : Distance from the center of rotation to each wheel (meters).
max_raw : Maximum allowed raw command (ticks) per wheel.
Returns:
A dictionary with wheel raw commands:
{"left_wheel": value, "back_wheel": value, "right_wheel": value}.
Notes:
- Internally, the method converts theta_cmd to rad/s for the kinematics.
- The raw command is computed from the wheels angular speed in deg/s
using _degps_to_raw(). If any command exceeds max_raw, all commands
are scaled down proportionally.
"""
# Convert rotational velocity from deg/s to rad/s.
theta_rad = theta_cmd * (np.pi / 180.0)
# Create the body velocity vector [x, y, theta_rad].
velocity_vector = np.array([x_cmd, y_cmd, theta_rad])
# Define the wheel mounting angles with a -90° offset.
angles = np.radians(np.array([240, 120, 0]) - 90)
# Build the kinematic matrix: each row maps body velocities to a wheels linear speed.
# The third column (base_radius) accounts for the effect of rotation.
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
# Compute each wheels linear speed (m/s) and then its angular speed (rad/s).
wheel_linear_speeds = m.dot(velocity_vector)
wheel_angular_speeds = wheel_linear_speeds / wheel_radius
# Convert wheel angular speeds from rad/s to deg/s.
wheel_degps = wheel_angular_speeds * (180.0 / np.pi)
# Scaling
steps_per_deg = 4096.0 / 360.0
raw_floats = [abs(degps) * steps_per_deg for degps in wheel_degps]
max_raw_computed = max(raw_floats)
if max_raw_computed > max_raw:
scale = max_raw / max_raw_computed
wheel_degps = wheel_degps * scale
# Convert each wheels angular speed (deg/s) to a raw integer.
wheel_raw = [LeKiwiClient._degps_to_raw(deg) for deg in wheel_degps]
# TODO(Steven): remove hard-coded names
return {"left_wheel": wheel_raw[0], "back_wheel": wheel_raw[1], "right_wheel": wheel_raw[2]}
# Copied from robot_lekiwi MobileManipulator class
def _wheel_raw_to_body(
self, wheel_raw: dict[str, Any], wheel_radius: float = 0.05, base_radius: float = 0.125
) -> dict[str, Any]:
"""
Convert wheel raw command feedback back into body-frame velocities.
Parameters:
wheel_raw : Vector with raw wheel commands ("left_wheel", "back_wheel", "right_wheel").
wheel_radius: Radius of each wheel (meters).
base_radius : Distance from the robot center to each wheel (meters).
Returns:
A tuple (x_cmd, y_cmd, theta_cmd) where:
x_cmd : Linear velocity in x (m/s).
y_cmd : Linear velocity in y (m/s).
theta_cmd : Rotational velocity in deg/s.
"""
# TODO(Steven): No check is done for dict keys
# Convert each raw command back to an angular speed in deg/s.
wheel_degps = np.array([LeKiwiClient._raw_to_degps(int(v)) for _, v in wheel_raw.items()])
# Convert from deg/s to rad/s.
wheel_radps = wheel_degps * (np.pi / 180.0)
# Compute each wheels linear speed (m/s) from its angular speed.
wheel_linear_speeds = wheel_radps * wheel_radius
# Define the wheel mounting angles with a -90° offset.
angles = np.radians(np.array([240, 120, 0]) - 90)
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
# Solve the inverse kinematics: body_velocity = M⁻¹ · wheel_linear_speeds.
m_inv = np.linalg.inv(m)
velocity_vector = m_inv.dot(wheel_linear_speeds)
x_cmd, y_cmd, theta_rad = velocity_vector
theta_cmd = theta_rad * (180.0 / np.pi)
return {"x_cmd": x_cmd, "y_cmd": y_cmd, "theta_cmd": theta_cmd}
# TODO(Steven): This is flaky, for example, if we received a state but failed decoding the image, we will not update any value
# TODO(Steven): All this function needs to be refactored
def _get_data(self):
# Copied from robot_lekiwi.py
"""Polls the video socket for up to 15 ms. If data arrives, decode only
the *latest* message, returning frames, speed, and arm state. If
nothing arrives for any field, use the last known values."""
frames = {}
present_speed = {}
remote_arm_state_tensor = {}
# Poll up to 15 ms
poller = zmq.Poller()
poller.register(self.zmq_observation_socket, zmq.POLLIN)
socks = dict(poller.poll(15))
if self.zmq_observation_socket not in socks or socks[self.zmq_observation_socket] != zmq.POLLIN:
# No new data arrived → reuse ALL old data
# TODO(Steven): This might return empty variables at init
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
# Drain all messages, keep only the last
last_msg = None
# TODO(Steven): There's probably a way to do this without while True
# TODO(Steven): Even consider changing to PUB/SUB
while True:
try:
obs_string = self.zmq_observation_socket.recv_string(zmq.NOBLOCK)
last_msg = obs_string
except zmq.Again:
break
if not last_msg:
# No new message → also reuse old
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
# Decode only the final message
try:
observation = json.loads(last_msg)
state_observation = observation[OBS_STATE]
image_observation = observation[OBS_IMAGES]
# Convert images
for cam_name, image_b64 in image_observation.items():
if image_b64:
jpg_data = base64.b64decode(image_b64)
np_arr = np.frombuffer(jpg_data, dtype=np.uint8)
frame_candidate = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
if frame_candidate is not None:
frames[cam_name] = frame_candidate
# TODO(Steven): Should we really ignore the arm state if the image is None?
# If remote_arm_state is None and frames is None there is no message then use the previous message
if state_observation is not None and frames is not None:
self.last_frames = frames
# TODO(Steven): hard-coded name of expected keys, not good
remote_arm_state_tensor = {k: v for k, v in state_observation.items() if k.startswith("arm")}
self.last_remote_arm_state = remote_arm_state_tensor
present_speed = {k: v for k, v in state_observation.items() if k.startswith("base")}
self.last_present_speed = present_speed
else:
frames = self.last_frames
remote_arm_state_tensor = self.last_remote_arm_state
present_speed = self.last_present_speed
except Exception as e:
print(f"[DEBUG] Error decoding video message: {e}")
# If decode fails, fall back to old data
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
return frames, present_speed, remote_arm_state_tensor
# TODO(Steven): The returned space is different from the get_observation of LeKiwi
# This returns body-frames velocities instead of wheel pos/speeds
def get_observation(self) -> dict[str, Any]:
"""
Capture observations from the remote robot: current follower arm positions,
present wheel speeds (converted to body-frame velocities: x, y, theta),
and a camera frame. Receives over ZMQ, translate to body-frame vel
"""
if not self._is_connected:
raise DeviceNotConnectedError("LeKiwiClient is not connected. You need to run `robot.connect()`.")
# TODO(Steven): remove hard-coded cam name
# This is needed at init for when there's no comms
obs_dict = {
OBS_IMAGES: {"wrist": np.zeros(shape=(480, 640, 3)), "front": np.zeros(shape=(640, 480, 3))}
}
frames, present_speed, remote_arm_state_tensor = self._get_data()
body_state = self._wheel_raw_to_body(present_speed)
# TODO(Steven): output isdict[str,Any] and we multiply by 1000.0. This should be more explicit and specify the expected type instead of Any
body_state_mm = {k: v * 1000.0 for k, v in body_state.items()} # Convert x,y to mm/s
obs_dict[OBS_STATE] = {**remote_arm_state_tensor, **body_state_mm}
# Loop over each configured camera
for cam_name, frame in frames.items():
if frame is None:
# TODO(Steven): Daemon doesn't know camera dimensions (hard-coded for now), consider at least getting them from state features
logging.warning("Frame is None")
frame = np.zeros((480, 640, 3), dtype=np.uint8)
obs_dict[OBS_IMAGES][cam_name] = torch.from_numpy(frame)
return obs_dict
def _from_keyboard_to_wheel_action(self, pressed_keys: np.ndarray):
# Speed control
if self.teleop_keys["speed_up"] in pressed_keys:
self.speed_index = min(self.speed_index + 1, 2)
if self.teleop_keys["speed_down"] in pressed_keys:
self.speed_index = max(self.speed_index - 1, 0)
speed_setting = self.speed_levels[self.speed_index]
xy_speed = speed_setting["xy"] # e.g. 0.1, 0.25, or 0.4
theta_speed = speed_setting["theta"] # e.g. 30, 60, or 90
x_cmd = 0.0 # m/s forward/backward
y_cmd = 0.0 # m/s lateral
theta_cmd = 0.0 # deg/s rotation
if self.teleop_keys["forward"] in pressed_keys:
x_cmd += xy_speed
if self.teleop_keys["backward"] in pressed_keys:
x_cmd -= xy_speed
if self.teleop_keys["left"] in pressed_keys:
y_cmd += xy_speed
if self.teleop_keys["right"] in pressed_keys:
y_cmd -= xy_speed
if self.teleop_keys["rotate_left"] in pressed_keys:
theta_cmd += theta_speed
if self.teleop_keys["rotate_right"] in pressed_keys:
theta_cmd -= theta_speed
return self._body_to_wheel_raw(x_cmd, y_cmd, theta_cmd)
def configure(self):
pass
# TODO(Steven): This assumes this call is always called from a keyboard teleop command
# TODO(Steven): Doing this mapping in here adds latecy between send_action and movement from the user perspective.
# t0: get teleop_cmd
# t1: send_action(teleop_cmd)
# t2: mapping teleop_cmd -> motor_cmd
# t3: execute motor_md
# This mapping for other robots/teleop devices might be slower. Doing this in the teleop will make this explicit
# t0': get teleop_cmd
# t1': mapping teleop_cmd -> motor_cmd
# t2': send_action(motor_cmd)
# t3': execute motor_cmd
# t3'-t2' << t3-t1
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
"""Command lekiwi to move to a target joint configuration. Translates to motor space + sends over ZMQ
Args:
action (np.ndarray): array containing the goal positions for the motors.
Raises:
RobotDeviceNotConnectedError: if robot is not connected.
Returns:
np.ndarray: the action sent to the motors, potentially clipped.
"""
if not self._is_connected:
raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()`."
)
if self.robot_mode is RobotMode.AUTO:
# TODO(Steven): Not yet implemented. The policy outputs might need a different conversion
raise Exception
goal_pos = {}
# TODO(Steven): This assumes teleop mode is always used with keyboard. Tomorrow we could teleop with another device ... ?
if self.robot_mode is RobotMode.TELEOP:
motors_name = self.state_feature.get("names").get("motors")
common_keys = [
key for key in action if key in (motor.replace("arm_", "") for motor in motors_name)
]
# TODO(Steven): I don't like this
if len(common_keys) < 6:
logging.error("Action should include at least the states of the leader arm")
raise InvalidActionError
arm_actions = {"arm_" + arm_motor: action[arm_motor] for arm_motor in common_keys}
goal_pos = arm_actions
if len(action) > 6:
keyboard_keys = np.array(list(set(action.keys()) - set(common_keys)))
wheel_actions = {
"base_" + k: v for k, v in self._from_keyboard_to_wheel_action(keyboard_keys).items()
}
goal_pos = {**arm_actions, **wheel_actions}
self.zmq_cmd_socket.send_string(json.dumps(goal_pos)) # action is in motor space
return goal_pos
def print_logs(self):
# TODO(Steven): Refactor logger
pass
def disconnect(self):
"""Cleans ZMQ comms"""
if not self._is_connected:
raise DeviceNotConnectedError(
"LeKiwi is not connected. You need to run `robot.connect()` before disconnecting."
)
self.zmq_observation_socket.close()
self.zmq_cmd_socket.close()
self.zmq_context.term()
self._is_connected = False
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()

View File

@ -0,0 +1,121 @@
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import logging
import time
import cv2
import zmq
from lerobot.common.constants import OBS_IMAGES
from .config_lekiwi import LeKiwiConfig
from .lekiwi import LeKiwi
# Network Configuration
PORT_ZMQ_CMD: int = 5555
PORT_ZMQ_OBSERVATIONS: int = 5556
class HostAgent:
def __init__(self):
self.zmq_context = zmq.Context()
self.zmq_cmd_socket = self.zmq_context.socket(zmq.PULL)
self.zmq_cmd_socket.setsockopt(zmq.CONFLATE, 1)
self.zmq_cmd_socket.bind(f"tcp://*:{PORT_ZMQ_CMD}")
self.zmq_observation_socket = self.zmq_context.socket(zmq.PUSH)
self.zmq_observation_socket.setsockopt(zmq.CONFLATE, 1)
self.zmq_observation_socket.bind(f"tcp://*:{PORT_ZMQ_OBSERVATIONS}")
def disconnect(self):
self.zmq_observation_socket.close()
self.zmq_cmd_socket.close()
self.zmq_context.term()
def main():
logging.info("Configuring LeKiwi")
robot_config = LeKiwiConfig()
robot = LeKiwi(robot_config)
logging.info("Connecting LeKiwi")
robot.connect()
logging.info("Starting HostAgent")
remote_agent = HostAgent()
last_cmd_time = time.time()
logging.info("Waiting for commands...")
try:
# Business logic
start = time.perf_counter()
duration = 0
while duration < 100:
loop_start_time = time.time()
try:
msg = remote_agent.zmq_cmd_socket.recv_string(zmq.NOBLOCK)
data = dict(json.loads(msg))
_action_sent = robot.send_action(data)
last_cmd_time = time.time()
except zmq.Again:
logging.warning("No command available")
except Exception as e:
logging.error("Message fetching failed: %s", e)
# TODO(Steven): Check this value
# Watchdog: stop the robot if no command is received for over 0.5 seconds.
now = time.time()
if now - last_cmd_time > 0.5:
robot.stop_base()
last_observation = robot.get_observation()
# Encode ndarrays to base64 strings
for cam_key, _ in robot.cameras.items():
ret, buffer = cv2.imencode(
".jpg", last_observation[OBS_IMAGES][cam_key], [int(cv2.IMWRITE_JPEG_QUALITY), 90]
)
if ret:
last_observation[OBS_IMAGES][cam_key] = base64.b64encode(buffer).decode("utf-8")
else:
last_observation[OBS_IMAGES][cam_key] = ""
# Send the observation to the remote agent
remote_agent.zmq_observation_socket.send_string(json.dumps(last_observation))
# Ensure a short sleep to avoid overloading the CPU.
elapsed = time.time() - loop_start_time
# TODO(Steven): Check this value
time.sleep(
max(0.033 - elapsed, 0)
) # If robot jitters increase the sleep and monitor cpu load with `top` in cmd
duration = time.perf_counter() - start
except KeyboardInterrupt:
print("Shutting down LeKiwi server.")
finally:
robot.disconnect()
remote_agent.disconnect()
logging.info("Finished LeKiwi cleanly")
if __name__ == "__main__":
main()

View File

@ -1,224 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import threading
import time
from pathlib import Path
import cv2
import zmq
from lerobot.common.robots.mobile_manipulator import LeKiwi
def setup_zmq_sockets(config):
context = zmq.Context()
cmd_socket = context.socket(zmq.PULL)
cmd_socket.setsockopt(zmq.CONFLATE, 1)
cmd_socket.bind(f"tcp://*:{config.port}")
video_socket = context.socket(zmq.PUSH)
video_socket.setsockopt(zmq.CONFLATE, 1)
video_socket.bind(f"tcp://*:{config.video_port}")
return context, cmd_socket, video_socket
def run_camera_capture(cameras, images_lock, latest_images_dict, stop_event):
while not stop_event.is_set():
local_dict = {}
for name, cam in cameras.items():
frame = cam.async_read()
ret, buffer = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
if ret:
local_dict[name] = base64.b64encode(buffer).decode("utf-8")
else:
local_dict[name] = ""
with images_lock:
latest_images_dict.update(local_dict)
time.sleep(0.01)
def calibrate_follower_arm(motors_bus, calib_dir_str):
"""
Calibrates the follower arm. Attempts to load an existing calibration file;
if not found, runs manual calibration and saves the result.
"""
calib_dir = Path(calib_dir_str)
calib_dir.mkdir(parents=True, exist_ok=True)
calib_file = calib_dir / "main_follower.json"
try:
from lerobot.common.motors.feetech.feetech_calibration import run_full_arm_calibration
except ImportError:
print("[WARNING] Calibration function not available. Skipping calibration.")
return
if calib_file.exists():
with open(calib_file) as f:
calibration = json.load(f)
print(f"[INFO] Loaded calibration from {calib_file}")
else:
print("[INFO] Calibration file not found. Running manual calibration...")
calibration = run_full_arm_calibration(motors_bus, "lekiwi", "follower_arm", "follower")
print(f"[INFO] Calibration complete. Saving to {calib_file}")
with open(calib_file, "w") as f:
json.dump(calibration, f)
try:
motors_bus.set_calibration(calibration)
print("[INFO] Applied calibration for follower arm.")
except Exception as e:
print(f"[WARNING] Could not apply calibration: {e}")
def run_lekiwi(robot_config):
"""
Runs the LeKiwi robot:
- Sets up cameras and connects them.
- Initializes the follower arm motors.
- Calibrates the follower arm if necessary.
- Creates ZeroMQ sockets for receiving commands and streaming observations.
- Processes incoming commands (arm and wheel commands) and sends back sensor and camera data.
"""
# Import helper functions and classes
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.motors.feetech.feetech import FeetechMotorsBus, TorqueMode
# Initialize cameras from the robot configuration.
cameras = make_cameras_from_configs(robot_config.cameras)
for cam in cameras.values():
cam.connect()
# Initialize the motors bus using the follower arm configuration.
motor_config = robot_config.follower_arms.get("main")
if motor_config is None:
print("[ERROR] Follower arm 'main' configuration not found.")
return
motors_bus = FeetechMotorsBus(motor_config)
motors_bus.connect()
# Calibrate the follower arm.
calibrate_follower_arm(motors_bus, robot_config.calibration_dir)
# Create the LeKiwi robot instance.
robot = LeKiwi(motors_bus)
# Define the expected arm motor IDs.
arm_motor_ids = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"]
# Disable torque for each arm motor.
for motor in arm_motor_ids:
motors_bus.write("Torque_Enable", TorqueMode.DISABLED.value, motor)
# Set up ZeroMQ sockets.
context, cmd_socket, video_socket = setup_zmq_sockets(robot_config)
# Start the camera capture thread.
latest_images_dict = {}
images_lock = threading.Lock()
stop_event = threading.Event()
cam_thread = threading.Thread(
target=run_camera_capture, args=(cameras, images_lock, latest_images_dict, stop_event), daemon=True
)
cam_thread.start()
last_cmd_time = time.time()
print("LeKiwi robot server started. Waiting for commands...")
try:
while True:
loop_start_time = time.time()
# Process incoming commands (non-blocking).
while True:
try:
msg = cmd_socket.recv_string(zmq.NOBLOCK)
except zmq.Again:
break
try:
data = json.loads(msg)
# Process arm position commands.
if "arm_positions" in data:
arm_positions = data["arm_positions"]
if not isinstance(arm_positions, list):
print(f"[ERROR] Invalid arm_positions: {arm_positions}")
elif len(arm_positions) < len(arm_motor_ids):
print(
f"[WARNING] Received {len(arm_positions)} arm positions, expected {len(arm_motor_ids)}"
)
else:
for motor, pos in zip(arm_motor_ids, arm_positions, strict=False):
motors_bus.write("Goal_Position", pos, motor)
# Process wheel (base) commands.
if "raw_velocity" in data:
raw_command = data["raw_velocity"]
# Expect keys: "left_wheel", "back_wheel", "right_wheel".
command_speeds = [
int(raw_command.get("left_wheel", 0)),
int(raw_command.get("back_wheel", 0)),
int(raw_command.get("right_wheel", 0)),
]
robot.set_velocity(command_speeds)
last_cmd_time = time.time()
except Exception as e:
print(f"[ERROR] Parsing message failed: {e}")
# Watchdog: stop the robot if no command is received for over 0.5 seconds.
now = time.time()
if now - last_cmd_time > 0.5:
robot.stop()
last_cmd_time = now
# Read current wheel speeds from the robot.
current_velocity = robot.read_velocity()
# Read the follower arm state from the motors bus.
follower_arm_state = []
for motor in arm_motor_ids:
try:
pos = motors_bus.read("Present_Position", motor)
# Convert the position to a float (or use as is if already numeric).
follower_arm_state.append(float(pos) if not isinstance(pos, (int, float)) else pos)
except Exception as e:
print(f"[ERROR] Reading motor {motor} failed: {e}")
# Get the latest camera images.
with images_lock:
images_dict_copy = dict(latest_images_dict)
# Build the observation dictionary.
observation = {
"images": images_dict_copy,
"present_speed": current_velocity,
"follower_arm_state": follower_arm_state,
}
# Send the observation over the video socket.
video_socket.send_string(json.dumps(observation))
# Ensure a short sleep to avoid overloading the CPU.
elapsed = time.time() - loop_start_time
time.sleep(
max(0.033 - elapsed, 0)
) # If robot jitters increase the sleep and monitor cpu load with `top` in cmd
except KeyboardInterrupt:
print("Shutting down LeKiwi server.")
finally:
stop_event.set()
cam_thread.join()
robot.stop()
motors_bus.disconnect()
cmd_socket.close()
video_socket.close()
context.term()

View File

@ -1,692 +0,0 @@
import base64
import json
import os
import sys
from pathlib import Path
import cv2
import numpy as np
import torch
import zmq
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.errors import DeviceNotConnectedError
from lerobot.common.motors.feetech.feetech import TorqueMode
from lerobot.common.motors.feetech.feetech_calibration import run_full_arm_calibration
from lerobot.common.motors.motors_bus import MotorsBus
from lerobot.common.motors.utils import make_motors_buses_from_configs
from lerobot.common.robots.lekiwi.configuration_lekiwi import LeKiwiRobotConfig
from lerobot.common.robots.utils import get_arm_id
PYNPUT_AVAILABLE = True
try:
# Only import if there's a valid X server or if we're not on a Pi
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
print("No DISPLAY set. Skipping pynput import.")
raise ImportError("pynput blocked intentionally due to no display.")
from pynput import keyboard
except ImportError:
keyboard = None
PYNPUT_AVAILABLE = False
except Exception as e:
keyboard = None
PYNPUT_AVAILABLE = False
print(f"Could not import pynput: {e}")
class MobileManipulator:
"""
MobileManipulator is a class for connecting to and controlling a remote mobile manipulator robot.
The robot includes a three omniwheel mobile base and a remote follower arm.
The leader arm is connected locally (on the laptop) and its joint positions are recorded and then
forwarded to the remote follower arm (after applying a safety clamp).
In parallel, keyboard teleoperation is used to generate raw velocity commands for the wheels.
"""
def __init__(self, config: LeKiwiRobotConfig):
"""
Expected keys in config:
- ip, port, video_port for the remote connection.
- calibration_dir, leader_arms, follower_arms, max_relative_target, etc.
"""
self.robot_type = config.type
self.config = config
self.remote_ip = config.ip
self.remote_port = config.port
self.remote_port_video = config.video_port
self.calibration_dir = Path(self.config.calibration_dir)
self.logs = {}
self.teleop_keys = self.config.teleop_keys
# For teleoperation, the leader arm (local) is used to record the desired arm pose.
self.leader_arms = make_motors_buses_from_configs(self.config.leader_arms)
self.follower_arms = make_motors_buses_from_configs(self.config.follower_arms)
self.cameras = make_cameras_from_configs(self.config.cameras)
self.is_connected = False
self.last_frames = {}
self.last_present_speed = {}
self.last_remote_arm_state = torch.zeros(6, dtype=torch.float32)
# Define three speed levels and a current index
self.speed_levels = [
{"xy": 0.1, "theta": 30}, # slow
{"xy": 0.2, "theta": 60}, # medium
{"xy": 0.3, "theta": 90}, # fast
]
self.speed_index = 0 # Start at slow
# ZeroMQ context and sockets.
self.context = None
self.cmd_socket = None
self.video_socket = None
# Keyboard state for base teleoperation.
self.running = True
self.pressed_keys = {
"forward": False,
"backward": False,
"left": False,
"right": False,
"rotate_left": False,
"rotate_right": False,
}
if PYNPUT_AVAILABLE:
print("pynput is available - enabling local keyboard listener.")
self.listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release,
)
self.listener.start()
else:
print("pynput not available - skipping local keyboard listener.")
self.listener = None
def get_motor_names(self, arms: dict[str, MotorsBus]) -> list:
return [f"{arm}_{motor}" for arm, bus in arms.items() for motor in bus.motors]
@property
def camera_features(self) -> dict:
cam_ft = {}
for cam_key, cam in self.cameras.items():
key = f"observation.images.{cam_key}"
cam_ft[key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
@property
def motor_features(self) -> dict:
follower_arm_names = [
"shoulder_pan",
"shoulder_lift",
"elbow_flex",
"wrist_flex",
"wrist_roll",
"gripper",
]
observations = ["x_mm", "y_mm", "theta"]
combined_names = follower_arm_names + observations
return {
"action": {
"dtype": "float32",
"shape": (len(combined_names),),
"names": combined_names,
},
"observation.state": {
"dtype": "float32",
"shape": (len(combined_names),),
"names": combined_names,
},
}
@property
def features(self):
return {**self.motor_features, **self.camera_features}
@property
def has_camera(self):
return len(self.cameras) > 0
@property
def num_cameras(self):
return len(self.cameras)
@property
def available_arms(self):
available = []
for name in self.leader_arms:
available.append(get_arm_id(name, "leader"))
for name in self.follower_arms:
available.append(get_arm_id(name, "follower"))
return available
def on_press(self, key):
try:
# Movement
if key.char == self.teleop_keys["forward"]:
self.pressed_keys["forward"] = True
elif key.char == self.teleop_keys["backward"]:
self.pressed_keys["backward"] = True
elif key.char == self.teleop_keys["left"]:
self.pressed_keys["left"] = True
elif key.char == self.teleop_keys["right"]:
self.pressed_keys["right"] = True
elif key.char == self.teleop_keys["rotate_left"]:
self.pressed_keys["rotate_left"] = True
elif key.char == self.teleop_keys["rotate_right"]:
self.pressed_keys["rotate_right"] = True
# Quit teleoperation
elif key.char == self.teleop_keys["quit"]:
self.running = False
return False
# Speed control
elif key.char == self.teleop_keys["speed_up"]:
self.speed_index = min(self.speed_index + 1, 2)
print(f"Speed index increased to {self.speed_index}")
elif key.char == self.teleop_keys["speed_down"]:
self.speed_index = max(self.speed_index - 1, 0)
print(f"Speed index decreased to {self.speed_index}")
except AttributeError:
# e.g., if key is special like Key.esc
if key == keyboard.Key.esc:
self.running = False
return False
def on_release(self, key):
try:
if hasattr(key, "char"):
if key.char == self.teleop_keys["forward"]:
self.pressed_keys["forward"] = False
elif key.char == self.teleop_keys["backward"]:
self.pressed_keys["backward"] = False
elif key.char == self.teleop_keys["left"]:
self.pressed_keys["left"] = False
elif key.char == self.teleop_keys["right"]:
self.pressed_keys["right"] = False
elif key.char == self.teleop_keys["rotate_left"]:
self.pressed_keys["rotate_left"] = False
elif key.char == self.teleop_keys["rotate_right"]:
self.pressed_keys["rotate_right"] = False
except AttributeError:
pass
def connect(self):
if not self.leader_arms:
raise ValueError("MobileManipulator has no leader arm to connect.")
for name in self.leader_arms:
print(f"Connecting {name} leader arm.")
self.calibrate_leader()
# Set up ZeroMQ sockets to communicate with the remote mobile robot.
self.context = zmq.Context()
self.cmd_socket = self.context.socket(zmq.PUSH)
connection_string = f"tcp://{self.remote_ip}:{self.remote_port}"
self.cmd_socket.connect(connection_string)
self.cmd_socket.setsockopt(zmq.CONFLATE, 1)
self.video_socket = self.context.socket(zmq.PULL)
video_connection = f"tcp://{self.remote_ip}:{self.remote_port_video}"
self.video_socket.connect(video_connection)
self.video_socket.setsockopt(zmq.CONFLATE, 1)
print(
f"[INFO] Connected to remote robot at {connection_string} and video stream at {video_connection}."
)
self.is_connected = True
def load_or_run_calibration_(self, name, arm, arm_type):
arm_id = get_arm_id(name, arm_type)
arm_calib_path = self.calibration_dir / f"{arm_id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
calibration = json.load(f)
else:
print(f"Missing calibration file '{arm_calib_path}'")
calibration = run_full_arm_calibration(arm, self.robot_type, name, arm_type)
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
json.dump(calibration, f)
return calibration
def calibrate_leader(self):
for name, arm in self.leader_arms.items():
# Connect the bus
arm.connect()
# Disable torque on all motors
for motor_id in arm.motors:
arm.write("Torque_Enable", TorqueMode.DISABLED.value, motor_id)
# Now run calibration
calibration = self.load_or_run_calibration_(name, arm, "leader")
arm.set_calibration(calibration)
def calibrate_follower(self):
for name, bus in self.follower_arms.items():
bus.connect()
# Disable torque on all motors
for motor_id in bus.motors:
bus.write("Torque_Enable", 0, motor_id)
# Then filter out wheels
arm_only_dict = {k: v for k, v in bus.motors.items() if not k.startswith("wheel_")}
if not arm_only_dict:
continue
original_motors = bus.motors
bus.motors = arm_only_dict
calibration = self.load_or_run_calibration_(name, bus, "follower")
bus.set_calibration(calibration)
bus.motors = original_motors
def _get_data(self):
"""
Polls the video socket for up to 15 ms. If data arrives, decode only
the *latest* message, returning frames, speed, and arm state. If
nothing arrives for any field, use the last known values.
"""
frames = {}
present_speed = {}
remote_arm_state_tensor = torch.zeros(6, dtype=torch.float32)
# Poll up to 15 ms
poller = zmq.Poller()
poller.register(self.video_socket, zmq.POLLIN)
socks = dict(poller.poll(15))
if self.video_socket not in socks or socks[self.video_socket] != zmq.POLLIN:
# No new data arrived → reuse ALL old data
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
# Drain all messages, keep only the last
last_msg = None
while True:
try:
obs_string = self.video_socket.recv_string(zmq.NOBLOCK)
last_msg = obs_string
except zmq.Again:
break
if not last_msg:
# No new message → also reuse old
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
# Decode only the final message
try:
observation = json.loads(last_msg)
images_dict = observation.get("images", {})
new_speed = observation.get("present_speed", {})
new_arm_state = observation.get("follower_arm_state", None)
# Convert images
for cam_name, image_b64 in images_dict.items():
if image_b64:
jpg_data = base64.b64decode(image_b64)
np_arr = np.frombuffer(jpg_data, dtype=np.uint8)
frame_candidate = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
if frame_candidate is not None:
frames[cam_name] = frame_candidate
# If remote_arm_state is None and frames is None there is no message then use the previous message
if new_arm_state is not None and frames is not None:
self.last_frames = frames
remote_arm_state_tensor = torch.tensor(new_arm_state, dtype=torch.float32)
self.last_remote_arm_state = remote_arm_state_tensor
present_speed = new_speed
self.last_present_speed = new_speed
else:
frames = self.last_frames
remote_arm_state_tensor = self.last_remote_arm_state
present_speed = self.last_present_speed
except Exception as e:
print(f"[DEBUG] Error decoding video message: {e}")
# If decode fails, fall back to old data
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
return frames, present_speed, remote_arm_state_tensor
def _process_present_speed(self, present_speed: dict) -> torch.Tensor:
state_tensor = torch.zeros(3, dtype=torch.int32)
if present_speed:
decoded = {key: MobileManipulator.raw_to_degps(value) for key, value in present_speed.items()}
if "1" in decoded:
state_tensor[0] = decoded["1"]
if "2" in decoded:
state_tensor[1] = decoded["2"]
if "3" in decoded:
state_tensor[2] = decoded["3"]
return state_tensor
def teleop_step(
self, record_data: bool = False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
if not self.is_connected:
raise DeviceNotConnectedError("MobileManipulator is not connected. Run `connect()` first.")
speed_setting = self.speed_levels[self.speed_index]
xy_speed = speed_setting["xy"] # e.g. 0.1, 0.25, or 0.4
theta_speed = speed_setting["theta"] # e.g. 30, 60, or 90
# Prepare to assign the position of the leader to the follower
arm_positions = []
for name in self.leader_arms:
pos = self.leader_arms[name].read("Present_Position")
pos_tensor = torch.from_numpy(pos).float()
# Instead of pos_tensor.item(), use tolist() to convert the entire tensor to a list
arm_positions.extend(pos_tensor.tolist())
# (The rest of your code for generating wheel commands remains unchanged)
x_cmd = 0.0 # m/s forward/backward
y_cmd = 0.0 # m/s lateral
theta_cmd = 0.0 # deg/s rotation
if self.pressed_keys["forward"]:
x_cmd += xy_speed
if self.pressed_keys["backward"]:
x_cmd -= xy_speed
if self.pressed_keys["left"]:
y_cmd += xy_speed
if self.pressed_keys["right"]:
y_cmd -= xy_speed
if self.pressed_keys["rotate_left"]:
theta_cmd += theta_speed
if self.pressed_keys["rotate_right"]:
theta_cmd -= theta_speed
wheel_commands = self.body_to_wheel_raw(x_cmd, y_cmd, theta_cmd)
message = {"raw_velocity": wheel_commands, "arm_positions": arm_positions}
self.cmd_socket.send_string(json.dumps(message))
if not record_data:
return
obs_dict = self.capture_observation()
arm_state_tensor = torch.tensor(arm_positions, dtype=torch.float32)
wheel_velocity_tuple = self.wheel_raw_to_body(wheel_commands)
wheel_velocity_mm = (
wheel_velocity_tuple[0] * 1000.0,
wheel_velocity_tuple[1] * 1000.0,
wheel_velocity_tuple[2],
)
wheel_tensor = torch.tensor(wheel_velocity_mm, dtype=torch.float32)
action_tensor = torch.cat([arm_state_tensor, wheel_tensor])
action_dict = {"action": action_tensor}
return obs_dict, action_dict
def capture_observation(self) -> dict:
"""
Capture observations from the remote robot: current follower arm positions,
present wheel speeds (converted to body-frame velocities: x, y, theta),
and a camera frame.
"""
if not self.is_connected:
raise DeviceNotConnectedError("Not connected. Run `connect()` first.")
frames, present_speed, remote_arm_state_tensor = self._get_data()
body_state = self.wheel_raw_to_body(present_speed)
body_state_mm = (body_state[0] * 1000.0, body_state[1] * 1000.0, body_state[2]) # Convert x,y to mm/s
wheel_state_tensor = torch.tensor(body_state_mm, dtype=torch.float32)
combined_state_tensor = torch.cat((remote_arm_state_tensor, wheel_state_tensor), dim=0)
obs_dict = {"observation.state": combined_state_tensor}
# Loop over each configured camera
for cam_name, cam in self.cameras.items():
frame = frames.get(cam_name, None)
if frame is None:
# Create a black image using the camera's configured width, height, and channels
frame = np.zeros((cam.height, cam.width, cam.channels), dtype=np.uint8)
obs_dict[f"observation.images.{cam_name}"] = torch.from_numpy(frame)
return obs_dict
def send_action(self, action: torch.Tensor) -> torch.Tensor:
if not self.is_connected:
raise DeviceNotConnectedError("Not connected. Run `connect()` first.")
# Ensure the action tensor has at least 9 elements:
# - First 6: arm positions.
# - Last 3: base commands.
if action.numel() < 9:
# Pad with zeros if there are not enough elements.
padded = torch.zeros(9, dtype=action.dtype)
padded[: action.numel()] = action
action = padded
# Extract arm and base actions.
arm_actions = action[:6].flatten()
base_actions = action[6:].flatten()
x_cmd_mm = base_actions[0].item() # mm/s
y_cmd_mm = base_actions[1].item() # mm/s
theta_cmd = base_actions[2].item() # deg/s
# Convert mm/s to m/s for the kinematics calculations.
x_cmd = x_cmd_mm / 1000.0 # m/s
y_cmd = y_cmd_mm / 1000.0 # m/s
# Compute wheel commands from body commands.
wheel_commands = self.body_to_wheel_raw(x_cmd, y_cmd, theta_cmd)
arm_positions_list = arm_actions.tolist()
message = {"raw_velocity": wheel_commands, "arm_positions": arm_positions_list}
self.cmd_socket.send_string(json.dumps(message))
return action
def print_logs(self):
pass
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError("Not connected.")
if self.cmd_socket:
stop_cmd = {
"raw_velocity": {"left_wheel": 0, "back_wheel": 0, "right_wheel": 0},
"arm_positions": {},
}
self.cmd_socket.send_string(json.dumps(stop_cmd))
self.cmd_socket.close()
if self.video_socket:
self.video_socket.close()
if self.context:
self.context.term()
if PYNPUT_AVAILABLE:
self.listener.stop()
self.is_connected = False
print("[INFO] Disconnected from remote robot.")
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()
if PYNPUT_AVAILABLE:
self.listener.stop()
@staticmethod
def degps_to_raw(degps: float) -> int:
steps_per_deg = 4096.0 / 360.0
speed_in_steps = abs(degps) * steps_per_deg
speed_int = int(round(speed_in_steps))
if speed_int > 0x7FFF:
speed_int = 0x7FFF
if degps < 0:
return speed_int | 0x8000
else:
return speed_int & 0x7FFF
@staticmethod
def raw_to_degps(raw_speed: int) -> float:
steps_per_deg = 4096.0 / 360.0
magnitude = raw_speed & 0x7FFF
degps = magnitude / steps_per_deg
if raw_speed & 0x8000:
degps = -degps
return degps
def body_to_wheel_raw(
self,
x_cmd: float,
y_cmd: float,
theta_cmd: float,
wheel_radius: float = 0.05,
base_radius: float = 0.125,
max_raw: int = 3000,
) -> dict:
"""
Convert desired body-frame velocities into wheel raw commands.
Parameters:
x_cmd : Linear velocity in x (m/s).
y_cmd : Linear velocity in y (m/s).
theta_cmd : Rotational velocity (deg/s).
wheel_radius: Radius of each wheel (meters).
base_radius : Distance from the center of rotation to each wheel (meters).
max_raw : Maximum allowed raw command (ticks) per wheel.
Returns:
A dictionary with wheel raw commands:
{"left_wheel": value, "back_wheel": value, "right_wheel": value}.
Notes:
- Internally, the method converts theta_cmd to rad/s for the kinematics.
- The raw command is computed from the wheels angular speed in deg/s
using degps_to_raw(). If any command exceeds max_raw, all commands
are scaled down proportionally.
"""
# Convert rotational velocity from deg/s to rad/s.
theta_rad = theta_cmd * (np.pi / 180.0)
# Create the body velocity vector [x, y, theta_rad].
velocity_vector = np.array([x_cmd, y_cmd, theta_rad])
# Define the wheel mounting angles with a -90° offset.
angles = np.radians(np.array([240, 120, 0]) - 90)
# Build the kinematic matrix: each row maps body velocities to a wheels linear speed.
# The third column (base_radius) accounts for the effect of rotation.
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
# Compute each wheels linear speed (m/s) and then its angular speed (rad/s).
wheel_linear_speeds = m.dot(velocity_vector)
wheel_angular_speeds = wheel_linear_speeds / wheel_radius
# Convert wheel angular speeds from rad/s to deg/s.
wheel_degps = wheel_angular_speeds * (180.0 / np.pi)
# Scaling
steps_per_deg = 4096.0 / 360.0
raw_floats = [abs(degps) * steps_per_deg for degps in wheel_degps]
max_raw_computed = max(raw_floats)
if max_raw_computed > max_raw:
scale = max_raw / max_raw_computed
wheel_degps = wheel_degps * scale
# Convert each wheels angular speed (deg/s) to a raw integer.
wheel_raw = [MobileManipulator.degps_to_raw(deg) for deg in wheel_degps]
return {"left_wheel": wheel_raw[0], "back_wheel": wheel_raw[1], "right_wheel": wheel_raw[2]}
def wheel_raw_to_body(
self, wheel_raw: dict, wheel_radius: float = 0.05, base_radius: float = 0.125
) -> tuple:
"""
Convert wheel raw command feedback back into body-frame velocities.
Parameters:
wheel_raw : Dictionary with raw wheel commands (keys: "left_wheel", "back_wheel", "right_wheel").
wheel_radius: Radius of each wheel (meters).
base_radius : Distance from the robot center to each wheel (meters).
Returns:
A tuple (x_cmd, y_cmd, theta_cmd) where:
x_cmd : Linear velocity in x (m/s).
y_cmd : Linear velocity in y (m/s).
theta_cmd : Rotational velocity in deg/s.
"""
# Extract the raw values in order.
raw_list = [
int(wheel_raw.get("left_wheel", 0)),
int(wheel_raw.get("back_wheel", 0)),
int(wheel_raw.get("right_wheel", 0)),
]
# Convert each raw command back to an angular speed in deg/s.
wheel_degps = np.array([MobileManipulator.raw_to_degps(r) for r in raw_list])
# Convert from deg/s to rad/s.
wheel_radps = wheel_degps * (np.pi / 180.0)
# Compute each wheels linear speed (m/s) from its angular speed.
wheel_linear_speeds = wheel_radps * wheel_radius
# Define the wheel mounting angles with a -90° offset.
angles = np.radians(np.array([240, 120, 0]) - 90)
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
# Solve the inverse kinematics: body_velocity = M⁻¹ · wheel_linear_speeds.
m_inv = np.linalg.inv(m)
velocity_vector = m_inv.dot(wheel_linear_speeds)
x_cmd, y_cmd, theta_rad = velocity_vector
theta_cmd = theta_rad * (180.0 / np.pi)
return (x_cmd, y_cmd, theta_cmd)
class LeKiwi:
def __init__(self, motor_bus):
"""
Initializes the LeKiwi with Feetech motors bus.
"""
self.motor_bus = motor_bus
self.motor_ids = ["left_wheel", "back_wheel", "right_wheel"]
# Initialize motors in velocity mode.
self.motor_bus.write("Lock", 0)
self.motor_bus.write("Mode", [1, 1, 1], self.motor_ids)
self.motor_bus.write("Lock", 1)
print("Motors set to velocity mode.")
def read_velocity(self):
"""
Reads the raw speeds for all wheels. Returns a dictionary with motor names:
"""
raw_speeds = self.motor_bus.read("Present_Speed", self.motor_ids)
return {
"left_wheel": int(raw_speeds[0]),
"back_wheel": int(raw_speeds[1]),
"right_wheel": int(raw_speeds[2]),
}
def set_velocity(self, command_speeds):
"""
Sends raw velocity commands (16-bit encoded values) directly to the motor bus.
The order of speeds must correspond to self.motor_ids.
"""
self.motor_bus.write("Goal_Speed", command_speeds, self.motor_ids)
def stop(self):
"""Stops the robot by setting all motor speeds to zero."""
self.motor_bus.write("Goal_Speed", [0, 0, 0], self.motor_ids)
print("Motors stopped.")

View File

@ -1,704 +0,0 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import os
import sys
from pathlib import Path
import cv2
import numpy as np
import torch
import zmq
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.errors import DeviceNotConnectedError
from lerobot.common.motors.feetech.feetech import TorqueMode
from lerobot.common.motors.feetech.feetech_calibration import run_full_arm_calibration
from lerobot.common.motors.motors_bus import MotorsBus
from lerobot.common.motors.utils import make_motors_buses_from_configs
from lerobot.common.robots.lekiwi.configuration_lekiwi import LeKiwiRobotConfig
from lerobot.common.robots.utils import get_arm_id
PYNPUT_AVAILABLE = True
try:
# Only import if there's a valid X server or if we're not on a Pi
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
print("No DISPLAY set. Skipping pynput import.")
raise ImportError("pynput blocked intentionally due to no display.")
from pynput import keyboard
except ImportError:
keyboard = None
PYNPUT_AVAILABLE = False
except Exception as e:
keyboard = None
PYNPUT_AVAILABLE = False
print(f"Could not import pynput: {e}")
class MobileManipulator:
"""
MobileManipulator is a class for connecting to and controlling a remote mobile manipulator robot.
The robot includes a three omniwheel mobile base and a remote follower arm.
The leader arm is connected locally (on the laptop) and its joint positions are recorded and then
forwarded to the remote follower arm (after applying a safety clamp).
In parallel, keyboard teleoperation is used to generate raw velocity commands for the wheels.
"""
def __init__(self, config: LeKiwiRobotConfig):
"""
Expected keys in config:
- ip, port, video_port for the remote connection.
- calibration_dir, leader_arms, follower_arms, max_relative_target, etc.
"""
self.robot_type = config.type
self.config = config
self.remote_ip = config.ip
self.remote_port = config.port
self.remote_port_video = config.video_port
self.calibration_dir = Path(self.config.calibration_dir)
self.logs = {}
self.teleop_keys = self.config.teleop_keys
# For teleoperation, the leader arm (local) is used to record the desired arm pose.
self.leader_arms = make_motors_buses_from_configs(self.config.leader_arms)
self.follower_arms = make_motors_buses_from_configs(self.config.follower_arms)
self.cameras = make_cameras_from_configs(self.config.cameras)
self.is_connected = False
self.last_frames = {}
self.last_present_speed = {}
self.last_remote_arm_state = torch.zeros(6, dtype=torch.float32)
# Define three speed levels and a current index
self.speed_levels = [
{"xy": 0.1, "theta": 30}, # slow
{"xy": 0.2, "theta": 60}, # medium
{"xy": 0.3, "theta": 90}, # fast
]
self.speed_index = 0 # Start at slow
# ZeroMQ context and sockets.
self.context = None
self.cmd_socket = None
self.video_socket = None
# Keyboard state for base teleoperation.
self.running = True
self.pressed_keys = {
"forward": False,
"backward": False,
"left": False,
"right": False,
"rotate_left": False,
"rotate_right": False,
}
if PYNPUT_AVAILABLE:
print("pynput is available - enabling local keyboard listener.")
self.listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release,
)
self.listener.start()
else:
print("pynput not available - skipping local keyboard listener.")
self.listener = None
def get_motor_names(self, arms: dict[str, MotorsBus]) -> list:
return [f"{arm}_{motor}" for arm, bus in arms.items() for motor in bus.motors]
@property
def camera_features(self) -> dict:
cam_ft = {}
for cam_key, cam in self.cameras.items():
key = f"observation.images.{cam_key}"
cam_ft[key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
@property
def motor_features(self) -> dict:
follower_arm_names = [
"shoulder_pan",
"shoulder_lift",
"elbow_flex",
"wrist_flex",
"wrist_roll",
"gripper",
]
observations = ["x_mm", "y_mm", "theta"]
combined_names = follower_arm_names + observations
return {
"action": {
"dtype": "float32",
"shape": (len(combined_names),),
"names": combined_names,
},
"observation.state": {
"dtype": "float32",
"shape": (len(combined_names),),
"names": combined_names,
},
}
@property
def features(self):
return {**self.motor_features, **self.camera_features}
@property
def has_camera(self):
return len(self.cameras) > 0
@property
def num_cameras(self):
return len(self.cameras)
@property
def available_arms(self):
available = []
for name in self.leader_arms:
available.append(get_arm_id(name, "leader"))
for name in self.follower_arms:
available.append(get_arm_id(name, "follower"))
return available
def on_press(self, key):
try:
# Movement
if key.char == self.teleop_keys["forward"]:
self.pressed_keys["forward"] = True
elif key.char == self.teleop_keys["backward"]:
self.pressed_keys["backward"] = True
elif key.char == self.teleop_keys["left"]:
self.pressed_keys["left"] = True
elif key.char == self.teleop_keys["right"]:
self.pressed_keys["right"] = True
elif key.char == self.teleop_keys["rotate_left"]:
self.pressed_keys["rotate_left"] = True
elif key.char == self.teleop_keys["rotate_right"]:
self.pressed_keys["rotate_right"] = True
# Quit teleoperation
elif key.char == self.teleop_keys["quit"]:
self.running = False
return False
# Speed control
elif key.char == self.teleop_keys["speed_up"]:
self.speed_index = min(self.speed_index + 1, 2)
print(f"Speed index increased to {self.speed_index}")
elif key.char == self.teleop_keys["speed_down"]:
self.speed_index = max(self.speed_index - 1, 0)
print(f"Speed index decreased to {self.speed_index}")
except AttributeError:
# e.g., if key is special like Key.esc
if key == keyboard.Key.esc:
self.running = False
return False
def on_release(self, key):
try:
if hasattr(key, "char"):
if key.char == self.teleop_keys["forward"]:
self.pressed_keys["forward"] = False
elif key.char == self.teleop_keys["backward"]:
self.pressed_keys["backward"] = False
elif key.char == self.teleop_keys["left"]:
self.pressed_keys["left"] = False
elif key.char == self.teleop_keys["right"]:
self.pressed_keys["right"] = False
elif key.char == self.teleop_keys["rotate_left"]:
self.pressed_keys["rotate_left"] = False
elif key.char == self.teleop_keys["rotate_right"]:
self.pressed_keys["rotate_right"] = False
except AttributeError:
pass
def connect(self):
if not self.leader_arms:
raise ValueError("MobileManipulator has no leader arm to connect.")
for name in self.leader_arms:
print(f"Connecting {name} leader arm.")
self.calibrate_leader()
# Set up ZeroMQ sockets to communicate with the remote mobile robot.
self.context = zmq.Context()
self.cmd_socket = self.context.socket(zmq.PUSH)
connection_string = f"tcp://{self.remote_ip}:{self.remote_port}"
self.cmd_socket.connect(connection_string)
self.cmd_socket.setsockopt(zmq.CONFLATE, 1)
self.video_socket = self.context.socket(zmq.PULL)
video_connection = f"tcp://{self.remote_ip}:{self.remote_port_video}"
self.video_socket.connect(video_connection)
self.video_socket.setsockopt(zmq.CONFLATE, 1)
print(
f"[INFO] Connected to remote robot at {connection_string} and video stream at {video_connection}."
)
self.is_connected = True
def load_or_run_calibration_(self, name, arm, arm_type):
arm_id = get_arm_id(name, arm_type)
arm_calib_path = self.calibration_dir / f"{arm_id}.json"
if arm_calib_path.exists():
with open(arm_calib_path) as f:
calibration = json.load(f)
else:
print(f"Missing calibration file '{arm_calib_path}'")
calibration = run_full_arm_calibration(arm, self.robot_type, name, arm_type)
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f:
json.dump(calibration, f)
return calibration
def calibrate_leader(self):
for name, arm in self.leader_arms.items():
# Connect the bus
arm.connect()
# Disable torque on all motors
for motor_id in arm.motors:
arm.write("Torque_Enable", TorqueMode.DISABLED.value, motor_id)
# Now run calibration
calibration = self.load_or_run_calibration_(name, arm, "leader")
arm.set_calibration(calibration)
def calibrate_follower(self):
for name, bus in self.follower_arms.items():
bus.connect()
# Disable torque on all motors
for motor_id in bus.motors:
bus.write("Torque_Enable", 0, motor_id)
# Then filter out wheels
arm_only_dict = {k: v for k, v in bus.motors.items() if not k.startswith("wheel_")}
if not arm_only_dict:
continue
original_motors = bus.motors
bus.motors = arm_only_dict
calibration = self.load_or_run_calibration_(name, bus, "follower")
bus.set_calibration(calibration)
bus.motors = original_motors
def _get_data(self):
"""
Polls the video socket for up to 15 ms. If data arrives, decode only
the *latest* message, returning frames, speed, and arm state. If
nothing arrives for any field, use the last known values.
"""
frames = {}
present_speed = {}
remote_arm_state_tensor = torch.zeros(6, dtype=torch.float32)
# Poll up to 15 ms
poller = zmq.Poller()
poller.register(self.video_socket, zmq.POLLIN)
socks = dict(poller.poll(15))
if self.video_socket not in socks or socks[self.video_socket] != zmq.POLLIN:
# No new data arrived → reuse ALL old data
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
# Drain all messages, keep only the last
last_msg = None
while True:
try:
obs_string = self.video_socket.recv_string(zmq.NOBLOCK)
last_msg = obs_string
except zmq.Again:
break
if not last_msg:
# No new message → also reuse old
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
# Decode only the final message
try:
observation = json.loads(last_msg)
images_dict = observation.get("images", {})
new_speed = observation.get("present_speed", {})
new_arm_state = observation.get("follower_arm_state", None)
# Convert images
for cam_name, image_b64 in images_dict.items():
if image_b64:
jpg_data = base64.b64decode(image_b64)
np_arr = np.frombuffer(jpg_data, dtype=np.uint8)
frame_candidate = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
if frame_candidate is not None:
frames[cam_name] = frame_candidate
# If remote_arm_state is None and frames is None there is no message then use the previous message
if new_arm_state is not None and frames is not None:
self.last_frames = frames
remote_arm_state_tensor = torch.tensor(new_arm_state, dtype=torch.float32)
self.last_remote_arm_state = remote_arm_state_tensor
present_speed = new_speed
self.last_present_speed = new_speed
else:
frames = self.last_frames
remote_arm_state_tensor = self.last_remote_arm_state
present_speed = self.last_present_speed
except Exception as e:
print(f"[DEBUG] Error decoding video message: {e}")
# If decode fails, fall back to old data
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
return frames, present_speed, remote_arm_state_tensor
def _process_present_speed(self, present_speed: dict) -> torch.Tensor:
state_tensor = torch.zeros(3, dtype=torch.int32)
if present_speed:
decoded = {key: MobileManipulator.raw_to_degps(value) for key, value in present_speed.items()}
if "1" in decoded:
state_tensor[0] = decoded["1"]
if "2" in decoded:
state_tensor[1] = decoded["2"]
if "3" in decoded:
state_tensor[2] = decoded["3"]
return state_tensor
def teleop_step(
self, record_data: bool = False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
if not self.is_connected:
raise DeviceNotConnectedError("MobileManipulator is not connected. Run `connect()` first.")
speed_setting = self.speed_levels[self.speed_index]
xy_speed = speed_setting["xy"] # e.g. 0.1, 0.25, or 0.4
theta_speed = speed_setting["theta"] # e.g. 30, 60, or 90
# Prepare to assign the position of the leader to the follower
arm_positions = []
for name in self.leader_arms:
pos = self.leader_arms[name].read("Present_Position")
pos_tensor = torch.from_numpy(pos).float()
arm_positions.extend(pos_tensor.tolist())
y_cmd = 0.0 # m/s forward/backward
x_cmd = 0.0 # m/s lateral
theta_cmd = 0.0 # deg/s rotation
if self.pressed_keys["forward"]:
y_cmd += xy_speed
if self.pressed_keys["backward"]:
y_cmd -= xy_speed
if self.pressed_keys["left"]:
x_cmd += xy_speed
if self.pressed_keys["right"]:
x_cmd -= xy_speed
if self.pressed_keys["rotate_left"]:
theta_cmd += theta_speed
if self.pressed_keys["rotate_right"]:
theta_cmd -= theta_speed
wheel_commands = self.body_to_wheel_raw(x_cmd, y_cmd, theta_cmd)
message = {"raw_velocity": wheel_commands, "arm_positions": arm_positions}
self.cmd_socket.send_string(json.dumps(message))
if not record_data:
return
obs_dict = self.capture_observation()
arm_state_tensor = torch.tensor(arm_positions, dtype=torch.float32)
wheel_velocity_tuple = self.wheel_raw_to_body(wheel_commands)
wheel_velocity_mm = (
wheel_velocity_tuple[0] * 1000.0,
wheel_velocity_tuple[1] * 1000.0,
wheel_velocity_tuple[2],
)
wheel_tensor = torch.tensor(wheel_velocity_mm, dtype=torch.float32)
action_tensor = torch.cat([arm_state_tensor, wheel_tensor])
action_dict = {"action": action_tensor}
return obs_dict, action_dict
def capture_observation(self) -> dict:
"""
Capture observations from the remote robot: current follower arm positions,
present wheel speeds (converted to body-frame velocities: x, y, theta),
and a camera frame.
"""
if not self.is_connected:
raise DeviceNotConnectedError("Not connected. Run `connect()` first.")
frames, present_speed, remote_arm_state_tensor = self._get_data()
body_state = self.wheel_raw_to_body(present_speed)
body_state_mm = (body_state[0] * 1000.0, body_state[1] * 1000.0, body_state[2]) # Convert x,y to mm/s
wheel_state_tensor = torch.tensor(body_state_mm, dtype=torch.float32)
combined_state_tensor = torch.cat((remote_arm_state_tensor, wheel_state_tensor), dim=0)
obs_dict = {"observation.state": combined_state_tensor}
# Loop over each configured camera
for cam_name, cam in self.cameras.items():
frame = frames.get(cam_name, None)
if frame is None:
# Create a black image using the camera's configured width, height, and channels
frame = np.zeros((cam.height, cam.width, cam.channels), dtype=np.uint8)
obs_dict[f"observation.images.{cam_name}"] = torch.from_numpy(frame)
return obs_dict
def send_action(self, action: torch.Tensor) -> torch.Tensor:
if not self.is_connected:
raise DeviceNotConnectedError("Not connected. Run `connect()` first.")
# Ensure the action tensor has at least 9 elements:
# - First 6: arm positions.
# - Last 3: base commands.
if action.numel() < 9:
# Pad with zeros if there are not enough elements.
padded = torch.zeros(9, dtype=action.dtype)
padded[: action.numel()] = action
action = padded
# Extract arm and base actions.
arm_actions = action[:6].flatten()
base_actions = action[6:].flatten()
x_cmd_mm = base_actions[0].item() # mm/s
y_cmd_mm = base_actions[1].item() # mm/s
theta_cmd = base_actions[2].item() # deg/s
# Convert mm/s to m/s for the kinematics calculations.
x_cmd = x_cmd_mm / 1000.0 # m/s
y_cmd = y_cmd_mm / 1000.0 # m/s
# Compute wheel commands from body commands.
wheel_commands = self.body_to_wheel_raw(x_cmd, y_cmd, theta_cmd)
arm_positions_list = arm_actions.tolist()
message = {"raw_velocity": wheel_commands, "arm_positions": arm_positions_list}
self.cmd_socket.send_string(json.dumps(message))
return action
def print_logs(self):
pass
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError("Not connected.")
if self.cmd_socket:
stop_cmd = {
"raw_velocity": {"left_wheel": 0, "back_wheel": 0, "right_wheel": 0},
"arm_positions": {},
}
self.cmd_socket.send_string(json.dumps(stop_cmd))
self.cmd_socket.close()
if self.video_socket:
self.video_socket.close()
if self.context:
self.context.term()
if PYNPUT_AVAILABLE:
self.listener.stop()
self.is_connected = False
print("[INFO] Disconnected from remote robot.")
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()
if PYNPUT_AVAILABLE:
self.listener.stop()
@staticmethod
def degps_to_raw(degps: float) -> int:
steps_per_deg = 4096.0 / 360.0
speed_in_steps = abs(degps) * steps_per_deg
speed_int = int(round(speed_in_steps))
if speed_int > 0x7FFF:
speed_int = 0x7FFF
if degps < 0:
return speed_int | 0x8000
else:
return speed_int & 0x7FFF
@staticmethod
def raw_to_degps(raw_speed: int) -> float:
steps_per_deg = 4096.0 / 360.0
magnitude = raw_speed & 0x7FFF
degps = magnitude / steps_per_deg
if raw_speed & 0x8000:
degps = -degps
return degps
def body_to_wheel_raw(
self,
x_cmd: float,
y_cmd: float,
theta_cmd: float,
wheel_radius: float = 0.05,
base_radius: float = 0.125,
max_raw: int = 3000,
) -> dict:
"""
Convert desired body-frame velocities into wheel raw commands.
Parameters:
x_cmd : Linear velocity in x (m/s).
y_cmd : Linear velocity in y (m/s).
theta_cmd : Rotational velocity (deg/s).
wheel_radius: Radius of each wheel (meters).
base_radius : Distance from the center of rotation to each wheel (meters).
max_raw : Maximum allowed raw command (ticks) per wheel.
Returns:
A dictionary with wheel raw commands:
{"left_wheel": value, "back_wheel": value, "right_wheel": value}.
Notes:
- Internally, the method converts theta_cmd to rad/s for the kinematics.
- The raw command is computed from the wheels angular speed in deg/s
using degps_to_raw(). If any command exceeds max_raw, all commands
are scaled down proportionally.
"""
# Convert rotational velocity from deg/s to rad/s.
theta_rad = theta_cmd * (np.pi / 180.0)
# Create the body velocity vector [x, y, theta_rad].
velocity_vector = np.array([x_cmd, y_cmd, theta_rad])
# Define the wheel mounting angles (defined from y axis cw)
angles = np.radians(np.array([300, 180, 60]))
# Build the kinematic matrix: each row maps body velocities to a wheels linear speed.
# The third column (base_radius) accounts for the effect of rotation.
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
# Compute each wheels linear speed (m/s) and then its angular speed (rad/s).
wheel_linear_speeds = m.dot(velocity_vector)
wheel_angular_speeds = wheel_linear_speeds / wheel_radius
# Convert wheel angular speeds from rad/s to deg/s.
wheel_degps = wheel_angular_speeds * (180.0 / np.pi)
# Scaling
steps_per_deg = 4096.0 / 360.0
raw_floats = [abs(degps) * steps_per_deg for degps in wheel_degps]
max_raw_computed = max(raw_floats)
if max_raw_computed > max_raw:
scale = max_raw / max_raw_computed
wheel_degps = wheel_degps * scale
# Convert each wheels angular speed (deg/s) to a raw integer.
wheel_raw = [MobileManipulator.degps_to_raw(deg) for deg in wheel_degps]
return {"left_wheel": wheel_raw[0], "back_wheel": wheel_raw[1], "right_wheel": wheel_raw[2]}
def wheel_raw_to_body(
self, wheel_raw: dict, wheel_radius: float = 0.05, base_radius: float = 0.125
) -> tuple:
"""
Convert wheel raw command feedback back into body-frame velocities.
Parameters:
wheel_raw : Dictionary with raw wheel commands (keys: "left_wheel", "back_wheel", "right_wheel").
wheel_radius: Radius of each wheel (meters).
base_radius : Distance from the robot center to each wheel (meters).
Returns:
A tuple (x_cmd, y_cmd, theta_cmd) where:
x_cmd : Linear velocity in x (m/s).
y_cmd : Linear velocity in y (m/s).
theta_cmd : Rotational velocity in deg/s.
"""
# Extract the raw values in order.
raw_list = [
int(wheel_raw.get("left_wheel", 0)),
int(wheel_raw.get("back_wheel", 0)),
int(wheel_raw.get("right_wheel", 0)),
]
# Convert each raw command back to an angular speed in deg/s.
wheel_degps = np.array([MobileManipulator.raw_to_degps(r) for r in raw_list])
# Convert from deg/s to rad/s.
wheel_radps = wheel_degps * (np.pi / 180.0)
# Compute each wheels linear speed (m/s) from its angular speed.
wheel_linear_speeds = wheel_radps * wheel_radius
# Define the wheel mounting angles (defined from y axis cw)
angles = np.radians(np.array([300, 180, 60]))
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
# Solve the inverse kinematics: body_velocity = M⁻¹ · wheel_linear_speeds.
m_inv = np.linalg.inv(m)
velocity_vector = m_inv.dot(wheel_linear_speeds)
x_cmd, y_cmd, theta_rad = velocity_vector
theta_cmd = theta_rad * (180.0 / np.pi)
return (x_cmd, y_cmd, theta_cmd)
class LeKiwi:
def __init__(self, motor_bus):
"""
Initializes the LeKiwi with Feetech motors bus.
"""
self.motor_bus = motor_bus
self.motor_ids = ["left_wheel", "back_wheel", "right_wheel"]
# Initialize motors in velocity mode.
self.motor_bus.write("Lock", 0)
self.motor_bus.write("Mode", [1, 1, 1], self.motor_ids)
self.motor_bus.write("Lock", 1)
print("Motors set to velocity mode.")
def read_velocity(self):
"""
Reads the raw speeds for all wheels. Returns a dictionary with motor names:
"""
raw_speeds = self.motor_bus.read("Present_Speed", self.motor_ids)
return {
"left_wheel": int(raw_speeds[0]),
"back_wheel": int(raw_speeds[1]),
"right_wheel": int(raw_speeds[2]),
}
def set_velocity(self, command_speeds):
"""
Sends raw velocity commands (16-bit encoded values) directly to the motor bus.
The order of speeds must correspond to self.motor_ids.
"""
self.motor_bus.write("Goal_Speed", command_speeds, self.motor_ids)
def stop(self):
"""Stops the robot by setting all motor speeds to zero."""
self.motor_bus.write("Goal_Speed", [0, 0, 0], self.motor_ids)
print("Motors stopped.")

View File

@ -22,8 +22,11 @@ class Robot(abc.ABC):
def __init__(self, config: RobotConfig): def __init__(self, config: RobotConfig):
self.robot_type = self.name self.robot_type = self.name
self.id = config.id self.id = config.id
self.robot_mode = config.robot_mode
self.calibration_dir = ( self.calibration_dir = (
config.calibration_dir if config.calibration_dir else HF_LEROBOT_CALIBRATION / ROBOTS / self.name Path(config.calibration_dir)
if config.calibration_dir
else HF_LEROBOT_CALIBRATION / ROBOTS / self.name
) )
self.calibration_dir.mkdir(parents=True, exist_ok=True) self.calibration_dir.mkdir(parents=True, exist_ok=True)
self.calibration_fpath = self.calibration_dir / f"{self.id}.json" self.calibration_fpath = self.calibration_dir / f"{self.id}.json"

View File

@ -49,22 +49,22 @@ def make_robot_config(robot_type: str, **kwargs) -> RobotConfig:
return Stretch3RobotConfig(**kwargs) return Stretch3RobotConfig(**kwargs)
elif robot_type == "lekiwi": elif robot_type == "lekiwi":
from .lekiwi.configuration_lekiwi import LeKiwiRobotConfig from .lekiwi.config_lekiwi import LeKiwiConfig
return LeKiwiRobotConfig(**kwargs) return LeKiwiConfig(**kwargs)
else: else:
raise ValueError(f"Robot type '{robot_type}' is not available.") raise ValueError(f"Robot type '{robot_type}' is not available.")
def make_robot_from_config(config: RobotConfig): def make_robot_from_config(config: RobotConfig):
from .lekiwi.configuration_lekiwi import LeKiwiRobotConfig from .lekiwi.config_lekiwi import LeKiwiConfig
from .manipulator import ManipulatorRobotConfig from .manipulator import ManipulatorRobotConfig
if isinstance(config, ManipulatorRobotConfig): if isinstance(config, ManipulatorRobotConfig):
from lerobot.common.robots.manipulator import ManipulatorRobot from lerobot.common.robots.manipulator import ManipulatorRobot
return ManipulatorRobot(config) return ManipulatorRobot(config)
elif isinstance(config, LeKiwiRobotConfig): elif isinstance(config, LeKiwiConfig):
from lerobot.common.robots.mobile_manipulator import MobileManipulator from lerobot.common.robots.mobile_manipulator import MobileManipulator
return MobileManipulator(config) return MobileManipulator(config)

View File

@ -22,4 +22,5 @@ from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("keyboard") @TeleoperatorConfig.register_subclass("keyboard")
@dataclass @dataclass
class KeyboardTeleopConfig(TeleoperatorConfig): class KeyboardTeleopConfig(TeleoperatorConfig):
# TODO(Steven): Maybe set in here the keys that we want to capture/listen
mock: bool = False mock: bool = False

View File

@ -19,8 +19,7 @@ import os
import sys import sys
import time import time
from queue import Queue from queue import Queue
from typing import Any
import numpy as np
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
@ -59,11 +58,12 @@ class KeyboardTeleop(Teleoperator):
self.event_queue = Queue() self.event_queue = Queue()
self.current_pressed = {} self.current_pressed = {}
self.listener = None self.listener = None
self.is_connected = False self._is_connected = False
self.logs = {} self.logs = {}
@property @property
def action_feature(self) -> dict: def action_feature(self) -> dict:
# TODO(Steven): Verify this is correct
return { return {
"dtype": "float32", "dtype": "float32",
"shape": (len(self.arm),), "shape": (len(self.arm),),
@ -74,8 +74,22 @@ class KeyboardTeleop(Teleoperator):
def feedback_feature(self) -> dict: def feedback_feature(self) -> dict:
return {} return {}
@property
def is_connected(self) -> bool:
return self._is_connected
@property
def is_calibrated(self) -> bool:
pass
def connect(self) -> None: def connect(self) -> None:
if self.is_connected: # TODO(Steven): Consider instead of raising a warning and then returning the status
# if self._is_connected:
# logging.warning(
# "ManipulatorRobot is already connected. Do not run `robot.connect()` twice."
# )
# return self._is_connected
if self._is_connected:
raise DeviceAlreadyConnectedError( raise DeviceAlreadyConnectedError(
"ManipulatorRobot is already connected. Do not run `robot.connect()` twice." "ManipulatorRobot is already connected. Do not run `robot.connect()` twice."
) )
@ -83,24 +97,24 @@ class KeyboardTeleop(Teleoperator):
if PYNPUT_AVAILABLE: if PYNPUT_AVAILABLE:
logging.info("pynput is available - enabling local keyboard listener.") logging.info("pynput is available - enabling local keyboard listener.")
self.listener = keyboard.Listener( self.listener = keyboard.Listener(
on_press=self.on_press, on_press=self._on_press,
on_release=self.on_release, on_release=self._on_release,
) )
self.listener.start() self.listener.start()
else: else:
logging.info("pynput not available - skipping local keyboard listener.") logging.info("pynput not available - skipping local keyboard listener.")
self.listener = None self.listener = None
self.is_connected = True self._is_connected = True
def calibrate(self) -> None: def calibrate(self) -> None:
pass pass
def on_press(self, key): def _on_press(self, key):
if hasattr(key, "char"): if hasattr(key, "char"):
self.event_queue.put((key.char, True)) self.event_queue.put((key.char, True))
def on_release(self, key): def _on_release(self, key):
if hasattr(key, "char"): if hasattr(key, "char"):
self.event_queue.put((key.char, False)) self.event_queue.put((key.char, False))
if key == keyboard.Key.esc: if key == keyboard.Key.esc:
@ -112,10 +126,13 @@ class KeyboardTeleop(Teleoperator):
key_char, is_pressed = self.event_queue.get_nowait() key_char, is_pressed = self.event_queue.get_nowait()
self.current_pressed[key_char] = is_pressed self.current_pressed[key_char] = is_pressed
def get_action(self) -> np.ndarray: def configure(self):
pass
def get_action(self) -> dict[str, Any]:
before_read_t = time.perf_counter() before_read_t = time.perf_counter()
if not self.is_connected: if not self._is_connected:
raise DeviceNotConnectedError( raise DeviceNotConnectedError(
"KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`." "KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`."
) )
@ -126,17 +143,17 @@ class KeyboardTeleop(Teleoperator):
action = {key for key, val in self.current_pressed.items() if val} action = {key for key, val in self.current_pressed.items() if val}
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
return np.array(list(action)) return dict.fromkeys(action, None)
def send_feedback(self, feedback: np.ndarray) -> None: def send_feedback(self, feedback: dict[str, Any]) -> None:
pass pass
def disconnect(self) -> None: def disconnect(self) -> None:
if not self.is_connected: if not self._is_connected:
raise DeviceNotConnectedError( raise DeviceNotConnectedError(
"KeyboardTeleop is not connected. You need to run `robot.connect()` before `disconnect()`." "KeyboardTeleop is not connected. You need to run `robot.connect()` before `disconnect()`."
) )
if self.listener is not None: if self.listener is not None:
self.listener.stop() self.listener.stop()
self.is_connected = False self._is_connected = False

View File

@ -24,3 +24,4 @@ from ..config import TeleoperatorConfig
class SO100LeaderConfig(TeleoperatorConfig): class SO100LeaderConfig(TeleoperatorConfig):
# Port to connect to the arm # Port to connect to the arm
port: str port: str
id = "so100"

View File

@ -95,7 +95,7 @@ class SO100Leader(Teleoperator):
full_turn_motor = "wrist_roll" full_turn_motor = "wrist_roll"
unknown_range_motors = [name for name in self.arm.names if name != full_turn_motor] unknown_range_motors = [name for name in self.arm.names if name != full_turn_motor]
logger.info( print(
f"Move all joints except '{full_turn_motor}' sequentially through their " f"Move all joints except '{full_turn_motor}' sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..." "entire ranges of motion.\nRecording positions. Press ENTER to stop..."
) )

View File

@ -45,6 +45,9 @@ class Teleoperator(abc.ABC):
def is_connected(self) -> bool: def is_connected(self) -> bool:
pass pass
# TODO(Steven): I think connect() should return a bool, such that the client/application code can check if the device connected successfully
# if not device.connect():
# raise DeviceNotConnectedError(f"{device} failed to connect")
@abc.abstractmethod @abc.abstractmethod
def connect(self) -> None: def connect(self) -> None:
"""Connects to the teleoperator.""" """Connects to the teleoperator."""

View File

@ -422,7 +422,7 @@ def control_robot(cfg: ControlPipelineConfig):
elif isinstance(cfg.control, ReplayControlConfig): elif isinstance(cfg.control, ReplayControlConfig):
replay(robot, cfg.control) replay(robot, cfg.control)
elif isinstance(cfg.control, RemoteRobotConfig): elif isinstance(cfg.control, RemoteRobotConfig):
from lerobot.common.robots.lekiwi.lekiwi_remote import run_lekiwi from lerobot.common.robots.lekiwi.old_lekiwi_remote import run_lekiwi
_init_rerun(control_config=cfg.control, session_name="lerobot_control_loop_remote") _init_rerun(control_config=cfg.control, session_name="lerobot_control_loop_remote")
run_lekiwi(cfg.robot) run_lekiwi(cfg.robot)