Implement FeetechMotorsBus & move functions to calibration

This commit is contained in:
Simon Alibert 2025-03-20 10:22:47 +01:00
parent dd1f33e5ed
commit 2c68c6ca40
2 changed files with 113 additions and 311 deletions

View File

@ -14,19 +14,11 @@
from copy import deepcopy from copy import deepcopy
import numpy as np from ..motors_bus import MotorsBus
from ..motors_bus import (
CalibrationMode,
JointOutOfRangeError,
MotorsBus,
assert_same_address,
get_group_sync_key,
)
PROTOCOL_VERSION = 0 PROTOCOL_VERSION = 0
BAUDRATE = 1_000_000 BAUDRATE = 1_000_000
TIMEOUT_MS = 1000 DEFAULT_TIMEOUT_MS = 1000
MAX_ID_RANGE = 252 MAX_ID_RANGE = 252
@ -125,94 +117,6 @@ NUM_READ_RETRY = 20
NUM_WRITE_RETRY = 20 NUM_WRITE_RETRY = 20
def convert_ticks_to_degrees(ticks, model):
resolutions = MODEL_RESOLUTION[model]
# Convert the ticks to degrees
return ticks * (360.0 / resolutions)
def convert_degrees_to_ticks(degrees, model):
resolutions = MODEL_RESOLUTION[model]
# Convert degrees to motor ticks
return int(degrees * (resolutions / 360.0))
def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motorbus, motor_id: int) -> int:
"""
Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048'
becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution).
"""
resolutions = MODEL_RESOLUTION[model]
# Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1].
ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions
# If above halfway, fold it into negative territory => [-2048..+2047].
if ticks > (resolutions // 2):
ticks -= resolutions
# Flip sign if drive_mode is set.
drive_mode = 0
if motorbus.calibration is not None:
drive_mode = motorbus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
ticks *= -1
return ticks
def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motorbus, motor_id: int) -> int:
"""
Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047]
and recovers the raw [0..(res-1)] ticks with 2048 as midpoint.
"""
# Flip sign if drive_mode was set.
drive_mode = 0
if motorbus.calibration is not None:
drive_mode = motorbus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
adjusted_pos *= -1
resolutions = MODEL_RESOLUTION[model]
# Shift by +half-resolution and wrap into [0..res-1].
# This undoes the earlier shift by -half-resolution.
ticks = (adjusted_pos + (resolutions // 2)) % resolutions
return ticks
def convert_to_bytes(value, n_bytes: int):
import scservo_sdk as scs
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if n_bytes == 1:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
]
elif n_bytes == 2:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
]
elif n_bytes == 4:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
scs.SCS_LOBYTE(scs.SCS_HIWORD(value)),
scs.SCS_HIBYTE(scs.SCS_HIWORD(value)),
]
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{n_bytes} is provided instead."
)
return data
class FeetechMotorsBus(MotorsBus): class FeetechMotorsBus(MotorsBus):
""" """
The FeetechMotorsBus class allows to efficiently read and write to the attached motors. It relies on the The FeetechMotorsBus class allows to efficiently read and write to the attached motors. It relies on the
@ -222,6 +126,8 @@ class FeetechMotorsBus(MotorsBus):
model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE) model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
model_resolution_table = deepcopy(MODEL_RESOLUTION) model_resolution_table = deepcopy(MODEL_RESOLUTION)
model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE) model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
calibration_required = deepcopy(CALIBRATION_REQUIRED)
default_timeout = DEFAULT_TIMEOUT_MS
def __init__( def __init__(
self, self,
@ -229,224 +135,60 @@ class FeetechMotorsBus(MotorsBus):
motors: dict[str, tuple[int, str]], motors: dict[str, tuple[int, str]],
): ):
super().__init__(port, motors) super().__init__(port, motors)
def _set_handlers(self):
import scservo_sdk as scs import scservo_sdk as scs
self.port_handler = scs.PortHandler(self.port) self.port_handler = scs.PortHandler(self.port)
self.packet_handler = scs.PacketHandler(PROTOCOL_VERSION) self.packet_handler = scs.PacketHandler(PROTOCOL_VERSION)
self.reader = scs.GroupSyncRead(self.packet_handler, self.packet_handler, 0, 0)
self.writer = scs.GroupSyncWrite(self.packet_handler, self.packet_handler, 0, 0)
def _set_timeout(self, timeout: int = TIMEOUT_MS): def broadcast_ping(self, num_retry: int | None = None):
self.port_handler.setPacketTimeoutMillis(timeout) raise NotImplementedError # TODO
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None): def calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
if motor_names is None: # TODO
motor_names = self.motor_names return ids_values
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range def uncalibrate_values(self, ids_values: dict[int, float]) -> dict[int, int]:
values = values.astype(np.float32) # TODO
return ids_values
for i, name in enumerate(motor_names): def _is_comm_success(self, comm: int) -> bool:
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
motor_idx, model = self.motors[name]
# Convert raw motor ticks to homed ticks, then convert the homed ticks to degrees
values[i] = adjusted_to_homing_ticks(values[i], model, self, motor_idx)
values[i] = convert_ticks_to_degrees(values[i], model)
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Rescale the present position to a nominal range [0, 100] %,
# useful for joints with linear motions like Aloha gripper
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR):
raise JointOutOfRangeError(
f"Wrong motor position range detected for {name}. "
f"Expected to be in nominal range of [0, 100] % (a full linear translation), "
f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, "
f"but present value is {values[i]} %. "
"This might be due to a cable connection issue creating an artificial jump in motor values. "
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
)
return values
def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
"""Inverse of `apply_calibration`."""
if motor_names is None:
motor_names = self.motor_names
for i, name in enumerate(motor_names):
calib_idx = self.calibration["motor_names"].index(name)
calib_mode = self.calibration["calib_mode"][calib_idx]
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
motor_idx, model = self.motors[name]
# Convert degrees to homed ticks, then convert the homed ticks to raw ticks
values[i] = convert_degrees_to_ticks(values[i], model)
values[i] = adjusted_to_motor_ticks(values[i], model, self, motor_idx)
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
start_pos = self.calibration["start_pos"][calib_idx]
end_pos = self.calibration["end_pos"][calib_idx]
# Convert from nominal lnear range of [0, 100] % to
# actual motor range of values which can be arbitrary.
values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos
values = np.round(values).astype(np.int32)
return values
def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
import scservo_sdk as scs import scservo_sdk as scs
return_list = True return comm == scs.COMM_SUCCESS
if not isinstance(motor_ids, list):
return_list = False
motor_ids = [motor_ids]
assert_same_address(self.model_ctrl_table, self.motor_models, data_name) @staticmethod
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name] def split_int_bytes(value: int, n_bytes: int) -> list[int]:
group = scs.GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes) # Validate input
for idx in motor_ids: if value < 0:
group.addParam(idx) raise ValueError(f"Negative values are not allowed: {value}")
for _ in range(num_retry): max_value = {1: 0xFF, 2: 0xFFFF, 4: 0xFFFFFFFF}.get(n_bytes)
comm = group.txRxPacket() if max_value is None:
if comm == scs.COMM_SUCCESS: raise NotImplementedError(f"Unsupported byte size: {n_bytes}. Expected [1, 2, 4].")
break
if comm != scs.COMM_SUCCESS: if value > max_value:
raise ConnectionError( raise ValueError(f"Value {value} exceeds the maximum for {n_bytes} bytes ({max_value}).")
f"Read failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
values = []
for idx in motor_ids:
value = group.getData(idx, addr, bytes)
values.append(value)
if return_list:
return values
else:
return values[0]
def _read(self, data_name: str, motor_names: list[str]):
import scservo_sdk as scs import scservo_sdk as scs
motor_ids = [] # Note: No need to convert back into unsigned int, since this byte preprocessing
models = [] # already handles it for us.
for name in motor_names: if n_bytes == 1:
motor_idx, model = self.motors[name] data = [
motor_ids.append(motor_idx) scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
models.append(model) ]
elif n_bytes == 2:
assert_same_address(self.model_ctrl_table, models, data_name) data = [
addr, bytes = self.model_ctrl_table[model][data_name] scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
group_key = get_group_sync_key(data_name, motor_names) scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
]
if data_name not in self.group_readers: elif n_bytes == 4:
# Very Important to flush the buffer! data = [
self.port_handler.ser.reset_output_buffer() scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
self.port_handler.ser.reset_input_buffer() scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
scs.SCS_LOBYTE(scs.SCS_HIWORD(value)),
# Create new group reader scs.SCS_HIBYTE(scs.SCS_HIWORD(value)),
self.group_readers[group_key] = scs.GroupSyncRead( ]
self.port_handler, self.packet_handler, addr, bytes return data
)
for idx in motor_ids:
self.group_readers[group_key].addParam(idx)
for _ in range(NUM_READ_RETRY):
comm = self.group_readers[group_key].txRxPacket()
if comm == scs.COMM_SUCCESS:
break
if comm != scs.COMM_SUCCESS:
raise ConnectionError(
f"Read failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
values = []
for idx in motor_ids:
value = self.group_readers[group_key].getData(idx, addr, bytes)
values.append(value)
values = np.array(values)
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
values = self.apply_calibration(values, motor_names)
return values
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
import scservo_sdk as scs
if not isinstance(motor_ids, list):
motor_ids = [motor_ids]
if not isinstance(values, list):
values = [values]
assert_same_address(self.model_ctrl_table, motor_models, data_name)
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = scs.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes)
group.addParam(idx, data)
for _ in range(num_retry):
comm = group.txPacket()
if comm == scs.COMM_SUCCESS:
break
if comm != scs.COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)
def _write(self, data_name: str, values: list[int], motor_names: list[str]) -> None:
import scservo_sdk as scs
motor_ids = []
models = []
for name in motor_names:
motor_idx, model = self.motors[name]
motor_ids.append(motor_idx)
models.append(model)
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
values = self.revert_calibration(values, motor_names)
assert_same_address(self.model_ctrl_table, models, data_name)
addr, bytes = self.model_ctrl_table[model][data_name]
group_key = get_group_sync_key(data_name, motor_names)
init_group = data_name not in self.group_readers
if init_group:
self.group_writers[group_key] = scs.GroupSyncWrite(
self.port_handler, self.packet_handler, addr, bytes
)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes)
if init_group:
self.group_writers[group_key].addParam(idx, data)
else:
self.group_writers[group_key].changeParam(idx, data)
comm = self.group_writers[group_key].txPacket()
if comm != scs.COMM_SUCCESS:
raise ConnectionError(
f"Write failed due to communication error on port {self.port} for group_key {group_key}: "
f"{self.packet_handler.getTxRxResult(comm)}"
)

