diff --git a/lerobot/common/teleoperators/keyboard/teleop_keyboard.py b/lerobot/common/teleoperators/keyboard/teleop_keyboard.py index a0f0ab21..65f1dff7 100644 --- a/lerobot/common/teleoperators/keyboard/teleop_keyboard.py +++ b/lerobot/common/teleoperators/keyboard/teleop_keyboard.py @@ -18,6 +18,7 @@ import logging import os import sys import time +from queue import Queue import numpy as np @@ -28,7 +29,6 @@ from .configuration_keyboard import KeyboardTeleopConfig PYNPUT_AVAILABLE = True try: - # Only import if there's a valid X server or if we're not on a Pi if ("DISPLAY" not in os.environ) and ("linux" in sys.platform): logging.info("No DISPLAY set. Skipping pynput import.") raise ImportError("pynput blocked intentionally due to no display.") @@ -56,7 +56,8 @@ class KeyboardTeleop(Teleoperator): self.config = config self.robot_type = config.type - self.pressed_keys = {} + self.event_queue = Queue() + self.current_pressed = {} self.listener = None self.is_connected = False self.logs = {} @@ -97,23 +98,35 @@ class KeyboardTeleop(Teleoperator): def on_press(self, key): if hasattr(key, "char"): - self.pressed_keys[key.char] = True + self.event_queue.put((key.char, True)) def on_release(self, key): if hasattr(key, "char"): - self.pressed_keys[key.char] = False + self.event_queue.put((key.char, False)) if key == keyboard.Key.esc: logging.info("ESC pressed, disconnecting.") self.disconnect() + def _drain_pressed_keys(self): + while not self.event_queue.empty(): + key_char, is_pressed = self.event_queue.get_nowait() + self.current_pressed[key_char] = is_pressed + def get_action(self) -> np.ndarray: before_read_t = time.perf_counter() - # pressed_keys.items is wrapped in a list to avoid any RuntimeError due to dictionary changing size - # during iteration - action = {key for key, val in list(self.pressed_keys.items()) if val} + + if not self.is_connected: + raise DeviceNotConnectedError( + "KeyboardTeleop is not connected. You need to run `connect()` before `get_action()`." + ) + + self._drain_pressed_keys() + + # Generate action based on current key states + action = {key for key, val in self.current_pressed.items() if val} self.logs["read_pos_dt_s"] = time.perf_counter() - before_read_t - return action + return np.array(list(action)) def send_feedback(self, feedback: np.ndarray) -> None: pass @@ -121,7 +134,9 @@ class KeyboardTeleop(Teleoperator): def disconnect(self) -> None: if not self.is_connected: raise DeviceNotConnectedError( - "ManipulatorRobot is not connected. You need to run `robot.connect()` before disconnecting." + "KeyboardTeleop is not connected. You need to run `robot.connect()` before `disconnect()`." ) - self.listener.stop() + if self.listener is not None: + self.listener.stop() + self.is_connected = False diff --git a/lerobot/common/teleoperators/keyboard/teleop_keyboard_app.py b/lerobot/common/teleoperators/keyboard/teleop_keyboard_app.py new file mode 100755 index 00000000..4f463814 --- /dev/null +++ b/lerobot/common/teleoperators/keyboard/teleop_keyboard_app.py @@ -0,0 +1,28 @@ +import logging +import time + +from lerobot.common.teleoperators.keyboard import KeyboardTeleop, KeyboardTeleopConfig + + +def main(): + logging.info("Configuring Keyboard Teleop") + keyboard_config = KeyboardTeleopConfig() + keyboard = KeyboardTeleop(keyboard_config) + + logging.info("Connecting Keyboard Teleop") + keyboard.connect() + + logging.info("Starting Keyboard capture") + i = 0 + while i < 20: + action = keyboard.get_action() + print("Captured keys: %s", action) + time.sleep(1) + i += 1 + + keyboard.disconnect() + logging.info("Finished LeKiwiRobot cleanly") + + +if __name__ == "__main__": + main()