This commit is contained in:
Simon Alibert 2025-04-04 12:57:55 +02:00 committed by GitHub
commit 56dbdb7521
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1242 additions and 0 deletions

View File

@ -0,0 +1,3 @@
from .config_hope_jr import HopeJrArmConfig, HopeJrHandConfig
from .hope_jr_arm import HopeJrArm
from .hope_jr_hand import HopeJrHand

View File

@ -0,0 +1,46 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.common.cameras import CameraConfig
from ..config import RobotConfig
@RobotConfig.register_subclass("hope_jr_hand")
@dataclass
class HopeJrHandConfig(RobotConfig):
port: str # Port to connect to the hand
disable_torque_on_disconnect: bool = True
cameras: dict[str, CameraConfig] = field(default_factory=dict)
@RobotConfig.register_subclass("hope_jr_arm")
@dataclass
class HopeJrArmConfig(RobotConfig):
port: str # Port to connect to the hand
disable_torque_on_disconnect: bool = True
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
max_relative_target: int | None = None
cameras: dict[str, CameraConfig] = field(default_factory=dict)

View File

@ -0,0 +1,194 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
OperatingMode,
)
from ..robot import Robot
from ..utils import ensure_safe_goal_position
from .config_hope_jr import HopeJrArmConfig
logger = logging.getLogger(__name__)
class HopeJrArm(Robot):
config_class = HopeJrArmConfig
name = "hope_jr_arm"
def __init__(self, config: HopeJrArmConfig):
super().__init__(config)
self.config = config
self.arm = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pitch": Motor(1, "sm8512bl", MotorNormMode.RANGE_M100_100),
"shoulder_yaw": Motor(2, "sts3250", MotorNormMode.RANGE_M100_100),
"shoulder_roll": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100),
"elbow_flex": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100),
"wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_yaw": Motor(6, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_pitch": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100),
},
)
self.cameras = make_cameras_from_configs(config.cameras)
@property
def state_feature(self) -> dict:
return {
"dtype": "float32",
"shape": (len(self.arm),),
"names": {"motors": list(self.arm.motors)},
}
@property
def action_feature(self) -> dict:
return self.state_feature
@property
def camera_features(self) -> dict[str, dict]:
cam_ft = {}
for cam_key, cam in self.cameras.items():
cam_ft[cam_key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
@property
def is_connected(self) -> bool:
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.arm.is_connected
def connect(self) -> None:
"""
We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.arm.connect()
if not self.is_calibrated:
self.calibrate()
# Connect the cameras
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.arm.is_calibrated
def calibrate(self) -> None:
raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch)
logger.info(f"\nRunning calibration of {self}")
self.arm.disable_torque()
for name in self.arm.names:
self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value)
input("Move robot to the middle of its range of motion and press ENTER....")
homing_offsets = self.arm.set_half_turn_homings()
full_turn_motor = "wrist_roll"
unknown_range_motors = [name for name in self.arm.names if name != full_turn_motor]
logger.info(
f"Move all joints except '{full_turn_motor}' sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors)
range_mins[full_turn_motor] = 0
range_maxes[full_turn_motor] = 4095
self.calibration = {}
for name, motor in self.arm.motors.items():
self.calibration[name] = MotorCalibration(
id=motor.id,
drive_mode=0,
homing_offset=homing_offsets[name],
range_min=range_mins[name],
range_max=range_maxes[name],
)
self.arm.write_calibration(self.calibration)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
self.arm.disable_torque()
self.arm.configure_motors()
# TODO
self.arm.enable_torque()
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {}
# Read arm position
start = time.perf_counter()
obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position")
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
goal_pos = action
# Cap goal position when too far away from present position.
# /!\ Slower fps expected due to reading from the follower.
if self.config.max_relative_target is not None:
present_pos = self.arm.sync_read("Present_Position")
goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()}
goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target)
self.arm.sync_write("Goal_Position", goal_pos)
return goal_pos
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.arm.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@ -0,0 +1,194 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.constants import OBS_IMAGES, OBS_STATE
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
OperatingMode,
)
from ..robot import Robot
from .config_hope_jr import HopeJrHandConfig
logger = logging.getLogger(__name__)
class HopeJrHand(Robot):
config_class = HopeJrHandConfig
name = "hope_jr_hand"
def __init__(self, config: HopeJrHandConfig):
super().__init__(config)
self.config = config
self.hand = FeetechMotorsBus(
port=self.config.port,
motors={
# Thumb
"thumb_basel_rotation": Motor(1, "scs0009", MotorNormMode.RANGE_M100_100),
"thumb_mcp": Motor(2, "scs0009", MotorNormMode.RANGE_M100_100),
"thumb_pip": Motor(3, "scs0009", MotorNormMode.RANGE_M100_100),
"thumb_dip": Motor(4, "scs0009", MotorNormMode.RANGE_M100_100),
# Index
"index_thumb_side": Motor(5, "scs0009", MotorNormMode.RANGE_M100_100),
"index_pinky_side": Motor(6, "scs0009", MotorNormMode.RANGE_M100_100),
"index_flexor": Motor(7, "scs0009", MotorNormMode.RANGE_M100_100),
# Middle
"middle_thumb_side": Motor(8, "scs0009", MotorNormMode.RANGE_M100_100),
"middle_pinky_side": Motor(9, "scs0009", MotorNormMode.RANGE_M100_100),
"middle_flexor": Motor(10, "scs0009", MotorNormMode.RANGE_M100_100),
# Ring
"ring_thumb_side": Motor(11, "scs0009", MotorNormMode.RANGE_M100_100),
"ring_pinky_side": Motor(12, "scs0009", MotorNormMode.RANGE_M100_100),
"ring_flexor": Motor(13, "scs0009", MotorNormMode.RANGE_M100_100),
# Pinky
"pinky_thumb_side": Motor(14, "scs0009", MotorNormMode.RANGE_M100_100),
"pinky_pinky_side": Motor(15, "scs0009", MotorNormMode.RANGE_M100_100),
"pinky_flexor": Motor(16, "scs0009", MotorNormMode.RANGE_M100_100),
},
)
self.cameras = make_cameras_from_configs(config.cameras)
@property
def state_feature(self) -> dict:
return {
"dtype": "float32",
"shape": (len(self.hand),),
"names": {"motors": list(self.hand.motors)},
}
@property
def action_feature(self) -> dict:
return self.state_feature
@property
def camera_features(self) -> dict[str, dict]:
cam_ft = {}
for cam_key, cam in self.cameras.items():
cam_ft[cam_key] = {
"shape": (cam.height, cam.width, cam.channels),
"names": ["height", "width", "channels"],
"info": None,
}
return cam_ft
@property
def is_connected(self) -> bool:
# TODO(aliberts): add cam.is_connected for cam in self.cameras
return self.hand.is_connected
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.hand.connect()
if not self.is_calibrated:
self.calibrate()
# Connect the cameras
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.hand.is_calibrated
def calibrate(self) -> None:
raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch)
logger.info(f"\nRunning calibration of {self}")
self.hand.disable_torque()
for name in self.hand.names:
self.hand.write("Operating_Mode", name, OperatingMode.POSITION.value)
input("Move robot to the middle of its range of motion and press ENTER....")
homing_offsets = self.hand.set_half_turn_homings()
full_turn_motor = "wrist_roll"
unknown_range_motors = [name for name in self.hand.names if name != full_turn_motor]
logger.info(
f"Move all joints except '{full_turn_motor}' sequentially through their "
"entire ranges of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.hand.record_ranges_of_motion(unknown_range_motors)
range_mins[full_turn_motor] = 0
range_maxes[full_turn_motor] = 4095
self.calibration = {}
for name, motor in self.hand.motors.items():
self.calibration[name] = MotorCalibration(
id=motor.id,
drive_mode=0,
homing_offset=homing_offsets[name],
range_min=range_mins[name],
range_max=range_maxes[name],
)
self.hand.write_calibration(self.calibration)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
self.hand.disable_torque()
self.hand.configure_motors()
# TODO
self.hand.enable_torque()
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
obs_dict = {}
# Read hand position
start = time.perf_counter()
obs_dict[OBS_STATE] = self.hand.sync_read("Present_Position")
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.hand.sync_write("Goal_Position", action)
return action
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.hand.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@ -0,0 +1,3 @@
from .config_homonculus import HomonculusArmConfig, HomonculusGloveConfig
from .homonculus_arm import HomonculusArm
from .homonculus_glove import HomonculusGlove

View File

@ -0,0 +1,35 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("homonculus_glove")
@dataclass
class HomonculusGloveConfig(TeleoperatorConfig):
port: str # Port to connect to the glove
baud_rate: int = 115_200
buffer_size: int = 10 # Number of past values to keep in memory
@TeleoperatorConfig.register_subclass("homonculus_arm")
@dataclass
class HomonculusArmConfig(TeleoperatorConfig):
port: str # Port to connect to the arm
baud_rate: int = 115_200
buffer_size: int = 10 # Number of past values to keep in memory

View File

@ -0,0 +1,419 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import pickle
import threading
from collections import deque
from enum import Enum
import numpy as np
import serial
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..teleoperator import Teleoperator
from .config_homonculus import HomonculusArmConfig
logger = logging.getLogger(__name__)
LOWER_BOUND_LINEAR = -100
UPPER_BOUND_LINEAR = 200
class CalibrationMode(Enum):
# Joints with rotational motions are expressed in degrees in nominal range of [-180, 180]
DEGREE = 0
# Joints with linear motions (like gripper of Aloha) are expressed in nominal range of [0, 100]
LINEAR = 1
class HomonculusArm(Teleoperator):
config_class = HomonculusArmConfig
name = "homonculus_arml"
def __init__(self, config: HomonculusArmConfig):
self.config = config
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
self.buffer_size = config.buffer_size
self.joints = [
"wrist_roll",
"wrist_pitch",
"wrist_yaw",
"elbow_flex",
"shoulder_roll",
"shoulder_yaw",
"shoulder_pitch",
]
# Initialize a buffer (deque) for each joint
self.joints_buffer = {joint: deque(maxlen=self.buffer_size) for joint in self.joints}
# Last read dictionary
self.last_d = {joint: 100 for joint in self.joints}
# For adaptive EMA, we store a "previous smoothed" state per joint
self.adaptive_ema_state = {joint: None for joint in self.joints}
self.kalman_state = {joint: {"x": None, "P": None} for joint in self.joints}
self.calibration = None
self.thread = threading.Thread(target=self.async_read, daemon=True)
@property
def action_feature(self) -> dict:
return {
"dtype": "float32",
"shape": (len(self.joints),),
"names": {"motors": self.joints},
}
@property
def feedback_feature(self) -> dict:
return {}
@property
def is_connected(self) -> bool:
return self.thread.is_alive() and self.serial.is_open
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.serial.open()
self.thread.start()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
raise NotImplementedError # TODO
def calibrate(self) -> None:
raise NotImplementedError # TODO
def configure(self) -> None:
raise NotImplementedError # TODO
def get_action(self) -> dict[str, float]:
raise NotImplementedError # TODO
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:
if not self.is_connected:
DeviceNotConnectedError(f"{self} is not connected.")
self.thread.join()
self.serial.close()
logger.info(f"{self} disconnected.")
### WIP below ###
@property
def joint_names(self):
return list(self.last_d.keys())
def read(self, motor_names: list[str] | None = None):
"""
Return the most recent (single) values from self.last_d,
optionally applying calibration.
"""
if motor_names is None:
motor_names = self.joint_names
# Get raw (last) values
values = np.array([self.last_d[k] for k in motor_names])
# print(motor_names)
print(values)
# Apply calibration if available
if self.calibration is not None:
values = self.apply_calibration(values, motor_names)
print(values)
return values
def read_running_average(self, motor_names: list[str] | None = None, linearize=False):
"""
Return the AVERAGE of the most recent self.buffer_size (or fewer, if not enough data) readings
for each joint, optionally applying calibration.
"""
if motor_names is None:
motor_names = self.joint_names
# Gather averaged readings from buffers
smoothed_vals = []
for name in motor_names:
buf = self.joint_buffer[name]
if len(buf) == 0:
# If no data has been read yet, fall back to last_d
smoothed_vals.append(self.last_d[name])
else:
# Otherwise, average over the existing buffer
smoothed_vals.append(np.mean(buf))
smoothed_vals = np.array(smoothed_vals, dtype=np.float32)
# Apply calibration if available
if self.calibration is not None:
if False:
for i, joint_name in enumerate(motor_names):
# Re-use the same raw_min / raw_max from the calibration
calib_idx = self.calibration["motor_names"].index(joint_name)
min_reading = self.calibration["start_pos"][calib_idx]
max_reading = self.calibration["end_pos"][calib_idx]
b_value = smoothed_vals[i]
print(joint_name)
if joint_name == "elbow_flex":
print("elbow")
try:
smoothed_vals[i] = int(
min_reading
+ (max_reading - min_reading)
* np.arcsin((b_value - min_reading) / (max_reading - min_reading))
/ (np.pi / 2)
)
except Exception:
print("not working")
print(smoothed_vals)
print("not working")
smoothed_vals = self.apply_calibration(smoothed_vals, motor_names)
return smoothed_vals
def read_kalman_filter(
self, Q: float = 1.0, R: float = 100.0, motor_names: list[str] | None = None
) -> np.ndarray:
"""
Return a Kalman-filtered reading for each requested joint.
We store a separate Kalman filter (x, P) per joint. For each new measurement Z:
1) Predict:
x_pred = x (assuming no motion model)
P_pred = P + Q
2) Update:
K = P_pred / (P_pred + R)
x = x_pred + K * (Z - x_pred)
P = (1 - K) * P_pred
:param Q: Process noise. Larger Q means the estimate can change more freely.
:param R: Measurement noise. Larger R means we trust our sensor less.
:param motor_names: If not specified, all joints are filtered.
:return: Kalman-filtered positions as a numpy array.
"""
if motor_names is None:
motor_names = self.joint_names
current_vals = np.array([self.last_d[name] for name in motor_names], dtype=np.float32)
filtered_vals = np.zeros_like(current_vals)
for i, name in enumerate(motor_names):
# Retrieve the filter state for this joint
x = self.kalman_state[name]["x"]
P = self.kalman_state[name]["P"]
Z = current_vals[i]
# If this is the first reading, initialize
if x is None or P is None:
x = Z
P = 1.0 # or some large initial uncertainty
# 1) Predict step
x_pred = x # no velocity model, so x_pred = x
P_pred = P + Q
# 2) Update step
K = P_pred / (P_pred + R) # Kalman gain
x_new = x_pred + K * (Z - x_pred) # new state estimate
P_new = (1 - K) * P_pred # new covariance
# Save back
self.kalman_state[name]["x"] = x_new
self.kalman_state[name]["P"] = P_new
filtered_vals[i] = x_new
if self.calibration is not None:
filtered_vals = self.apply_calibration(filtered_vals, motor_names)
return filtered_vals
def async_read(self):
"""
Continuously read from the serial buffer in its own thread,
store into `self.last_d` and also append to the rolling buffer (joint_buffer).
"""
while True:
if self.serial.in_waiting > 0:
self.serial.flush()
vals = self.serial.readline().decode("utf-8").strip()
vals = vals.split(" ")
if len(vals) != 7:
continue
try:
vals = [int(val) for val in vals] # remove last digit
except ValueError:
self.serial.flush()
vals = self.serial.readline().decode("utf-8").strip()
vals = vals.split(" ")
vals = [int(val) for val in vals]
d = {
"wrist_roll": vals[0],
"wrist_yaw": vals[1],
"wrist_pitch": vals[2],
"elbow_flex": vals[3],
"shoulder_roll": vals[4],
"shoulder_yaw": vals[5],
"shoulder_pitch": vals[6],
}
# Update the last_d dictionary
self.last_d = d
# Also push these new values into the rolling buffers
for joint_name, joint_val in d.items():
self.joint_buffer[joint_name].append(joint_val)
# Optional: short sleep to avoid busy-loop
# time.sleep(0.001)
def run_calibration(self, robot):
robot.arm_bus.write("Acceleration", 50)
n_joints = len(self.joint_names)
max_open_all = np.zeros(n_joints, dtype=np.float32)
min_open_all = np.zeros(n_joints, dtype=np.float32)
max_closed_all = np.zeros(n_joints, dtype=np.float32)
min_closed_all = np.zeros(n_joints, dtype=np.float32)
for i, jname in enumerate(self.joint_names):
print(f"\n--- Calibrating joint '{jname}' ---")
joint_idx = robot.arm_calib_dict["motor_names"].index(jname)
open_val = robot.arm_calib_dict["start_pos"][joint_idx]
print(f"Commanding {jname} to OPEN position {open_val}...")
robot.arm_bus.write("Goal_Position", [open_val], [jname])
input("Physically verify or adjust the joint. Press Enter when ready to capture...")
open_pos_list = []
for _ in range(100):
all_joints_vals = self.read() # read entire arm
open_pos_list.append(all_joints_vals[i]) # store only this joint
time.sleep(0.01)
# Convert to numpy and track min/max
open_array = np.array(open_pos_list, dtype=np.float32)
max_open_all[i] = open_array.max()
min_open_all[i] = open_array.min()
closed_val = robot.arm_calib_dict["end_pos"][joint_idx]
if jname == "elbow_flex":
closed_val = closed_val - 700
closed_val = robot.arm_calib_dict["end_pos"][joint_idx]
print(f"Commanding {jname} to CLOSED position {closed_val}...")
robot.arm_bus.write("Goal_Position", [closed_val], [jname])
input("Physically verify or adjust the joint. Press Enter when ready to capture...")
closed_pos_list = []
for _ in range(100):
all_joints_vals = self.read()
closed_pos_list.append(all_joints_vals[i])
time.sleep(0.01)
closed_array = np.array(closed_pos_list, dtype=np.float32)
# Some thresholding for closed positions
# closed_array[closed_array < 1000] = 60000
max_closed_all[i] = closed_array.max()
min_closed_all[i] = closed_array.min()
robot.arm_bus.write("Goal_Position", [int((closed_val + open_val) / 2)], [jname])
open_pos = np.maximum(max_open_all, max_closed_all)
closed_pos = np.minimum(min_open_all, min_closed_all)
for i, jname in enumerate(self.joint_names):
if jname not in ["wrist_pitch", "shoulder_pitch"]:
# Swap open/closed for these joints
tmp_pos = open_pos[i]
open_pos[i] = closed_pos[i]
closed_pos[i] = tmp_pos
# Debug prints
print("\nFinal open/closed arrays after any swaps/inversions:")
print(f"open_pos={open_pos}")
print(f"closed_pos={closed_pos}")
homing_offset = [0] * n_joints
drive_mode = [0] * n_joints
calib_modes = [CalibrationMode.LINEAR.name] * n_joints
calib_dict = {
"homing_offset": homing_offset,
"drive_mode": drive_mode,
"start_pos": open_pos,
"end_pos": closed_pos,
"calib_mode": calib_modes,
"motor_names": self.joint_names,
}
file_path = "examples/hopejr/settings/arm_calib.pkl"
if not os.path.exists(file_path):
with open(file_path, "wb") as f:
pickle.dump(calib_dict, f)
print(f"Dictionary saved to {file_path}")
self.set_calibration(calib_dict)
def set_calibration(self, calibration: dict[str, list]):
self.calibration = calibration
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""
Example calibration that linearly maps [start_pos, end_pos] to [0,100].
Extend or modify for your needs.
"""
if motor_names is None:
motor_names = self.joint_names
values = values.astype(np.float32)
for i, name in enumerate(motor_names):
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Rescale the present position to [0, 100]
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
# Check boundaries
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR):
# If you want to handle out-of-range differently:
# raise JointOutOfRangeError(msg)
msg = (
f"Wrong motor position range detected for {name}. "
f"Value = {values[i]} %, expected within [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}]"
)
print(msg)
return values

