refactor(robots): lekiwi v0.5

This commit is contained in:
Steven Palma 2025-03-14 14:38:06 +01:00
parent b1aed5a5d0
commit cb27f14f7b
No known key found for this signature in database
11 changed files with 121 additions and 123 deletions

View File

@ -16,8 +16,10 @@ class DeviceAlreadyConnectedError(ConnectionError):
self.message = message
super().__init__(self.message)
class InvalidActionError(ConnectionError):
"""Exception raised when an action is already invalid."""
def __init__(
self,
message="The action is invalid. Check the value follows what it is expected from the action space.",

View File

@ -2,10 +2,10 @@ from dataclasses import dataclass, field
from lerobot.common.robots.config import RobotConfig
@RobotConfig.register_subclass("daemon_lekiwi")
@dataclass
class DaemonLeKiwiRobotConfig(RobotConfig):
# Network Configuration
remote_ip: str = "192.168.0.193"
port_zmq_cmd: int = 5555

View File

@ -8,7 +8,6 @@ from lerobot.common.robots.config import RobotConfig
@RobotConfig.register_subclass("lekiwi")
@dataclass
class LeKiwiRobotConfig(RobotConfig):
port_zmq_cmd: int = 5555
port_zmq_observations: int = 5556
@ -32,12 +31,12 @@ class LeKiwiRobotConfig(RobotConfig):
# LeKiwiRobot will always have (and needs) such
shoulder_pan: tuple = (1, "sts3215")
shoulder_lift: tuple = (2, "sts3215")
elbow_flex: tuple=(3, "sts3215")
wrist_flex: tuple=(4, "sts3215")
wrist_roll: tuple=(5, "sts3215")
gripper: tuple =(6, "sts3215")
left_wheel: tuple= (7, "sts3215")
back_wheel: tuple = (8, "sts3215")
elbow_flex: tuple = (3, "sts3215")
wrist_flex: tuple = (4, "sts3215")
wrist_roll: tuple = (5, "sts3215")
gripper: tuple = (6, "sts3215")
left_wheel: tuple = (7, "sts3215")
back_wheel: tuple = (8, "sts3215")
right_wheel: tuple = (9, "sts3215")
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.

View File

@ -14,18 +14,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import logging
import numpy as np
import base64
import cv2
import numpy as np
import torch
import zmq
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError, InvalidActionError
from ..robot import Robot, RobotMode
from .configuration_daemon_lekiwi import DaemonLeKiwiRobotConfig
import zmq
# TODO(Steven): This doesn't need to inherit from Robot
# But we do it for now to offer a familiar API
@ -39,7 +42,6 @@ import zmq
# (meaning the policy might be needed to be train
# over the teleop action space)
class DaemonLeKiwiRobot(Robot):
config_class = DaemonLeKiwiRobotConfig
name = "daemonlekiwi"
@ -63,7 +65,6 @@ class DaemonLeKiwiRobot(Robot):
self.last_present_speed = {}
self.last_remote_arm_state = torch.zeros(6, dtype=torch.float32)
# Define three speed levels and a current index
self.speed_levels = [
{"xy": 0.1, "theta": 30}, # slow
@ -121,14 +122,14 @@ class DaemonLeKiwiRobot(Robot):
self.zmq_observation_socket = self.zmq_context.socket(zmq.PULL)
zmq_observations_locator = f"tcp://{self.remote_ip}:{self.port_zmq_observations}"
self.zmq_observation_socket.connect(zmq_observations_locator)
self.zmq_observation_socket.setsockopt(zmq.CONFLATE,1)
self.zmq_observation_socket.setsockopt(zmq.CONFLATE, 1)
self.is_connected = True
def calibrate(self) -> None:
# TODO(Steven): Nothing to calibrate.
# Consider triggering calibrate() on the remote mobile robot?
# Althought this would require a more complex comms schema
# Although this would require a more complex comms schema
logging.warning("DaemonLeKiwiRobot has nothing to calibrate.")
return
@ -361,13 +362,12 @@ class DaemonLeKiwiRobot(Robot):
if frame is None:
# TODO(Steven): Daemon doesn't know camera dimensions
logging.warning("Frame is None")
#frame = np.zeros((cam.height, cam.width, cam.channels), dtype=np.uint8)
# frame = np.zeros((cam.height, cam.width, cam.channels), dtype=np.uint8)
obs_dict[cam_name] = torch.from_numpy(frame)
return obs_dict
def from_keyboard_to_wheel_action(self, pressed_keys: np.ndarray):
# Speed control
if self.teleop_keys["speed_up"] in pressed_keys:
self.speed_index = min(self.speed_index + 1, 2)
@ -433,15 +433,15 @@ class DaemonLeKiwiRobot(Robot):
# TODO(Steven): This assumes teleop mode is always used with keyboard
if self.robot_mode is RobotMode.TELEOP:
if action.size <6:
if action.size < 6:
logging.error("Action should include at least the 6 states of the leader arm")
raise InvalidActionError
# TODO(Steven): Assumes size and order is respected
wheel_actions = [v for _,v in self.from_keyboard_to_wheel_action(action[6:])]
goal_pos[:6]=action[:6]
goal_pos[6:]=wheel_actions
self.zmq_cmd_socket.send_string(json.dumps(goal_pos)) #action is in motor space
wheel_actions = [v for _, v in self.from_keyboard_to_wheel_action(action[6:])]
goal_pos[:6] = action[:6]
goal_pos[6:] = wheel_actions
self.zmq_cmd_socket.send_string(json.dumps(goal_pos)) # action is in motor space
return goal_pos

View File

@ -1,12 +1,15 @@
from ...teleoperators.so100 import SO100Teleop, SO100TeleopConfig
import logging
import time
import numpy as np
from ...teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig
from ...teleoperators.so100 import SO100Teleop, SO100TeleopConfig
from .configuration_daemon_lekiwi import DaemonLeKiwiRobotConfig
from .daemon_lekiwi import DaemonLeKiwiRobot, RobotMode
import time
import logging
def main():
logging.info("Configuring Teleop Devices")
leader_arm_config = SO100TeleopConfig(port="/dev/tty.usbmodem585A0085511")
leader_arm = SO100Teleop(leader_arm_config)
@ -23,22 +26,18 @@ def main():
robot = DaemonLeKiwiRobot(robot_config)
logging.info("Connecting remote LeKiwiRobot")
robot.connect() # Establishes ZMQ sockets with the remote mobile robot
robot.connect() # Establishes ZMQ sockets with the remote mobile robot
robot.robot_mode = RobotMode.TELEOP
logging.info("Starting LeKiwiRobot teleoperation")
start = time.perf_counter()
duration = 0
while duration < 20:
arm_action = leader_arm.get_action()
base_action = keyboard.get_action()
action = {
**arm_action,
**base_action
}
_action_sent = robot.send_action(action) # Translates to motor space + sends over ZMQ
_observation = robot.get_observation() # Receives over ZMQ, translate to body-frame vel
action = np.concatenate((arm_action, base_action))
_action_sent = robot.send_action(action) # Translates to motor space + sends over ZMQ
_observation = robot.get_observation() # Receives over ZMQ, translate to body-frame vel
# dataset.save(action_sent, obs)
@ -50,11 +49,12 @@ def main():
duration = time.perf_counter() - start
logging.info("Disconnecting Teleop Devices and LeKiwiRobot Daemon")
robot.disconnect() # Cleans ZMQ comms
robot.disconnect() # Cleans ZMQ comms
leader_arm.disconnect()
keyboard.disconnect()
logging.info("Finished LeKiwiRobot cleanly")
if __name__ == "__main__":
main()

View File

@ -1,9 +1,10 @@
import logging
from .configuration_lekiwi import LeKiwiRobotConfig
from .lekiwi_robot import LeKiwiRobot
def main():
def main():
logging.info("Configuring LeKiwiRobot")
robot_config = LeKiwiRobotConfig()
robot = LeKiwiRobot(robot_config)

View File

@ -14,27 +14,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import logging
import time
import threading
import numpy as np
import time
import base64
import cv2
import numpy as np
import zmq
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .configuration_lekiwi import LeKiwiRobotConfig
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
TorqueMode,
run_arm_manual_calibration,
run_full_arm_calibration,
)
import zmq
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .configuration_lekiwi import LeKiwiRobotConfig
class LeKiwiRobot(Robot):
"""
@ -58,7 +60,7 @@ class LeKiwiRobot(Robot):
# TODO(Steven): Consider in the future using S100 robot class
# TODO(Steven): Another option is to use the motorbus factory, but in this case we assume that
# what we consider 'lekiwi robot' always uses the FeetechMotorsBus
# TODO(Steven): Order and dimension are generaly assumed to be 6 first for arm, 3 last for base
# TODO(Steven): Order and dimension are generally assumed to be 6 first for arm, 3 last for base
self.actuators_bus = FeetechMotorsBus(
port=self.config.port_motor_bus,
motors={
@ -79,7 +81,7 @@ class LeKiwiRobot(Robot):
self.max_relative_target = config.max_relative_target
#TODO(Steven): Consider removing cameras from configs
# TODO(Steven): Consider removing cameras from configs
self.cameras = make_cameras_from_configs(config.cameras)
self.observation_lock = threading.Lock()
@ -92,8 +94,6 @@ class LeKiwiRobot(Robot):
self.is_connected = False
self.logs = {}
@property
def state_feature(self) -> dict:
return {
@ -130,39 +130,37 @@ class LeKiwiRobot(Robot):
return context, cmd_socket, observation_socket
def setup_actuators(self):
# Set-up arm actuators (position mode)
# We assume that at connection time, arm is in a rest position,
# and torque can be safely disabled to run calibration.
self.actuators_bus.write("Torque_Enable", TorqueMode.DISABLED.value,self.arm_actuators)
self.calibrate() # TODO(Steven): This should be only for the arm
self.actuators_bus.write("Torque_Enable", TorqueMode.DISABLED.value, self.arm_actuators)
self.calibrate() # TODO(Steven): This should be only for the arm
# Mode=0 for Position Control
self.actuators_bus.write("Mode", 0,self.arm_actuators)
self.actuators_bus.write("Mode", 0, self.arm_actuators)
# Set P_Coefficient to lower value to avoid shakiness (Default is 32)
self.actuators_bus.write("P_Coefficient", 16,self.arm_actuators)
self.actuators_bus.write("P_Coefficient", 16, self.arm_actuators)
# Set I_Coefficient and D_Coefficient to default value 0 and 32
self.actuators_bus.write("I_Coefficient", 0, self.arm_actuators)
self.actuators_bus.write("D_Coefficient", 32,self.arm_actuators)
self.actuators_bus.write("D_Coefficient", 32, self.arm_actuators)
# Close the write lock so that Maximum_Acceleration gets written to EPROM address,
# which is mandatory for Maximum_Acceleration to take effect after rebooting.
self.actuators_bus.write("Lock", 0,self.arm_actuators)
self.actuators_bus.write("Lock", 0, self.arm_actuators)
# Set Maximum_Acceleration to 254 to speedup acceleration and deceleration of
# the motors. Note: this configuration is not in the official STS3215 Memory Table
self.actuators_bus.write("Maximum_Acceleration", 254,self.arm_actuators)
self.actuators_bus.write("Maximum_Acceleration", 254, self.arm_actuators)
self.actuators_bus.write("Acceleration", 254, self.arm_actuators)
logging.info("Activating torque.")
self.actuators_bus.write("Torque_Enable", TorqueMode.ENABLED.value,self.arm_actuators)
self.actuators_bus.write("Torque_Enable", TorqueMode.ENABLED.value, self.arm_actuators)
# Check arm can be read
self.actuators_bus.read("Present_Position",self.arm_actuators)
self.actuators_bus.read("Present_Position", self.arm_actuators)
# Set-up base actuators (velocity mode)
self.actuators_bus.write("Lock",0,self.base_actuators)
self.actuators_bus.write("Mode",[1,1,1],self.base_actuators)
self.actuators_bus.write("Lock",1,self.base_actuators)
self.actuators_bus.write("Lock", 0, self.base_actuators)
self.actuators_bus.write("Mode", [1, 1, 1], self.base_actuators)
self.actuators_bus.write("Lock", 1, self.base_actuators)
def connect(self) -> None:
if self.is_connected:
@ -192,15 +190,17 @@ class LeKiwiRobot(Robot):
actuators_calib_path = self.calibration_dir / f"{self.config.id}.json"
if actuators_calib_path.exists():
with open(actuators_calib_path) as f:
with open(actuators_calib_path,encoding="utf-8") as f:
calibration = json.load(f)
else:
logging.info(f"Missing calibration file '{actuators_calib_path}'")
calibration = run_arm_manual_calibration(self.actuators_bus, self.robot_type, self.name, "follower")
logging.info("Missing calibration file '%s'",actuators_calib_path)
calibration = run_full_arm_calibration(
self.actuators_bus, self.robot_type, self.name, "follower"
)
logging.info(f"Calibration is done! Saving calibration file '{actuators_calib_path}'")
logging.info("Calibration is done! Saving calibration file '%s'",actuators_calib_path)
actuators_calib_path.parent.mkdir(parents=True, exist_ok=True)
with open(actuators_calib_path, "w") as f:
with open(actuators_calib_path, "w",encoding="utf-8") as f:
json.dump(calibration, f)
self.actuators_bus.set_calibration(calibration)
@ -208,15 +208,15 @@ class LeKiwiRobot(Robot):
def get_observation(self) -> dict[str, np.ndarray]:
"""The returned observations do not have a batch dimension."""
if not self.is_connected:
raise DeviceNotConnectedError(
"LeKiwiRobot is not connected. You need to run `robot.connect()`."
)
raise DeviceNotConnectedError("LeKiwiRobot is not connected. You need to run `robot.connect()`.")
obs_dict = {}
# Read actuators position for arm and vel for base
before_read_t = time.perf_counter()
obs_dict[OBS_STATE] = self.actuators_bus.read("Present_Position",self.arm_actuators) + self.actuators_bus.read("Present_Speed", self.base_actuators)
obs_dict[OBS_STATE] = self.actuators_bus.read(
"Present_Position", self.arm_actuators
) + self.actuators_bus.read("Present_Speed", self.base_actuators)
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
# Capture images from cameras
@ -251,22 +251,22 @@ class LeKiwiRobot(Robot):
np.ndarray: the action sent to the motors, potentially clipped.
"""
if not self.is_connected:
raise DeviceNotConnectedError(
"LeKiwiRobot is not connected. You need to run `robot.connect()`."
)
raise DeviceNotConnectedError("LeKiwiRobot is not connected. You need to run `robot.connect()`.")
# Input action is in motor space
goal_pos=action
goal_pos = action
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.actuators_bus.read("Present_Position",self.arm_actuators)
goal_pos[:6] = ensure_safe_goal_position(goal_pos[:6], present_pos, self.config.max_relative_target)
present_pos = self.actuators_bus.read("Present_Position", self.arm_actuators)
goal_pos[:6] = ensure_safe_goal_position(
goal_pos[:6], present_pos, self.config.max_relative_target
)
# Send goal position to the actuators
# TODO(Steven): This happens synchronously
self.actuators_bus.write("Goal_Position", goal_pos[:6].astype(np.int32),self.arm_actuators)
self.actuators_bus.write("Goal_Speed",goal_pos[6:].astype(np.int32),self.base_actuators)
self.actuators_bus.write("Goal_Position", goal_pos[:6].astype(np.int32), self.arm_actuators)
self.actuators_bus.write("Goal_Speed", goal_pos[6:].astype(np.int32), self.base_actuators)
return goal_pos
@ -280,15 +280,13 @@ class LeKiwiRobot(Robot):
def stop(self):
# TODO(Steven): Assumes there's only 3 motors for base
logging.info("Stopping base")
self.actuators_bus.write("Goal_Speed",[0,0,0],self.base_actuators)
self.actuators_bus.write("Goal_Speed", [0, 0, 0], self.base_actuators)
logging.info("Base motors stopped")
def run(self):
# Copied logic from run_lekiwi in lekiwi_remote.py
if not self.is_connected:
raise DeviceNotConnectedError(
"LeKiwiRobot is not connected. You need to run `robot.connect()`."
)
raise DeviceNotConnectedError("LeKiwiRobot is not connected. You need to run `robot.connect()`.")
stop_event = threading.Event()
observation_thread = threading.Thread(
@ -311,13 +309,12 @@ class LeKiwiRobot(Robot):
# except zmq.Again:
# logging.warning("ZMQ again")
except Exception as e:
logging.warning(f"[ERROR] Message fetching failed: {e}")
logging.error("Message fetching failed: %s",e)
# Watchdog: stop the robot if no command is received for over 0.5 seconds.
now = time.time()
if now - last_cmd_time > 0.5:
self.stop()
pass
with self.observation_lock:
self.zmq_observation_socket.send_string(json.dumps(self.last_observation))
@ -333,7 +330,6 @@ class LeKiwiRobot(Robot):
stop_event.set()
observation_thread.join()
self.disconnect()
pass
def print_logs(self):
# TODO(Steven): Refactor logger

View File

@ -1,5 +1,6 @@
import abc
from pathlib import Path
import enum
from typing import Any
import draccus
@ -9,7 +10,6 @@ from lerobot.common.motors import MotorCalibration
from .config import RobotConfig
import enum
class RobotMode(enum.Enum):
TELEOP = 0

View File

@ -64,7 +64,7 @@ class KeyboardTeleop(Teleoperator):
@property
def action_feature(self) -> dict:
#TODO(Steven): Verify this is correct
# TODO(Steven): Verify this is correct
return {
"dtype": "float32",
"shape": (len(self.arm),),
@ -76,7 +76,7 @@ class KeyboardTeleop(Teleoperator):
return {}
def connect(self) -> None:
#TODO(Steven): Consider instead of raising a warning and then returning the status
# TODO(Steven): Consider instead of raising a warning and then returning the status
# if self.is_connected:
# logging.warning(
# "ManipulatorRobot is already connected. Do not run `robot.connect()` twice."