feat(teleop): thread-safe keyboard teleop implementation (#869)

Co-authored-by: Simon Alibert <75076266+aliberts@users.noreply.github.com>
This commit is contained in:
Steven Palma 2025-04-04 09:45:18 +02:00 committed by GitHub
parent 716029b1e3
commit 99c0938b42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 53 additions and 10 deletions

View File

@ -18,6 +18,7 @@ import logging
import os
import sys
import time
from queue import Queue
import numpy as np
@ -28,7 +29,6 @@ from .configuration_keyboard import KeyboardTeleopConfig
PYNPUT_AVAILABLE = True
try:
# Only import if there's a valid X server or if we're not on a Pi
if ("DISPLAY" not in os.environ) and ("linux" in sys.platform):
logging.info("No DISPLAY set. Skipping pynput import.")
raise ImportError("pynput blocked intentionally due to no display.")
@ -56,7 +56,8 @@ class KeyboardTeleop(Teleoperator):
self.config = config
self.robot_type = config.type
self.pressed_keys = {}
self.event_queue = Queue()
self.current_pressed = {}
self.listener = None
self.is_connected = False
self.logs = {}
@ -97,23 +98,35 @@ class KeyboardTeleop(Teleoperator):
def on_press(self, key):
if hasattr(key, "char"):
self.pressed_keys[key.char] = True
self.event_queue.put((key.char, True))
def on_release(self, key):
if hasattr(key, "char"):
self.pressed_keys[key.char] = False
self.event_queue.put((key.char, False))
if key == keyboard.Key.esc:
logging.info("ESC pressed, disconnecting.")
self.disconnect()
def _drain_pressed_keys(self):
while not self.event_queue.empty():
key_char, is_pressed = self.event_queue.get_nowait()
self.current_pressed[key_char] = is_pressed
def get_action(self) -> np.ndarray:
before_read_t = time.perf_counter()
# pressed_keys.items is wrapped in a list to avoid any RuntimeError due to dictionary changing size
# during iteration
action = {key for key, val in list(self.pressed_keys.items()) if val}
if not self.is_connected:
raise DeviceNotConnectedError(
"KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`."
)
self._drain_pressed_keys()
# Generate action based on current key states
action = {key for key, val in self.current_pressed.items() if val}
self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t
return action
return np.array(list(action))
def send_feedback(self, feedback: np.ndarray) -> None:
pass
@ -121,7 +134,9 @@ class KeyboardTeleop(Teleoperator):
def disconnect(self) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(
"ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting."
"KeyboardTeleop is not connected. You need to run `robot.connect()` before `disconnect()`."
)
self.listener.stop()
if self.listener is not None:
self.listener.stop()
self.is_connected = False

View File

@ -0,0 +1,28 @@
import logging
import time
from lerobot.common.teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig
def main():
logging.info("Configuring Keyboard Teleop")
keyboard_config = KeyboardTeleopConfig()
keyboard = KeyboardTeleop(keyboard_config)
logging.info("Connecting Keyboard Teleop")
keyboard.connect()
logging.info("Starting Keyboard capture")
i = 0
while i < 20:
action = keyboard.get_action()
print("Captured keys: %s", action)
time.sleep(1)
i += 1
keyboard.disconnect()
logging.info("Finished LeKiwiRobot cleanly")
if __name__ == "__main__":
main()