feat(teleop): thread-safe keyboard teleop implementation

This commit is contained in:
Steven Palma 2025-03-17 15:47:22 +01:00
parent 8d659a6aa9
commit f6ac413977
No known key found for this signature in database
2 changed files with 50 additions and 8 deletions

View File

@ -18,6 +18,7 @@ import logging
import os
import sys
import time
from threading import Lock
import numpy as np
@ -56,6 +57,7 @@ class KeyboardTeleop(Teleoperator):
self.config = config
self.robot_type = config.type
self.pressed_keys_lock = Lock()
self.pressed_keys = {}
self.listener = None
self.is_connected = False
@ -63,6 +65,7 @@ class KeyboardTeleop(Teleoperator):
@property
def action_feature(self) -> dict:
# TODO(Steven): Set this from config? (when deciding the keys we want to capture)
return {
"dtype": "float32",
"shape": (len(self.arm),),
@ -97,10 +100,12 @@ class KeyboardTeleop(Teleoperator):
def on_press(self, key):
if hasattr(key, "char"):
with self.pressed_keys_lock:
self.pressed_keys[key.char] = True
def on_release(self, key):
if hasattr(key, "char"):
with self.pressed_keys_lock:
self.pressed_keys[key.char] = False
if key == keyboard.Key.esc:
logging.info("ESC pressed, disconnecting.")
@ -108,12 +113,19 @@ class KeyboardTeleop(Teleoperator):
def get_action(self) -> np.ndarray:
before_read_t = time.perf_counter()
# pressed_keys.items is wrapped in a list to avoid any RuntimeError due to dictionary changing size
# during iteration
action = {key for key, val in list(self.pressed_keys.items()) if val}
if not self.is_connected:
raise DeviceNotConnectedError(
"KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`."
)
with self.pressed_keys_lock:
pressed_keys_snapshot = self.pressed_keys
action = {key for key, val in pressed_keys_snapshot.items() if val}
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
return action
return np.array(list(action))
def send_feedback(self, feedback: np.ndarray) -> None:
pass
@ -121,7 +133,9 @@ class KeyboardTeleop(Teleoperator):
def disconnect(self) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting."
"KeyboardTeleop is not connected. You need to run `robot.connect()` before `disconnect()`."
)
if self.listener is not None:
self.listener.stop()
self.is_connected = False

View File

@ -0,0 +1,28 @@
import logging
import time
from lerobot.common.teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig
def main():
logging.info("Configuring Keyboard Teleop")
keyboard_config = KeyboardTeleopConfig()
keyboard = KeyboardTeleop(keyboard_config)
logging.info("Connecting Keyboard Teleop")
keyboard.connect()
logging.info("Starting Keyboard capture")
i = 0
while i < 20:
action = keyboard.get_action()
print("Captured keys: %s", action)
time.sleep(1)
i += 1
keyboard.disconnect()
logging.info("Finished LeKiwiRobot cleanly")
if __name__ == "__main__":
main()