Move & organize motors, add base class

This commit is contained in:
Simon Alibert 2025-03-03 18:18:24 +01:00
parent 212c6095a2
commit 3d3a176940
9 changed files with 107 additions and 40 deletions

View File

@ -0,0 +1,3 @@
from .motors_bus import MotorsBus
__all__ = ["MotorsBus"]

View File

@ -0,0 +1,4 @@
from .dynamixel import DynamixelMotorsBus, TorqueMode, set_operating_mode
from .dynamixel_calibration import run_arm_calibration
__all__ = ["DynamixelMotorsBus", "TorqueMode", "set_operating_mode", "run_arm_calibration"]

View File

@ -8,7 +8,7 @@ from copy import deepcopy
import numpy as np import numpy as np
import tqdm import tqdm
from lerobot.common.utils.robot_utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.utils.utils import capture_timestamp_utc from lerobot.common.utils.utils import capture_timestamp_utc
PROTOCOL_VERSION = 2.0 PROTOCOL_VERSION = 2.0
@ -313,7 +313,7 @@ class DynamixelMotorsBus:
def connect(self): def connect(self):
if self.is_connected: if self.is_connected:
raise RobotDeviceAlreadyConnectedError( raise DeviceAlreadyConnectedError(
f"DynamixelMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice." f"DynamixelMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice."
) )
@ -670,7 +670,7 @@ class DynamixelMotorsBus:
def read(self, data_name, motor_names: str | list[str] | None = None): def read(self, data_name, motor_names: str | list[str] | None = None):
if not self.is_connected: if not self.is_connected:
raise RobotDeviceNotConnectedError( raise DeviceNotConnectedError(
f"DynamixelMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`." f"DynamixelMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`."
) )
@ -772,7 +772,7 @@ class DynamixelMotorsBus:
def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None): def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None):
if not self.is_connected: if not self.is_connected:
raise RobotDeviceNotConnectedError( raise DeviceNotConnectedError(
f"DynamixelMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`." f"DynamixelMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`."
) )
@ -841,7 +841,7 @@ class DynamixelMotorsBus:
def disconnect(self): def disconnect(self):
if not self.is_connected: if not self.is_connected:
raise RobotDeviceNotConnectedError( raise DeviceNotConnectedError(
f"DynamixelMotorsBus({self.port}) is not connected. Try running `motors_bus.connect()` first." f"DynamixelMotorsBus({self.port}) is not connected. Try running `motors_bus.connect()` first."
) )
@ -857,3 +857,25 @@ class DynamixelMotorsBus:
def __del__(self): def __del__(self):
if getattr(self, "is_connected", False): if getattr(self, "is_connected", False):
self.disconnect() self.disconnect()
def set_operating_mode(arm: DynamixelMotorsBus):
if (arm.read("Torque_Enable") != TorqueMode.DISABLED.value).any():
raise ValueError("To run set robot preset, the torque must be disabled on all motors.")
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm,
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
all_motors_except_gripper = [name for name in arm.motor_names if name != "gripper"]
if len(all_motors_except_gripper) > 0:
# 4 corresponds to Extended Position on Koch motors
arm.write("Operating_Mode", 4, all_motors_except_gripper)
# Use 'position control current based' for gripper to be limited by the limit of the current.
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
# to make it move, and it will move back to its original target position when we release the force.
# 5 corresponds to Current Controlled Position on Koch gripper motors "xl330-m077, xl330-m288"
arm.write("Operating_Mode", 5, "gripper")

View File

@ -3,12 +3,12 @@
import numpy as np import numpy as np
from lerobot.common.motors.dynamixel.dynamixel import ( from ..motors_bus import MotorsBus
from .dynamixel import (
CalibrationMode, CalibrationMode,
TorqueMode, TorqueMode,
convert_degrees_to_steps, convert_degrees_to_steps,
) )
from lerobot.common.motors.utils import MotorsBus
URL_TEMPLATE = ( URL_TEMPLATE = (
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp" "https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"

View File

@ -0,0 +1,3 @@
from .feetech import FeetechMotorsBus, TorqueMode
__all__ = ["FeetechMotorsBus", "TorqueMode"]

View File

@ -8,8 +8,7 @@ from copy import deepcopy
import numpy as np import numpy as np
import tqdm import tqdm
from lerobot.common.motors.configs import FeetechMotorsBusConfig from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.utils.robot_utils import RobotDeviceAlreadyConnectedError, RobotDeviceNotConnectedError
from lerobot.common.utils.utils import capture_timestamp_utc from lerobot.common.utils.utils import capture_timestamp_utc
PROTOCOL_VERSION = 0 PROTOCOL_VERSION = 0
@ -253,11 +252,10 @@ class FeetechMotorsBus:
motor_index = 6 motor_index = 6
motor_model = "sts3215" motor_model = "sts3215"
config = FeetechMotorsBusConfig( motors_bus = FeetechMotorsBus(
port="/dev/tty.usbmodem575E0031751", port="/dev/tty.usbmodem575E0031751",
motors={motor_name: (motor_index, motor_model)}, motors={motor_name: (motor_index, motor_model)},
) )
motors_bus = FeetechMotorsBus(config)
motors_bus.connect() motors_bus.connect()
position = motors_bus.read("Present_Position") position = motors_bus.read("Present_Position")
@ -273,11 +271,13 @@ class FeetechMotorsBus:
def __init__( def __init__(
self, self,
config: FeetechMotorsBusConfig, port: str,
motors: dict[str, tuple[int, str]],
mock: bool = False,
): ):
self.port = config.port self.port = port
self.motors = config.motors self.motors = motors
self.mock = config.mock self.mock = mock
self.model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) self.model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
self.model_resolution = deepcopy(MODEL_RESOLUTION) self.model_resolution = deepcopy(MODEL_RESOLUTION)
@ -294,7 +294,7 @@ class FeetechMotorsBus:
def connect(self): def connect(self):
if self.is_connected: if self.is_connected:
raise RobotDeviceAlreadyConnectedError( raise DeviceAlreadyConnectedError(
f"FeetechMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice." f"FeetechMotorsBus({self.port}) is already connected. Do not call `motors_bus.connect()` twice."
) )
@ -693,7 +693,7 @@ class FeetechMotorsBus:
import scservo_sdk as scs import scservo_sdk as scs
if not self.is_connected: if not self.is_connected:
raise RobotDeviceNotConnectedError( raise DeviceNotConnectedError(
f"FeetechMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`." f"FeetechMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`."
) )
@ -797,7 +797,7 @@ class FeetechMotorsBus:
def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None): def write(self, data_name, values: int | float | np.ndarray, motor_names: str | list[str] | None = None):
if not self.is_connected: if not self.is_connected:
raise RobotDeviceNotConnectedError( raise DeviceNotConnectedError(
f"FeetechMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`." f"FeetechMotorsBus({self.port}) is not connected. You need to run `motors_bus.connect()`."
) )
@ -866,7 +866,7 @@ class FeetechMotorsBus:
def disconnect(self): def disconnect(self):
if not self.is_connected: if not self.is_connected:
raise RobotDeviceNotConnectedError( raise DeviceNotConnectedError(
f"FeetechMotorsBus({self.port}) is not connected. Try running `motors_bus.connect()` first." f"FeetechMotorsBus({self.port}) is not connected. Try running `motors_bus.connect()` first."
) )

