refactor(robots): lewiki v0.1

This commit is contained in:
Steven Palma 2025-03-12 19:12:27 +01:00
parent 27cb0c40bd
commit 6afc113a91
No known key found for this signature in database
6 changed files with 315 additions and 638 deletions

View File

@ -0,0 +1,35 @@
def main():
teleop_arm_config = SO100TeleopConfig(port="/dev/tty.usbmodem585A0085511")
teleop_arm = SO100Teleop(teleop_arm_config)
keyboard_config = KeyboardTeleopConfig()
keyboard = KeyboardTeleop(keyboard_config)
robot_config = kiwiconfig(port="/dev/tty.usbmodem575E0032081")
robot = KiwiRobotDaemon(robot_config)
teleop_arm.connect()
keyboard.connect()
robot.connect() # Establish ZMQ sockets with the mobile robot
start = time.perf_counter()
duration = 0
while duration < 20:
arm_action = teleop_arm.get_action()
base_action = keyboard.get_action()
action = {
**arm_action,
# **base_action ??
}
robot.send_action(action) # sends over ZMQ
# robot.get_observation() # receives over ZMQ
print(action)
duration = time.perf_counter() - start
robot.disconnect() # cleans ZMQ comms
teleop.disconnect()
if __name__ == "__main__":
main()

View File

@ -1,17 +1,3 @@
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64 import base64
import json import json
import os import os
@ -407,19 +393,21 @@ class MobileManipulator:
for name in self.leader_arms: for name in self.leader_arms:
pos = self.leader_arms[name].read("Present_Position") pos = self.leader_arms[name].read("Present_Position")
pos_tensor = torch.from_numpy(pos).float() pos_tensor = torch.from_numpy(pos).float()
# Instead of pos_tensor.item(), use tolist() to convert the entire tensor to a list
arm_positions.extend(pos_tensor.tolist()) arm_positions.extend(pos_tensor.tolist())
y_cmd = 0.0 # m/s forward/backward # (The rest of your code for generating wheel commands remains unchanged)
x_cmd = 0.0 # m/s lateral x_cmd = 0.0 # m/s forward/backward
y_cmd = 0.0 # m/s lateral
theta_cmd = 0.0 # deg/s rotation theta_cmd = 0.0 # deg/s rotation
if self.pressed_keys["forward"]: if self.pressed_keys["forward"]:
y_cmd += xy_speed
if self.pressed_keys["backward"]:
y_cmd -= xy_speed
if self.pressed_keys["left"]:
x_cmd += xy_speed x_cmd += xy_speed
if self.pressed_keys["right"]: if self.pressed_keys["backward"]:
x_cmd -= xy_speed x_cmd -= xy_speed
if self.pressed_keys["left"]:
y_cmd += xy_speed
if self.pressed_keys["right"]:
y_cmd -= xy_speed
if self.pressed_keys["rotate_left"]: if self.pressed_keys["rotate_left"]:
theta_cmd += theta_speed theta_cmd += theta_speed
if self.pressed_keys["rotate_right"]: if self.pressed_keys["rotate_right"]:
@ -597,8 +585,8 @@ class MobileManipulator:
# Create the body velocity vector [x, y, theta_rad]. # Create the body velocity vector [x, y, theta_rad].
velocity_vector = np.array([x_cmd, y_cmd, theta_rad]) velocity_vector = np.array([x_cmd, y_cmd, theta_rad])
# Define the wheel mounting angles (defined from y axis cw) # Define the wheel mounting angles with a -90° offset.
angles = np.radians(np.array([300, 180, 60])) angles = np.radians(np.array([240, 120, 0]) - 90)
# Build the kinematic matrix: each row maps body velocities to a wheels linear speed. # Build the kinematic matrix: each row maps body velocities to a wheels linear speed.
# The third column (base_radius) accounts for the effect of rotation. # The third column (base_radius) accounts for the effect of rotation.
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles]) m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
@ -654,8 +642,8 @@ class MobileManipulator:
# Compute each wheels linear speed (m/s) from its angular speed. # Compute each wheels linear speed (m/s) from its angular speed.
wheel_linear_speeds = wheel_radps * wheel_radius wheel_linear_speeds = wheel_radps * wheel_radius
# Define the wheel mounting angles (defined from y axis cw) # Define the wheel mounting angles with a -90° offset.
angles = np.radians(np.array([300, 180, 60])) angles = np.radians(np.array([240, 120, 0]) - 90)
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles]) m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
# Solve the inverse kinematics: body_velocity = M⁻¹ · wheel_linear_speeds. # Solve the inverse kinematics: body_velocity = M⁻¹ · wheel_linear_speeds.

View File

