diff --git a/examples/hopejr/BOM b/examples/hopejr/BOM new file mode 100644 index 00000000..178f47de --- /dev/null +++ b/examples/hopejr/BOM @@ -0,0 +1 @@ +Bearings - https://amzn.eu/d/8Xz7m4C - https://amzn.eu/d/1xOo8re - https://amzn.eu/d/9LXO205 (17x) - https://amzn.eu/d/eKGj9gf (2x) Bike Components - https://amzn.eu/d/cNiQi0O (1x) Accessories - https://amzn.eu/d/ipjCq1R (1x) - https://amzn.eu/d/0ZMzC3G (1x) Screws - https://amzn.eu/d/dzNhSkJ - https://amzn.eu/d/41AhVIU - https://amzn.eu/d/8G91txy - https://amzn.eu/d/9xu0pLa - https://amzn.eu/d/c5xaClV - https://amzn.eu/d/7kudpAo - https://amzn.eu/d/2BEgJFc - https://amzn.eu/d/4q9RNby - https://amzn.eu/d/4RE2lPV - https://amzn.eu/d/63YU0l1 Inserts - https://amzn.eu/d/7fjOtOC \ No newline at end of file diff --git a/examples/hopejr/agugu.py b/examples/hopejr/agugu.py new file mode 100644 index 00000000..bf66165c --- /dev/null +++ b/examples/hopejr/agugu.py @@ -0,0 +1,45 @@ +import time +import numpy as np + +from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus, CalibrationMode + +@staticmethod +def degps_to_raw(degps: float) -> int: + steps_per_deg = 4096.0 / 360.0 + speed_in_steps = abs(degps) * steps_per_deg + speed_int = int(round(speed_in_steps)) + if speed_int > 0x7FFF: + speed_int = 0x7FFF + if degps < 0: + return speed_int | 0x8000 + else: + return speed_int & 0x7FFF + +@staticmethod +def raw_to_degps(raw_speed: int) -> float: + steps_per_deg = 4096.0 / 360.0 + magnitude = raw_speed & 0x7FFF + degps = magnitude / steps_per_deg + if raw_speed & 0x8000: + degps = -degps + return degps + +def main(): + # Instantiate the bus for a single motor on port /dev/ttyACM0. + arm_bus = FeetechMotorsBus( + port="/dev/ttyACM0", + motors={"wrist_pitch": [1, "scs0009"]}, + protocol_version=1, + group_sync_read=False, # using individual read calls + ) + arm_bus.connect() + # Read the current raw motor position. + # Note that "Present_Position" is in the raw units. + current_raw = arm_bus.read("Present_Position", ["wrist_pitch"])[0] + print("Current raw position:", current_raw) + arm_bus.write("Goal_Position", 1000) + arm_bus.disconnect() + exit() + +if __name__ == "__main__": + main() diff --git a/examples/hopejr/asd.png b/examples/hopejr/asd.png new file mode 100644 index 00000000..41bf8c55 Binary files /dev/null and b/examples/hopejr/asd.png differ diff --git a/examples/hopejr/follower.py b/examples/hopejr/follower.py new file mode 100644 index 00000000..a477052f --- /dev/null +++ b/examples/hopejr/follower.py @@ -0,0 +1,186 @@ +from lerobot.common.robot_devices.motors.feetech import ( + CalibrationMode, + FeetechMotorsBus, +) +import yaml + +class HopeJuniorRobot: + def __init__(self): + self.arm_port = "/dev/ttyUSB0" + self.hand_port = "/dev/ttyACM0" + self.arm_bus = FeetechMotorsBus( + port = self.arm_port, + motors={ + # "motor1": (1, "sts3250"), + # "motor2": (2, "sts3250"), + # "motor3": (3, "sts3250"), + + #"shoulder_pitch": [1, "sts3215"], + "shoulder_pitch": [1, "sm8512bl"], + "shoulder_yaw": [2, "sts3250"], # TODO: sts3250 + "shoulder_roll": [3, "sts3250"], # TODO: sts3250 + "elbow_flex": [4, "sts3250"], + "wrist_roll": [5, "sts3215"], + "wrist_yaw": [6, "sts3215"], + "wrist_pitch": [7, "sts3215"], + }, + protocol_version=0, + ) + self.hand_bus = FeetechMotorsBus( + port=self.hand_port, + + motors = { + # Thumb + "thumb_basel_rotation": [1, "scs0009"], + "thumb_mcp": [2, "scs0009"], + "thumb_pip": [3, "scs0009"], + "thumb_dip": [4, "scs0009"], + + # Index + "index_thumb_side": [5, "scs0009"], + "index_pinky_side": [6, "scs0009"], + "index_flexor": [7, "scs0009"], + + # Middle + "middle_thumb_side": [8, "scs0009"], + "middle_pinky_side": [9, "scs0009"], + "middle_flexor": [10, "scs0009"], + + # Ring + "ring_thumb_side": [11, "scs0009"], + "ring_pinky_side": [12, "scs0009"], + "ring_flexor": [13, "scs0009"], + + # Pinky + "pinky_thumb_side": [14, "scs0009"], + "pinky_pinky_side": [15, "scs0009"], + "pinky_flexor": [16, "scs0009"], + }, + protocol_version=1,#1 + group_sync_read=False, + ) + + self.arm_calib_dict = self.get_arm_calibration() + self.hand_calib_dict = self.get_hand_calibration() + + + def apply_arm_config(self, config_file): + with open(config_file, "r") as file: + config = yaml.safe_load(file) + for param, value in config.get("robot", {}).get("arm_bus", {}).items(): + self.arm_bus.write(param, value) + + def apply_hand_config(config_file, robot): + with open(config_file, "r") as file: + config = yaml.safe_load(file) + + for param, value in config.get("robot", {}).get("hand_bus", {}).items(): + robot.arm_bus.write(param, value) + + def get_hand_calibration(self): + homing_offset = [0] * len(self.hand_bus.motor_names) + drive_mode = [0] * len(self.hand_bus.motor_names) + + start_pos = [ + 750, # thumb_basel_rotation + 1000, # thumb_mcp + 500, # thumb_pip + 950, # thumb_dip + + 150, # index_thumb_side + 950, # index_pinky_side + 500, # index_flexor + + 250, # middle_thumb_side + 850, # middle_pinky_side + 1000, # middle_flexor + + 850, # ring_thumb_side + 900, # ring_pinky_side + 600, # ring_flexor + + 00, # pinky_thumb_side + 950, # pinky_pinky_side + 150, # pinky_flexor + ] + + end_pos = [ + start_pos[0] - 550, # thumb_basel_rotation + start_pos[1] - 350, # thumb_mcp + start_pos[2] + 500, # thumb_pip + start_pos[3] - 550, # thumb_dip + + start_pos[4] + 350, # index_thumb_side + start_pos[5] - 300, # index_pinky_side + start_pos[6] + 500, # index_flexor + + start_pos[7] + 400, # middle_thumb_side + start_pos[8] - 400, # middle_pinky_side + start_pos[9] - 650, # middle_flexor + + start_pos[10] - 400, # ring_thumb_side + start_pos[11] - 400, # ring_pinky_side + start_pos[12] + 400, # ring_flexor + + start_pos[13] + 500, # pinky_thumb_side + start_pos[14] - 350, # pinky_pinky_side + start_pos[15] + 450, # pinky_flexor + ] + + + + + calib_modes = [CalibrationMode.LINEAR.name] * len(self.hand_bus.motor_names) + + calib_dict = { + "homing_offset": homing_offset, + "drive_mode": drive_mode, + "start_pos": start_pos, + "end_pos": end_pos, + "calib_mode": calib_modes, + "motor_names": self.hand_bus.motor_names, + } + return calib_dict + + def get_arm_calibration(self): + + homing_offset = [0] * len(self.arm_bus.motor_names) + drive_mode = [0] * len(self.arm_bus.motor_names) + + start_pos = [ + 1600, # shoulder_up + 2450, # shoulder_forward + 1700, # shoulder_roll + 1150, # bend_elbow + 700, # wrist_roll + 1850, # wrist_yaw + 1700, # wrist_pitch + ] + + end_pos = [ + 3100, # shoulder_up + 3150, # shoulder_forward + 400, #shoulder_roll + 2800, # bend_elbow + 2600, # wrist_roll + 2150, # wrist_yaw + 2400, # wrist_pitch + ] + + calib_modes = [CalibrationMode.LINEAR.name] * len(self.arm_bus.motor_names) + + calib_dict = { + "homing_offset": homing_offset, + "drive_mode": drive_mode, + "start_pos": start_pos, + "end_pos": end_pos, + "calib_mode": calib_modes, + "motor_names": self.arm_bus.motor_names, + } + return calib_dict + + def connect_arm(self): + self.arm_bus.connect() + + def connect_hand(self): + self.hand_bus.connect() \ No newline at end of file diff --git a/examples/hopejr/leader.py b/examples/hopejr/leader.py new file mode 100644 index 00000000..bca0abc1 --- /dev/null +++ b/examples/hopejr/leader.py @@ -0,0 +1,730 @@ +from lerobot.common.robot_devices.motors.feetech import ( + CalibrationMode, + FeetechMotorsBus, +) +import serial +import threading +import time +from typing import Callable +import pickle +import cv2 +import numpy as np +from collections import deque +import json +import os +LOWER_BOUND_LINEAR = -100 +UPPER_BOUND_LINEAR = 200 + +class HomonculusArm: + def __init__(self, serial_port: str = "/dev/ttyACM1", baud_rate: int = 115200): + self.serial_port = serial_port + self.baud_rate = 115200 + self.serial = serial.Serial(self.serial_port, self.baud_rate, timeout=1) + + # Number of past values to keep in memory + self.buffer_size = 10 + + # Initialize a buffer (deque) for each joint + self.joint_buffer = { + "wrist_roll": deque(maxlen=self.buffer_size), + "wrist_pitch": deque(maxlen=self.buffer_size), + "wrist_yaw": deque(maxlen=self.buffer_size), + "elbow_flex": deque(maxlen=self.buffer_size), + "shoulder_roll": deque(maxlen=self.buffer_size), + "shoulder_yaw": deque(maxlen=self.buffer_size), + "shoulder_pitch": deque(maxlen=self.buffer_size), + } + + # Start the reading thread + self.thread = threading.Thread(target=self.async_read, daemon=True) + self.thread.start() + + # Last read dictionary + self.last_d = { + "wrist_roll": 100, + "wrist_pitch": 100, + "wrist_yaw": 100, + "elbow_flex": 100, + "shoulder_roll": 100, + "shoulder_yaw": 100, + "shoulder_pitch": 100, + } + self.calibration = None + + # For adaptive EMA, we store a "previous smoothed" state per joint + self.adaptive_ema_state = { + "wrist_roll": None, + "wrist_pitch": None, + "wrist_yaw": None, + "elbow_flex": None, + "shoulder_roll": None, + "shoulder_yaw": None, + "shoulder_pitch": None, + } + + self.kalman_state = { + joint: {"x": None, "P": None} for joint in self.joint_buffer.keys() + } + + @property + def joint_names(self): + return list(self.last_d.keys()) + + def read(self, motor_names: list[str] | None = None): + """ + Return the most recent (single) values from self.last_d, + optionally applying calibration. + """ + if motor_names is None: + motor_names = self.joint_names + + # Get raw (last) values + values = np.array([self.last_d[k] for k in motor_names]) + + #print(motor_names) + print(values) + + # Apply calibration if available + if self.calibration is not None: + values = self.apply_calibration(values, motor_names) + print(values) + return values + + def read_running_average(self, motor_names: list[str] | None = None, linearize=False): + """ + Return the AVERAGE of the most recent self.buffer_size (or fewer, if not enough data) readings + for each joint, optionally applying calibration. + """ + if motor_names is None: + motor_names = self.joint_names + + # Gather averaged readings from buffers + smoothed_vals = [] + for name in motor_names: + buf = self.joint_buffer[name] + if len(buf) == 0: + # If no data has been read yet, fall back to last_d + smoothed_vals.append(self.last_d[name]) + else: + # Otherwise, average over the existing buffer + smoothed_vals.append(np.mean(buf)) + + smoothed_vals = np.array(smoothed_vals, dtype=np.float32) + + # Apply calibration if available + if self.calibration is not None: + + if False: + for i, joint_name in enumerate(motor_names): + # Re-use the same raw_min / raw_max from the calibration + calib_idx = self.calibration["motor_names"].index(joint_name) + min_reading = self.calibration["start_pos"][calib_idx] + max_reading = self.calibration["end_pos"][calib_idx] + + B_value = smoothed_vals[i] + print(joint_name) + if joint_name == "elbow_flex": + print('elbow') + try: + smoothed_vals[i] = int(min_reading+(max_reading - min_reading)*np.arcsin((B_value-min_reading)/(max_reading-min_reading))/(np.pi / 2)) + except: + print('not working') + print(smoothed_vals) + print('not working') + smoothed_vals = self.apply_calibration(smoothed_vals, motor_names) + return smoothed_vals + + def read_kalman_filter( + self, + Q: float = 1.0, + R: float = 100.0, + motor_names: list[str] | None = None + ) -> np.ndarray: + """ + Return a Kalman-filtered reading for each requested joint. + + We store a separate Kalman filter (x, P) per joint. For each new measurement Z: + 1) Predict: + x_pred = x (assuming no motion model) + P_pred = P + Q + 2) Update: + K = P_pred / (P_pred + R) + x = x_pred + K * (Z - x_pred) + P = (1 - K) * P_pred + + :param Q: Process noise. Larger Q means the estimate can change more freely. + :param R: Measurement noise. Larger R means we trust our sensor less. + :param motor_names: If not specified, all joints are filtered. + :return: Kalman-filtered positions as a numpy array. + """ + if motor_names is None: + motor_names = self.joint_names + + current_vals = np.array([self.last_d[name] for name in motor_names], dtype=np.float32) + filtered_vals = np.zeros_like(current_vals) + + for i, name in enumerate(motor_names): + # Retrieve the filter state for this joint + x = self.kalman_state[name]["x"] + P = self.kalman_state[name]["P"] + Z = current_vals[i] + + # If this is the first reading, initialize + if x is None or P is None: + x = Z + P = 1.0 # or some large initial uncertainty + + # 1) Predict step + x_pred = x # no velocity model, so x_pred = x + P_pred = P + Q + + # 2) Update step + K = P_pred / (P_pred + R) # Kalman gain + x_new = x_pred + K * (Z - x_pred) # new state estimate + P_new = (1 - K) * P_pred # new covariance + + # Save back + self.kalman_state[name]["x"] = x_new + self.kalman_state[name]["P"] = P_new + + filtered_vals[i] = x_new + + if self.calibration is not None: + filtered_vals = self.apply_calibration(filtered_vals, motor_names) + + return filtered_vals + + + def async_read(self): + """ + Continuously read from the serial buffer in its own thread, + store into `self.last_d` and also append to the rolling buffer (joint_buffer). + """ + while True: + if self.serial.in_waiting > 0: + self.serial.flush() + vals = self.serial.readline().decode("utf-8").strip() + vals = vals.split(" ") + + if len(vals) != 7: + continue + try: + vals = [int(val) for val in vals]#remove last digit + except ValueError: + self.serial.flush() + vals = self.serial.readline().decode("utf-8").strip() + vals = vals.split(" ") + vals = [int(val) for val in vals] + d = { + "wrist_roll": vals[0], + "wrist_yaw": vals[1], + "wrist_pitch": vals[2], + "elbow_flex": vals[3], + "shoulder_roll": vals[4], + "shoulder_yaw": vals[5], + "shoulder_pitch": vals[6], + } + + # Update the last_d dictionary + self.last_d = d + + # Also push these new values into the rolling buffers + for joint_name, joint_val in d.items(): + self.joint_buffer[joint_name].append(joint_val) + + # Optional: short sleep to avoid busy-loop + # time.sleep(0.001) + + def run_calibration(self, robot): + robot.arm_bus.write("Acceleration", 50) + n_joints = len(self.joint_names) + + max_open_all = np.zeros(n_joints, dtype=np.float32) + min_open_all = np.zeros(n_joints, dtype=np.float32) + max_closed_all = np.zeros(n_joints, dtype=np.float32) + min_closed_all = np.zeros(n_joints, dtype=np.float32) + + for i, jname in enumerate(self.joint_names): + + print(f"\n--- Calibrating joint '{jname}' ---") + + joint_idx = robot.arm_calib_dict["motor_names"].index(jname) + open_val = robot.arm_calib_dict["start_pos"][joint_idx] + print(f"Commanding {jname} to OPEN position {open_val}...") + robot.arm_bus.write("Goal_Position", [open_val], [jname]) + + input("Physically verify or adjust the joint. Press Enter when ready to capture...") + + open_pos_list = [] + for _ in range(100): + all_joints_vals = self.read() # read entire arm + open_pos_list.append(all_joints_vals[i]) # store only this joint + time.sleep(0.01) + + # Convert to numpy and track min/max + open_array = np.array(open_pos_list, dtype=np.float32) + max_open_all[i] = open_array.max() + min_open_all[i] = open_array.min() + closed_val = robot.arm_calib_dict["end_pos"][joint_idx] + if jname == "elbow_flex": + closed_val = closed_val - 700 + closed_val = robot.arm_calib_dict["end_pos"][joint_idx] + print(f"Commanding {jname} to CLOSED position {closed_val}...") + robot.arm_bus.write("Goal_Position", [closed_val], [jname]) + + input("Physically verify or adjust the joint. Press Enter when ready to capture...") + + closed_pos_list = [] + for _ in range(100): + all_joints_vals = self.read() + closed_pos_list.append(all_joints_vals[i]) + time.sleep(0.01) + + closed_array = np.array(closed_pos_list, dtype=np.float32) + # Some thresholding for closed positions + #closed_array[closed_array < 1000] = 60000 + + max_closed_all[i] = closed_array.max() + min_closed_all[i] = closed_array.min() + + robot.arm_bus.write("Goal_Position", [int((closed_val+open_val)/2)], [jname]) + + open_pos = np.maximum(max_open_all, max_closed_all) + closed_pos = np.minimum(min_open_all, min_closed_all) + + for i, jname in enumerate(self.joint_names): + if jname not in ["wrist_pitch", "shoulder_pitch"]: + # Swap open/closed for these joints + tmp_pos = open_pos[i] + open_pos[i] = closed_pos[i] + closed_pos[i] = tmp_pos + + # Debug prints + print("\nFinal open/closed arrays after any swaps/inversions:") + print(f"open_pos={open_pos}") + print(f"closed_pos={closed_pos}") + + + homing_offset = [0] * n_joints + drive_mode = [0] * n_joints + calib_modes = [CalibrationMode.LINEAR.name] * n_joints + + calib_dict = { + "homing_offset": homing_offset, + "drive_mode": drive_mode, + "start_pos": open_pos, + "end_pos": closed_pos, + "calib_mode": calib_modes, + "motor_names": self.joint_names, + } + file_path = "examples/hopejr/settings/arm_calib.pkl" + + if not os.path.exists(file_path): + with open(file_path, "wb") as f: + pickle.dump(calib_dict, f) + print(f"Dictionary saved to {file_path}") + + self.set_calibration(calib_dict) + + def set_calibration(self, calibration: dict[str, list]): + self.calibration = calibration + + def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): + """ + Example calibration that linearly maps [start_pos, end_pos] to [0,100]. + Extend or modify for your needs. + """ + if motor_names is None: + motor_names = self.joint_names + + values = values.astype(np.float32) + + for i, name in enumerate(motor_names): + calib_idx = self.calibration["motor_names"].index(name) + calib_mode = self.calibration["calib_mode"][calib_idx] + + if CalibrationMode[calib_mode] == CalibrationMode.LINEAR: + start_pos = self.calibration["start_pos"][calib_idx] + end_pos = self.calibration["end_pos"][calib_idx] + + # Rescale the present position to [0, 100] + values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100 + + # Check boundaries + if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR): + # If you want to handle out-of-range differently: + # raise JointOutOfRangeError(msg) + msg = ( + f"Wrong motor position range detected for {name}. " + f"Value = {values[i]} %, expected within [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}]" + ) + print(msg) + + return values + + +class HomonculusGlove: + def __init__(self, serial_port: str = "/dev/ttyACM1", baud_rate: int = 115200): + self.serial_port = serial_port + self.baud_rate = baud_rate + self.serial = serial.Serial(self.serial_port, self.baud_rate, timeout=1) + + # Number of past values to keep in memory + self.buffer_size = 10 + + # Initialize a buffer (deque) for each joint + self.joint_buffer = { + "thumb_0": deque(maxlen=self.buffer_size), + "thumb_1": deque(maxlen=self.buffer_size), + "thumb_2": deque(maxlen=self.buffer_size), + "thumb_3": deque(maxlen=self.buffer_size), + "index_0": deque(maxlen=self.buffer_size), + "index_1": deque(maxlen=self.buffer_size), + "index_2": deque(maxlen=self.buffer_size), + "middle_0": deque(maxlen=self.buffer_size), + "middle_1": deque(maxlen=self.buffer_size), + "middle_2": deque(maxlen=self.buffer_size), + "ring_0": deque(maxlen=self.buffer_size), + "ring_1": deque(maxlen=self.buffer_size), + "ring_2": deque(maxlen=self.buffer_size), + "pinky_0": deque(maxlen=self.buffer_size), + "pinky_1": deque(maxlen=self.buffer_size), + "pinky_2": deque(maxlen=self.buffer_size), + "battery_voltage": deque(maxlen=self.buffer_size), + } + + # Start the reading thread + self.thread = threading.Thread(target=self.async_read, daemon=True) + self.thread.start() + + # Last read dictionary + self.last_d = { + "thumb_0": 100, + "thumb_1": 100, + "thumb_2": 100, + "thumb_3": 100, + "index_0": 100, + "index_1": 100, + "index_2": 100, + "middle_0": 100, + "middle_1": 100, + "middle_2": 100, + "ring_0": 100, + "ring_1": 100, + "ring_2": 100, + "pinky_0": 100, + "pinky_1": 100, + "pinky_2": 100, + "battery_voltage": 100, + } + self.calibration = None + + @property + def joint_names(self): + return list(self.last_d.keys()) + + def read(self, motor_names: list[str] | None = None): + """ + Return the most recent (single) values from self.last_d, + optionally applying calibration. + """ + if motor_names is None: + motor_names = self.joint_names + + # Get raw (last) values + values = np.array([self.last_d[k] for k in motor_names]) + + print(values) + + # Apply calibration if available + if self.calibration is not None: + values = self.apply_calibration(values, motor_names) + print(values) + return values + + def read_running_average(self, motor_names: list[str] | None = None, linearize=False): + """ + Return the AVERAGE of the most recent self.buffer_size (or fewer, if not enough data) readings + for each joint, optionally applying calibration. + """ + if motor_names is None: + motor_names = self.joint_names + + # Gather averaged readings from buffers + smoothed_vals = [] + for name in motor_names: + buf = self.joint_buffer[name] + if len(buf) == 0: + # If no data has been read yet, fall back to last_d + smoothed_vals.append(self.last_d[name]) + else: + # Otherwise, average over the existing buffer + smoothed_vals.append(np.mean(buf)) + + smoothed_vals = np.array(smoothed_vals, dtype=np.float32) + + # Apply calibration if available + if self.calibration is not None: + smoothed_vals = self.apply_calibration(smoothed_vals, motor_names) + + return smoothed_vals + + def async_read(self): + """ + Continuously read from the serial buffer in its own thread, + store into `self.last_d` and also append to the rolling buffer (joint_buffer). + """ + while True: + if self.serial.in_waiting > 0: + self.serial.flush() + vals = self.serial.readline().decode("utf-8").strip() + vals = vals.split(" ") + if len(vals) != 17: + continue + vals = [int(val) for val in vals] + + d = { + "thumb_0": vals[0], + "thumb_1": vals[1], + "thumb_2": vals[2], + "thumb_3": vals[3], + "index_0": vals[4], + "index_1": vals[5], + "index_2": vals[6], + "middle_0": vals[7], + "middle_1": vals[8], + "middle_2": vals[9], + "ring_0": vals[10], + "ring_1": vals[11], + "ring_2": vals[12], + "pinky_0": vals[13], + "pinky_1": vals[14], + "pinky_2": vals[15], + "battery_voltage": vals[16], + } + + # Update the last_d dictionary + self.last_d = d + + # Also push these new values into the rolling buffers + for joint_name, joint_val in d.items(): + self.joint_buffer[joint_name].append(joint_val) + + def run_calibration(self): + print("\nMove arm to open position") + input("Press Enter to continue...") + open_pos_list = [] + for _ in range(100): + open_pos = self.read() + open_pos_list.append(open_pos) + time.sleep(0.01) + open_pos = np.array(open_pos_list) + max_open_pos = open_pos.max(axis=0) + min_open_pos = open_pos.min(axis=0) + + print(f"{max_open_pos=}") + print(f"{min_open_pos=}") + + print("\nMove arm to closed position") + input("Press Enter to continue...") + closed_pos_list = [] + for _ in range(100): + closed_pos = self.read() + closed_pos_list.append(closed_pos) + time.sleep(0.01) + closed_pos = np.array(closed_pos_list) + max_closed_pos = closed_pos.max(axis=0) + closed_pos[closed_pos < 1000] = 60000 + min_closed_pos = closed_pos.min(axis=0) + + print(f"{max_closed_pos=}") + print(f"{min_closed_pos=}") + + open_pos = np.array([max_open_pos, max_closed_pos]).max(axis=0) + closed_pos = np.array([min_open_pos, min_closed_pos]).min(axis=0) + + # INVERTION + # INVERTION + # INVERTION + # INVERTION + # INVERTION + # INVERTION + # INVERTION + for i, jname in enumerate(self.joint_names): + if jname in [ + "thumb_0", + "thumb_3", + "index_2", + "middle_2", + "ring_2", + "pinky_2", + "index_0", + ]: + tmp_pos = open_pos[i] + open_pos[i] = closed_pos[i] + closed_pos[i] = tmp_pos + + print() + print(f"{open_pos=}") + print(f"{closed_pos=}") + + homing_offset = [0] * len(self.joint_names) + drive_mode = [0] * len(self.joint_names) + calib_modes = [CalibrationMode.LINEAR.name] * len(self.joint_names) + + calib_dict = { + "homing_offset": homing_offset, + "drive_mode": drive_mode, + "start_pos": open_pos, + "end_pos": closed_pos, + "calib_mode": calib_modes, + "motor_names": self.joint_names, + } + + file_path = "examples/hopejr/settings/hand_calib.pkl" + + if not os.path.exists(file_path): + with open(file_path, "wb") as f: + pickle.dump(calib_dict, f) + print(f"Dictionary saved to {file_path}") + + # return calib_dict + self.set_calibration(calib_dict) + + + + def set_calibration(self, calibration: dict[str, list]): + self.calibration = calibration + + def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): + """Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with + a "zero position" at 0 degree. + + Note: We say "nominal degree range" since the motors can take values outside this range. For instance, 190 degrees, if the motor + rotate more than a half a turn from the zero position. However, most motors can't rotate more than 180 degrees and will stay in this range. + + Joints values are original in [0, 2**32[ (unsigned int32). Each motor are expected to complete a full rotation + when given a goal position that is + or - their resolution. For instance, feetech xl330-m077 have a resolution of 4096, and + at any position in their original range, let's say the position 56734, they complete a full rotation clockwise by moving to 60830, + or anticlockwise by moving to 52638. The position in the original range is arbitrary and might change a lot between each motor. + To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work + in the centered nominal degree range ]-180, 180[. + """ + if motor_names is None: + motor_names = self.motor_names + + # Convert from unsigned int32 original range [0, 2**32] to signed float32 range + values = values.astype(np.float32) + + for i, name in enumerate(motor_names): + calib_idx = self.calibration["motor_names"].index(name) + calib_mode = self.calibration["calib_mode"][calib_idx] + + if CalibrationMode[calib_mode] == CalibrationMode.LINEAR: + start_pos = self.calibration["start_pos"][calib_idx] + end_pos = self.calibration["end_pos"][calib_idx] + + # Rescale the present position to a nominal range [0, 100] %, + # useful for joints with linear motions like Aloha gripper + values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100 + + if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR): + if name == "pinky_1" and (values[i] < LOWER_BOUND_LINEAR): + values[i] = end_pos + else: + msg = ( + f"Wrong motor position range detected for {name}. " + f"Expected to be in nominal range of [0, 100] % (a full linear translation), " + f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, " + f"but present value is {values[i]} %. " + "This might be due to a cable connection issue creating an artificial jump in motor values. " + "You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`" + ) + print(msg) + # raise JointOutOfRangeError(msg) + + return values + + # def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): + # """Inverse of `apply_calibration`.""" + # if motor_names is None: + # motor_names = self.motor_names + + # for i, name in enumerate(motor_names): + # calib_idx = self.calibration["motor_names"].index(name) + # calib_mode = self.calibration["calib_mode"][calib_idx] + + # if CalibrationMode[calib_mode] == CalibrationMode.LINEAR: + # start_pos = self.calibration["start_pos"][calib_idx] + # end_pos = self.calibration["end_pos"][calib_idx] + + # # Convert from nominal lnear range of [0, 100] % to + # # actual motor range of values which can be arbitrary. + # values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos + + # values = np.round(values).astype(np.int32) + # return values + +class EncoderReader: + def __init__(self, serial_port="/dev/ttyUSB1", baud_rate=115200): + self.serial_port = serial_port + self.baud_rate = baud_rate + self.serial = serial.Serial(self.serial_port, self.baud_rate, timeout=1) + + # Start a background thread to continuously read from the serial port + self.thread = threading.Thread(target=self.async_read, daemon=True) + self.thread.start() + + # Store the latest encoder reading in this dictionary + self.last_d = {"encoder": 500} + + def async_read(self): + while True: + # Read one line from serial + line = self.serial.readline().decode("utf-8").strip() + if line: + try: + val = int(line) # Parse the incoming line as integer + self.last_d["encoder"] = val + except ValueError: + # If we couldn't parse it as an integer, just skip + pass + + def read(self): + """ + Returns the last encoder value that was read. + """ + return self.last_d["encoder"] + +class Tac_Man: + def __init__(self, serial_port="/dev/ttyUSB1", baud_rate=115200): + self.serial_port = serial_port + self.baud_rate = baud_rate + self.serial = serial.Serial(self.serial_port, self.baud_rate, timeout=1) + + # Start a background thread to continuously read from the serial port + self.thread = threading.Thread(target=self.async_read, daemon=True) + self.thread.start() + + # Store the latest encoder readings in this list + self.last_d = [0, 0, 0] # Default values for three readings + + def async_read(self): + while True: + # Read one line from serial + line = self.serial.readline().decode("utf-8").strip() + if line: + try: + # Parse the incoming line as three comma-separated integers + values = [int(val) for val in line.split(",")] + if len(values) == 3: # Ensure we have exactly three values + self.last_d = values + except ValueError: + # If parsing fails, skip this line + pass + + def read(self): + """ + Returns the last encoder values that were read as a list of three integers. + """ + return self.last_d diff --git a/examples/hopejr/notes b/examples/hopejr/notes new file mode 100644 index 00000000..9362bea9 --- /dev/null +++ b/examples/hopejr/notes @@ -0,0 +1,103 @@ +test and test4 +installed serial and opencv +after pip install -e . +pip install -e ".[feetech]" + +robot.hand_bus.read("Present_Position") +array([ 349, 799, 1000, 1004, 508, 503, 673, 608, 791, 390, 552, + 506, 600, 565, 428, 379], dtype=int32) + +robot.hand_bus.write("Goal_Position",[349,799,500,500,508,503,673,608,791,390,552,506,600,565,428,379]) + + +robot.arm_bus.write("Goal_Position", [1825, 2045, 2010, 2035, 1414, 1800, 1615]) +robot.arm_bus.read("Present_Position") + +robot.arm_bus.write("Goal_Position", [1500], ["elbow_flex"]) +robot.arm_bus.write("Goal_Position", [2000], ["wrist_yaw"]) + +ranges: [600-2300, 1500-2300, 1300-2800, 1000-2500, 600-2800,400-1700, 1300-2300] + shoulder_up, + shoulder forward, + shoulder yaw, + elbow_flex + wrist_yaw, + wrist_pitch, + wrist_roll + +COM18 + +C:/Users/Lenovo/AppData/Local/Programs/Python/Python310/python.exe c:/Users/Lenovo/Documents/HuggingFace/lerobot/examples/test4.py + +wrist pitch is fucked + + +so the wrist motor was fucked +and we didnt know which one it was because +if the chain hjas an issue we dont know how to locate whihc motor is at fault (cables are hard to remove) + +to calibrate: + +python lerobot/scripts/configure_motor.py \ + --port /dev/ttyACM1 \ + --brand feetech \ + --model sts3250 \ + --baudrate 1000000 \ + --ID 2 + + +python lerobot/scripts/configure_motor.py \ + --port /dev/ttyACM0 \ + --brand feetech \ + --model sm8512bl \ + --baudrate 115200 \ + --ID 1 + +python lerobot/scripts/configure_motor.py \ + --port /dev/ttyACM1 \ + --brand feetech \ + --model scs0009 \ + --baudrate 1000000 \ + --ID 30 + + why are the motors beeping? + + +#interpolate between start and end pos +robot.arm_bus.write("Goal_Position", [int((i*interp+j*(1-interp))) for i, j in zip(arm_calibration["start_pos"], arm_calibration["end_pos"])]) + +control maj M to look for stuff + +set calibration is useless + +move the joints to that position too + + +/home/nepyope/Desktop/HuggingFace/lerobot/lerobot/common/robot_devices/motors/feetech.py + +theres clearly some lag, and its probably because of an out of range issue + + + # hand_calibration = robot.get_hand_calibration() + # joint = input("Enter joint name: ") + # j1 = f"{joint}_pinky_side" + # j2 = f"{joint}_thumb_side" + # encoder = EncoderReader("/dev/ttyUSB0", 115200) + # start_angle1 = hand_calibration['start_pos'][hand_calibration['motor_names'].index(j1)] + # end_angle1 = hand_calibration['end_pos'][hand_calibration['motor_names'].index(j1)] + # start_angle2 = hand_calibration['start_pos'][hand_calibration['motor_names'].index(j2)] + # end_angle2 = hand_calibration['end_pos'][hand_calibration['motor_names'].index(j2)] + # # start_angle = shoulder_calibration['start_pos'][shoulder_calibration['motor_names'].index(joint)] + # # end_angle = shoulder_calibration['end_pos'][shoulder_calibration['motor_names'].index(joint)] + # while True: + # angle1 = int(start_angle1+(end_angle1-start_angle1)*encoder.read()/1000) + # angle2 = int(start_angle2+(end_angle2-start_angle2)*encoder.read()/1000) + + # robot.hand_bus.write("Goal_Position",angle1, [j1]) + # robot.hand_bus.write("Goal_Position",angle2, [j2]) + # print(angle1, angle2) + # time.sleep(0.1) + + # print(robot.hand_bus.find_motor_indices()) + # exit() + diff --git a/examples/hopejr/read.cs b/examples/hopejr/read.cs new file mode 100644 index 00000000..86b63fb9 --- /dev/null +++ b/examples/hopejr/read.cs @@ -0,0 +1,54 @@ +#include + +// Define multiplexer input pins +#define S0 5 +#define S1 6 +#define S2 7 +#define S3 8 +#define SENSOR_INPUT 4 + +#define SENSOR_COUNT 16 + +int rawVals[SENSOR_COUNT]; + +void measureRawValues() { + for (uint8_t i = 0; i < SENSOR_COUNT; i++) { + digitalWrite(S0, i & 0b1); + digitalWrite(S1, i >> 1 & 0b1); + digitalWrite(S2, i >> 2 & 0b1); + digitalWrite(S3, i >> 3 & 0b1); + delay(1); + + rawVals[i] = analogRead(SENSOR_INPUT); + } +} + +void printRawValues() { + for (uint8_t i = 0; i < SENSOR_COUNT; i++) { + Serial.print(rawVals[i]); + if (i < SENSOR_COUNT - 1) Serial.print(" "); + } + Serial.println(); +} + +void setup() { + Serial.begin(115200); + analogReadResolution(12); + analogSetAttenuation(ADC_11db);; // ~0.0–1.5V range + + pinMode(S0, OUTPUT); + pinMode(S1, OUTPUT); + pinMode(S2, OUTPUT); + pinMode(S3, OUTPUT); + + digitalWrite(S0, LOW); + digitalWrite(S1, LOW); + digitalWrite(S2, LOW); + digitalWrite(S3, LOW); +} + +void loop() { + measureRawValues(); + printRawValues(); + delay(1); +} diff --git a/examples/hopejr/rolling/input.gif b/examples/hopejr/rolling/input.gif new file mode 100644 index 00000000..0987367a Binary files /dev/null and b/examples/hopejr/rolling/input.gif differ diff --git a/examples/hopejr/rolling/result.png b/examples/hopejr/rolling/result.png new file mode 100644 index 00000000..c86c26e8 Binary files /dev/null and b/examples/hopejr/rolling/result.png differ diff --git a/examples/hopejr/rolling/test.py b/examples/hopejr/rolling/test.py new file mode 100644 index 00000000..53e17bc5 --- /dev/null +++ b/examples/hopejr/rolling/test.py @@ -0,0 +1,74 @@ +import numpy as np +from PIL import Image, ImageSequence + +def coalesce_gif(im): + """ + Attempt to coalesce frames so each one is a full image. + This handles many (though not all) partial-frame GIFs. + """ + # Convert mode to RGBA + im = im.convert("RGBA") + + # Prepare an accumulator the same size as the base frame + base = Image.new("RGBA", im.size) + frames = [] + + # Go through each frame + for frame in ImageSequence.Iterator(im): + base.alpha_composite(frame.convert("RGBA")) + frames.append(base.copy()) + return frames + +def remove_white_make_black(arr, threshold=250): + """ + For each pixel in arr (H,W,3), if R,G,B >= threshold, set to black (0,0,0). + This effectively 'removes' white so it won't affect the sum. + """ + mask = (arr[..., 0] >= threshold) & \ + (arr[..., 1] >= threshold) & \ + (arr[..., 2] >= threshold) + arr[mask] = 0 # set to black + +def main(): + # Load the animated GIF + gif = Image.open("input.gif") + + # Coalesce frames so each is full-size + frames = coalesce_gif(gif) + if not frames: + print("No frames found!") + return + + # Convert first frame to RGB array, initialize sum array + w, h = frames[0].size + sum_array = np.zeros((h, w, 3), dtype=np.uint16) # 16-bit to avoid overflow + + # For each frame: + for f in frames: + # Convert to RGB + rgb = f.convert("RGB") + arr = np.array(rgb, dtype=np.uint16) # shape (H, W, 3) + + # Remove near-white by setting it to black + remove_white_make_black(arr, threshold=250) + + # Add to sum_array, then clamp to 255 + sum_array += arr + np.clip(sum_array, 0, 255, out=sum_array) + + # Convert sum_array back to 8-bit + sum_array = sum_array.astype(np.uint8) + + # Finally, any pixel that stayed black is presumably "empty," so we set it to white + black_mask = (sum_array[..., 0] == 0) & \ + (sum_array[..., 1] == 0) & \ + (sum_array[..., 2] == 0) + sum_array[black_mask] = [255, 255, 255] + + # Create final Pillow image + final_img = Image.fromarray(sum_array, mode="RGB") + final_img.save("result.png") + print("Done! Wrote result.png.") + +if __name__ == "__main__": + main() diff --git a/examples/hopejr/settings/arm_calib.pkl b/examples/hopejr/settings/arm_calib.pkl new file mode 100644 index 00000000..e0308c53 Binary files /dev/null and b/examples/hopejr/settings/arm_calib.pkl differ diff --git a/examples/hopejr/settings/config.yaml b/examples/hopejr/settings/config.yaml new file mode 100644 index 00000000..3d4bccfa --- /dev/null +++ b/examples/hopejr/settings/config.yaml @@ -0,0 +1,15 @@ +robot: + arm_bus: + Lock: 0 + Torque_Limit: 1000 + Protection_Current: 500 + Over_Current_Protection_Time: 10 + Max_Torque_Limit: 1000 + Overload_Torque: 40 # Play around with this + Protection_Time: 1000 # When does it kick in? + Protective_Torque: 1 + Maximum_Acceleration: 100 + Torque_Enable: 1 + Acceleration: 100 + hand_bus: + Acceleration: 100 \ No newline at end of file diff --git a/examples/hopejr/settings/hand_calib.pkl b/examples/hopejr/settings/hand_calib.pkl new file mode 100644 index 00000000..d80b056f Binary files /dev/null and b/examples/hopejr/settings/hand_calib.pkl differ diff --git a/examples/hopejr/speedtest.py b/examples/hopejr/speedtest.py new file mode 100644 index 00000000..bd1c1f8b --- /dev/null +++ b/examples/hopejr/speedtest.py @@ -0,0 +1,61 @@ +import time +import numpy as np +from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus + +def main(): + # Instantiate the bus for a single motor on port /dev/ttyACM0. + arm_bus = FeetechMotorsBus( + port="/dev/ttyACM0", + motors={"wrist_pitch": [1, "scs0009"]}, + protocol_version=1, + group_sync_read=False, # using individual read calls + ) + arm_bus.connect() + + # Configure continuous rotation mode. + arm_bus.write("Min_Angle_Limit", 0) + arm_bus.write("Max_Angle_Limit", 1024) + + # For model "scs0009", the raw reading runs from 0 to ~1022. + resolution_max = 1022 # use 1022 as the effective maximum raw value + + # Read initial raw motor position. + prev_raw = arm_bus.read("Present_Position", ["wrist_pitch"])[0] + print("Initial raw position:", prev_raw) + + # Command continuous rotation. + arm_bus.write("Goal_Position", 1024) + + # Initialize loop counter. + loops_count = 0 + target_effective = 1780 + tolerance = 50 # stop when effective position is within ±50 of target + + while True: + current_raw = arm_bus.read("Present_Position", ["wrist_pitch"])[0] + + # Detect wrap-around: if the previous reading was near the top (>= 1020) + # and current reading is low (< 100), count that as one full loop. + if prev_raw >= 1020 and current_raw < 100: + loops_count += 1 + print(f"Wrap detected! loops_count increased to {loops_count}") + + # Compute the effective position. + effective_position = loops_count * resolution_max + current_raw + print(f"Raw position: {current_raw} | loops_count: {loops_count} | Effective position: {effective_position}") + + # Check if effective position is within tolerance of the target. + if abs(effective_position - target_effective) <= tolerance: + # Command motor to stop by setting the current raw position as goal. + arm_bus.write("Goal_Position", current_raw) + print(f"Target reached (effective position: {effective_position}). Stopping motor at raw position {current_raw}.") + break + + prev_raw = current_raw + time.sleep(0.01) # 10 ms delay + + time.sleep(1) + arm_bus.disconnect() + +if __name__ == "__main__": + main() diff --git a/examples/hopejr/teleop.py b/examples/hopejr/teleop.py new file mode 100644 index 00000000..93c18e34 --- /dev/null +++ b/examples/hopejr/teleop.py @@ -0,0 +1,219 @@ +from follower import HopeJuniorRobot +from leader import ( + HomonculusArm, + HomonculusGlove, + EncoderReader +) +from visualizer import value_to_color + +import time +import numpy as np + +import pickle +import pygame +import typer + +def main( + calibrate_glove: bool = typer.Option(False, "--calibrate-glove", help="Calibrate the glove"), + calibrate_exoskeleton: bool = typer.Option(False, "--calibrate-exoskeleton", help="Calibrate the exoskeleton"), + freeze_fingers: bool = typer.Option(False, "--freeze-fingers", help="Freeze the fingers"), + freeze_arm: bool = typer.Option(False, "--freeze-arm", help="Freeze the arm")): + show_loads: bool = typer.Option(False, "--show-loads", help="Show the loads in a GUI") + robot = HopeJuniorRobot() + + + robot.connect_hand() + #robot.connect_arm() + #read pos + print(robot.hand_bus.read("Present_Position")) + #print(robot.arm_bus.read("Present_Position", "shoulder_pitch")) + #print(robot.arm_bus.read("Present_Position",["shoulder_yaw","shoulder_roll","elbow_flex","wrist_roll","wrist_yaw","wrist_pitch"])) + + #for i in range(10): + # time.sleep(0.1) + # robot.apply_arm_config('examples/hopejr/settings/config.yaml') + + # #calibrate arm + #arm_calibration = robot.get_arm_calibration() + #exoskeleton = HomonculusArm(serial_port="/dev/ttyACM2") + #robot.arm_bus.write("Goal_Position", robot.arm_calib_dict["start_pos"][0]*0.7 + robot.arm_calib_dict["end_pos"][0]*0.3, ["shoulder_pitch"]) + + #if calibrate_exoskeleton: + # exoskeleton.run_calibration(robot) + + #file_path = "examples/hopejr/settings/arm_calib.pkl" + #with open(file_path, "rb") as f: + # calib_dict = pickle.load(f) + #print("Loaded dictionary:", calib_dict) + #exoskeleton.set_calibration(calib_dict) + + #calibrate hand + hand_calibration = robot.get_hand_calibration() + glove = HomonculusGlove(serial_port = "/dev/ttyACM1") + + if calibrate_glove: + glove.run_calibration() + + file_path = "examples/hopejr/settings/hand_calib.pkl" + with open(file_path, "rb") as f: + calib_dict = pickle.load(f) + print("Loaded dictionary:", calib_dict) + glove.set_calibration(calib_dict) + + robot.hand_bus.set_calibration(hand_calibration) + #robot.arm_bus.set_calibration(arm_calibration) + + # Initialize Pygame + # pygame.init() + + # # Set up the display + # screen = pygame.display.set_mode((800, 600)) + + # pygame.display.set_caption("Robot Hand Visualization") + + + # # Create hand structure with 16 squares and initial values + # hand_components = [] + + # # Add thumb (4 squares in diamond shape) + # thumb_positions = [ + # (150, 300), (125, 350), + # (175, 350), (150, 400) + # ] + # for pos in thumb_positions: + # hand_components.append({"pos": pos, "value": 0}) + + # # Add fingers (4 fingers with 3 squares each in vertical lines) + # finger_positions = [ + # (200, 100), # Index + # (250, 100), # Middle + # (300, 100), # Ring + # (350, 100) # Pinky + # ] + + # for x, y in finger_positions: + # for i in range(3): + # hand_components.append({"pos": (x, y + i * 50), "value": 0}) + + for i in range(1000000000000000): + # robot.apply_arm_config('examples/hopejr/settings//config.yaml') + # robot.arm_bus.write("Acceleration", 50, "shoulder_yaw") + # joint_names = ["shoulder_pitch", "shoulder_yaw", "shoulder_roll", "elbow_flex", "wrist_roll", "wrist_yaw", "wrist_pitch"] + # joint_values = exoskeleton.read_running_average(motor_names=joint_names, linearize=True) + + # joint_values = joint_values.round().astype(int) + # joint_dict = {k: v for k, v in zip(joint_names, joint_values, strict=False)} + + # motor_values = [] + # motor_names = [] + # motor_names += ["shoulder_pitch", "shoulder_yaw", "shoulder_roll", "elbow_flex", "wrist_roll", "wrist_yaw", "wrist_pitch"] + # motor_values += [joint_dict[name]-30 for name in motor_names] + + # motor_values = np.array(motor_values) + # motor_values = np.clip(motor_values, 0, 100) + # if not freeze_arm: + # robot.arm_bus.write("Goal_Position", motor_values, motor_names) + + if not freeze_fingers:#include hand + hand_joint_names = [] + hand_joint_names += ["thumb_3", "thumb_2", "thumb_1", "thumb_0"]#, "thumb_3"] + hand_joint_names += ["index_0", "index_1", "index_2"] + hand_joint_names += ["middle_0", "middle_1", "middle_2"] + hand_joint_names += ["ring_0", "ring_1", "ring_2"] + hand_joint_names += ["pinky_0", "pinky_1", "pinky_2"] + hand_joint_values = glove.read(hand_joint_names) + hand_joint_values = hand_joint_values.round( ).astype(int) + hand_joint_dict = {k: v for k, v in zip(hand_joint_names, hand_joint_values, strict=False)} + + hand_motor_values = [] + hand_motor_names = [] + + # Thumb + hand_motor_names += ["thumb_basel_rotation", "thumb_mcp", "thumb_pip", "thumb_dip"]#, "thumb_MCP"] + hand_motor_values += [ + hand_joint_dict["thumb_3"], + hand_joint_dict["thumb_2"], + hand_joint_dict["thumb_1"], + hand_joint_dict["thumb_0"] + ] + + # # Index finger + index_splay = 0.1 + hand_motor_names += ["index_flexor", "index_pinky_side", "index_thumb_side"] + hand_motor_values += [ + hand_joint_dict["index_2"], + (100 - hand_joint_dict["index_0"]) * index_splay + hand_joint_dict["index_1"] * (1 - index_splay), + hand_joint_dict["index_0"] * index_splay + hand_joint_dict["index_1"] * (1 - index_splay), + ] + + # Middle finger + middle_splay = 0.1 + hand_motor_names += ["middle_flexor", "middle_pinky_side", "middle_thumb_side"] + hand_motor_values += [ + hand_joint_dict["middle_2"], + hand_joint_dict["middle_0"] * middle_splay + hand_joint_dict["middle_1"] * (1 - middle_splay), + (100 - hand_joint_dict["middle_0"]) * middle_splay + hand_joint_dict["middle_1"] * (1 - middle_splay), + ] + + # # Ring finger + ring_splay = 0.1 + hand_motor_names += ["ring_flexor", "ring_pinky_side", "ring_thumb_side"] + hand_motor_values += [ + hand_joint_dict["ring_2"], + (100 - hand_joint_dict["ring_0"]) * ring_splay + hand_joint_dict["ring_1"] * (1 - ring_splay), + hand_joint_dict["ring_0"] * ring_splay + hand_joint_dict["ring_1"] * (1 - ring_splay), + ] + + # # Pinky finger + pinky_splay = -.1 + hand_motor_names += ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"] + hand_motor_values += [ + hand_joint_dict["pinky_2"], + hand_joint_dict["pinky_0"] * pinky_splay + hand_joint_dict["pinky_1"] * (1 - pinky_splay), + (100 - hand_joint_dict["pinky_0"]) * pinky_splay + hand_joint_dict["pinky_1"] * (1 - pinky_splay), + ] + + hand_motor_values = np.array(hand_motor_values) + hand_motor_values = np.clip(hand_motor_values, 0, 100) + robot.hand_bus.write("Acceleration", 255, hand_motor_names) + robot.hand_bus.write("Goal_Position", hand_motor_values, hand_motor_names) + + # if i%20==0 and i > 100: + # try: + # loads = robot.hand_bus.read("Present_Load") + # for i, comp in enumerate(hand_components): + # # Wave oscillates between 0 and 2024: + # # Center (1012) +/- 1012 * sin(...) + # comp["value"] = loads[i] + # except: + # pass + + + time.sleep(0.01) + + # for event in pygame.event.get(): + # if event.type == pygame.QUIT: + # robot.hand_bus.disconnect() + # robot.arm_bus.disconnect() + # exit() + # # Check for user pressing 'q' to quit + # if event.type == pygame.KEYDOWN: + # if event.key == pygame.K_q: + # robot.hand_bus.disconnect() + # robot.arm_bus.disconnect() + # exit() + + # # Draw background + # screen.fill((0, 0, 0)) # Black background + + # # Draw hand components + # for comp in hand_components: + # x, y = comp["pos"] + # color = value_to_color(comp["value"]) + # pygame.draw.rect(screen, color, (x, y, 30, 30)) + + # pygame.display.flip() + + +if __name__ == "__main__": + typer.run(main) diff --git a/examples/hopejr/teleop_stats/asd.py b/examples/hopejr/teleop_stats/asd.py new file mode 100644 index 00000000..5419b7e6 --- /dev/null +++ b/examples/hopejr/teleop_stats/asd.py @@ -0,0 +1,135 @@ +import serial +import threading +import time +import numpy as np +import matplotlib.pyplot as plt + +# Thread function to read from a serial port continuously until stop_event is set. +def read_serial(port, baudrate, stop_event, data_list): + try: + ser = serial.Serial(port, baudrate, timeout=1) + except Exception as e: + print(f"Error opening {port}: {e}") + return + + while not stop_event.is_set(): + try: + line = ser.readline().decode('utf-8').strip() + except Exception as e: + print(f"Decode error on {port}: {e}") + continue + + if line: + try: + # Split the line into integer values. + values = [int(x) for x in line.split()] + # For ACM1, ignore the extra value if present. + if len(values) >= 16: + if len(values) > 16: + values = values[:16] + # Save the timestamp (relative to start) with the sensor readings. + timestamp = time.time() + data_list.append((timestamp, values)) + except Exception as e: + print(f"Error parsing line from {port}: '{line}' -> {e}") + ser.close() + +def main(): + # --- Configuration --- + # Set your serial port names here (adjust for your system) + acm0_port = "/dev/ttyACM0" # Example for Linux (or "COM3" on Windows) + acm1_port = "/dev/ttyACM1" # Example for Linux (or "COM4" on Windows) + baudrate = 115200 + + # Data storage for each device: + data_acm0 = [] # Will hold tuples of (timestamp, [16 sensor values]) + data_acm1 = [] + + # Event to signal threads to stop reading. + stop_event = threading.Event() + + # Create and start reader threads. + thread_acm0 = threading.Thread(target=read_serial, args=(acm0_port, baudrate, stop_event, data_acm0)) + thread_acm1 = threading.Thread(target=read_serial, args=(acm1_port, baudrate, stop_event, data_acm1)) + thread_acm0.start() + thread_acm1.start() + + # Record data for 10 seconds. + record_duration = 10 # seconds + start_time = time.time() + time.sleep(record_duration) + stop_event.set() # signal threads to stop + + # Wait for both threads to finish. + thread_acm0.join() + thread_acm1.join() + print("Finished recording.") + + # --- Process the Data --- + # Convert lists of (timestamp, values) to numpy arrays. + # Compute time relative to the start of the recording. + times_acm0 = np.array([t - start_time for t, _ in data_acm0]) + sensor_acm0 = np.array([vals for _, vals in data_acm0]) # shape (N0, 16) + + times_acm1 = np.array([t - start_time for t, _ in data_acm1]) + sensor_acm1 = np.array([vals for _, vals in data_acm1]) # shape (N1, 16) + + # --- Plot 1: Overlapping Time Series --- + plt.figure(figsize=(12, 8)) + # Plot each sensor from ACM0 in red. + for i in range(16): + plt.plot(times_acm0, sensor_acm0[:, i], color='red', alpha=0.7, + label='ACM0 Sensor 1' if i == 0 else None) + # Plot each sensor from ACM1 in blue. + for i in range(16): + plt.plot(times_acm1, sensor_acm1[:, i], color='blue', alpha=0.7, + label='ACM1 Sensor 1' if i == 0 else None) + plt.xlabel("Time (s)") + plt.ylabel("Sensor Reading") + plt.title("Overlapping Sensor Readings (ACM0 in Red, ACM1 in Blue)") + plt.legend() + plt.tight_layout() + plt.savefig("overlapping_sensor_readings.png", dpi=300) + plt.close() + print("Saved overlapping_sensor_readings.png") + + # --- Plot 2: Variance of Noise for Each Sensor --- + # Compute variance (over time) for each sensor channel. + variance_acm0 = np.var(sensor_acm0, axis=0) + variance_acm1 = np.var(sensor_acm1, axis=0) + sensor_numbers = np.arange(1, 17) + bar_width = 0.35 + + plt.figure(figsize=(12, 6)) + plt.bar(sensor_numbers - bar_width/2, variance_acm0, bar_width, color='red', label='ACM0') + plt.bar(sensor_numbers + bar_width/2, variance_acm1, bar_width, color='blue', label='ACM1') + plt.xlabel("Sensor Number") + plt.ylabel("Variance") + plt.title("Noise Variance per Sensor") + plt.xticks(sensor_numbers) + plt.legend() + plt.tight_layout() + plt.savefig("sensor_variance.png", dpi=300) + plt.close() + print("Saved sensor_variance.png") + + # --- Plot 3: Difference Between ACM0 and ACM1 Readings --- + # Since the two devices may not sample at exactly the same time, + # we interpolate ACM1's data onto ACM0's time base for each sensor. + plt.figure(figsize=(12, 8)) + for i in range(16): + if len(times_acm1) > 1 and len(times_acm0) > 1: + interp_acm1 = np.interp(times_acm0, times_acm1, sensor_acm1[:, i]) + diff = sensor_acm0[:, i] - interp_acm1 + plt.plot(times_acm0, diff, label=f"Sensor {i+1}") + plt.xlabel("Time (s)") + plt.ylabel("Difference (ACM0 - ACM1)") + plt.title("Difference in Sensor Readings") + plt.legend(fontsize='small', ncol=2) + plt.tight_layout() + plt.savefig("sensor_differences.png", dpi=300) + plt.close() + print("Saved sensor_differences.png") + +if __name__ == "__main__": + main() diff --git a/examples/hopejr/teleop_stats/overlapping_sensor_readings.png b/examples/hopejr/teleop_stats/overlapping_sensor_readings.png new file mode 100644 index 00000000..faf74be2 Binary files /dev/null and b/examples/hopejr/teleop_stats/overlapping_sensor_readings.png differ diff --git a/examples/hopejr/teleop_stats/sensor_differences.png b/examples/hopejr/teleop_stats/sensor_differences.png new file mode 100644 index 00000000..70465c23 Binary files /dev/null and b/examples/hopejr/teleop_stats/sensor_differences.png differ diff --git a/examples/hopejr/teleop_stats/sensor_variance.png b/examples/hopejr/teleop_stats/sensor_variance.png new file mode 100644 index 00000000..d19d6c43 Binary files /dev/null and b/examples/hopejr/teleop_stats/sensor_variance.png differ diff --git a/examples/hopejr/teleopp.py b/examples/hopejr/teleopp.py new file mode 100644 index 00000000..4c7f6655 --- /dev/null +++ b/examples/hopejr/teleopp.py @@ -0,0 +1,84 @@ +import time +import serial +import numpy as np +import matplotlib.pyplot as plt + +# Import the motor bus (adjust the import path as needed) +from lerobot.common.robot_devices.motors.feetech import FeetechMotorsBus + +def main(): + # ------------------------------- + # Setup the motor bus (ACM0) + # ------------------------------- + arm_bus = FeetechMotorsBus( + port="/dev/ttyACM0", + motors={ + "wrist_pitch": [7, "sts3215"], + }, + protocol_version=0, + ) + arm_bus.connect() + + # ------------------------------- + # Setup the serial connection for sensor (ACM1) + # ------------------------------- + try: + ser = serial.Serial("/dev/ttyACM1", 115200, timeout=1) + except Exception as e: + print(f"Error opening serial port /dev/ttyACM1: {e}") + return + + # Lists to store the motor positions and sensor values. + positions = [] + sensor_values = [] + + # ------------------------------- + # Loop: move motor and collect sensor data + # ------------------------------- + # We assume that 2800 > 1480 so we decrement by 10 each step. + for pos in range(2800, 1500, -10): # 2800 down to 1480 (inclusive) + # Command the motor to go to position 'pos' + arm_bus.write("Goal_Position", pos, ["wrist_pitch"]) + + # Wait a short period for the motor to move and the sensor to update. + time.sleep(0.01) + + # Read one line from the sensor device. + sensor_val = np.nan # default if reading fails + try: + line = ser.readline().decode('utf-8').strip() + if line: + # Split the line into parts and convert each part to int. + parts = line.split() + # Ensure there are enough values (we expect at least 15 values) + if len(parts) >= 15: + values = [int(x) for x in parts] + # Use the 15th value (index 14) + sensor_val = values[14] + except Exception as e: + print(f"Error parsing sensor data: {e}") + + positions.append(pos) + sensor_values.append(sensor_val) + print(f"Motor pos: {pos} | Sensor 15th value: {sensor_val}") + + #move it back to + arm_bus.write("Goal_Position", 2800, ["wrist_pitch"]) + # ------------------------------- + # Plot the data: Motor Angle vs. Sensor 15th Value + # ------------------------------- + plt.figure(figsize=(10, 6)) + plt.plot(positions, sensor_values, marker='o', linestyle='-') + plt.xlabel("Motor Angle") + plt.ylabel("Sensor 15th Value") + plt.title("Motor Angle vs Sensor 15th Value") + plt.grid(True) + plt.savefig("asd.png", dpi=300) + plt.close() + print("Plot saved as asd.png") + + # Close the serial connection. + ser.close() + +if __name__ == "__main__": + main() diff --git a/examples/hopejr/test.py b/examples/hopejr/test.py new file mode 100644 index 00000000..b06bbe5d --- /dev/null +++ b/examples/hopejr/test.py @@ -0,0 +1,682 @@ +import threading +import time +from typing import Callable + +import cv2 +import numpy as np + +# from qai_hub_models.models.mediapipe_hand.app import MediaPipeHandApp +# from qai_hub_models.models.mediapipe_hand.model import ( +# MediaPipeHand, +# ) +# from qai_hub_models.utils.image_processing import ( +# app_to_net_image_inputs, +# ) +from lerobot.common.robot_devices.motors.feetech import ( + CalibrationMode, + FeetechMotorsBus, +) + +LOWER_BOUND_LINEAR = -100 +UPPER_BOUND_LINEAR = 200 + +import serial + + +class HomonculusGlove: + def __init__(self): + self.serial_port = "COM10" + self.baud_rate = 115200 + self.serial = serial.Serial(self.serial_port, self.baud_rate, timeout=1) + self.thread = threading.Thread(target=self.async_read) + self.thread.start() + self.last_d = { + "thumb_0": 100, + "thumb_1": 100, + "thumb_2": 100, + "thumb_3": 100, + "index_0": 100, + "index_1": 100, + "index_2": 100, + "middle_0": 100, + "middle_1": 100, + "middle_2": 100, + "ring_0": 100, + "ring_1": 100, + "ring_2": 100, + "pinky_0": 100, + "pinky_1": 100, + "pinky_2": 100, + "battery_voltage": 100, + } + self.calibration = None + + @property + def joint_names(self): + return list(self.last_d.keys()) + + def read(self, motor_names: list[str] | None = None): + if motor_names is None: + motor_names = self.joint_names + + values = np.array([self.last_d[k] for k in motor_names]) + + print(motor_names) + print(values) + + if self.calibration is not None: + values = self.apply_calibration(values, motor_names) + print(values) + return values + + def async_read(self): + while True: + if self.serial.in_waiting > 0: + self.serial.flush() + vals = self.serial.readline().decode("utf-8").strip() + vals = vals.split(" ") + if len(vals) != 17: + continue + vals = [int(val) for val in vals] + + d = { + "thumb_0": vals[0], + "thumb_1": vals[1], + "thumb_2": vals[2], + "thumb_3": vals[3], + "index_0": vals[4], + "index_1": vals[5], + "index_2": vals[6], + "middle_0": vals[7], + "middle_1": vals[8], + "middle_2": vals[9], + "ring_0": vals[10], + "ring_1": vals[11], + "ring_2": vals[12], + "pinky_0": vals[13], + "pinky_1": vals[14], + "pinky_2": vals[15], + "battery_voltage": vals[16], + } + self.last_d = d + # print(d.values()) + + def run_calibration(self): + print("\nMove arm to open position") + input("Press Enter to continue...") + open_pos_list = [] + for _ in range(300): + open_pos = self.read() + open_pos_list.append(open_pos) + time.sleep(0.01) + open_pos = np.array(open_pos_list) + max_open_pos = open_pos.max(axis=0) + min_open_pos = open_pos.min(axis=0) + + print(f"{max_open_pos=}") + print(f"{min_open_pos=}") + + print("\nMove arm to closed position") + input("Press Enter to continue...") + closed_pos_list = [] + for _ in range(300): + closed_pos = self.read() + closed_pos_list.append(closed_pos) + time.sleep(0.01) + closed_pos = np.array(closed_pos_list) + max_closed_pos = closed_pos.max(axis=0) + closed_pos[closed_pos < 1000] = 60000 + min_closed_pos = closed_pos.min(axis=0) + + print(f"{max_closed_pos=}") + print(f"{min_closed_pos=}") + + open_pos = np.array([max_open_pos, max_closed_pos]).max(axis=0) + closed_pos = np.array([min_open_pos, min_closed_pos]).min(axis=0) + + # INVERTION + # INVERTION + # INVERTION + # INVERTION + # INVERTION + # INVERTION + # INVERTION + for i, jname in enumerate(self.joint_names): + if jname in ["thumb_0", "thumb_3", "index_2", "middle_2", "ring_2", "pinky_0", "pinky_2"]: + tmp_pos = open_pos[i] + open_pos[i] = closed_pos[i] + closed_pos[i] = tmp_pos + + print() + print(f"{open_pos=}") + print(f"{closed_pos=}") + + homing_offset = [0] * len(self.joint_names) + drive_mode = [0] * len(self.joint_names) + calib_modes = [CalibrationMode.LINEAR.name] * len(self.joint_names) + + calib_dict = { + "homing_offset": homing_offset, + "drive_mode": drive_mode, + "start_pos": open_pos, + "end_pos": closed_pos, + "calib_mode": calib_modes, + "motor_names": self.joint_names, + } + # return calib_dict + self.set_calibration(calib_dict) + + def set_calibration(self, calibration: dict[str, list]): + self.calibration = calibration + + def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): + """Convert from unsigned int32 joint position range [0, 2**32[ to the universal float32 nominal degree range ]-180.0, 180.0[ with + a "zero position" at 0 degree. + + Note: We say "nominal degree range" since the motors can take values outside this range. For instance, 190 degrees, if the motor + rotate more than a half a turn from the zero position. However, most motors can't rotate more than 180 degrees and will stay in this range. + + Joints values are original in [0, 2**32[ (unsigned int32). Each motor are expected to complete a full rotation + when given a goal position that is + or - their resolution. For instance, feetech xl330-m077 have a resolution of 4096, and + at any position in their original range, let's say the position 56734, they complete a full rotation clockwise by moving to 60830, + or anticlockwise by moving to 52638. The position in the original range is arbitrary and might change a lot between each motor. + To harmonize between motors of the same model, different robots, or even models of different brands, we propose to work + in the centered nominal degree range ]-180, 180[. + """ + if motor_names is None: + motor_names = self.motor_names + + # Convert from unsigned int32 original range [0, 2**32] to signed float32 range + values = values.astype(np.float32) + + for i, name in enumerate(motor_names): + calib_idx = self.calibration["motor_names"].index(name) + calib_mode = self.calibration["calib_mode"][calib_idx] + + if CalibrationMode[calib_mode] == CalibrationMode.LINEAR: + start_pos = self.calibration["start_pos"][calib_idx] + end_pos = self.calibration["end_pos"][calib_idx] + + # Rescale the present position to a nominal range [0, 100] %, + # useful for joints with linear motions like Aloha gripper + values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100 + + if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR): + if name == "pinky_1" and (values[i] < LOWER_BOUND_LINEAR): + values[i] = end_pos + else: + msg = ( + f"Wrong motor position range detected for {name}. " + f"Expected to be in nominal range of [0, 100] % (a full linear translation), " + f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, " + f"but present value is {values[i]} %. " + "This might be due to a cable connection issue creating an artificial jump in motor values. " + "You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`" + ) + print(msg) + # raise JointOutOfRangeError(msg) + + return values + + # def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): + # """Inverse of `apply_calibration`.""" + # if motor_names is None: + # motor_names = self.motor_names + + # for i, name in enumerate(motor_names): + # calib_idx = self.calibration["motor_names"].index(name) + # calib_mode = self.calibration["calib_mode"][calib_idx] + + # if CalibrationMode[calib_mode] == CalibrationMode.LINEAR: + # start_pos = self.calibration["start_pos"][calib_idx] + # end_pos = self.calibration["end_pos"][calib_idx] + + # # Convert from nominal lnear range of [0, 100] % to + # # actual motor range of values which can be arbitrary. + # values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos + + # values = np.round(values).astype(np.int32) + # return values + + +class HopeJuniorRobot: + def __init__(self): + self.arm_bus = FeetechMotorsBus( + port="COM14", + motors={ + # "motor1": (2, "sts3250"), + # "motor2": (1, "scs0009"), + "shoulder_pitch": [1, "sts3250"], + "shoulder_yaw": [2, "sts3215"], # TODO: sts3250 + "shoulder_roll": [3, "sts3215"], # TODO: sts3250 + "elbow_flex": [4, "sts3250"], + "wrist_roll": [5, "sts3215"], + "wrist_yaw": [6, "sts3215"], + "wrist_pitch": [7, "sts3215"], + }, + protocol_version=0, + ) + self.hand_bus = FeetechMotorsBus( + port="COM15", + motors={ + "thumb_basel_rotation": [30, "scs0009"], + "thumb_flexor": [27, "scs0009"], + "thumb_pinky_side": [26, "scs0009"], + "thumb_thumb_side": [28, "scs0009"], + "index_flexor": [25, "scs0009"], + "index_pinky_side": [31, "scs0009"], + "index_thumb_side": [32, "scs0009"], + "middle_flexor": [24, "scs0009"], + "middle_pinky_side": [33, "scs0009"], + "middle_thumb_side": [34, "scs0009"], + "ring_flexor": [21, "scs0009"], + "ring_pinky_side": [36, "scs0009"], + "ring_thumb_side": [35, "scs0009"], + "pinky_flexor": [23, "scs0009"], + "pinky_pinky_side": [38, "scs0009"], + "pinky_thumb_side": [37, "scs0009"], + }, + protocol_version=1, + group_sync_read=False, + ) + + def get_hand_calibration(self): + homing_offset = [0] * len(self.hand_bus.motor_names) + drive_mode = [0] * len(self.hand_bus.motor_names) + + start_pos = [ + 500, + 900, + 1000, + 0, + 100, + 250, + 750, + 100, + 400, + 150, + 100, + 120, + 980, + 100, + 950, + 750, + ] + + end_pos = [ + 500 - 250, + 900 - 300, + 1000 - 550, + 0 + 550, + 1000, + 250 + 700, + 750 - 700, + 1000, + 400 + 700, + 150 + 700, + 1000, + 120 + 700, + 980 - 700, + 1000, + 950 - 700, + 750 - 700, + ] + + calib_modes = [CalibrationMode.LINEAR.name] * len(self.hand_bus.motor_names) + + calib_dict = { + "homing_offset": homing_offset, + "drive_mode": drive_mode, + "start_pos": start_pos, + "end_pos": end_pos, + "calib_mode": calib_modes, + "motor_names": self.hand_bus.motor_names, + } + return calib_dict + + def connect(self): + self.arm_bus.connect() + #self.hand_bus.connect() + + +ESCAPE_KEY_ID = 27 + + +def capture_and_display_processed_frames( + frame_processor: Callable[[np.ndarray], np.ndarray], + window_display_name: str, + cap_device: int = 0, +) -> None: + """ + Capture frames from the given input camera device, run them through + the frame processor, and display the outputs in a window with the given name. + + User should press Esc to exit. + + Inputs: + frame_processor: Callable[[np.ndarray], np.ndarray] + Processes frames. + Input and output are numpy arrays of shape (H W C) with BGR channel layout and dtype uint8 / byte. + window_display_name: str + Name of the window used to display frames. + cap_device: int + Identifier for the camera to use to capture frames. + """ + cv2.namedWindow(window_display_name) + capture = cv2.VideoCapture(cap_device) + if not capture.isOpened(): + raise ValueError("Unable to open video capture.") + + frame_count = 0 + has_frame, frame = capture.read() + while has_frame: + assert isinstance(frame, np.ndarray) + + frame_count = frame_count + 1 + # mirror frame + frame = np.ascontiguousarray(frame[:, ::-1, ::-1]) + + # process & show frame + processed_frame = frame_processor(frame) + cv2.imshow(window_display_name, processed_frame[:, :, ::-1]) + + has_frame, frame = capture.read() + key = cv2.waitKey(1) + if key == ESCAPE_KEY_ID: + break + + capture.release() + + +def main(): + robot = HopeJuniorRobot() + robot.connect() + + # robot.hand_bus.calibration = None + + # breakpoint() + # print(robot.arm_bus.read("Present_Position")) + robot.arm_bus.write("Torque_Enable", 1) + robot.arm_bus.write("Acceleration", 20) + robot.arm_bus.read("Acceleration") + robot.arm_bus.write("Goal_Position", calibration["start_pos"]) + exit() + calibration = robot.get_hand_calibration() + robot.arm_bus.write("Goal_Position", calibration["start_pos"]) + # robot.hand_bus.write("Goal_Position", calibration["end_pos"][:4], robot.hand_bus.motor_names[:4]) + robot.hand_bus.set_calibration(calibration) + lol = 1 + + # # print(motors_bus.write("Goal_Position", 500)) + # print(robot.hand_bus.read("Present_Position")) + # # pos = hand_bus.read("Present_Position") + # # hand_bus.write("Goal_Position", pos[0]+20, hand_bus.motor_names[0]) + # # hand_bus.write("Goal_Position", pos[i]+delta, hand_bus.motor_names[i]) + # robot.hand_bus.read("Acceleration") + # robot.hand_bus.write("Acceleration", 10) + + # sleep = 1 + # # robot.hand_bus.write( + # # "Goal_Position", [glove.last_d['index_2']-1500,300,300], ["index_pinky_side", "index_flexor", "index_thumb_side"] + # # ) + # #time.sleep(sleep) + # time.sleep(sleep) + # robot.hand_bus.write( + # "Goal_Position", [100, 100, 100], ["index_flexor", "index_pinky_side", "index_thumb_side"] + # ) + # time.sleep(sleep) + # robot.hand_bus.write( + # "Goal_Position", [100, 0, 0], ["middle_flexor", "middle_pinky_side", "middle_thumb_side"] + # ) + # time.sleep(sleep) + # robot.hand_bus.write( + # "Goal_Position", [200, 200, 0], ["ring_flexor", "ring_pinky_side", "ring_thumb_side"] + # ) + # time.sleep(sleep) + # robot.hand_bus.write( + # "Goal_Position", [200, 100, 600], ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"] + # ) + # time.sleep(sleep) + + # breakpoint() + + glove = HomonculusGlove() + glove.run_calibration() + # while True: + # joint_names = ["index_1", "index_2"] + # joint_values = glove.read(joint_names) + # print(joint_values) + + input() + while True: + joint_names = [] + joint_names += ["thumb_0", "thumb_2", "thumb_3"] + joint_names += ["index_1", "index_2"] + joint_names += ["middle_1", "middle_2"] + joint_names += ["ring_1", "ring_2"] + joint_names += ["pinky_1", "pinky_2"] + joint_values = glove.read(joint_names) + joint_values = joint_values.round().astype(int) + joint_dict = {k: v for k, v in zip(joint_names, joint_values, strict=False)} + + motor_values = [] + motor_names = [] + motor_names += ["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"] + motor_values += [ + joint_dict["thumb_3"], + joint_dict["thumb_0"], + joint_dict["thumb_2"], + joint_dict["thumb_2"], + ] + motor_names += ["index_flexor", "index_pinky_side", "index_thumb_side"] + motor_values += [joint_dict["index_2"], joint_dict["index_1"], joint_dict["index_1"]] + motor_names += ["middle_flexor", "middle_pinky_side", "middle_thumb_side"] + motor_values += [joint_dict["middle_2"], joint_dict["middle_1"], joint_dict["middle_1"]] + motor_names += ["ring_flexor", "ring_pinky_side", "ring_thumb_side"] + motor_values += [joint_dict["ring_2"], joint_dict["ring_1"], joint_dict["ring_1"]] + motor_names += ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"] + + motor_values += [joint_dict["pinky_2"], joint_dict["pinky_1"], joint_dict["pinky_1"]] + + motor_values = np.array(motor_values) + motor_values = np.clip(motor_values, 0, 100) + + robot.hand_bus.write("Goal_Position", motor_values, motor_names) + time.sleep(0.02) + + while True: + # print(glove.read()['index_2']-1500) + glove_index_flexor = glove.read()["index_2"] - 1500 + glove_index_subflexor = glove.read()["index_1"] - 1500 + glove_index_side = glove.read()["index_0"] - 2100 + + vals = [glove_index_flexor, 1000 - (glove_index_subflexor), glove_index_subflexor] + + keys = ["index_flexor", "index_pinky_side", "index_thumb_side"] + + glove_middle_flexor = glove.read()["middle_2"] - 1500 + glove_middle_subflexor = 1000 - (glove.read()["middle_1"] - 1700) + vals += [glove_middle_flexor, glove_middle_subflexor, glove_middle_subflexor - 200] + keys += ["middle_flexor", "middle_pinky_side", "middle_thumb_side"] + + glove_ring_flexor = glove.read()["ring_2"] - 1300 + print(glove_ring_flexor) + glove_ring_subflexor = glove.read()["ring_1"] - 1100 + + vals += [glove_ring_flexor, 1000 - glove_ring_subflexor, glove_ring_subflexor] + keys += ["ring_flexor", "ring_pinky_side", "ring_thumb_side"] + + glove_pinky_flexor = glove.read()["pinky_2"] - 1500 + glove_pinky_subflexor = glove.read()["pinky_1"] - 1300 + vals += [300 + glove_pinky_flexor, max(1000 - glove_pinky_subflexor - 100, 0), glove_pinky_subflexor] + keys += ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"] + + robot.hand_bus.write("Goal_Position", vals, keys) + time.sleep(0.1) + + time.sleep(3) + + def move_arm(loop=10): + sleep = 1 + for i in range(loop): + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1195]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 2195]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1457, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 2357, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 974, 1957, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 2674, 1957, 1695]) + time.sleep(sleep + 2) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 1632, 1874, 1957, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 1369, 1632, 1874, 1957, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 1330, 2069, 2032, 1874, 1957, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [2381, 2030, 2069, 2032, 1874, 1957, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1681, 2030, 2069, 2032, 1874, 1957, 1695]) + time.sleep(sleep) + robot.arm_bus.write("Goal_Position", [1981, 2030, 2069, 2032, 1874, 1957, 1695]) + time.sleep(sleep) + + def move_hand(loop=10): + sleep = 0.5 + for i in range(loop): + robot.hand_bus.write( + "Goal_Position", + [500, 1000, 0, 1000], + ["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"], + ) + time.sleep(sleep) + robot.hand_bus.write( + "Goal_Position", [100, 100, 100], ["index_flexor", "index_pinky_side", "index_thumb_side"] + ) + time.sleep(sleep) + robot.hand_bus.write( + "Goal_Position", [100, 1000, 150], ["middle_flexor", "middle_pinky_side", "middle_thumb_side"] + ) + time.sleep(sleep) + robot.hand_bus.write( + "Goal_Position", [200, 200, 0], ["ring_flexor", "ring_pinky_side", "ring_thumb_side"] + ) + time.sleep(sleep) + robot.hand_bus.write( + "Goal_Position", [200, 100, 700], ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"] + ) + time.sleep(sleep) + + robot.hand_bus.write( + "Goal_Position", + [500, 1000 - 250, 0 + 300, 1000 - 200], + ["thumb_basel_rotation", "thumb_flexor", "thumb_pinky_side", "thumb_thumb_side"], + ) + time.sleep(sleep) + robot.hand_bus.write( + "Goal_Position", + [100 + 450, 100 + 400, 100 + 400], + ["index_flexor", "index_pinky_side", "index_thumb_side"], + ) + time.sleep(sleep) + robot.hand_bus.write( + "Goal_Position", + [100 + 350, 1000 - 450, 150 + 450], + ["middle_flexor", "middle_pinky_side", "middle_thumb_side"], + ) + time.sleep(sleep) + robot.hand_bus.write( + "Goal_Position", + [200 + 650, 200 + 350, 0 + 350], + ["ring_flexor", "ring_pinky_side", "ring_thumb_side"], + ) + time.sleep(sleep) + robot.hand_bus.write( + "Goal_Position", + [200 + 450, 100 + 400, 700 - 400], + ["pinky_flexor", "pinky_pinky_side", "pinky_thumb_side"], + ) + time.sleep(sleep) + + move_hand(3) + + move_arm(1) + + from concurrent.futures import ThreadPoolExecutor + + with ThreadPoolExecutor() as executor: + executor.submit(move_arm) + executor.submit(move_hand) + + # initial position + for i in range(3): + robot.hand_bus.write( + "Goal_Position", [500, 1000, 0, 1000, 100, 950, 100, 100, 1000, 150, 200, 200, 0, 200, 100, 700] + ) + time.sleep(1) + + # for i in range(3): + # robot.hand_bus.write("Goal_Position", [500, 1000-150, 0+250, 1000-150, + # 100+300, 950-250, 100+250, + # 100+200, 1000-300, 150+300, + # 200+500, 200+200, 0+200, + # 200+300, 100+200, 700-200]) + # time.sleep(1) + + # camera = 0 + # score_threshold = 0.95 + # iou_threshold = 0.3 + + # app = MediaPipeHandApp(MediaPipeHand.from_pretrained(), score_threshold, iou_threshold) + + # def frame_processor(frame: np.ndarray) -> np.ndarray: + # # Input Prep + # NHWC_int_numpy_frames, NCHW_fp32_torch_frames = app_to_net_image_inputs(frame) + + # # Run Bounding Box & Keypoint Detector + # batched_selected_boxes, batched_selected_keypoints = app._run_box_detector(NCHW_fp32_torch_frames) + + # # The region of interest ( bounding box of 4 (x, y) corners). + # # list[torch.Tensor(shape=[Num Boxes, 4, 2])], + # # where 2 == (x, y) + # # + # # A list element will be None if there is no selected ROI. + # batched_roi_4corners = app._compute_object_roi(batched_selected_boxes, batched_selected_keypoints) + + # # selected landmarks for the ROI (if any) + # # list[torch.Tensor(shape=[Num Selected Landmarks, K, 3])], + # # where K == number of landmark keypoints, 3 == (x, y, confidence) + # # + # # A list element will be None if there is no ROI. + # landmarks_out = app._run_landmark_detector(NHWC_int_numpy_frames, batched_roi_4corners) + + # app._draw_predictions( + # NHWC_int_numpy_frames, + # batched_selected_boxes, + # batched_selected_keypoints, + # batched_roi_4corners, + # *landmarks_out, + # ) + + # return NHWC_int_numpy_frames[0] + + # capture_and_display_processed_frames(frame_processor, "QAIHM Mediapipe Hand Demo", camera) + + +if __name__ == "__main__": + main() diff --git a/examples/hopejr/utils.py b/examples/hopejr/utils.py new file mode 100644 index 00000000..f9e1dedb --- /dev/null +++ b/examples/hopejr/utils.py @@ -0,0 +1,231 @@ + + #robot.arm_bus.write("Acceleration", [20], ["shoulder_pitch"]) + + ####DEBUGGER#################### + # joint = input("Enter joint name: ") + # encoder = EncoderReader("/dev/ttyUSB1", 115200) + # start_angle = arm_calibration['start_pos'][arm_calibration['motor_names'].index(joint)] + # end_angle = arm_calibration['end_pos'][arm_calibration['motor_names'].index(joint)] + # # start_angle = shoulder_calibration['start_pos'][shoulder_calibration['motor_names'].index(joint)] + # # end_angle = shoulder_calibration['end_pos'][shoulder_calibration['motor_names'].index(joint)] + # while True: + # angle = int(start_angle+(end_angle-start_angle)*encoder.read()/1000) + # # robot.shoulder_bus.set_bus_baudrate(115200) + # # robot.shoulder_bus.write("Goal_Position",angle, [joint]) + # robot.shoulder_bus.set_bus_baudrate(1000000) + # robot.arm_bus.write("Goal_Position",angle, [joint]) + # print(angle) + # time.sleep(0.1) + + + + #####SAFETY CHECKS EXPLAINED##### + #There are two safety checks built-in: one is based on load and the other is based on current. + #Current: if Protection_Current > Present_Current we wait Over_Current_Protection_Time (expressed in ms) and set Torque_Enable to 0 + #Load: if Max_Torque_Limit*Overload_Torque (expressed as a percentage) > Present_Load, we wait Protection_Time (expressed in ms + #and set Max_Torque_Limit to Protective_Torque) + #Though we can specify Min-Max_Angle_Limit, Max_Temperature_Limit, Min-Max_Voltage_Limit, no safety checks are implemented for these values + + #robot.arm_bus.set_calibration(arm_calibration) + + + + + #method 1 + # robot.arm_bus.write("Overload_Torque", 80) + # robot.arm_bus.write("Protection_Time", 10) + # robot.arm_bus.write("Protective_Torque", 1) + # robot.arm_bus.write("Protection_Current", 200,["shoulder_pitch"]) + # robot.arm_bus.write("Over_Current_Protection_Time", 10) + + #method 2 + # robot.arm_bus.write("Protection_Current", 500,["shoulder_pitch"]) + # robot.arm_bus.write("Over_Current_Protection_Time", 10) + # robot.arm_bus.write("Max_Torque_Limit", 1000) + # robot.arm_bus.write("Overload_Torque", 40) + # robot.arm_bus.write("Protection_Time", 10) + # robot.arm_bus.write("Protective_Torque", 1) + + # robot.shoulder_bus.set_bus_baudrate(115200) + # robot.shoulder_bus.write("Goal_Position",2500) + # exit() + + ######LOGGER#################### + # from test_torque.log_and_plot_feetech import log_and_plot_params + + # params_to_log = [ + # "Protection_Current", + # "Present_Current", + # "Max_Torque_Limit", + # "Protection_Time", + # "Overload_Torque", + # "Present_Load", + # "Present_Position", + # ] + + # servo_names = ["shoulder_pitch"] + + + # servo_data, timestamps = log_and_plot_params(robot.shoulder_bus, params_to_log, servo_names, test_id="shoulder_pitch") + # exit() + + + #robot.arm_bus.write("Goal_Position",2300, ["shoulder_pitch"]) + # dt = 2 + # steps = 4 + # max_pos = 1500 + # min_pos = 2300 + # increment = (max_pos - min_pos) / steps + # # Move from min_pos to max_pos in steps + # for i in range(steps + 1): # Include the last step + # current_pos = min_pos + int(i * increment) + # robot.arm_bus.write("Goal_Position", [current_pos], ["shoulder_pitch"]) + # time.sleep(dt) + + # # Move back from max_pos to min_pos in steps + # for i in range(steps + 1): # Include the last step + # current_pos = max_pos - int(i * increment) + # robot.arm_bus.write("Goal_Position", [current_pos], ["shoulder_pitch"]) + # time.sleep(dt)shoulder_pitch + #demo to show how sending a lot of values makes the robt shake + + + + # # Step increment + # + + # # Move from min_pos to max_pos in steps + # for i in range(steps + 1): # Include the last step + # current_pos = min_pos + int(i * increment) + # robot.arm_bus.write("Goal_Position", [current_pos], ["elbow_flex"]) + # time.sleep(dt) + + # # Move back from max_pos to min_pos in steps + # for i in range(steps + 1): # Include the last step + # current_pos = max_pos - int(i * increment) + # robot.arm_bus.write("Goal_Position", [current_pos], ["elbow_flex"]) + # time.sleep(dt) + # exit() + + #robot.arm_bus.write("Goal_Position", a # shoulder_calibration = robot.get_shoulder_calibration() + # print(shoulder_calibration)m_calibration["start_pos"]) + # robot.arm_bus.write("Over_Current_Protection_Time", 50) + # robot.arm_bus.write("Protection_Current", 310, ["shoulder_pitch"]) + # robot.arm_bus.write("Overload_Torque", 80, ["shoulder_pitch"]) + # robot.arm_bus.write("Protection_Time", 100, ["shoulder_pitch"]) + # robot.arm_bus.write("Over_Current_Protection_Time", 50, ["shoulder_pitch"]) + + # robot.arm_bus.write("Protective_Torque", 20, ["shoulder_pitch"]) + + + # robot.arm_bus.write("Goal_Position", [600],["shoulder_pitch"]) + + # from test_torque.log_and_plot_feetech import log_and_plot_params + + # params_to_log = [ + # "Present_Current", + # "Protection_Current", + # "Overload_Torque", + # "Protection_Time", + # "Protective_Torque", + # "Present_Load", + # "Present_Position", + # ] + + # servo_names = ["shoulder_pitch"] + + # + + #robot.arm_bus.write("Goal_Position", arm_calibration["start_pos"]) + + #robot.hand_bus.set_calibration(hand_calibration) + + #interp = 0.3 + + #robot.arm_bus.write("Goal_Position", [int((i*interp+j*(1-interp))) for i, j in zip(arm_calibration["start_pos"], arm_calibration["end_pos"])]) + #exit() + + # glove = HomonculusGlove() + # glove.run_calibration() + + + +####GOOD FOR GRASPING + # start_pos = [ + # 500, + # 900, + # 500, + # 1000, + # 100, + # 450,#250 + # 950,#750 + # 100, + # 300,#400 + # 50,#150 + # 100, + # 120, + # 980, + # 100, + # 950, + # 750, + # ] + # end_pos = [ + # start_pos[0] - 400, + # start_pos[1] - 300, + # start_pos[2] + 500, + # start_pos[3] - 50, + # start_pos[4] + 900, + # start_pos[5] + 500, + # start_pos[6] - 500, + # start_pos[7] + 900, + # start_pos[8] + 700, + # start_pos[9] + 700, + # start_pos[10] + 900, + # start_pos[11] + 700, + # start_pos[12] - 700, + # start_pos[13] + 900, + # start_pos[14] - 700, + # start_pos[15] - 700, + # ] + + + + + + +SCS_SERIES_CONTROL_TABLE = { + + # "Max_Torque_Limit": (16, 2), + # "Phase": (18, 1), + # "Unloading_Condition": (19, 1), + + "Protective_Torque": (37, 1), + "Protection_Time": (38, 1), + #Baud_Rate": (48, 1), + +} + +def read_and_print_scs_values(robot): + for param_name in SCS_SERIES_CONTROL_TABLE: + value = robot.hand_bus.read(param_name) + print(f"{param_name}: {value}") + +motor_1_values = { + "Lock" : 255, + #"Protection_Time": 20#so if you write to these they turn to 0 for some fucking reason. protection time was 100, procetive to +} + +# motor_1_values = { +# "Lock": 1, +# "Protection_Time": 100, +# "Protective_Torque": 20, +# "Phase": 1,#thisu is bullshit +# "Unloading_Condition": 32, + +# } +#bug in writing to specific values of the scs0009 + +# Write values to motor 2, there is overload torque there +#ok so i can write, the jittering is because of the overload torque which is still being triggered + +#TODO: i have to write a functioining version for the sc009 (or i dont who cares) diff --git a/examples/hopejr/visualizer.py b/examples/hopejr/visualizer.py new file mode 100644 index 00000000..d39c2925 --- /dev/null +++ b/examples/hopejr/visualizer.py @@ -0,0 +1,18 @@ +# Color gradient function (0-2024 scaled to 0-10) +def value_to_color(value): + # Clamp the value between 0 and 2024 + value = max(0, min(2024, value)) + + # Scale from [0..2024] to [0..10] + scaled_value = (value / 2024) * 10 + + # Green to Yellow (scaled_value 0..5), then Yellow to Red (scaled_value 5..10) + if scaled_value <= 5: + r = int(255 * (scaled_value / 5)) + g = 255 + else: + r = 255 + g = int(255 * (1 - (scaled_value - 5) / 5)) + b = 0 + + return (r, g, b) \ No newline at end of file