From d75d904e43722102f8bd1bd96157f4a2efdb2979 Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Mon, 3 Mar 2025 18:55:59 +0100 Subject: [PATCH] Add teleoperator base class --- lerobot/common/teleoperators/__init__.py | 4 ++ lerobot/common/teleoperators/config.py | 17 ++++++ lerobot/common/teleoperators/teleoperator.py | 60 ++++++++++++++++++++ 3 files changed, 81 insertions(+) create mode 100644 lerobot/common/teleoperators/__init__.py create mode 100644 lerobot/common/teleoperators/config.py create mode 100644 lerobot/common/teleoperators/teleoperator.py diff --git a/lerobot/common/teleoperators/__init__.py b/lerobot/common/teleoperators/__init__.py new file mode 100644 index 00000000..9dd9b962 --- /dev/null +++ b/lerobot/common/teleoperators/__init__.py @@ -0,0 +1,4 @@ +from .config import TeleoperatorConfig +from .teleoperator import Teleoperator + +__all__ = ["TeleoperatorConfig", "Teleoperator"] diff --git a/lerobot/common/teleoperators/config.py b/lerobot/common/teleoperators/config.py new file mode 100644 index 00000000..997c51f1 --- /dev/null +++ b/lerobot/common/teleoperators/config.py @@ -0,0 +1,17 @@ +import abc +from dataclasses import dataclass +from pathlib import Path + +import draccus + + +@dataclass(kw_only=True) +class TeleoperatorConfig(draccus.ChoiceRegistry, abc.ABC): + # Allows to distinguish between different teleoperators of the same type + id: str | None = None + # Directory to store calibration file + calibration_dir: Path | None = None + + @property + def type(self) -> str: + return self.get_choice_name(self.__class__) diff --git a/lerobot/common/teleoperators/teleoperator.py b/lerobot/common/teleoperators/teleoperator.py new file mode 100644 index 00000000..f06b1983 --- /dev/null +++ b/lerobot/common/teleoperators/teleoperator.py @@ -0,0 +1,60 @@ +import abc + +import numpy as np + +from lerobot.common.constants import HF_LEROBOT_CALIBRATION, TELEOPERATORS + +from .config import TeleoperatorConfig + + +class Teleoperator(abc.ABC): + """The main LeRobot class for implementing teleoperation devices.""" + + # Set these in ALL subclasses + config_class: TeleoperatorConfig + name: str + + def __init__(self, config: TeleoperatorConfig): + self.calibration_dir = ( + config.calibration_dir + if config.calibration_dir + else HF_LEROBOT_CALIBRATION / TELEOPERATORS / self.name + ) + self.calibration_dir.mkdir(parents=True, exist_ok=True) + + @abc.abstractproperty + def action_feature(self) -> dict: + pass + + @abc.abstractproperty + def feedback_feature(self) -> dict: + pass + + @abc.abstractmethod + def connect(self) -> None: + """Connects to the teleoperator.""" + pass + + @abc.abstractmethod + def calibrate(self) -> None: + """Calibrates the teleoperator.""" + pass + + @abc.abstractmethod + def get_action(self) -> np.ndarray: + """Gets the action to send to a teleoperator.""" + pass + + @abc.abstractmethod + def send_feedback(self, feedback: np.ndarray) -> None: + """Sends feedback captured from a robot to the teleoperator.""" + pass + + @abc.abstractmethod + def disconnect(self) -> None: + """Disconnects from the teleoperator.""" + pass + + def __del__(self): + if getattr(self, "is_connected", False): + self.disconnect()