@ -1,692 +1,339 @@
import base64 #!/usr/bin/env python
import json
import os
import sys
from pathlib import Path
import cv2 # Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import time
import threading
import numpy as np import numpy as np
import torch import time
import zmq # import torch
from lerobot.common.cameras.utils import make_cameras_from_configs from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.errors import DeviceNotConnectedError from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.motors.feetech.feetech import TorqueMode from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors.feetech.feetech_calibration import run_full_arm_calibration from ..robot import Robot
from lerobot.common.motors.motors_bus import MotorsBus from ..utils import ensure_safe_goal_position
from lerobot.common.motors.utils import make_motors_buses_from_configs from .configuration_lekiwi import LeKiwiRobotConfig
from lerobot.common.robots.lekiwi.configuration_lekiwi import LeKiwiRobotConfig from lerobot.common.motors.feetech import (
from lerobot.common.robots.utils import get_arm_id FeetechMotorsBus,
TorqueMode,
run_arm_manual_calibration,
)
import zmq
PYNPUT_AVAILABLE = True class LeKiwiRobot(Robot):
try:
# Only import if there's a valid X server or if we're not on a Pi
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
print("No DISPLAY set. Skipping pynput import.")
raise ImportError("pynput blocked intentionally due to no display.")
from pynput import keyboard
except ImportError:
keyboard = None
PYNPUT_AVAILABLE = False
except Exception as e:
keyboard = None
PYNPUT_AVAILABLE = False
print(f"Could not import pynput: {e}")
class MobileManipulator:
""" """
MobileManipulator is a class for connecting to and controlling a remote mobile manipulator robot.
The robot includes a three omniwheel mobile base and a remote follower arm. The robot includes a three omniwheel mobile base and a remote follower arm.
The leader arm is connected locally (on the laptop) and its joint positions are recorded and then The leader arm is connected locally (on the laptop) and its joint positions are recorded and then
forwarded to the remote follower arm (after applying a safety clamp). forwarded to the remote follower arm (after applying a safety clamp).
In parallel, keyboard teleoperation is used to generate raw velocity commands for the wheels. In parallel, keyboard teleoperation is used to generate raw velocity commands for the wheels.
""" """
config_class = LeKiwiRobotConfig
name = "lekiwi"
def __init__(self, config: LeKiwiRobotConfig): def __init__(self, config: LeKiwiRobotConfig):
""" super().__init__(config)
Expected keys in config:
- ip, port, video_port for the remote connection.
- calibration_dir, leader_arms, follower_arms, max_relative_target, etc.
"""
self.robot_type = config.type
self.config = config self.config = config
# self.robot_type = config.type
# self.id = config.id
self.remote_ip = config.ip self.remote_ip = config.ip
self.remote_port = config.port self.remote_port = config.port
self.remote_port_video = config.video_port self.remote_port_video = config.video_port
self.calibration_dir = Path(self.config.calibration_dir) # self.logs = {}
self.logs = {} # TODO(Steven): This should go to teleop
# self.teleop_keys = self.config.teleop_keys
self.teleop_keys = self.config.teleop_keys # TODO(Steven): Consider in the future using S100 robot class
# TODO(Steven): Another option is to use the motorbus factory, but in this case we assume that
# what we consider 'lekiwi robot' always uses the FeetechMotorsBus
# TODO(Steve): We will need to have a key for arm and base for calibration
self.actuators = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": config.shoulder_pan,
"shoulder_lift": config.shoulder_lift,
"elbow_flex": config.elbow_flex,
"wrist_flex": config.wrist_flex,
"wrist_roll": config.wrist_roll,
"gripper": config.gripper,
"left_wheel": config.left_wheel,
"right_wheel": config.right_wheel,
"back_wheel": config.back_wheel,
},
)
# For teleoperation, the leader arm (local) is used to record the desired arm pose.
self.leader_arms = make_motors_buses_from_configs(self.config.leader_arms)
self.follower_arms = make_motors_buses_from_configs(self.config.follower_arms)
self.cameras = make_cameras_from_configs(self.config.cameras) #TODO(Steven): Consider removing cameras from configs
self.cameras = make_cameras_from_configs(config.cameras)
self.is_connected = False self.is_connected = False
self.logs = {}
self.last_frames = {} self.observation_lock = threading.Lock()
self.last_present_speed = {} self.last_observation = None
self.last_remote_arm_state = torch.zeros(6, dtype=torch.float32) # self.last_frames = {}
# self.last_present_speed = {}
# Define three speed levels and a current index # self.last_remote_arm_state = torch.zeros(6, dtype=torch.float32)
self.speed_levels = [
{"xy": 0.1, "theta": 30}, # slow
{"xy": 0.2, "theta": 60}, # medium
{"xy": 0.3, "theta": 90}, # fast
]
self.speed_index = 0 # Start at slow
# ZeroMQ context and sockets. # ZeroMQ context and sockets.
self.context = None self.context = None
self.cmd_socket = None self.cmd_socket = None
self.video_socket = None self.observation_socket = None
# Keyboard state for base teleoperation.
self.running = True
self.pressed_keys = {
"forward": False,
"backward": False,
"left": False,
"right": False,
"rotate_left": False,
"rotate_right": False,
}
if PYNPUT_AVAILABLE:
print("pynput is available - enabling local keyboard listener.")
self.listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release,
)
self.listener.start()
else:
print("pynput not available - skipping local keyboard listener.")
self.listener = None
def get_motor_names(self, arms: dict[str, MotorsBus]) -> list:
return [f"{arm}_{motor}" for arm, bus in arms.items() for motor in bus.motors]
@property @property
def camera_features(self) -> dict: def state_feature(self) -> dict:
return {
"dtype": "float32",
"shape": (len(self.actuators),),
"names": {"motors": list(self.actuators.motors)},
}
@property
def action_feature(self) -> dict:
return self.state_feature
@property
def camera_features(self) -> dict[str, dict]:
cam_ft = {} cam_ft = {}
for cam_key, cam in self.cameras.items(): for cam_key, cam in self.cameras.items():
key = f"observation.images.{cam_key}" cam_ft[cam_key] = {
cam_ft[key] = {
"shape": (cam.height, cam.width, cam.channels), "shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"], "names": ["height", "width", "channels"],
"info": None, "info": None,
} }
return cam_ft return cam_ft
@property def setup_zmq_sockets(self, config):
def motor_features(self) -> dict: context = zmq.Context()
follower_arm_names = [ cmd_socket = context.socket(zmq.PULL)
"shoulder_pan", cmd_socket.setsockopt(zmq.CONFLATE, 1)
"shoulder_lift", cmd_socket.bind(f"tcp://*:{config.port}")
"elbow_flex",
"wrist_flex",
"wrist_roll",
"gripper",
]
observations = ["x_mm", "y_mm", "theta"]
combined_names = follower_arm_names + observations
return {
"action": {
"dtype": "float32",
"shape": (len(combined_names),),
"names": combined_names,
},
"observation.state": {
"dtype": "float32",
"shape": (len(combined_names),),
"names": combined_names,
},
}
@property observation_socket = context.socket(zmq.PUSH)
def features(self): observation_socket.setsockopt(zmq.CONFLATE, 1)
return {**self.motor_features, **self.camera_features} observation_socket.bind(f"tcp://*:{config.video_port}")
@property return context, cmd_socket, observation_socket
def has_camera(self):
return len(self.cameras) > 0
@property def setup_actuators(self):
def num_cameras(self): # We assume that at connection time, arm is in a rest position,
return len(self.cameras) # and torque can be safely disabled to run calibration.
self.actuators.write("Torque_Enable", TorqueMode.DISABLED.value)
self.calibrate()
@property # Mode=0 for Position Control
def available_arms(self): # TODO(Steven): Base robots should actually be in vel mode
available = [] self.actuators.write("Mode", 0)
for name in self.leader_arms: # Set P_Coefficient to lower value to avoid shakiness (Default is 32)
available.append(get_arm_id(name, "leader")) self.actuators.write("P_Coefficient", 16)
for name in self.follower_arms: # Set I_Coefficient and D_Coefficient to default value 0 and 32
available.append(get_arm_id(name, "follower")) self.actuators.write("I_Coefficient", 0)
return available self.actuators.write("D_Coefficient", 32)
# Close the write lock so that Maximum_Acceleration gets written to EPROM address,
# which is mandatory for Maximum_Acceleration to take effect after rebooting.
self.actuators.write("Lock", 0)
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table
self.actuators.write("Maximum_Acceleration", 254)
self.actuators.write("Acceleration", 254)
def on_press(self, key): logging.info("Activating torque.")
try: self.actuators.write("Torque_Enable", TorqueMode.ENABLED.value)
# Movement
if key.char == self.teleop_keys["forward"]:
self.pressed_keys["forward"] = True
elif key.char == self.teleop_keys["backward"]:
self.pressed_keys["backward"] = True
elif key.char == self.teleop_keys["left"]:
self.pressed_keys["left"] = True
elif key.char == self.teleop_keys["right"]:
self.pressed_keys["right"] = True
elif key.char == self.teleop_keys["rotate_left"]:
self.pressed_keys["rotate_left"] = True
elif key.char == self.teleop_keys["rotate_right"]:
self.pressed_keys["rotate_right"] = True
# Quit teleoperation # Check arm can be read
elif key.char == self.teleop_keys["quit"]: self.actuators.read("Present_Position")
self.running = False
return False
# Speed control def connect(self) -> None:
elif key.char == self.teleop_keys["speed_up"]: if self.is_connected:
self.speed_index = min(self.speed_index + 1, 2) raise DeviceAlreadyConnectedError(
print(f"Speed index increased to {self.speed_index}") "LeKiwi Robot is already connected. Do not run `robot.connect()` twice."
elif key.char == self.teleop_keys["speed_down"]:
self.speed_index = max(self.speed_index - 1, 0)
print(f"Speed index decreased to {self.speed_index}")
except AttributeError:
# e.g., if key is special like Key.esc
if key == keyboard.Key.esc:
self.running = False
return False
def on_release(self, key):
try:
if hasattr(key, "char"):
if key.char == self.teleop_keys["forward"]:
self.pressed_keys["forward"] = False
elif key.char == self.teleop_keys["backward"]:
self.pressed_keys["backward"] = False
elif key.char == self.teleop_keys["left"]:
self.pressed_keys["left"] = False
elif key.char == self.teleop_keys["right"]:
self.pressed_keys["right"] = False
elif key.char == self.teleop_keys["rotate_left"]:
self.pressed_keys["rotate_left"] = False
elif key.char == self.teleop_keys["rotate_right"]:
self.pressed_keys["rotate_right"] = False
except AttributeError:
pass
def connect(self):
if not self.leader_arms:
raise ValueError("MobileManipulator has no leader arm to connect.")
for name in self.leader_arms:
print(f"Connecting {name} leader arm.")
self.calibrate_leader()
# Set up ZeroMQ sockets to communicate with the remote mobile robot.
self.context = zmq.Context()
self.cmd_socket = self.context.socket(zmq.PUSH)
connection_string = f"tcp://{self.remote_ip}:{self.remote_port}"
self.cmd_socket.connect(connection_string)
self.cmd_socket.setsockopt(zmq.CONFLATE, 1)
self.video_socket = self.context.socket(zmq.PULL)
video_connection = f"tcp://{self.remote_ip}:{self.remote_port_video}"
self.video_socket.connect(video_connection)
self.video_socket.setsockopt(zmq.CONFLATE, 1)
print(
f"[INFO] Connected to remote robot at {connection_string} and video stream at {video_connection}."
) )
logging.info("Connecting actuators.")
self.actuators.connect()
self.setup_actuators()
logging.info("Connecting cameras.")
for cam in self.cameras.values():
cam.connect()
logging.info("Connecting ZMQ sockets.")
self.context, self.cmd_socket, self.observation_socket = self.setup_zmq_sockets(self.config)
self.is_connected = True self.is_connected = True
def load_or_run_calibration_(self, name, arm, arm_type): def calibrate(self) -> None:
arm_id = get_arm_id(name, arm_type) # Copied from S100 robot
arm_calib_path = self.calibration_dir / f"{arm_id}.json" """After calibration all motors function in human interpretable ranges.
Rotations are expressed in degrees in nominal range of [-180, 180],
and linear motions (like gripper of Aloha) in nominal range of [0, 100].
"""
actuators_calib_path = self.calibration_dir / f"{self.config.id}.json"
if arm_calib_path.exists(): if actuators_calib_path.exists():
with open(arm_calib_path) as f: with open(actuators_calib_path) as f:
calibration = json.load(f) calibration = json.load(f)
else: else:
print(f"Missing calibration file '{arm_calib_path}'") # TODO(rcadene): display a warning in __init__ if calibration file not available
calibration = run_full_arm_calibration(arm, self.robot_type, name, arm_type) logging.info(f"Missing calibration file '{actuators_calib_path}'")
print(f"Calibration is done! Saving calibration file '{arm_calib_path}'") calibration = run_arm_manual_calibration(self.actuators, self.robot_type, self.name, "follower")
arm_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(arm_calib_path, "w") as f: logging.info(f"Calibration is done! Saving calibration file '{actuators_calib_path}'")
actuators_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(actuators_calib_path, "w") as f:
json.dump(calibration, f) json.dump(calibration, f)
return calibration self.actuators.set_calibration(calibration)
def calibrate_leader(self): def get_observation(self) -> dict[str, np.ndarray]:
for name, arm in self.leader_arms.items(): """The returned observations do not have a batch dimension."""
# Connect the bus
arm.connect()
# Disable torque on all motors
for motor_id in arm.motors:
arm.write("Torque_Enable", TorqueMode.DISABLED.value, motor_id)
# Now run calibration
calibration = self.load_or_run_calibration_(name, arm, "leader")
arm.set_calibration(calibration)
def calibrate_follower(self):
for name, bus in self.follower_arms.items():
bus.connect()
# Disable torque on all motors
for motor_id in bus.motors:
bus.write("Torque_Enable", 0, motor_id)
# Then filter out wheels
arm_only_dict = {k: v for k, v in bus.motors.items() if not k.startswith("wheel_")}
if not arm_only_dict:
continue
original_motors = bus.motors
bus.motors = arm_only_dict
calibration = self.load_or_run_calibration_(name, bus, "follower")
bus.set_calibration(calibration)
bus.motors = original_motors
def _get_data(self):
"""
Polls the video socket for up to 15 ms. If data arrives, decode only
the *latest* message, returning frames, speed, and arm state. If
nothing arrives for any field, use the last known values.
"""
frames = {}
present_speed = {}
remote_arm_state_tensor = torch.zeros(6, dtype=torch.float32)
# Poll up to 15 ms
poller = zmq.Poller()
poller.register(self.video_socket, zmq.POLLIN)
socks = dict(poller.poll(15))
if self.video_socket not in socks or socks[self.video_socket] != zmq.POLLIN:
# No new data arrived → reuse ALL old data
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
# Drain all messages, keep only the last
last_msg = None
while True:
try:
obs_string = self.video_socket.recv_string(zmq.NOBLOCK)
last_msg = obs_string
except zmq.Again:
break
if not last_msg:
# No new message → also reuse old
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
# Decode only the final message
try:
observation = json.loads(last_msg)
images_dict = observation.get("images", {})
new_speed = observation.get("present_speed", {})
new_arm_state = observation.get("follower_arm_state", None)
# Convert images
for cam_name, image_b64 in images_dict.items():
if image_b64:
jpg_data = base64.b64decode(image_b64)
np_arr = np.frombuffer(jpg_data, dtype=np.uint8)
frame_candidate = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
if frame_candidate is not None:
frames[cam_name] = frame_candidate
# If remote_arm_state is None and frames is None there is no message then use the previous message
if new_arm_state is not None and frames is not None:
self.last_frames = frames
remote_arm_state_tensor = torch.tensor(new_arm_state, dtype=torch.float32)
self.last_remote_arm_state = remote_arm_state_tensor
present_speed = new_speed
self.last_present_speed = new_speed
else:
frames = self.last_frames
remote_arm_state_tensor = self.last_remote_arm_state
present_speed = self.last_present_speed
except Exception as e:
print(f"[DEBUG] Error decoding video message: {e}")
# If decode fails, fall back to old data
return (self.last_frames, self.last_present_speed, self.last_remote_arm_state)
return frames, present_speed, remote_arm_state_tensor
def _process_present_speed(self, present_speed: dict) -> torch.Tensor:
state_tensor = torch.zeros(3, dtype=torch.int32)
if present_speed:
decoded = {key: MobileManipulator.raw_to_degps(value) for key, value in present_speed.items()}
if "1" in decoded:
state_tensor[0] = decoded["1"]
if "2" in decoded:
state_tensor[1] = decoded["2"]
if "3" in decoded:
state_tensor[2] = decoded["3"]
return state_tensor
def teleop_step(
self, record_data: bool = False
) -> None | tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]]:
if not self.is_connected: if not self.is_connected:
raise DeviceNotConnectedError("MobileManipulator is not connected. Run `connect()` first.") raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()`."
speed_setting = self.speed_levels[self.speed_index]
xy_speed = speed_setting["xy"] # e.g. 0.1, 0.25, or 0.4
theta_speed = speed_setting["theta"] # e.g. 30, 60, or 90
# Prepare to assign the position of the leader to the follower
arm_positions = []
for name in self.leader_arms:
pos = self.leader_arms[name].read("Present_Position")
pos_tensor = torch.from_numpy(pos).float()
# Instead of pos_tensor.item(), use tolist() to convert the entire tensor to a list
arm_positions.extend(pos_tensor.tolist())
# (The rest of your code for generating wheel commands remains unchanged)
x_cmd = 0.0 # m/s forward/backward
y_cmd = 0.0 # m/s lateral
theta_cmd = 0.0 # deg/s rotation
if self.pressed_keys["forward"]:
x_cmd += xy_speed
if self.pressed_keys["backward"]:
x_cmd -= xy_speed
if self.pressed_keys["left"]:
y_cmd += xy_speed
if self.pressed_keys["right"]:
y_cmd -= xy_speed
if self.pressed_keys["rotate_left"]:
theta_cmd += theta_speed
if self.pressed_keys["rotate_right"]:
theta_cmd -= theta_speed
wheel_commands = self.body_to_wheel_raw(x_cmd, y_cmd, theta_cmd)
message = {"raw_velocity": wheel_commands, "arm_positions": arm_positions}
self.cmd_socket.send_string(json.dumps(message))
if not record_data:
return
obs_dict = self.capture_observation()
arm_state_tensor = torch.tensor(arm_positions, dtype=torch.float32)
wheel_velocity_tuple = self.wheel_raw_to_body(wheel_commands)
wheel_velocity_mm = (
wheel_velocity_tuple[0] * 1000.0,
wheel_velocity_tuple[1] * 1000.0,
wheel_velocity_tuple[2],
) )
wheel_tensor = torch.tensor(wheel_velocity_mm, dtype=torch.float32)
action_tensor = torch.cat([arm_state_tensor, wheel_tensor])
action_dict = {"action": action_tensor}
return obs_dict, action_dict obs_dict = {}
def capture_observation(self) -> dict: # Read actuators position
""" # TODO(Steven): Base motors should return a vel instead of a pos
Capture observations from the remote robot: current follower arm positions, before_read_t = time.perf_counter()
present wheel speeds (converted to body-frame velocities: x, y, theta), obs_dict[OBS_STATE] = self.actuators.read("Present_Position")
and a camera frame. self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
"""
if not self.is_connected:
raise DeviceNotConnectedError("Not connected. Run `connect()` first.")
frames, present_speed, remote_arm_state_tensor = self._get_data() # Capture images from cameras
for cam_key, cam in self.cameras.items():
body_state = self.wheel_raw_to_body(present_speed) before_camread_t = time.perf_counter()
obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
body_state_mm = (body_state[0] * 1000.0, body_state[1] * 1000.0, body_state[2]) # Convert x,y to mm/s self.logs[f"read_camera_{cam_key}_dt_s"] = cam.logs["delta_timestamp_s"]
wheel_state_tensor = torch.tensor(body_state_mm, dtype=torch.float32) self.logs[f"async_read_camera_{cam_key}_dt_s"] = time.perf_counter() - before_camread_t
combined_state_tensor = torch.cat((remote_arm_state_tensor, wheel_state_tensor), dim=0)
obs_dict = {"observation.state": combined_state_tensor}
# Loop over each configured camera
for cam_name, cam in self.cameras.items():
frame = frames.get(cam_name, None)
if frame is None:
# Create a black image using the camera's configured width, height, and channels
frame = np.zeros((cam.height, cam.width, cam.channels), dtype=np.uint8)
obs_dict[f"observation.images.{cam_name}"] = torch.from_numpy(frame)
return obs_dict return obs_dict
def send_action(self, action: torch.Tensor) -> torch.Tensor: def send_action(self, action: np.ndarray) -> np.ndarray:
# Copied from S100 robot
"""Command lekiwi to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
`max_relative_target`. In this case, the action sent differs from original action.
Thus, this function always returns the action actually sent.
Args:
action (np.ndarray): array containing the goal positions for the motors.
Raises:
RobotDeviceNotConnectedError: if robot is not connected.
Returns:
np.ndarray: the action sent to the motors, potentially clipped.
"""
if not self.is_connected: if not self.is_connected:
raise DeviceNotConnectedError("Not connected. Run `connect()` first.") raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()`."
)
# Ensure the action tensor has at least 9 elements: goal_pos = action
# - First 6: arm positions.
# - Last 3: base commands.
if action.numel() < 9:
# Pad with zeros if there are not enough elements.
padded = torch.zeros(9, dtype=action.dtype)
padded[: action.numel()] = action
action = padded
# Extract arm and base actions. # Cap goal position when too far away from present position.
arm_actions = action[:6].flatten() # /!\ Slower fps expected due to reading from the follower.
base_actions = action[6:].flatten() if self.config.max_relative_target is not None:
present_pos = self.actuators.read("Present_Position")
goal_pos = ensure_safe_goal_position(goal_pos, present_pos, self.config.max_relative_target)
x_cmd_mm = base_actions[0].item() # mm/s # Send goal position to the actuators
y_cmd_mm = base_actions[1].item() # mm/s # TODO(Steven): Base motors should set a vel instead
theta_cmd = base_actions[2].item() # deg/s self.actuators.write("Goal_Position", goal_pos.astype(np.int32))
# Convert mm/s to m/s for the kinematics calculations. return goal_pos
x_cmd = x_cmd_mm / 1000.0 # m/s
y_cmd = y_cmd_mm / 1000.0 # m/s
# Compute wheel commands from body commands. def update_last_observation(self, stop_event):
wheel_commands = self.body_to_wheel_raw(x_cmd, y_cmd, theta_cmd) while not stop_event.is_set():
with self.observation_lock:
self.last_observation = self.get_observation()
# TODO(Steven): Consider adding a delay to not starve the CPU
arm_positions_list = arm_actions.tolist() def run(self):
# Copied from run_lekiwi in lekiwi_remote.py
# TODO(Steven): Csnsider with, finally
if not self.is_connected:
self.connect()
message = {"raw_velocity": wheel_commands, "arm_positions": arm_positions_list} stop_event = threading.Event()
self.cmd_socket.send_string(json.dumps(message)) observation_thread = threading.Thread(
target=self.update_last_observation, args=(stop_event), daemon=True
)
observation_thread.start()
return action last_cmd_time = time.time()
logging.info("LeKiwi robot server started. Waiting for commands...")
try:
while True:
loop_start_time = time.time()
try:
msg = self.cmd_socket.recv_string(zmq.NOBLOCK)
data = json.loads(msg)
# TODO(Steven): Process data correctly
self.send_action(data)
last_cmd_time = time.time()
# except zmq.Again:
# logging.warning("ZMQ again")
except Exception as e:
logging.warning(f"[ERROR] Message fetching failed: {e}")
# Watchdog: stop the robot if no command is received for over 0.5 seconds.
now = time.time()
if now - last_cmd_time > 0.5:
# TODO(Steven): Implement stop()
#self.stop()
pass
with self.observation_lock:
self.observation_socket.send_string(json.dumps(self.last_observation))
# Ensure a short sleep to avoid overloading the CPU.
elapsed = time.time() - loop_start_time
time.sleep(
max(0.033 - elapsed, 0)
) # If robot jitters increase the sleep and monitor cpu load with `top` in cmd
except KeyboardInterrupt:
print("Shutting down LeKiwi server.")
finally:
#TODO(Steven): Implement finally
pass
def print_logs(self): def print_logs(self):
# TODO(aliberts): move robot-specific logs logic here
pass pass
def disconnect(self): def disconnect(self):
if not self.is_connected: if not self.is_connected:
raise DeviceNotConnectedError("Not connected.") raise DeviceNotConnectedError(
if self.cmd_socket: "LeKiwi is not connected. You need to run `robot.connect()` before disconnecting."
stop_cmd = { )
"raw_velocity": {"left_wheel": 0, "back_wheel": 0, "right_wheel": 0}, # TODO(Steven): Base motors speed should be set to 0
"arm_positions": {}, # TODO(Steven): Close ZMQ sockets
} # TODO(Steven): Stop main loop threads
self.cmd_socket.send_string(json.dumps(stop_cmd)) self.actuators.disconnect()
self.cmd_socket.close() for cam in self.cameras.values():
if self.video_socket: cam.disconnect()
self.video_socket.close()
if self.context:
self.context.term()
if PYNPUT_AVAILABLE:
self.listener.stop()
self.is_connected = False self.is_connected = False
print("[INFO] Disconnected from remote robot.")
def __del__(self):
if getattr(self, "is_connected", False):
self.disconnect()
if PYNPUT_AVAILABLE:
self.listener.stop()
@staticmethod
def degps_to_raw(degps: float) -> int:
steps_per_deg = 4096.0 / 360.0
speed_in_steps = abs(degps) * steps_per_deg
speed_int = int(round(speed_in_steps))
if speed_int > 0x7FFF:
speed_int = 0x7FFF
if degps < 0:
return speed_int | 0x8000
else:
return speed_int & 0x7FFF
@staticmethod
def raw_to_degps(raw_speed: int) -> float:
steps_per_deg = 4096.0 / 360.0
magnitude = raw_speed & 0x7FFF
degps = magnitude / steps_per_deg
if raw_speed & 0x8000:
degps = -degps
return degps
def body_to_wheel_raw(
self,
x_cmd: float,
y_cmd: float,
theta_cmd: float,
wheel_radius: float = 0.05,
base_radius: float = 0.125,
max_raw: int = 3000,
) -> dict:
"""
Convert desired body-frame velocities into wheel raw commands.
Parameters:
x_cmd : Linear velocity in x (m/s).
y_cmd : Linear velocity in y (m/s).
theta_cmd : Rotational velocity (deg/s).
wheel_radius: Radius of each wheel (meters).
base_radius : Distance from the center of rotation to each wheel (meters).
max_raw : Maximum allowed raw command (ticks) per wheel.
Returns:
A dictionary with wheel raw commands:
{"left_wheel": value, "back_wheel": value, "right_wheel": value}.
Notes:
- Internally, the method converts theta_cmd to rad/s for the kinematics.
- The raw command is computed from the wheels angular speed in deg/s
using degps_to_raw(). If any command exceeds max_raw, all commands
are scaled down proportionally.
"""
# Convert rotational velocity from deg/s to rad/s.
theta_rad = theta_cmd * (np.pi / 180.0)
# Create the body velocity vector [x, y, theta_rad].
velocity_vector = np.array([x_cmd, y_cmd, theta_rad])
# Define the wheel mounting angles with a -90° offset.
angles = np.radians(np.array([240, 120, 0]) - 90)
# Build the kinematic matrix: each row maps body velocities to a wheels linear speed.
# The third column (base_radius) accounts for the effect of rotation.
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
# Compute each wheels linear speed (m/s) and then its angular speed (rad/s).
wheel_linear_speeds = m.dot(velocity_vector)
wheel_angular_speeds = wheel_linear_speeds / wheel_radius
# Convert wheel angular speeds from rad/s to deg/s.
wheel_degps = wheel_angular_speeds * (180.0 / np.pi)
# Scaling
steps_per_deg = 4096.0 / 360.0
raw_floats = [abs(degps) * steps_per_deg for degps in wheel_degps]
max_raw_computed = max(raw_floats)
if max_raw_computed > max_raw:
scale = max_raw / max_raw_computed
wheel_degps = wheel_degps * scale
# Convert each wheels angular speed (deg/s) to a raw integer.
wheel_raw = [MobileManipulator.degps_to_raw(deg) for deg in wheel_degps]
return {"left_wheel": wheel_raw[0], "back_wheel": wheel_raw[1], "right_wheel": wheel_raw[2]}
def wheel_raw_to_body(
self, wheel_raw: dict, wheel_radius: float = 0.05, base_radius: float = 0.125
) -> tuple:
"""
Convert wheel raw command feedback back into body-frame velocities.
Parameters:
wheel_raw : Dictionary with raw wheel commands (keys: "left_wheel", "back_wheel", "right_wheel").
wheel_radius: Radius of each wheel (meters).
base_radius : Distance from the robot center to each wheel (meters).
Returns:
A tuple (x_cmd, y_cmd, theta_cmd) where:
x_cmd : Linear velocity in x (m/s).
y_cmd : Linear velocity in y (m/s).
theta_cmd : Rotational velocity in deg/s.
"""
# Extract the raw values in order.
raw_list = [
int(wheel_raw.get("left_wheel", 0)),
int(wheel_raw.get("back_wheel", 0)),
int(wheel_raw.get("right_wheel", 0)),
]
# Convert each raw command back to an angular speed in deg/s.
wheel_degps = np.array([MobileManipulator.raw_to_degps(r) for r in raw_list])
# Convert from deg/s to rad/s.
wheel_radps = wheel_degps * (np.pi / 180.0)
# Compute each wheels linear speed (m/s) from its angular speed.
wheel_linear_speeds = wheel_radps * wheel_radius
# Define the wheel mounting angles with a -90° offset.
angles = np.radians(np.array([240, 120, 0]) - 90)
m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles])
# Solve the inverse kinematics: body_velocity = M⁻¹ · wheel_linear_speeds.
m_inv = np.linalg.inv(m)
velocity_vector = m_inv.dot(wheel_linear_speeds)
x_cmd, y_cmd, theta_rad = velocity_vector
theta_cmd = theta_rad * (180.0 / np.pi)
return (x_cmd, y_cmd, theta_cmd)
class LeKiwi:
def __init__(self, motor_bus):
"""
Initializes the LeKiwi with Feetech motors bus.
"""
self.motor_bus = motor_bus
self.motor_ids = ["left_wheel", "back_wheel", "right_wheel"]
# Initialize motors in velocity mode.
self.motor_bus.write("Lock", 0)
self.motor_bus.write("Mode", [1, 1, 1], self.motor_ids)
self.motor_bus.write("Lock", 1)
print("Motors set to velocity mode.")
def read_velocity(self):
"""
Reads the raw speeds for all wheels. Returns a dictionary with motor names:
"""
raw_speeds = self.motor_bus.read("Present_Speed", self.motor_ids)
return {
"left_wheel": int(raw_speeds[0]),
"back_wheel": int(raw_speeds[1]),
"right_wheel": int(raw_speeds[2]),
}
def set_velocity(self, command_speeds):
"""
Sends raw velocity commands (16-bit encoded values) directly to the motor bus.
The order of speeds must correspond to self.motor_ids.
"""
self.motor_bus.write("Goal_Speed", command_speeds, self.motor_ids)
def stop(self):
"""Stops the robot by setting all motor speeds to zero."""
self.motor_bus.write("Goal_Speed", [0, 0, 0], self.motor_ids)
print("Motors stopped.")

