Update branch & fix pre-commit errors

This commit is contained in:
Simon Alibert 2025-04-15 13:01:34 +02:00
parent a0120a2446
commit c4b1cee615
4 changed files with 42 additions and 34 deletions

View File

@ -52,6 +52,7 @@ class HopeJrArm(Robot):
"wrist_yaw": Motor(6, "sts3215", MotorNormMode.RANGE_M100_100), "wrist_yaw": Motor(6, "sts3215", MotorNormMode.RANGE_M100_100),
"wrist_pitch": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100), "wrist_pitch": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100),
}, },
calibration=self.calibration,
) )
self.cameras = make_cameras_from_configs(config.cameras) self.cameras = make_cameras_from_configs(config.cameras)
@ -141,10 +142,9 @@ class HopeJrArm(Robot):
print("Calibration saved to", self.calibration_fpath) print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None: def configure(self) -> None:
self.arm.disable_torque() with self.arm.torque_disabled():
self.arm.configure_motors() self.arm.configure_motors()
# TODO # TODO
self.arm.enable_torque()
def get_observation(self) -> dict[str, Any]: def get_observation(self) -> dict[str, Any]:
if not self.is_connected: if not self.is_connected:

View File

@ -65,6 +65,8 @@ class HopeJrHand(Robot):
"pinky_pinky_side": Motor(15, "scs0009", MotorNormMode.RANGE_M100_100), "pinky_pinky_side": Motor(15, "scs0009", MotorNormMode.RANGE_M100_100),
"pinky_flexor": Motor(16, "scs0009", MotorNormMode.RANGE_M100_100), "pinky_flexor": Motor(16, "scs0009", MotorNormMode.RANGE_M100_100),
}, },
calibration=self.calibration,
protocol_version=1,
) )
self.cameras = make_cameras_from_configs(config.cameras) self.cameras = make_cameras_from_configs(config.cameras)
@ -150,10 +152,9 @@ class HopeJrHand(Robot):
print("Calibration saved to", self.calibration_fpath) print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None: def configure(self) -> None:
self.hand.disable_torque() with self.hand.torque_disabled():
self.hand.configure_motors() self.hand.configure_motors()
# TODO # TODO
self.hand.enable_torque()
def get_observation(self) -> dict[str, Any]: def get_observation(self) -> dict[str, Any]:
if not self.is_connected: if not self.is_connected:

View File

@ -16,8 +16,9 @@
import logging import logging
import os import os
import pickle import pickle # nosec
import threading import threading
import time
from collections import deque from collections import deque
from enum import Enum from enum import Enum
@ -44,7 +45,7 @@ class CalibrationMode(Enum):
class HomonculusArm(Teleoperator): class HomonculusArm(Teleoperator):
config_class = HomonculusArmConfig config_class = HomonculusArmConfig
name = "homonculus_arml" name = "homonculus_arm"
def __init__(self, config: HomonculusArmConfig): def __init__(self, config: HomonculusArmConfig):
self.config = config self.config = config
@ -197,7 +198,7 @@ class HomonculusArm(Teleoperator):
return smoothed_vals return smoothed_vals
def read_kalman_filter( def read_kalman_filter(
self, Q: float = 1.0, R: float = 100.0, motor_names: list[str] | None = None self, process_noise: float = 1.0, measurement_noise: float = 100.0, motors: list[str] | None = None
) -> np.ndarray: ) -> np.ndarray:
""" """
Return a Kalman-filtered reading for each requested joint. Return a Kalman-filtered reading for each requested joint.
@ -211,45 +212,50 @@ class HomonculusArm(Teleoperator):
x = x_pred + K * (Z - x_pred) x = x_pred + K * (Z - x_pred)
P = (1 - K) * P_pred P = (1 - K) * P_pred
:param Q: Process noise. Larger Q means the estimate can change more freely. Args:
:param R: Measurement noise. Larger R means we trust our sensor less. process_noise (float, optional): Process noise (Q). Larger Q means the estimate can change more
:param motor_names: If not specified, all joints are filtered. freely. Defaults to 1.0.
:return: Kalman-filtered positions as a numpy array. measurement_noise (float, optional): Measurement noise (R). Larger R means we trust our sensor
""" less. Defaults to 100.0.
if motor_names is None: motors (list[str] | None, optional): If None, all joints are filtered. Defaults to None.
motor_names = self.joint_names
current_vals = np.array([self.last_d[name] for name in motor_names], dtype=np.float32) Returns:
np.ndarray: Kalman-filtered positions.
"""
if motors is None:
motors = self.joint_names
current_vals = np.array([self.last_d[name] for name in motors], dtype=np.float32)
filtered_vals = np.zeros_like(current_vals) filtered_vals = np.zeros_like(current_vals)
for i, name in enumerate(motor_names): for i, name in enumerate(motors):
# Retrieve the filter state for this joint # Retrieve the filter state for this joint
x = self.kalman_state[name]["x"] x = self.kalman_state[name]["x"]
P = self.kalman_state[name]["P"] p = self.kalman_state[name]["P"]
Z = current_vals[i] z = current_vals[i]
# If this is the first reading, initialize # If this is the first reading, initialize
if x is None or P is None: if x is None or p is None:
x = Z x = z
P = 1.0 # or some large initial uncertainty p = 1.0 # or some large initial uncertainty
# 1) Predict step # 1) Predict step
x_pred = x # no velocity model, so x_pred = x x_pred = x # no velocity model, so x_pred = x
P_pred = P + Q p_pred = p + process_noise
# 2) Update step # 2) Update step
K = P_pred / (P_pred + R) # Kalman gain kalman_gain = p_pred / (p_pred + measurement_noise) # Kalman gain
x_new = x_pred + K * (Z - x_pred) # new state estimate x_new = x_pred + kalman_gain * (z - x_pred) # new state estimate
P_new = (1 - K) * P_pred # new covariance p_new = (1 - kalman_gain) * p_pred # new covariance
# Save back # Save back
self.kalman_state[name]["x"] = x_new self.kalman_state[name]["x"] = x_new
self.kalman_state[name]["P"] = P_new self.kalman_state[name]["P"] = p_new
filtered_vals[i] = x_new filtered_vals[i] = x_new
if self.calibration is not None: if self.calibration is not None:
filtered_vals = self.apply_calibration(filtered_vals, motor_names) filtered_vals = self.apply_calibration(filtered_vals, motors)
return filtered_vals return filtered_vals
@ -377,7 +383,7 @@ class HomonculusArm(Teleoperator):
if not os.path.exists(file_path): if not os.path.exists(file_path):
with open(file_path, "wb") as f: with open(file_path, "wb") as f:
pickle.dump(calib_dict, f) pickle.dump(calib_dict, f) # TODO(aliberts): use json
print(f"Dictionary saved to {file_path}") print(f"Dictionary saved to {file_path}")
self.set_calibration(calib_dict) self.set_calibration(calib_dict)

View File

@ -16,8 +16,9 @@
import logging import logging
import os import os
import pickle import pickle # nosec
import threading import threading
import time
from collections import deque from collections import deque
from enum import Enum from enum import Enum
@ -289,7 +290,7 @@ class HomonculusGlove(Teleoperator):
if not os.path.exists(file_path): if not os.path.exists(file_path):
with open(file_path, "wb") as f: with open(file_path, "wb") as f:
pickle.dump(calib_dict, f) pickle.dump(calib_dict, f) # TODO(aliberts): use json
print(f"Dictionary saved to {file_path}") print(f"Dictionary saved to {file_path}")
# return calib_dict # return calib_dict