View File

@ -0,0 +1,348 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import pickle
import threading
from collections import deque
from enum import Enum
import numpy as np
import serial
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..teleoperator import Teleoperator
from .config_homonculus import HomonculusGloveConfig
logger = logging.getLogger(__name__)
LOWER_BOUND_LINEAR = -100
UPPER_BOUND_LINEAR = 200
class CalibrationMode(Enum):
# Joints with rotational motions are expressed in degrees in nominal range of [-180, 180]
DEGREE = 0
# Joints with linear motions (like gripper of Aloha) are expressed in nominal range of [0, 100]
LINEAR = 1
class HomonculusGlove(Teleoperator):
config_class = HomonculusGloveConfig
name = "homonculus_glove"
def __init__(self, config: HomonculusGloveConfig):
self.config = config
self.serial = serial.Serial(config.port, config.baud_rate, timeout=1)
self.buffer_size = config.buffer_size
self.joints = [
"thumb_0",
"thumb_1",
"thumb_2",
"thumb_3",
"index_0",
"index_1",
"index_2",
"middle_0",
"middle_1",
"middle_2",
"ring_0",
"ring_1",
"ring_2",
"pinky_0",
"pinky_1",
"pinky_2",
"battery_voltage", # TODO(aliberts): Should this be in joints?
]
# Initialize a buffer (deque) for each joint
self.joints_buffer = {joint: deque(maxlen=self.buffer_size) for joint in self.joints}
# Last read dictionary
self.last_d = {joint: 100 for joint in self.joints}
self.calibration = None
self.thread = threading.Thread(target=self.async_read, daemon=True)
@property
def action_feature(self) -> dict:
return {
"dtype": "float32",
"shape": (len(self.joints),),
"names": {"motors": self.joints},
}
@property
def feedback_feature(self) -> dict:
return {}
@property
def is_connected(self) -> bool:
return self.thread.is_alive() and self.serial.is_open
def connect(self) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.serial.open()
self.thread.start()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
raise NotImplementedError # TODO
def calibrate(self) -> None:
raise NotImplementedError # TODO
def configure(self) -> None:
raise NotImplementedError # TODO
def get_action(self) -> dict[str, float]:
raise NotImplementedError # TODO
def send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:
if not self.is_connected:
DeviceNotConnectedError(f"{self} is not connected.")
self.thread.join()
self.serial.close()
logger.info(f"{self} disconnected.")
### WIP below ###
@property
def joint_names(self):
return list(self.last_d.keys())
def read(self, motor_names: list[str] | None = None):
"""
Return the most recent (single) values from self.last_d,
optionally applying calibration.
"""
if motor_names is None:
motor_names = self.joint_names
# Get raw (last) values
values = np.array([self.last_d[k] for k in motor_names])
print(values)
# Apply calibration if available
if self.calibration is not None:
values = self.apply_calibration(values, motor_names)
print(values)
return values
def read_running_average(self, motor_names: list[str] | None = None, linearize=False):
"""
Return the AVERAGE of the most recent self.buffer_size (or fewer, if not enough data) readings
for each joint, optionally applying calibration.
"""
if motor_names is None:
motor_names = self.joint_names
# Gather averaged readings from buffers
smoothed_vals = []
for name in motor_names:
buf = self.joint_buffer[name]
if len(buf) == 0:
# If no data has been read yet, fall back to last_d
smoothed_vals.append(self.last_d[name])
else:
# Otherwise, average over the existing buffer
smoothed_vals.append(np.mean(buf))
smoothed_vals = np.array(smoothed_vals, dtype=np.float32)
# Apply calibration if available
if self.calibration is not None:
smoothed_vals = self.apply_calibration(smoothed_vals, motor_names)
return smoothed_vals
def async_read(self):
"""
Continuously read from the serial buffer in its own thread,
store into `self.last_d` and also append to the rolling buffer (joint_buffer).
"""
while True:
if self.serial.in_waiting > 0:
self.serial.flush()
vals = self.serial.readline().decode("utf-8").strip()
vals = vals.split(" ")
if len(vals) != 17:
continue
vals = [int(val) for val in vals]
d = {
"thumb_0": vals[0],
"thumb_1": vals[1],
"thumb_2": vals[2],
"thumb_3": vals[3],
"index_0": vals[4],
"index_1": vals[5],
"index_2": vals[6],
"middle_0": vals[7],
"middle_1": vals[8],
"middle_2": vals[9],
"ring_0": vals[10],
"ring_1": vals[11],
"ring_2": vals[12],
"pinky_0": vals[13],
"pinky_1": vals[14],
"pinky_2": vals[15],
"battery_voltage": vals[16],
}
# Update the last_d dictionary
self.last_d = d
# Also push these new values into the rolling buffers
for joint_name, joint_val in d.items():
self.joint_buffer[joint_name].append(joint_val)
def run_calibration(self):
print("\nMove arm to open position")
input("Press Enter to continue...")
open_pos_list = []
for _ in range(100):
open_pos = self.read()
open_pos_list.append(open_pos)
time.sleep(0.01)
open_pos = np.array(open_pos_list)
max_open_pos = open_pos.max(axis=0)
min_open_pos = open_pos.min(axis=0)
print(f"{max_open_pos=}")
print(f"{min_open_pos=}")
print("\nMove arm to closed position")
input("Press Enter to continue...")
closed_pos_list = []
for _ in range(100):
closed_pos = self.read()
closed_pos_list.append(closed_pos)
time.sleep(0.01)
closed_pos = np.array(closed_pos_list)
max_closed_pos = closed_pos.max(axis=0)
closed_pos[closed_pos < 1000] = 60000
min_closed_pos = closed_pos.min(axis=0)
print(f"{max_closed_pos=}")
print(f"{min_closed_pos=}")
open_pos = np.array([max_open_pos, max_closed_pos]).max(axis=0)
closed_pos = np.array([min_open_pos, min_closed_pos]).min(axis=0)
# INVERSION
for i, jname in enumerate(self.joint_names):
if jname in [
"thumb_0",
"thumb_3",
"index_2",
"middle_2",
"ring_2",
"pinky_2",
"index_0",
]:
tmp_pos = open_pos[i]
open_pos[i] = closed_pos[i]
closed_pos[i] = tmp_pos
print()
print(f"{open_pos=}")
print(f"{closed_pos=}")
homing_offset = [0] * len(self.joint_names)
drive_mode = [0] * len(self.joint_names)
calib_modes = [CalibrationMode.LINEAR.name] * len(self.joint_names)
calib_dict = {
"homing_offset": homing_offset,
"drive_mode": drive_mode,
"start_pos": open_pos,
"end_pos": closed_pos,
"calib_mode": calib_modes,
"motor_names": self.joint_names,
}
file_path = "examples/hopejr/settings/hand_calib.pkl"
if not os.path.exists(file_path):
with open(file_path, "wb") as f:
pickle.dump(calib_dict, f)
print(f"Dictionary saved to {file_path}")
# return calib_dict
self.set_calibration(calib_dict)
def set_calibration(self, calibration: dict[str, list]):
self.calibration = calibration
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with
a "zero position" at 0 degree.
Note: We say "nominal degree range" since the motors can take values outside this range. For instance, 190 degrees, if the motor
rotate more than a half a turn from the zero position. However, most motors can't rotate more than 180 degrees and will stay in this range.
Joints values are original in [0, 2**32[ (unsigned int32). Each motor are expected to complete a full rotation
when given a goal position that is + or - their resolution. For instance, feetech xl330-m077 have a resolution of 4096, and
at any position in their original range, let's say the position 56734, they complete a full rotation clockwise by moving to 60830,
or anticlockwise by moving to 52638. The position in the original range is arbitrary and might change a lot between each motor.
To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work
in the centered nominal degree range ]-180, 180[.
"""
if motor_names is None:
motor_names = self.motor_names
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range
values = values.astype(np.float32)
for i, name in enumerate(motor_names):
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Rescale the present position to a nominal range [0, 100] %,
# useful for joints with linear motions like Aloha gripper
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR):
if name == "pinky_1" and (values[i] < LOWER_BOUND_LINEAR):
values[i] = end_pos
else:
msg = (
f"Wrong motor position range detected for {name}. "
f"Expected to be in nominal range of [0, 100] % (a full linear translation), "
f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, "
f"but present value is {values[i]} %. "
"This might be due to a cable connection issue creating an artificial jump in motor values. "
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
)
print(msg)
# raise JointOutOfRangeError(msg)
return values