View File

@ -64,6 +64,7 @@ class KeyboardTeleop(Teleoperator):
@property @property
def action_feature(self) -> dict: def action_feature(self) -> dict:
#TODO(Steven): Verify this is correct
return { return {
"dtype": "float32", "dtype": "float32",
"shape": (len(self.arm),), "shape": (len(self.arm),),
@ -75,6 +76,12 @@ class KeyboardTeleop(Teleoperator):
return {} return {}
def connect(self) -> None: def connect(self) -> None:
#TODO(Steven): Consider instead of raising a warning and then returning the status
# if self.is_connected:
# logging.warning(
# "ManipulatorRobot is already connected. Do not run `robot.connect()` twice."
# )
# return self.is_connected
if self.is_connected: if self.is_connected:
raise DeviceAlreadyConnectedError( raise DeviceAlreadyConnectedError(
"ManipulatorRobot is already connected. Do not run `robot.connect()` twice." "ManipulatorRobot is already connected. Do not run `robot.connect()` twice."

View File

@ -379,7 +379,7 @@ def control_robot(cfg: ControlPipelineConfig):
elif isinstance(cfg.control, ReplayControlConfig): elif isinstance(cfg.control, ReplayControlConfig):
replay(robot, cfg.control) replay(robot, cfg.control)
elif isinstance(cfg.control, RemoteRobotConfig): elif isinstance(cfg.control, RemoteRobotConfig):
from lerobot.common.robots.lekiwi.lekiwi_remote import run_lekiwi from lerobot.common.robots.lekiwi.old_lekiwi_remote import run_lekiwi
run_lekiwi(cfg.robot) run_lekiwi(cfg.robot)