View File

@ -5,12 +5,12 @@ import time
import numpy as np import numpy as np
from lerobot.common.motors.feetech.feetech import ( from ..motors_bus import MotorsBus
from .feetech import (
CalibrationMode, CalibrationMode,
TorqueMode, TorqueMode,
convert_degrees_to_steps, convert_degrees_to_steps,
) )
from lerobot.common.motors.utils import MotorsBus
URL_TEMPLATE = ( URL_TEMPLATE = (
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp" "https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"

View File

@ -0,0 +1,46 @@
import abc
class MotorsBus(abc.ABC):
"""The main LeRobot class for implementing motors buses."""
def __init__(
self,
motors: dict[str, tuple[int, str]],
):
self.motors = motors
def __len__(self):
return len(self.motors)
@abc.abstractmethod
def connect(self):
pass
@abc.abstractmethod
def reconnect(self):
pass
@abc.abstractmethod
def set_calibration(self, calibration: dict[str, list]):
pass
@abc.abstractmethod
def apply_calibration(self):
pass
@abc.abstractmethod
def revert_calibration(self):
pass
@abc.abstractmethod
def read(self):
pass
@abc.abstractmethod
def write(self):
pass
@abc.abstractmethod
def disconnect(self):
pass

View File

@ -1,19 +1,5 @@
from typing import Protocol from .configs import MotorsBusConfig
from .motors_bus import MotorsBus
from lerobot.common.motors.configs import (
DynamixelMotorsBusConfig,
FeetechMotorsBusConfig,
MotorsBusConfig,
)
class MotorsBus(Protocol):
def motor_names(self): ...
def set_calibration(self): ...
def apply_calibration(self): ...
def revert_calibration(self): ...
def read(self): ...
def write(self): ...
def make_motors_buses_from_configs(motors_bus_configs: dict[str, MotorsBusConfig]) -> list[MotorsBus]: def make_motors_buses_from_configs(motors_bus_configs: dict[str, MotorsBusConfig]) -> list[MotorsBus]:
@ -21,7 +7,7 @@ def make_motors_buses_from_configs(motors_bus_configs: dict[str, MotorsBusConfig
for key, cfg in motors_bus_configs.items(): for key, cfg in motors_bus_configs.items():
if cfg.type == "dynamixel": if cfg.type == "dynamixel":
from lerobot.common.motors.dynamixel.dynamixel import DynamixelMotorsBus from .dynamixel import DynamixelMotorsBus
motors_buses[key] = DynamixelMotorsBus(cfg) motors_buses[key] = DynamixelMotorsBus(cfg)
@ -38,13 +24,16 @@ def make_motors_buses_from_configs(motors_bus_configs: dict[str, MotorsBusConfig
def make_motors_bus(motor_type: str, **kwargs) -> MotorsBus: def make_motors_bus(motor_type: str, **kwargs) -> MotorsBus:
if motor_type == "dynamixel": if motor_type == "dynamixel":
from lerobot.common.motors.dynamixel.dynamixel import DynamixelMotorsBus from .configs import DynamixelMotorsBusConfig
from .dynamixel import DynamixelMotorsBus
config = DynamixelMotorsBusConfig(**kwargs) config = DynamixelMotorsBusConfig(**kwargs)
return DynamixelMotorsBus(config) return DynamixelMotorsBus(config)
elif motor_type == "feetech": elif motor_type == "feetech":
from lerobot.common.motors.feetech.feetech import FeetechMotorsBus from feetech import FeetechMotorsBus
from .configs import FeetechMotorsBusConfig
config = FeetechMotorsBusConfig(**kwargs) config = FeetechMotorsBusConfig(**kwargs)
return FeetechMotorsBus(config) return FeetechMotorsBus(config)