From 58d2ac0c0b323dcb034e2f60f0ba31ec9028579a Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Thu, 3 Apr 2025 20:15:37 +0200 Subject: [PATCH 1/7] (WIP) Add Hope Jr --- lerobot/common/robots/hope_jr/__init__.py | 3 + .../common/robots/hope_jr/config_hope_jr.py | 46 ++++ lerobot/common/robots/hope_jr/hope_jr_arm.py | 194 +++++++++++++++++ lerobot/common/robots/hope_jr/hope_jr_hand.py | 198 ++++++++++++++++++ 4 files changed, 441 insertions(+) create mode 100644 lerobot/common/robots/hope_jr/__init__.py create mode 100644 lerobot/common/robots/hope_jr/config_hope_jr.py create mode 100644 lerobot/common/robots/hope_jr/hope_jr_arm.py create mode 100644 lerobot/common/robots/hope_jr/hope_jr_hand.py diff --git a/lerobot/common/robots/hope_jr/__init__.py b/lerobot/common/robots/hope_jr/__init__.py new file mode 100644 index 00000000..324e3c8e --- /dev/null +++ b/lerobot/common/robots/hope_jr/__init__.py @@ -0,0 +1,3 @@ +from .config_hope_jr import HopeJrArmConfig, HopeJrHandConfig +from .hope_jr_arm import HopeJrArm +from .hope_jr_hand import HopeJrHand diff --git a/lerobot/common/robots/hope_jr/config_hope_jr.py b/lerobot/common/robots/hope_jr/config_hope_jr.py new file mode 100644 index 00000000..5e4df7aa --- /dev/null +++ b/lerobot/common/robots/hope_jr/config_hope_jr.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field + +from lerobot.common.cameras import CameraConfig + +from ..config import RobotConfig + + +@RobotConfig.register_subclass("hope_jr_hand") +@dataclass +class HopeJrHandConfig(RobotConfig): + port: str # Port to connect to the hand + + disable_torque_on_disconnect: bool = True + + cameras: dict[str, CameraConfig] = field(default_factory=dict) + + +@RobotConfig.register_subclass("hope_jr_arm") +@dataclass +class HopeJrArmConfig(RobotConfig): + port: str # Port to connect to the hand + + disable_torque_on_disconnect: bool = True + + # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. + # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as + # the number of motors in your follower arms. + max_relative_target: int | None = None + + cameras: dict[str, CameraConfig] = field(default_factory=dict) diff --git a/lerobot/common/robots/hope_jr/hope_jr_arm.py b/lerobot/common/robots/hope_jr/hope_jr_arm.py new file mode 100644 index 00000000..9107de8f --- /dev/null +++ b/lerobot/common/robots/hope_jr/hope_jr_arm.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from typing import Any + +from lerobot.common.cameras.utils import make_cameras_from_configs +from lerobot.common.constants import OBS_IMAGES, OBS_STATE +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.common.motors.feetech import ( + FeetechMotorsBus, + OperatingMode, +) + +from ..robot import Robot +from ..utils import ensure_safe_goal_position +from .config_hope_jr import HopeJrArmConfig + +logger = logging.getLogger(__name__) + + +class HopeJrArm(Robot): + config_class = HopeJrArmConfig + name = "hope_jr_arm" + + def __init__(self, config: HopeJrArmConfig): + super().__init__(config) + self.config = config + self.arm = FeetechMotorsBus( + port=self.config.port, + motors={ + "shoulder_pitch": Motor(1, "sm8512bl", MotorNormMode.RANGE_M100_100), + "shoulder_yaw": Motor(2, "sts3250", MotorNormMode.RANGE_M100_100), + "shoulder_roll": Motor(3, "sts3250", MotorNormMode.RANGE_M100_100), + "elbow_flex": Motor(4, "sts3250", MotorNormMode.RANGE_M100_100), + "wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100), + "wrist_yaw": Motor(6, "sts3215", MotorNormMode.RANGE_M100_100), + "wrist_pitch": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100), + }, + ) + self.cameras = make_cameras_from_configs(config.cameras) + + @property + def state_feature(self) -> dict: + return { + "dtype": "float32", + "shape": (len(self.arm),), + "names": {"motors": list(self.arm.motors)}, + } + + @property + def action_feature(self) -> dict: + return self.state_feature + + @property + def camera_features(self) -> dict[str, dict]: + cam_ft = {} + for cam_key, cam in self.cameras.items(): + cam_ft[cam_key] = { + "shape": (cam.height, cam.width, cam.channels), + "names": ["height", "width", "channels"], + "info": None, + } + return cam_ft + + @property + def is_connected(self) -> bool: + # TODO(aliberts): add cam.is_connected for cam in self.cameras + return self.arm.is_connected + + def connect(self) -> None: + """ + We assume that at connection time, arm is in a rest position, + and torque can be safely disabled to run calibration. + """ + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.arm.connect() + if not self.is_calibrated: + self.calibrate() + + # Connect the cameras + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.arm.is_calibrated + + def calibrate(self) -> None: + raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch) + logger.info(f"\nRunning calibration of {self}") + self.arm.disable_torque() + for name in self.arm.names: + self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value) + + input("Move robot to the middle of its range of motion and press ENTER....") + homing_offsets = self.arm.set_half_turn_homings() + + full_turn_motor = "wrist_roll" + unknown_range_motors = [name for name in self.arm.names if name != full_turn_motor] + logger.info( + f"Move all joints except '{full_turn_motor}' sequentially through their " + "entire ranges of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins[full_turn_motor] = 0 + range_maxes[full_turn_motor] = 4095 + + self.calibration = {} + for name, motor in self.arm.motors.items(): + self.calibration[name] = MotorCalibration( + id=motor.id, + drive_mode=0, + homing_offset=homing_offsets[name], + range_min=range_mins[name], + range_max=range_maxes[name], + ) + + self.arm.write_calibration(self.calibration) + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self) -> None: + self.arm.disable_torque() + self.arm.configure_motors() + # TODO + self.arm.enable_torque() + + def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + obs_dict = {} + + # Read arm position + start = time.perf_counter() + obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + goal_pos = action + + # Cap goal position when too far away from present position. + # /!\ Slower fps expected due to reading from the follower. + if self.config.max_relative_target is not None: + present_pos = self.arm.sync_read("Present_Position") + goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in goal_pos.items()} + goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) + + self.arm.sync_write("Goal_Position", goal_pos) + return goal_pos + + def disconnect(self): + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.arm.disconnect(self.config.disable_torque_on_disconnect) + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/robots/hope_jr/hope_jr_hand.py b/lerobot/common/robots/hope_jr/hope_jr_hand.py new file mode 100644 index 00000000..704af9a6 --- /dev/null +++ b/lerobot/common/robots/hope_jr/hope_jr_hand.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from typing import Any + +from lerobot.common.cameras.utils import make_cameras_from_configs +from lerobot.common.constants import OBS_IMAGES, OBS_STATE +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.common.motors.feetech import ( + FeetechMotorsBus, + OperatingMode, +) + +from ..robot import Robot +from .config_hope_jr import HopeJrHandConfig + +logger = logging.getLogger(__name__) + + +class HopeJrHand(Robot): + config_class = HopeJrHandConfig + name = "hope_jr_hand" + + def __init__(self, config: HopeJrHandConfig): + super().__init__(config) + self.config = config + self.arm = FeetechMotorsBus( + port=self.config.port, + motors={ + # Thumb + "thumb_basel_rotation": Motor(1, "scs0009", MotorNormMode.RANGE_M100_100), + "thumb_mcp": Motor(2, "scs0009", MotorNormMode.RANGE_M100_100), + "thumb_pip": Motor(3, "scs0009", MotorNormMode.RANGE_M100_100), + "thumb_dip": Motor(4, "scs0009", MotorNormMode.RANGE_M100_100), + # Index + "index_thumb_side": Motor(5, "scs0009", MotorNormMode.RANGE_M100_100), + "index_pinky_side": Motor(6, "scs0009", MotorNormMode.RANGE_M100_100), + "index_flexor": Motor(7, "scs0009", MotorNormMode.RANGE_M100_100), + # Middle + "middle_thumb_side": Motor(8, "scs0009", MotorNormMode.RANGE_M100_100), + "middle_pinky_side": Motor(9, "scs0009", MotorNormMode.RANGE_M100_100), + "middle_flexor": Motor(10, "scs0009", MotorNormMode.RANGE_M100_100), + # Ring + "ring_thumb_side": Motor(11, "scs0009", MotorNormMode.RANGE_M100_100), + "ring_pinky_side": Motor(12, "scs0009", MotorNormMode.RANGE_M100_100), + "ring_flexor": Motor(13, "scs0009", MotorNormMode.RANGE_M100_100), + # Pinky + "pinky_thumb_side": Motor(14, "scs0009", MotorNormMode.RANGE_M100_100), + "pinky_pinky_side": Motor(15, "scs0009", MotorNormMode.RANGE_M100_100), + "pinky_flexor": Motor(16, "scs0009", MotorNormMode.RANGE_M100_100), + }, + ) + self.cameras = make_cameras_from_configs(config.cameras) + + @property + def state_feature(self) -> dict: + return { + "dtype": "float32", + "shape": (len(self.arm),), + "names": {"motors": list(self.arm.motors)}, + } + + @property + def action_feature(self) -> dict: + return self.state_feature + + @property + def camera_features(self) -> dict[str, dict]: + cam_ft = {} + for cam_key, cam in self.cameras.items(): + cam_ft[cam_key] = { + "shape": (cam.height, cam.width, cam.channels), + "names": ["height", "width", "channels"], + "info": None, + } + return cam_ft + + @property + def is_connected(self) -> bool: + # TODO(aliberts): add cam.is_connected for cam in self.cameras + return self.arm.is_connected + + def connect(self) -> None: + """ + We assume that at connection time, arm is in a rest position, + and torque can be safely disabled to run calibration. + """ + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.arm.connect() + if not self.is_calibrated: + self.calibrate() + + # Connect the cameras + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.arm.is_calibrated + + def calibrate(self) -> None: + raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch) + logger.info(f"\nRunning calibration of {self}") + self.arm.disable_torque() + for name in self.arm.names: + self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value) + + input("Move robot to the middle of its range of motion and press ENTER....") + homing_offsets = self.arm.set_half_turn_homings() + + full_turn_motor = "wrist_roll" + unknown_range_motors = [name for name in self.arm.names if name != full_turn_motor] + logger.info( + f"Move all joints except '{full_turn_motor}' sequentially through their " + "entire ranges of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins[full_turn_motor] = 0 + range_maxes[full_turn_motor] = 4095 + + self.calibration = {} + for name, motor in self.arm.motors.items(): + self.calibration[name] = MotorCalibration( + id=motor.id, + drive_mode=0, + homing_offset=homing_offsets[name], + range_min=range_mins[name], + range_max=range_maxes[name], + ) + + self.arm.write_calibration(self.calibration) + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self) -> None: + self.arm.disable_torque() + self.arm.configure_motors() + # TODO + self.arm.enable_torque() + + def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + obs_dict = {} + + # Read arm position + start = time.perf_counter() + obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.arm.sync_write("Goal_Position", action) + return action + + def disconnect(self): + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.arm.disconnect(self.config.disable_torque_on_disconnect) + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") From c5ab08e17573df7a9e0bce20aeaacb75270581bd Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Thu, 3 Apr 2025 20:20:57 +0200 Subject: [PATCH 2/7] Rename arm -> hand --- lerobot/common/robots/hope_jr/hope_jr_hand.py | 46 +++++++++---------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/lerobot/common/robots/hope_jr/hope_jr_hand.py b/lerobot/common/robots/hope_jr/hope_jr_hand.py index 704af9a6..283ec382 100644 --- a/lerobot/common/robots/hope_jr/hope_jr_hand.py +++ b/lerobot/common/robots/hope_jr/hope_jr_hand.py @@ -40,7 +40,7 @@ class HopeJrHand(Robot): def __init__(self, config: HopeJrHandConfig): super().__init__(config) self.config = config - self.arm = FeetechMotorsBus( + self.hand = FeetechMotorsBus( port=self.config.port, motors={ # Thumb @@ -72,8 +72,8 @@ class HopeJrHand(Robot): def state_feature(self) -> dict: return { "dtype": "float32", - "shape": (len(self.arm),), - "names": {"motors": list(self.arm.motors)}, + "shape": (len(self.hand),), + "names": {"motors": list(self.hand.motors)}, } @property @@ -94,17 +94,13 @@ class HopeJrHand(Robot): @property def is_connected(self) -> bool: # TODO(aliberts): add cam.is_connected for cam in self.cameras - return self.arm.is_connected + return self.hand.is_connected def connect(self) -> None: - """ - We assume that at connection time, arm is in a rest position, - and torque can be safely disabled to run calibration. - """ if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - self.arm.connect() + self.hand.connect() if not self.is_calibrated: self.calibrate() @@ -117,30 +113,30 @@ class HopeJrHand(Robot): @property def is_calibrated(self) -> bool: - return self.arm.is_calibrated + return self.hand.is_calibrated def calibrate(self) -> None: raise NotImplementedError # TODO(aliberts): adapt code below (copied from koch) logger.info(f"\nRunning calibration of {self}") - self.arm.disable_torque() - for name in self.arm.names: - self.arm.write("Operating_Mode", name, OperatingMode.POSITION.value) + self.hand.disable_torque() + for name in self.hand.names: + self.hand.write("Operating_Mode", name, OperatingMode.POSITION.value) input("Move robot to the middle of its range of motion and press ENTER....") - homing_offsets = self.arm.set_half_turn_homings() + homing_offsets = self.hand.set_half_turn_homings() full_turn_motor = "wrist_roll" - unknown_range_motors = [name for name in self.arm.names if name != full_turn_motor] + unknown_range_motors = [name for name in self.hand.names if name != full_turn_motor] logger.info( f"Move all joints except '{full_turn_motor}' sequentially through their " "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) - range_mins, range_maxes = self.arm.record_ranges_of_motion(unknown_range_motors) + range_mins, range_maxes = self.hand.record_ranges_of_motion(unknown_range_motors) range_mins[full_turn_motor] = 0 range_maxes[full_turn_motor] = 4095 self.calibration = {} - for name, motor in self.arm.motors.items(): + for name, motor in self.hand.motors.items(): self.calibration[name] = MotorCalibration( id=motor.id, drive_mode=0, @@ -149,15 +145,15 @@ class HopeJrHand(Robot): range_max=range_maxes[name], ) - self.arm.write_calibration(self.calibration) + self.hand.write_calibration(self.calibration) self._save_calibration() print("Calibration saved to", self.calibration_fpath) def configure(self) -> None: - self.arm.disable_torque() - self.arm.configure_motors() + self.hand.disable_torque() + self.hand.configure_motors() # TODO - self.arm.enable_torque() + self.hand.enable_torque() def get_observation(self) -> dict[str, Any]: if not self.is_connected: @@ -165,9 +161,9 @@ class HopeJrHand(Robot): obs_dict = {} - # Read arm position + # Read hand position start = time.perf_counter() - obs_dict[OBS_STATE] = self.arm.sync_read("Present_Position") + obs_dict[OBS_STATE] = self.hand.sync_read("Present_Position") dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") @@ -184,14 +180,14 @@ class HopeJrHand(Robot): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - self.arm.sync_write("Goal_Position", action) + self.hand.sync_write("Goal_Position", action) return action def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - self.arm.disconnect(self.config.disable_torque_on_disconnect) + self.hand.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() From c81f293ae1ff0e7f07ee4abef3670799925ff754 Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Fri, 4 Apr 2025 12:57:27 +0200 Subject: [PATCH 3/7] (WIP) Add homonculus arm & glove --- .../teleoperators/homonculus/__init__.py | 3 + .../homonculus/config_homonculus.py | 35 ++ .../homonculus/homonculus_arm.py | 419 ++++++++++++++++++ .../homonculus/homonculus_glove.py | 348 +++++++++++++++ 4 files changed, 805 insertions(+) create mode 100644 lerobot/common/teleoperators/homonculus/__init__.py create mode 100644 lerobot/common/teleoperators/homonculus/config_homonculus.py create mode 100644 lerobot/common/teleoperators/homonculus/homonculus_arm.py create mode 100644 lerobot/common/teleoperators/homonculus/homonculus_glove.py diff --git a/lerobot/common/teleoperators/homonculus/__init__.py b/lerobot/common/teleoperators/homonculus/__init__.py new file mode 100644 index 00000000..2749104d --- /dev/null +++ b/lerobot/common/teleoperators/homonculus/__init__.py @@ -0,0 +1,3 @@ +from .config_homonculus import HomonculusArmConfig, HomonculusGloveConfig +from .homonculus_arm import HomonculusArm +from .homonculus_glove import HomonculusGlove diff --git a/lerobot/common/teleoperators/homonculus/config_homonculus.py b/lerobot/common/teleoperators/homonculus/config_homonculus.py new file mode 100644 index 00000000..55c0e1ed --- /dev/null +++ b/lerobot/common/teleoperators/homonculus/config_homonculus.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from ..config import TeleoperatorConfig + + +@TeleoperatorConfig.register_subclass("homonculus_glove") +@dataclass +class HomonculusGloveConfig(TeleoperatorConfig): + port: str # Port to connect to the glove + baud_rate: int = 115_200 + buffer_size: int = 10 # Number of past values to keep in memory + + +@TeleoperatorConfig.register_subclass("homonculus_arm") +@dataclass +class HomonculusArmConfig(TeleoperatorConfig): + port: str # Port to connect to the arm + baud_rate: int = 115_200 + buffer_size: int = 10 # Number of past values to keep in memory diff --git a/lerobot/common/teleoperators/homonculus/homonculus_arm.py b/lerobot/common/teleoperators/homonculus/homonculus_arm.py new file mode 100644 index 00000000..a75b5924 --- /dev/null +++ b/lerobot/common/teleoperators/homonculus/homonculus_arm.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import pickle +import threading +from collections import deque +from enum import Enum + +import numpy as np +import serial + +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError + +from ..teleoperator import Teleoperator +from .config_homonculus import HomonculusArmConfig + +logger = logging.getLogger(__name__) + +LOWER_BOUND_LINEAR = -100 +UPPER_BOUND_LINEAR = 200 + + +class CalibrationMode(Enum): + # Joints with rotational motions are expressed in degrees in nominal range of [-180, 180] + DEGREE = 0 + # Joints with linear motions (like gripper of Aloha) are expressed in nominal range of [0, 100] + LINEAR = 1 + + +class HomonculusArm(Teleoperator): + config_class = HomonculusArmConfig + name = "homonculus_arml" + + def __init__(self, config: HomonculusArmConfig): + self.config = config + self.serial = serial.Serial(config.port, config.baud_rate, timeout=1) + self.buffer_size = config.buffer_size + + self.joints = [ + "wrist_roll", + "wrist_pitch", + "wrist_yaw", + "elbow_flex", + "shoulder_roll", + "shoulder_yaw", + "shoulder_pitch", + ] + # Initialize a buffer (deque) for each joint + self.joints_buffer = {joint: deque(maxlen=self.buffer_size) for joint in self.joints} + + # Last read dictionary + self.last_d = {joint: 100 for joint in self.joints} + + # For adaptive EMA, we store a "previous smoothed" state per joint + self.adaptive_ema_state = {joint: None for joint in self.joints} + self.kalman_state = {joint: {"x": None, "P": None} for joint in self.joints} + + self.calibration = None + self.thread = threading.Thread(target=self.async_read, daemon=True) + + @property + def action_feature(self) -> dict: + return { + "dtype": "float32", + "shape": (len(self.joints),), + "names": {"motors": self.joints}, + } + + @property + def feedback_feature(self) -> dict: + return {} + + @property + def is_connected(self) -> bool: + return self.thread.is_alive() and self.serial.is_open + + def connect(self) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.serial.open() + self.thread.start() + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + raise NotImplementedError # TODO + + def calibrate(self) -> None: + raise NotImplementedError # TODO + + def configure(self) -> None: + raise NotImplementedError # TODO + + def get_action(self) -> dict[str, float]: + raise NotImplementedError # TODO + + def send_feedback(self, feedback: dict[str, float]) -> None: + raise NotImplementedError + + def disconnect(self) -> None: + if not self.is_connected: + DeviceNotConnectedError(f"{self} is not connected.") + + self.thread.join() + self.serial.close() + logger.info(f"{self} disconnected.") + + ### WIP below ### + + @property + def joint_names(self): + return list(self.last_d.keys()) + + def read(self, motor_names: list[str] | None = None): + """ + Return the most recent (single) values from self.last_d, + optionally applying calibration. + """ + if motor_names is None: + motor_names = self.joint_names + + # Get raw (last) values + values = np.array([self.last_d[k] for k in motor_names]) + + # print(motor_names) + print(values) + + # Apply calibration if available + if self.calibration is not None: + values = self.apply_calibration(values, motor_names) + print(values) + return values + + def read_running_average(self, motor_names: list[str] | None = None, linearize=False): + """ + Return the AVERAGE of the most recent self.buffer_size (or fewer, if not enough data) readings + for each joint, optionally applying calibration. + """ + if motor_names is None: + motor_names = self.joint_names + + # Gather averaged readings from buffers + smoothed_vals = [] + for name in motor_names: + buf = self.joint_buffer[name] + if len(buf) == 0: + # If no data has been read yet, fall back to last_d + smoothed_vals.append(self.last_d[name]) + else: + # Otherwise, average over the existing buffer + smoothed_vals.append(np.mean(buf)) + + smoothed_vals = np.array(smoothed_vals, dtype=np.float32) + + # Apply calibration if available + if self.calibration is not None: + if False: + for i, joint_name in enumerate(motor_names): + # Re-use the same raw_min / raw_max from the calibration + calib_idx = self.calibration["motor_names"].index(joint_name) + min_reading = self.calibration["start_pos"][calib_idx] + max_reading = self.calibration["end_pos"][calib_idx] + + b_value = smoothed_vals[i] + print(joint_name) + if joint_name == "elbow_flex": + print("elbow") + try: + smoothed_vals[i] = int( + min_reading + + (max_reading - min_reading) + * np.arcsin((b_value - min_reading) / (max_reading - min_reading)) + / (np.pi / 2) + ) + except Exception as e: + print("not working") + print(smoothed_vals) + print("not working") + smoothed_vals = self.apply_calibration(smoothed_vals, motor_names) + return smoothed_vals + + def read_kalman_filter( + self, Q: float = 1.0, R: float = 100.0, motor_names: list[str] | None = None + ) -> np.ndarray: + """ + Return a Kalman-filtered reading for each requested joint. + + We store a separate Kalman filter (x, P) per joint. For each new measurement Z: + 1) Predict: + x_pred = x (assuming no motion model) + P_pred = P + Q + 2) Update: + K = P_pred / (P_pred + R) + x = x_pred + K * (Z - x_pred) + P = (1 - K) * P_pred + + :param Q: Process noise. Larger Q means the estimate can change more freely. + :param R: Measurement noise. Larger R means we trust our sensor less. + :param motor_names: If not specified, all joints are filtered. + :return: Kalman-filtered positions as a numpy array. + """ + if motor_names is None: + motor_names = self.joint_names + + current_vals = np.array([self.last_d[name] for name in motor_names], dtype=np.float32) + filtered_vals = np.zeros_like(current_vals) + + for i, name in enumerate(motor_names): + # Retrieve the filter state for this joint + x = self.kalman_state[name]["x"] + P = self.kalman_state[name]["P"] + Z = current_vals[i] + + # If this is the first reading, initialize + if x is None or P is None: + x = Z + P = 1.0 # or some large initial uncertainty + + # 1) Predict step + x_pred = x # no velocity model, so x_pred = x + P_pred = P + Q + + # 2) Update step + K = P_pred / (P_pred + R) # Kalman gain + x_new = x_pred + K * (Z - x_pred) # new state estimate + P_new = (1 - K) * P_pred # new covariance + + # Save back + self.kalman_state[name]["x"] = x_new + self.kalman_state[name]["P"] = P_new + + filtered_vals[i] = x_new + + if self.calibration is not None: + filtered_vals = self.apply_calibration(filtered_vals, motor_names) + + return filtered_vals + + def async_read(self): + """ + Continuously read from the serial buffer in its own thread, + store into `self.last_d` and also append to the rolling buffer (joint_buffer). + """ + while True: + if self.serial.in_waiting > 0: + self.serial.flush() + vals = self.serial.readline().decode("utf-8").strip() + vals = vals.split(" ") + + if len(vals) != 7: + continue + try: + vals = [int(val) for val in vals] # remove last digit + except ValueError: + self.serial.flush() + vals = self.serial.readline().decode("utf-8").strip() + vals = vals.split(" ") + vals = [int(val) for val in vals] + d = { + "wrist_roll": vals[0], + "wrist_yaw": vals[1], + "wrist_pitch": vals[2], + "elbow_flex": vals[3], + "shoulder_roll": vals[4], + "shoulder_yaw": vals[5], + "shoulder_pitch": vals[6], + } + + # Update the last_d dictionary + self.last_d = d + + # Also push these new values into the rolling buffers + for joint_name, joint_val in d.items(): + self.joint_buffer[joint_name].append(joint_val) + + # Optional: short sleep to avoid busy-loop + # time.sleep(0.001) + + def run_calibration(self, robot): + robot.arm_bus.write("Acceleration", 50) + n_joints = len(self.joint_names) + + max_open_all = np.zeros(n_joints, dtype=np.float32) + min_open_all = np.zeros(n_joints, dtype=np.float32) + max_closed_all = np.zeros(n_joints, dtype=np.float32) + min_closed_all = np.zeros(n_joints, dtype=np.float32) + + for i, jname in enumerate(self.joint_names): + print(f"\n--- Calibrating joint '{jname}' ---") + + joint_idx = robot.arm_calib_dict["motor_names"].index(jname) + open_val = robot.arm_calib_dict["start_pos"][joint_idx] + print(f"Commanding {jname} to OPEN position {open_val}...") + robot.arm_bus.write("Goal_Position", [open_val], [jname]) + + input("Physically verify or adjust the joint. Press Enter when ready to capture...") + + open_pos_list = [] + for _ in range(100): + all_joints_vals = self.read() # read entire arm + open_pos_list.append(all_joints_vals[i]) # store only this joint + time.sleep(0.01) + + # Convert to numpy and track min/max + open_array = np.array(open_pos_list, dtype=np.float32) + max_open_all[i] = open_array.max() + min_open_all[i] = open_array.min() + closed_val = robot.arm_calib_dict["end_pos"][joint_idx] + if jname == "elbow_flex": + closed_val = closed_val - 700 + closed_val = robot.arm_calib_dict["end_pos"][joint_idx] + print(f"Commanding {jname} to CLOSED position {closed_val}...") + robot.arm_bus.write("Goal_Position", [closed_val], [jname]) + + input("Physically verify or adjust the joint. Press Enter when ready to capture...") + + closed_pos_list = [] + for _ in range(100): + all_joints_vals = self.read() + closed_pos_list.append(all_joints_vals[i]) + time.sleep(0.01) + + closed_array = np.array(closed_pos_list, dtype=np.float32) + # Some thresholding for closed positions + # closed_array[closed_array < 1000] = 60000 + + max_closed_all[i] = closed_array.max() + min_closed_all[i] = closed_array.min() + + robot.arm_bus.write("Goal_Position", [int((closed_val + open_val) / 2)], [jname]) + + open_pos = np.maximum(max_open_all, max_closed_all) + closed_pos = np.minimum(min_open_all, min_closed_all) + + for i, jname in enumerate(self.joint_names): + if jname not in ["wrist_pitch", "shoulder_pitch"]: + # Swap open/closed for these joints + tmp_pos = open_pos[i] + open_pos[i] = closed_pos[i] + closed_pos[i] = tmp_pos + + # Debug prints + print("\nFinal open/closed arrays after any swaps/inversions:") + print(f"open_pos={open_pos}") + print(f"closed_pos={closed_pos}") + + homing_offset = [0] * n_joints + drive_mode = [0] * n_joints + calib_modes = [CalibrationMode.LINEAR.name] * n_joints + + calib_dict = { + "homing_offset": homing_offset, + "drive_mode": drive_mode, + "start_pos": open_pos, + "end_pos": closed_pos, + "calib_mode": calib_modes, + "motor_names": self.joint_names, + } + file_path = "examples/hopejr/settings/arm_calib.pkl" + + if not os.path.exists(file_path): + with open(file_path, "wb") as f: + pickle.dump(calib_dict, f) + print(f"Dictionary saved to {file_path}") + + self.set_calibration(calib_dict) + + def set_calibration(self, calibration: dict[str, list]): + self.calibration = calibration + + def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): + """ + Example calibration that linearly maps [start_pos, end_pos] to [0,100]. + Extend or modify for your needs. + """ + if motor_names is None: + motor_names = self.joint_names + + values = values.astype(np.float32) + + for i, name in enumerate(motor_names): + calib_idx = self.calibration["motor_names"].index(name) + calib_mode = self.calibration["calib_mode"][calib_idx] + + if CalibrationMode[calib_mode] == CalibrationMode.LINEAR: + start_pos = self.calibration["start_pos"][calib_idx] + end_pos = self.calibration["end_pos"][calib_idx] + + # Rescale the present position to [0, 100] + values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100 + + # Check boundaries + if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR): + # If you want to handle out-of-range differently: + # raise JointOutOfRangeError(msg) + msg = ( + f"Wrong motor position range detected for {name}. " + f"Value = {values[i]} %, expected within [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}]" + ) + print(msg) + + return values diff --git a/lerobot/common/teleoperators/homonculus/homonculus_glove.py b/lerobot/common/teleoperators/homonculus/homonculus_glove.py new file mode 100644 index 00000000..2ba2f712 --- /dev/null +++ b/lerobot/common/teleoperators/homonculus/homonculus_glove.py @@ -0,0 +1,348 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import pickle +import threading +from collections import deque +from enum import Enum + +import numpy as np +import serial + +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError + +from ..teleoperator import Teleoperator +from .config_homonculus import HomonculusGloveConfig + +logger = logging.getLogger(__name__) + +LOWER_BOUND_LINEAR = -100 +UPPER_BOUND_LINEAR = 200 + + +class CalibrationMode(Enum): + # Joints with rotational motions are expressed in degrees in nominal range of [-180, 180] + DEGREE = 0 + # Joints with linear motions (like gripper of Aloha) are expressed in nominal range of [0, 100] + LINEAR = 1 + + +class HomonculusGlove(Teleoperator): + config_class = HomonculusGloveConfig + name = "homonculus_glove" + + def __init__(self, config: HomonculusGloveConfig): + self.config = config + self.serial = serial.Serial(config.port, config.baud_rate, timeout=1) + self.buffer_size = config.buffer_size + + self.joints = [ + "thumb_0", + "thumb_1", + "thumb_2", + "thumb_3", + "index_0", + "index_1", + "index_2", + "middle_0", + "middle_1", + "middle_2", + "ring_0", + "ring_1", + "ring_2", + "pinky_0", + "pinky_1", + "pinky_2", + "battery_voltage", # TODO(aliberts): Should this be in joints? + ] + # Initialize a buffer (deque) for each joint + self.joints_buffer = {joint: deque(maxlen=self.buffer_size) for joint in self.joints} + # Last read dictionary + self.last_d = {joint: 100 for joint in self.joints} + + self.calibration = None + self.thread = threading.Thread(target=self.async_read, daemon=True) + + @property + def action_feature(self) -> dict: + return { + "dtype": "float32", + "shape": (len(self.joints),), + "names": {"motors": self.joints}, + } + + @property + def feedback_feature(self) -> dict: + return {} + + @property + def is_connected(self) -> bool: + return self.thread.is_alive() and self.serial.is_open + + def connect(self) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.serial.open() + self.thread.start() + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + raise NotImplementedError # TODO + + def calibrate(self) -> None: + raise NotImplementedError # TODO + + def configure(self) -> None: + raise NotImplementedError # TODO + + def get_action(self) -> dict[str, float]: + raise NotImplementedError # TODO + + def send_feedback(self, feedback: dict[str, float]) -> None: + raise NotImplementedError + + def disconnect(self) -> None: + if not self.is_connected: + DeviceNotConnectedError(f"{self} is not connected.") + + self.thread.join() + self.serial.close() + logger.info(f"{self} disconnected.") + + ### WIP below ### + + @property + def joint_names(self): + return list(self.last_d.keys()) + + def read(self, motor_names: list[str] | None = None): + """ + Return the most recent (single) values from self.last_d, + optionally applying calibration. + """ + if motor_names is None: + motor_names = self.joint_names + + # Get raw (last) values + values = np.array([self.last_d[k] for k in motor_names]) + + print(values) + + # Apply calibration if available + if self.calibration is not None: + values = self.apply_calibration(values, motor_names) + print(values) + return values + + def read_running_average(self, motor_names: list[str] | None = None, linearize=False): + """ + Return the AVERAGE of the most recent self.buffer_size (or fewer, if not enough data) readings + for each joint, optionally applying calibration. + """ + if motor_names is None: + motor_names = self.joint_names + + # Gather averaged readings from buffers + smoothed_vals = [] + for name in motor_names: + buf = self.joint_buffer[name] + if len(buf) == 0: + # If no data has been read yet, fall back to last_d + smoothed_vals.append(self.last_d[name]) + else: + # Otherwise, average over the existing buffer + smoothed_vals.append(np.mean(buf)) + + smoothed_vals = np.array(smoothed_vals, dtype=np.float32) + + # Apply calibration if available + if self.calibration is not None: + smoothed_vals = self.apply_calibration(smoothed_vals, motor_names) + + return smoothed_vals + + def async_read(self): + """ + Continuously read from the serial buffer in its own thread, + store into `self.last_d` and also append to the rolling buffer (joint_buffer). + """ + while True: + if self.serial.in_waiting > 0: + self.serial.flush() + vals = self.serial.readline().decode("utf-8").strip() + vals = vals.split(" ") + if len(vals) != 17: + continue + vals = [int(val) for val in vals] + + d = { + "thumb_0": vals[0], + "thumb_1": vals[1], + "thumb_2": vals[2], + "thumb_3": vals[3], + "index_0": vals[4], + "index_1": vals[5], + "index_2": vals[6], + "middle_0": vals[7], + "middle_1": vals[8], + "middle_2": vals[9], + "ring_0": vals[10], + "ring_1": vals[11], + "ring_2": vals[12], + "pinky_0": vals[13], + "pinky_1": vals[14], + "pinky_2": vals[15], + "battery_voltage": vals[16], + } + + # Update the last_d dictionary + self.last_d = d + + # Also push these new values into the rolling buffers + for joint_name, joint_val in d.items(): + self.joint_buffer[joint_name].append(joint_val) + + def run_calibration(self): + print("\nMove arm to open position") + input("Press Enter to continue...") + open_pos_list = [] + for _ in range(100): + open_pos = self.read() + open_pos_list.append(open_pos) + time.sleep(0.01) + open_pos = np.array(open_pos_list) + max_open_pos = open_pos.max(axis=0) + min_open_pos = open_pos.min(axis=0) + + print(f"{max_open_pos=}") + print(f"{min_open_pos=}") + + print("\nMove arm to closed position") + input("Press Enter to continue...") + closed_pos_list = [] + for _ in range(100): + closed_pos = self.read() + closed_pos_list.append(closed_pos) + time.sleep(0.01) + closed_pos = np.array(closed_pos_list) + max_closed_pos = closed_pos.max(axis=0) + closed_pos[closed_pos < 1000] = 60000 + min_closed_pos = closed_pos.min(axis=0) + + print(f"{max_closed_pos=}") + print(f"{min_closed_pos=}") + + open_pos = np.array([max_open_pos, max_closed_pos]).max(axis=0) + closed_pos = np.array([min_open_pos, min_closed_pos]).min(axis=0) + + # INVERSION + for i, jname in enumerate(self.joint_names): + if jname in [ + "thumb_0", + "thumb_3", + "index_2", + "middle_2", + "ring_2", + "pinky_2", + "index_0", + ]: + tmp_pos = open_pos[i] + open_pos[i] = closed_pos[i] + closed_pos[i] = tmp_pos + + print() + print(f"{open_pos=}") + print(f"{closed_pos=}") + + homing_offset = [0] * len(self.joint_names) + drive_mode = [0] * len(self.joint_names) + calib_modes = [CalibrationMode.LINEAR.name] * len(self.joint_names) + + calib_dict = { + "homing_offset": homing_offset, + "drive_mode": drive_mode, + "start_pos": open_pos, + "end_pos": closed_pos, + "calib_mode": calib_modes, + "motor_names": self.joint_names, + } + + file_path = "examples/hopejr/settings/hand_calib.pkl" + + if not os.path.exists(file_path): + with open(file_path, "wb") as f: + pickle.dump(calib_dict, f) + print(f"Dictionary saved to {file_path}") + + # return calib_dict + self.set_calibration(calib_dict) + + def set_calibration(self, calibration: dict[str, list]): + self.calibration = calibration + + def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): + """Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with + a "zero position" at 0 degree. + + Note: We say "nominal degree range" since the motors can take values outside this range. For instance, 190 degrees, if the motor + rotate more than a half a turn from the zero position. However, most motors can't rotate more than 180 degrees and will stay in this range. + + Joints values are original in [0, 2**32[ (unsigned int32). Each motor are expected to complete a full rotation + when given a goal position that is + or - their resolution. For instance, feetech xl330-m077 have a resolution of 4096, and + at any position in their original range, let's say the position 56734, they complete a full rotation clockwise by moving to 60830, + or anticlockwise by moving to 52638. The position in the original range is arbitrary and might change a lot between each motor. + To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work + in the centered nominal degree range ]-180, 180[. + """ + if motor_names is None: + motor_names = self.motor_names + + # Convert from unsigned int32 original range [0, 2**32] to signed float32 range + values = values.astype(np.float32) + + for i, name in enumerate(motor_names): + calib_idx = self.calibration["motor_names"].index(name) + calib_mode = self.calibration["calib_mode"][calib_idx] + + if CalibrationMode[calib_mode] == CalibrationMode.LINEAR: + start_pos = self.calibration["start_pos"][calib_idx] + end_pos = self.calibration["end_pos"][calib_idx] + + # Rescale the present position to a nominal range [0, 100] %, + # useful for joints with linear motions like Aloha gripper + values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100 + + if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR): + if name == "pinky_1" and (values[i] < LOWER_BOUND_LINEAR): + values[i] = end_pos + else: + msg = ( + f"Wrong motor position range detected for {name}. " + f"Expected to be in nominal range of [0, 100] % (a full linear translation), " + f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, " + f"but present value is {values[i]} %. " + "This might be due to a cable connection issue creating an artificial jump in motor values. " + "You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`" + ) + print(msg) + # raise JointOutOfRangeError(msg) + + return values From 27feea019a601dc66d41b26aaf947d601b4cb468 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 4 Apr 2025 10:57:51 +0000 Subject: [PATCH 4/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- lerobot/common/teleoperators/homonculus/homonculus_arm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lerobot/common/teleoperators/homonculus/homonculus_arm.py b/lerobot/common/teleoperators/homonculus/homonculus_arm.py index a75b5924..0d4571f2 100644 --- a/lerobot/common/teleoperators/homonculus/homonculus_arm.py +++ b/lerobot/common/teleoperators/homonculus/homonculus_arm.py @@ -189,7 +189,7 @@ class HomonculusArm(Teleoperator): * np.arcsin((b_value - min_reading) / (max_reading - min_reading)) / (np.pi / 2) ) - except Exception as e: + except Exception: print("not working") print(smoothed_vals) print("not working") From 7c5813778ba708392ce8cf8d0f3445e17cd8c816 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 8 Apr 2025 09:39:48 +0000 Subject: [PATCH 5/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- lerobot/common/teleoperators/homonculus/homonculus_arm.py | 4 ++-- lerobot/common/teleoperators/homonculus/homonculus_glove.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lerobot/common/teleoperators/homonculus/homonculus_arm.py b/lerobot/common/teleoperators/homonculus/homonculus_arm.py index 0d4571f2..b4fc37e8 100644 --- a/lerobot/common/teleoperators/homonculus/homonculus_arm.py +++ b/lerobot/common/teleoperators/homonculus/homonculus_arm.py @@ -64,10 +64,10 @@ class HomonculusArm(Teleoperator): self.joints_buffer = {joint: deque(maxlen=self.buffer_size) for joint in self.joints} # Last read dictionary - self.last_d = {joint: 100 for joint in self.joints} + self.last_d = dict.fromkeys(self.joints, 100) # For adaptive EMA, we store a "previous smoothed" state per joint - self.adaptive_ema_state = {joint: None for joint in self.joints} + self.adaptive_ema_state = dict.fromkeys(self.joints) self.kalman_state = {joint: {"x": None, "P": None} for joint in self.joints} self.calibration = None diff --git a/lerobot/common/teleoperators/homonculus/homonculus_glove.py b/lerobot/common/teleoperators/homonculus/homonculus_glove.py index 2ba2f712..fb78fb61 100644 --- a/lerobot/common/teleoperators/homonculus/homonculus_glove.py +++ b/lerobot/common/teleoperators/homonculus/homonculus_glove.py @@ -73,7 +73,7 @@ class HomonculusGlove(Teleoperator): # Initialize a buffer (deque) for each joint self.joints_buffer = {joint: deque(maxlen=self.buffer_size) for joint in self.joints} # Last read dictionary - self.last_d = {joint: 100 for joint in self.joints} + self.last_d = dict.fromkeys(self.joints, 100) self.calibration = None self.thread = threading.Thread(target=self.async_read, daemon=True) From c4b1cee61593d1ee3799371274a5e8360940eedf Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Tue, 15 Apr 2025 13:01:34 +0200 Subject: [PATCH 6/7] Update branch & fix pre-commit errors --- lerobot/common/robots/hope_jr/hope_jr_arm.py | 8 +-- lerobot/common/robots/hope_jr/hope_jr_hand.py | 9 ++-- .../homonculus/homonculus_arm.py | 54 ++++++++++--------- .../homonculus/homonculus_glove.py | 5 +- 4 files changed, 42 insertions(+), 34 deletions(-) diff --git a/lerobot/common/robots/hope_jr/hope_jr_arm.py b/lerobot/common/robots/hope_jr/hope_jr_arm.py index 9107de8f..e7146131 100644 --- a/lerobot/common/robots/hope_jr/hope_jr_arm.py +++ b/lerobot/common/robots/hope_jr/hope_jr_arm.py @@ -52,6 +52,7 @@ class HopeJrArm(Robot): "wrist_yaw": Motor(6, "sts3215", MotorNormMode.RANGE_M100_100), "wrist_pitch": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100), }, + calibration=self.calibration, ) self.cameras = make_cameras_from_configs(config.cameras) @@ -141,10 +142,9 @@ class HopeJrArm(Robot): print("Calibration saved to", self.calibration_fpath) def configure(self) -> None: - self.arm.disable_torque() - self.arm.configure_motors() - # TODO - self.arm.enable_torque() + with self.arm.torque_disabled(): + self.arm.configure_motors() + # TODO def get_observation(self) -> dict[str, Any]: if not self.is_connected: diff --git a/lerobot/common/robots/hope_jr/hope_jr_hand.py b/lerobot/common/robots/hope_jr/hope_jr_hand.py index 283ec382..3eedaaf6 100644 --- a/lerobot/common/robots/hope_jr/hope_jr_hand.py +++ b/lerobot/common/robots/hope_jr/hope_jr_hand.py @@ -65,6 +65,8 @@ class HopeJrHand(Robot): "pinky_pinky_side": Motor(15, "scs0009", MotorNormMode.RANGE_M100_100), "pinky_flexor": Motor(16, "scs0009", MotorNormMode.RANGE_M100_100), }, + calibration=self.calibration, + protocol_version=1, ) self.cameras = make_cameras_from_configs(config.cameras) @@ -150,10 +152,9 @@ class HopeJrHand(Robot): print("Calibration saved to", self.calibration_fpath) def configure(self) -> None: - self.hand.disable_torque() - self.hand.configure_motors() - # TODO - self.hand.enable_torque() + with self.hand.torque_disabled(): + self.hand.configure_motors() + # TODO def get_observation(self) -> dict[str, Any]: if not self.is_connected: diff --git a/lerobot/common/teleoperators/homonculus/homonculus_arm.py b/lerobot/common/teleoperators/homonculus/homonculus_arm.py index b4fc37e8..2d8c1b30 100644 --- a/lerobot/common/teleoperators/homonculus/homonculus_arm.py +++ b/lerobot/common/teleoperators/homonculus/homonculus_arm.py @@ -16,8 +16,9 @@ import logging import os -import pickle +import pickle # nosec import threading +import time from collections import deque from enum import Enum @@ -44,7 +45,7 @@ class CalibrationMode(Enum): class HomonculusArm(Teleoperator): config_class = HomonculusArmConfig - name = "homonculus_arml" + name = "homonculus_arm" def __init__(self, config: HomonculusArmConfig): self.config = config @@ -197,7 +198,7 @@ class HomonculusArm(Teleoperator): return smoothed_vals def read_kalman_filter( - self, Q: float = 1.0, R: float = 100.0, motor_names: list[str] | None = None + self, process_noise: float = 1.0, measurement_noise: float = 100.0, motors: list[str] | None = None ) -> np.ndarray: """ Return a Kalman-filtered reading for each requested joint. @@ -211,45 +212,50 @@ class HomonculusArm(Teleoperator): x = x_pred + K * (Z - x_pred) P = (1 - K) * P_pred - :param Q: Process noise. Larger Q means the estimate can change more freely. - :param R: Measurement noise. Larger R means we trust our sensor less. - :param motor_names: If not specified, all joints are filtered. - :return: Kalman-filtered positions as a numpy array. - """ - if motor_names is None: - motor_names = self.joint_names + Args: + process_noise (float, optional): Process noise (Q). Larger Q means the estimate can change more + freely. Defaults to 1.0. + measurement_noise (float, optional): Measurement noise (R). Larger R means we trust our sensor + less. Defaults to 100.0. + motors (list[str] | None, optional): If None, all joints are filtered. Defaults to None. - current_vals = np.array([self.last_d[name] for name in motor_names], dtype=np.float32) + Returns: + np.ndarray: Kalman-filtered positions. + """ + if motors is None: + motors = self.joint_names + + current_vals = np.array([self.last_d[name] for name in motors], dtype=np.float32) filtered_vals = np.zeros_like(current_vals) - for i, name in enumerate(motor_names): + for i, name in enumerate(motors): # Retrieve the filter state for this joint x = self.kalman_state[name]["x"] - P = self.kalman_state[name]["P"] - Z = current_vals[i] + p = self.kalman_state[name]["P"] + z = current_vals[i] # If this is the first reading, initialize - if x is None or P is None: - x = Z - P = 1.0 # or some large initial uncertainty + if x is None or p is None: + x = z + p = 1.0 # or some large initial uncertainty # 1) Predict step x_pred = x # no velocity model, so x_pred = x - P_pred = P + Q + p_pred = p + process_noise # 2) Update step - K = P_pred / (P_pred + R) # Kalman gain - x_new = x_pred + K * (Z - x_pred) # new state estimate - P_new = (1 - K) * P_pred # new covariance + kalman_gain = p_pred / (p_pred + measurement_noise) # Kalman gain + x_new = x_pred + kalman_gain * (z - x_pred) # new state estimate + p_new = (1 - kalman_gain) * p_pred # new covariance # Save back self.kalman_state[name]["x"] = x_new - self.kalman_state[name]["P"] = P_new + self.kalman_state[name]["P"] = p_new filtered_vals[i] = x_new if self.calibration is not None: - filtered_vals = self.apply_calibration(filtered_vals, motor_names) + filtered_vals = self.apply_calibration(filtered_vals, motors) return filtered_vals @@ -377,7 +383,7 @@ class HomonculusArm(Teleoperator): if not os.path.exists(file_path): with open(file_path, "wb") as f: - pickle.dump(calib_dict, f) + pickle.dump(calib_dict, f) # TODO(aliberts): use json print(f"Dictionary saved to {file_path}") self.set_calibration(calib_dict) diff --git a/lerobot/common/teleoperators/homonculus/homonculus_glove.py b/lerobot/common/teleoperators/homonculus/homonculus_glove.py index fb78fb61..83fcec3d 100644 --- a/lerobot/common/teleoperators/homonculus/homonculus_glove.py +++ b/lerobot/common/teleoperators/homonculus/homonculus_glove.py @@ -16,8 +16,9 @@ import logging import os -import pickle +import pickle # nosec import threading +import time from collections import deque from enum import Enum @@ -289,7 +290,7 @@ class HomonculusGlove(Teleoperator): if not os.path.exists(file_path): with open(file_path, "wb") as f: - pickle.dump(calib_dict, f) + pickle.dump(calib_dict, f) # TODO(aliberts): use json print(f"Dictionary saved to {file_path}") # return calib_dict From 98643b000ef1b30c87f15dc8156f86377b69b2ed Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Wed, 16 Apr 2025 09:54:53 +0200 Subject: [PATCH 7/7] Fix hand & glove readings --- lerobot/common/robots/hope_jr/hope_jr_hand.py | 3 ++- .../teleoperators/homonculus/homonculus_glove.py | 14 +++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/lerobot/common/robots/hope_jr/hope_jr_hand.py b/lerobot/common/robots/hope_jr/hope_jr_hand.py index 3eedaaf6..b9289cdc 100644 --- a/lerobot/common/robots/hope_jr/hope_jr_hand.py +++ b/lerobot/common/robots/hope_jr/hope_jr_hand.py @@ -164,7 +164,8 @@ class HopeJrHand(Robot): # Read hand position start = time.perf_counter() - obs_dict[OBS_STATE] = self.hand.sync_read("Present_Position") + for motor in self.hand.motors: + obs_dict[f"{OBS_STATE}.{motor}"] = self.hand.read("Present_Position", motor, normalize=False) dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") diff --git a/lerobot/common/teleoperators/homonculus/homonculus_glove.py b/lerobot/common/teleoperators/homonculus/homonculus_glove.py index 83fcec3d..f44869e4 100644 --- a/lerobot/common/teleoperators/homonculus/homonculus_glove.py +++ b/lerobot/common/teleoperators/homonculus/homonculus_glove.py @@ -48,6 +48,7 @@ class HomonculusGlove(Teleoperator): name = "homonculus_glove" def __init__(self, config: HomonculusGloveConfig): + super().__init__(config) self.config = config self.serial = serial.Serial(config.port, config.baud_rate, timeout=1) self.buffer_size = config.buffer_size @@ -99,7 +100,8 @@ class HomonculusGlove(Teleoperator): if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") - self.serial.open() + if not self.serial.is_open: + self.serial.open() self.thread.start() self.configure() logger.info(f"{self} connected.") @@ -145,8 +147,6 @@ class HomonculusGlove(Teleoperator): # Get raw (last) values values = np.array([self.last_d[k] for k in motor_names]) - print(values) - # Apply calibration if available if self.calibration is not None: values = self.apply_calibration(values, motor_names) @@ -164,7 +164,7 @@ class HomonculusGlove(Teleoperator): # Gather averaged readings from buffers smoothed_vals = [] for name in motor_names: - buf = self.joint_buffer[name] + buf = self.joints_buffer[name] if len(buf) == 0: # If no data has been read yet, fall back to last_d smoothed_vals.append(self.last_d[name]) @@ -219,10 +219,10 @@ class HomonculusGlove(Teleoperator): # Also push these new values into the rolling buffers for joint_name, joint_val in d.items(): - self.joint_buffer[joint_name].append(joint_val) + self.joints_buffer[joint_name].append(joint_val) def run_calibration(self): - print("\nMove arm to open position") + print("\nMove hand to open position") input("Press Enter to continue...") open_pos_list = [] for _ in range(100): @@ -236,7 +236,7 @@ class HomonculusGlove(Teleoperator): print(f"{max_open_pos=}") print(f"{min_open_pos=}") - print("\nMove arm to closed position") + print("\nMove hand to closed position") input("Press Enter to continue...") closed_pos_list = [] for _ in range(100):