View File

@ -13,11 +13,12 @@
# limitations under the License. # limitations under the License.
import numpy as np import numpy as np
from ..motors_bus import MotorsBus, TorqueMode from ..motors_bus import (
from .feetech import (
CalibrationMode, CalibrationMode,
FeetechMotorsBus, MotorsBus,
TorqueMode,
) )
from .feetech import MODEL_RESOLUTION, FeetechMotorsBus
URL_TEMPLATE = ( URL_TEMPLATE = (
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp" "https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
@ -140,19 +141,19 @@ def run_full_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm
) # TODO(pepijn): replace with new instruction homing pos (all motors in middle) in tutorial ) # TODO(pepijn): replace with new instruction homing pos (all motors in middle) in tutorial
input("Press Enter to continue...") input("Press Enter to continue...")
start_positions = np.zeros(len(arm.motor_indices)) start_positions = np.zeros(len(arm.motor_ids))
end_positions = np.zeros(len(arm.motor_indices)) end_positions = np.zeros(len(arm.motor_ids))
encoder_offsets = np.zeros(len(arm.motor_indices)) encoder_offsets = np.zeros(len(arm.motor_ids))
modes = get_calibration_modes(arm) modes = get_calibration_modes(arm)
for i, motor_id in enumerate(arm.motor_indices): for i, motor_id in enumerate(arm.motor_ids):
if modes[i] == CalibrationMode.DEGREE.name: if modes[i] == CalibrationMode.DEGREE.name:
encoder_offsets[i] = calibrate_homing_motor(motor_id, arm) encoder_offsets[i] = calibrate_homing_motor(motor_id, arm)
start_positions[i] = 0 start_positions[i] = 0
end_positions[i] = 0 end_positions[i] = 0
for i, motor_id in enumerate(arm.motor_indices): for i, motor_id in enumerate(arm.motor_ids):
if modes[i] == CalibrationMode.LINEAR.name: if modes[i] == CalibrationMode.LINEAR.name:
start_positions[i], end_positions[i] = calibrate_linear_motor(motor_id, arm) start_positions[i], end_positions[i] = calibrate_linear_motor(motor_id, arm)
encoder_offsets[i] = 0 encoder_offsets[i] = 0
@ -251,3 +252,62 @@ def apply_feetech_offsets_from_calibration(motorsbus: FeetechMotorsBus, calibrat
motorsbus.write("Lock", 0) motorsbus.write("Lock", 0)
print("Offsets have been saved to EEPROM successfully.") print("Offsets have been saved to EEPROM successfully.")
def convert_ticks_to_degrees(ticks, model):
resolutions = MODEL_RESOLUTION[model]
# Convert the ticks to degrees
return ticks * (360.0 / resolutions)
def convert_degrees_to_ticks(degrees, model):
resolutions = MODEL_RESOLUTION[model]
# Convert degrees to motor ticks
return int(degrees * (resolutions / 360.0))
def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motorbus, motor_id: int) -> int:
"""
Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048'
becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution).
"""
resolutions = MODEL_RESOLUTION[model]
# Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1].
ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions
# If above halfway, fold it into negative territory => [-2048..+2047].
if ticks > (resolutions // 2):
ticks -= resolutions
# Flip sign if drive_mode is set.
drive_mode = 0
if motorbus.calibration is not None:
drive_mode = motorbus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
ticks *= -1
return ticks
def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motorbus, motor_id: int) -> int:
"""
Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047]
and recovers the raw [0..(res-1)] ticks with 2048 as midpoint.
"""
# Flip sign if drive_mode was set.
drive_mode = 0
if motorbus.calibration is not None:
drive_mode = motorbus.calibration["drive_mode"][motor_id - 1]
if drive_mode:
adjusted_pos *= -1
resolutions = MODEL_RESOLUTION[model]
# Shift by +half-resolution and wrap into [0..res-1].
# This undoes the earlier shift by -half-resolution.
ticks = (adjusted_pos + (resolutions // 2)) % resolutions
return ticks