Compare commits
24 Commits
f527adf7a9
...
6541982dff
Author | SHA1 | Date |
---|---|---|
|
6541982dff | |
|
43bc9404bb | |
|
375499c323 | |
|
17a4447cef | |
|
287dc13d96 | |
|
02a1cf6a4e | |
|
34cd1e47bf | |
|
74d56834af | |
|
dd80dbb4cd | |
|
bc020ee0a4 | |
|
a15767aff1 | |
|
9af0a9bf37 | |
|
e2c8bc6948 | |
|
2c68c6ca40 | |
|
dd1f33e5ed | |
|
2c1bb766ff | |
|
c1c71fb994 | |
|
2d56f35071 | |
|
64ce2669ca | |
|
f39652707c | |
|
712d5dae4f | |
|
952e892fe5 | |
|
e8159997c7 | |
|
1c15bab70f |
|
@ -51,7 +51,7 @@ For a comprehensive list and documentation of these parameters, see the ffmpeg d
|
|||
### Decoding parameters
|
||||
**Decoder**
|
||||
We tested two video decoding backends from torchvision:
|
||||
- `pyav` (default)
|
||||
- `pyav`
|
||||
- `video_reader` (requires to build torchvision from source)
|
||||
|
||||
**Requested timestamps**
|
||||
|
|
|
@ -292,6 +292,11 @@ Steps:
|
|||
- Scan for devices. All 12 motors should appear.
|
||||
- Select the motors one by one and move the arm. Check that the graphical indicator near the top right shows the movement.
|
||||
|
||||
** There is a common issue with the Dynamixel XL430-W250 motors where the motors become undiscoverable after upgrading their firmware from Mac and Windows Dynamixel Wizard2 applications. When this occurs, it is required to do a firmware recovery (Select `DYNAMIXEL Firmware Recovery` and follow the prompts). There are two known workarounds to conduct this firmware reset:
|
||||
1) Install the Dynamixel Wizard on a linux machine and complete the firmware recovery
|
||||
2) Use the Dynamixel U2D2 in order to perform the reset with Windows or Mac. This U2D2 can be purchased [here](https://www.robotis.us/u2d2/).
|
||||
For either solution, open DYNAMIXEL Wizard 2.0 and select the appropriate port. You will likely be unable to see the motor in the GUI at this time. Select `Firmware Recovery`, carefully choose the correct model, and wait for the process to complete. Finally, re-scan to confirm the firmware recovery was successful.
|
||||
|
||||
**Read and Write with DynamixelMotorsBus**
|
||||
|
||||
To get familiar with how `DynamixelMotorsBus` communicates with the motors, you can start by reading data from them. Copy past this code in the same interactive python session:
|
||||
|
|
|
@ -69,6 +69,7 @@ from lerobot.common.datasets.video_utils import (
|
|||
VideoFrame,
|
||||
decode_video_frames,
|
||||
encode_video_frames,
|
||||
get_safe_default_codec,
|
||||
get_video_info,
|
||||
)
|
||||
from lerobot.common.robots.utils import Robot
|
||||
|
@ -462,7 +463,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
|
|||
download_videos (bool, optional): Flag to download the videos. Note that when set to True but the
|
||||
video files are already present on local disk, they won't be downloaded again. Defaults to
|
||||
True.
|
||||
video_backend (str | None, optional): Video backend to use for decoding videos. Defaults to torchcodec.
|
||||
video_backend (str | None, optional): Video backend to use for decoding videos. Defaults to torchcodec when available int the platform; otherwise, defaults to 'pyav'.
|
||||
You can also use the 'pyav' decoder used by Torchvision, which used to be the default option, or 'video_reader' which is another decoder of Torchvision.
|
||||
"""
|
||||
super().__init__()
|
||||
|
@ -473,7 +474,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
|
|||
self.episodes = episodes
|
||||
self.tolerance_s = tolerance_s
|
||||
self.revision = revision if revision else CODEBASE_VERSION
|
||||
self.video_backend = video_backend if video_backend else "torchcodec"
|
||||
self.video_backend = video_backend if video_backend else get_safe_default_codec()
|
||||
self.delta_indices = None
|
||||
|
||||
# Unused attributes
|
||||
|
@ -1027,7 +1028,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
|
|||
obj.delta_timestamps = None
|
||||
obj.delta_indices = None
|
||||
obj.episode_data_index = None
|
||||
obj.video_backend = video_backend if video_backend is not None else "torchcodec"
|
||||
obj.video_backend = video_backend if video_backend is not None else get_safe_default_codec()
|
||||
return obj
|
||||
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import importlib
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
|
@ -27,14 +28,23 @@ import torch
|
|||
import torchvision
|
||||
from datasets.features.features import register_feature
|
||||
from PIL import Image
|
||||
from torchcodec.decoders import VideoDecoder
|
||||
|
||||
|
||||
def get_safe_default_codec():
|
||||
if importlib.util.find_spec("torchcodec"):
|
||||
return "torchcodec"
|
||||
else:
|
||||
logging.warning(
|
||||
"'torchcodec' is not available in your platform, falling back to 'pyav' as a default decoder"
|
||||
)
|
||||
return "pyav"
|
||||
|
||||
|
||||
def decode_video_frames(
|
||||
video_path: Path | str,
|
||||
timestamps: list[float],
|
||||
tolerance_s: float,
|
||||
backend: str = "torchcodec",
|
||||
backend: str | None = None,
|
||||
) -> torch.Tensor:
|
||||
"""
|
||||
Decodes video frames using the specified backend.
|
||||
|
@ -43,13 +53,15 @@ def decode_video_frames(
|
|||
video_path (Path): Path to the video file.
|
||||
timestamps (list[float]): List of timestamps to extract frames.
|
||||
tolerance_s (float): Allowed deviation in seconds for frame retrieval.
|
||||
backend (str, optional): Backend to use for decoding. Defaults to "torchcodec".
|
||||
backend (str, optional): Backend to use for decoding. Defaults to "torchcodec" when available in the platform; otherwise, defaults to "pyav"..
|
||||
|
||||
Returns:
|
||||
torch.Tensor: Decoded frames.
|
||||
|
||||
Currently supports torchcodec on cpu and pyav.
|
||||
"""
|
||||
if backend is None:
|
||||
backend = get_safe_default_codec()
|
||||
if backend == "torchcodec":
|
||||
return decode_video_frames_torchcodec(video_path, timestamps, tolerance_s)
|
||||
elif backend in ["pyav", "video_reader"]:
|
||||
|
@ -173,6 +185,12 @@ def decode_video_frames_torchcodec(
|
|||
and all subsequent frames until reaching the requested frame. The number of key frames in a video
|
||||
can be adjusted during encoding to take into account decoding time and video size in bytes.
|
||||
"""
|
||||
|
||||
if importlib.util.find_spec("torchcodec"):
|
||||
from torchcodec.decoders import VideoDecoder
|
||||
else:
|
||||
raise ImportError("torchcodec is required but not available.")
|
||||
|
||||
# initialize video decoder
|
||||
decoder = VideoDecoder(video_path, device=device, seek_mode="approximate")
|
||||
loaded_frames = []
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
from .motors_bus import visualize_motors_bus
|
||||
|
||||
__all__ = ["visualize_motors_bus"]
|
|
@ -0,0 +1,82 @@
|
|||
"""
|
||||
Usage example
|
||||
|
||||
```python
|
||||
from lerobot.common.debugging.motors_bus import visualize_motors_bus
|
||||
from lerobot.common.robots.so100 import SO100Robot, SO100RobotConfig
|
||||
|
||||
cfg = SO100RobotConfig(port="/dev/tty.usbmodem58760430541")
|
||||
so100 = SO100Robot(cfg)
|
||||
|
||||
visualize_motors_bus(so100.arm)
|
||||
```
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
from lerobot.common.motors import MotorsBus
|
||||
from lerobot.common.motors.feetech.feetech_calibration import (
|
||||
adjusted_to_homing_ticks,
|
||||
adjusted_to_motor_ticks,
|
||||
convert_degrees_to_ticks,
|
||||
convert_ticks_to_degrees,
|
||||
)
|
||||
|
||||
|
||||
def visualize_motors_bus(motors_bus: MotorsBus):
|
||||
"""
|
||||
Reads each joint's (1) raw ticks, (2) homed ticks, (3) degrees, and (4) invert-adjusted ticks.
|
||||
"""
|
||||
if not motors_bus.is_connected:
|
||||
motors_bus.connect()
|
||||
|
||||
# Disable torque on all motors so you can move them freely by hand
|
||||
values_dict = {idx: 0 for idx in motors_bus.motor_ids}
|
||||
motors_bus.write("Torque_Enable", values_dict)
|
||||
print("Torque disabled on all joints.")
|
||||
|
||||
try:
|
||||
print("\nPress Ctrl+C to quit.\n")
|
||||
while True:
|
||||
# Read *raw* positions (no calibration).
|
||||
raw_positions = motors_bus.read("Present_Position")
|
||||
|
||||
# # Read *already-homed* positions
|
||||
# homed_positions = motor_bus.read("Present_Position")
|
||||
|
||||
for name, raw_ticks in raw_positions.items():
|
||||
idx = motors_bus.motors[name][0]
|
||||
model = motors_bus.motors[name][1]
|
||||
|
||||
# homed_val = homed_positions[i] # degrees or % if linear
|
||||
|
||||
# Manually compute "adjusted ticks" from raw ticks
|
||||
manual_adjusted = adjusted_to_homing_ticks(raw_ticks, model, motors_bus, idx)
|
||||
# Convert to degrees
|
||||
manual_degs = convert_ticks_to_degrees(manual_adjusted, model)
|
||||
|
||||
# Convert that deg back to ticks
|
||||
manual_ticks = convert_degrees_to_ticks(manual_degs, model)
|
||||
# Then invert them using offset & bus drive mode
|
||||
inv_ticks = adjusted_to_motor_ticks(manual_ticks, model, motors_bus, idx)
|
||||
|
||||
print(
|
||||
f"{name:15s} | "
|
||||
f"RAW={raw_ticks:4d} | "
|
||||
# f"HOMED_FROM_READ={homed_val:7.2f} | "
|
||||
f"HOMED_TICKS={manual_adjusted:6d} | "
|
||||
f"MANUAL_ADJ_DEG={manual_degs:7.2f} | "
|
||||
f"MANUAL_ADJ_TICKS={manual_ticks:6d} | "
|
||||
f"INV_TICKS={inv_ticks:4d} "
|
||||
)
|
||||
print("----------------------------------------------------")
|
||||
time.sleep(0.25)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
print("\nExiting. Disconnecting bus...")
|
||||
motors_bus.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
visualize_motors_bus()
|
|
@ -17,10 +17,8 @@
|
|||
|
||||
import numpy as np
|
||||
|
||||
from ..motors_bus import MotorsBus
|
||||
from ..motors_bus import CalibrationMode, MotorsBus, TorqueMode
|
||||
from .dynamixel import (
|
||||
CalibrationMode,
|
||||
TorqueMode,
|
||||
convert_degrees_to_steps,
|
||||
)
|
||||
|
||||
|
|
|
@ -14,19 +14,11 @@
|
|||
|
||||
from copy import deepcopy
|
||||
|
||||
import numpy as np
|
||||
|
||||
from ..motors_bus import (
|
||||
CalibrationMode,
|
||||
JointOutOfRangeError,
|
||||
MotorsBus,
|
||||
assert_same_address,
|
||||
get_group_sync_key,
|
||||
)
|
||||
from ..motors_bus import MotorsBus
|
||||
|
||||
PROTOCOL_VERSION = 0
|
||||
BAUDRATE = 1_000_000
|
||||
TIMEOUT_MS = 1000
|
||||
DEFAULT_TIMEOUT_MS = 1000
|
||||
|
||||
MAX_ID_RANGE = 252
|
||||
|
||||
|
@ -125,94 +117,6 @@ NUM_READ_RETRY = 20
|
|||
NUM_WRITE_RETRY = 20
|
||||
|
||||
|
||||
def convert_ticks_to_degrees(ticks, model):
|
||||
resolutions = MODEL_RESOLUTION[model]
|
||||
# Convert the ticks to degrees
|
||||
return ticks * (360.0 / resolutions)
|
||||
|
||||
|
||||
def convert_degrees_to_ticks(degrees, model):
|
||||
resolutions = MODEL_RESOLUTION[model]
|
||||
# Convert degrees to motor ticks
|
||||
return int(degrees * (resolutions / 360.0))
|
||||
|
||||
|
||||
def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motorbus, motor_id: int) -> int:
|
||||
"""
|
||||
Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048'
|
||||
becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution).
|
||||
"""
|
||||
resolutions = MODEL_RESOLUTION[model]
|
||||
|
||||
# Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1].
|
||||
ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions
|
||||
|
||||
# If above halfway, fold it into negative territory => [-2048..+2047].
|
||||
if ticks > (resolutions // 2):
|
||||
ticks -= resolutions
|
||||
|
||||
# Flip sign if drive_mode is set.
|
||||
drive_mode = 0
|
||||
if motorbus.calibration is not None:
|
||||
drive_mode = motorbus.calibration["drive_mode"][motor_id - 1]
|
||||
|
||||
if drive_mode:
|
||||
ticks *= -1
|
||||
|
||||
return ticks
|
||||
|
||||
|
||||
def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motorbus, motor_id: int) -> int:
|
||||
"""
|
||||
Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047]
|
||||
and recovers the raw [0..(res-1)] ticks with 2048 as midpoint.
|
||||
"""
|
||||
# Flip sign if drive_mode was set.
|
||||
drive_mode = 0
|
||||
if motorbus.calibration is not None:
|
||||
drive_mode = motorbus.calibration["drive_mode"][motor_id - 1]
|
||||
|
||||
if drive_mode:
|
||||
adjusted_pos *= -1
|
||||
|
||||
resolutions = MODEL_RESOLUTION[model]
|
||||
|
||||
# Shift by +half-resolution and wrap into [0..res-1].
|
||||
# This undoes the earlier shift by -half-resolution.
|
||||
ticks = (adjusted_pos + (resolutions // 2)) % resolutions
|
||||
|
||||
return ticks
|
||||
|
||||
|
||||
def convert_to_bytes(value, n_bytes: int):
|
||||
import scservo_sdk as scs
|
||||
|
||||
# Note: No need to convert back into unsigned int, since this byte preprocessing
|
||||
# already handles it for us.
|
||||
if n_bytes == 1:
|
||||
data = [
|
||||
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
|
||||
]
|
||||
elif n_bytes == 2:
|
||||
data = [
|
||||
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
|
||||
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
|
||||
]
|
||||
elif n_bytes == 4:
|
||||
data = [
|
||||
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
|
||||
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
|
||||
scs.SCS_LOBYTE(scs.SCS_HIWORD(value)),
|
||||
scs.SCS_HIBYTE(scs.SCS_HIWORD(value)),
|
||||
]
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
|
||||
f"{n_bytes} is provided instead."
|
||||
)
|
||||
return data
|
||||
|
||||
|
||||
class FeetechMotorsBus(MotorsBus):
|
||||
"""
|
||||
The FeetechMotorsBus class allows to efficiently read and write to the attached motors. It relies on the
|
||||
|
@ -222,6 +126,8 @@ class FeetechMotorsBus(MotorsBus):
|
|||
model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
|
||||
model_resolution_table = deepcopy(MODEL_RESOLUTION)
|
||||
model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
|
||||
calibration_required = deepcopy(CALIBRATION_REQUIRED)
|
||||
default_timeout = DEFAULT_TIMEOUT_MS
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -229,224 +135,60 @@ class FeetechMotorsBus(MotorsBus):
|
|||
motors: dict[str, tuple[int, str]],
|
||||
):
|
||||
super().__init__(port, motors)
|
||||
|
||||
def _set_handlers(self):
|
||||
import scservo_sdk as scs
|
||||
|
||||
self.port_handler = scs.PortHandler(self.port)
|
||||
self.packet_handler = scs.PacketHandler(PROTOCOL_VERSION)
|
||||
self.reader = scs.GroupSyncRead(self.port_handler, self.packet_handler, 0, 0)
|
||||
self.writer = scs.GroupSyncWrite(self.port_handler, self.packet_handler, 0, 0)
|
||||
|
||||
def _set_timeout(self, timeout: int = TIMEOUT_MS):
|
||||
self.port_handler.setPacketTimeoutMillis(timeout)
|
||||
def broadcast_ping(self, num_retry: int | None = None):
|
||||
raise NotImplementedError # TODO
|
||||
|
||||
def apply_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
|
||||
if motor_names is None:
|
||||
motor_names = self.motor_names
|
||||
def calibrate_values(self, ids_values: dict[int, int]) -> dict[int, float]:
|
||||
# TODO
|
||||
return ids_values
|
||||
|
||||
# Convert from unsigned int32 original range [0, 2**32] to signed float32 range
|
||||
values = values.astype(np.float32)
|
||||
def uncalibrate_values(self, ids_values: dict[int, float]) -> dict[int, int]:
|
||||
# TODO
|
||||
return ids_values
|
||||
|
||||
for i, name in enumerate(motor_names):
|
||||
calib_idx = self.calibration["motor_names"].index(name)
|
||||
calib_mode = self.calibration["calib_mode"][calib_idx]
|
||||
|
||||
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
|
||||
motor_idx, model = self.motors[name]
|
||||
|
||||
# Convert raw motor ticks to homed ticks, then convert the homed ticks to degrees
|
||||
values[i] = adjusted_to_homing_ticks(values[i], model, self, motor_idx)
|
||||
values[i] = convert_ticks_to_degrees(values[i], model)
|
||||
|
||||
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
|
||||
start_pos = self.calibration["start_pos"][calib_idx]
|
||||
end_pos = self.calibration["end_pos"][calib_idx]
|
||||
|
||||
# Rescale the present position to a nominal range [0, 100] %,
|
||||
# useful for joints with linear motions like Aloha gripper
|
||||
values[i] = (values[i] - start_pos) / (end_pos - start_pos) * 100
|
||||
|
||||
if (values[i] < LOWER_BOUND_LINEAR) or (values[i] > UPPER_BOUND_LINEAR):
|
||||
raise JointOutOfRangeError(
|
||||
f"Wrong motor position range detected for {name}. "
|
||||
f"Expected to be in nominal range of [0, 100] % (a full linear translation), "
|
||||
f"with a maximum range of [{LOWER_BOUND_LINEAR}, {UPPER_BOUND_LINEAR}] % to account for some imprecision during calibration, "
|
||||
f"but present value is {values[i]} %. "
|
||||
"This might be due to a cable connection issue creating an artificial jump in motor values. "
|
||||
"You need to recalibrate by running: `python lerobot/scripts/control_robot.py calibrate`"
|
||||
)
|
||||
|
||||
return values
|
||||
|
||||
def revert_calibration(self, values: np.ndarray | list, motor_names: list[str] | None):
|
||||
"""Inverse of `apply_calibration`."""
|
||||
if motor_names is None:
|
||||
motor_names = self.motor_names
|
||||
|
||||
for i, name in enumerate(motor_names):
|
||||
calib_idx = self.calibration["motor_names"].index(name)
|
||||
calib_mode = self.calibration["calib_mode"][calib_idx]
|
||||
|
||||
if CalibrationMode[calib_mode] == CalibrationMode.DEGREE:
|
||||
motor_idx, model = self.motors[name]
|
||||
|
||||
# Convert degrees to homed ticks, then convert the homed ticks to raw ticks
|
||||
values[i] = convert_degrees_to_ticks(values[i], model)
|
||||
values[i] = adjusted_to_motor_ticks(values[i], model, self, motor_idx)
|
||||
|
||||
elif CalibrationMode[calib_mode] == CalibrationMode.LINEAR:
|
||||
start_pos = self.calibration["start_pos"][calib_idx]
|
||||
end_pos = self.calibration["end_pos"][calib_idx]
|
||||
|
||||
# Convert from nominal lnear range of [0, 100] % to
|
||||
# actual motor range of values which can be arbitrary.
|
||||
values[i] = values[i] / 100 * (end_pos - start_pos) + start_pos
|
||||
|
||||
values = np.round(values).astype(np.int32)
|
||||
return values
|
||||
|
||||
def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
|
||||
def _is_comm_success(self, comm: int) -> bool:
|
||||
import scservo_sdk as scs
|
||||
|
||||
return_list = True
|
||||
if not isinstance(motor_ids, list):
|
||||
return_list = False
|
||||
motor_ids = [motor_ids]
|
||||
return comm == scs.COMM_SUCCESS
|
||||
|
||||
assert_same_address(self.model_ctrl_table, self.motor_models, data_name)
|
||||
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
|
||||
group = scs.GroupSyncRead(self.port_handler, self.packet_handler, addr, bytes)
|
||||
for idx in motor_ids:
|
||||
group.addParam(idx)
|
||||
@staticmethod
|
||||
def split_int_bytes(value: int, n_bytes: int) -> list[int]:
|
||||
# Validate input
|
||||
if value < 0:
|
||||
raise ValueError(f"Negative values are not allowed: {value}")
|
||||
|
||||
for _ in range(num_retry):
|
||||
comm = group.txRxPacket()
|
||||
if comm == scs.COMM_SUCCESS:
|
||||
break
|
||||
max_value = {1: 0xFF, 2: 0xFFFF, 4: 0xFFFFFFFF}.get(n_bytes)
|
||||
if max_value is None:
|
||||
raise NotImplementedError(f"Unsupported byte size: {n_bytes}. Expected [1, 2, 4].")
|
||||
|
||||
if comm != scs.COMM_SUCCESS:
|
||||
raise ConnectionError(
|
||||
f"Read failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
|
||||
f"{self.packet_handler.getTxRxResult(comm)}"
|
||||
)
|
||||
if value > max_value:
|
||||
raise ValueError(f"Value {value} exceeds the maximum for {n_bytes} bytes ({max_value}).")
|
||||
|
||||
values = []
|
||||
for idx in motor_ids:
|
||||
value = group.getData(idx, addr, bytes)
|
||||
values.append(value)
|
||||
|
||||
if return_list:
|
||||
return values
|
||||
else:
|
||||
return values[0]
|
||||
|
||||
def _read(self, data_name: str, motor_names: list[str]):
|
||||
import scservo_sdk as scs
|
||||
|
||||
motor_ids = []
|
||||
models = []
|
||||
for name in motor_names:
|
||||
motor_idx, model = self.motors[name]
|
||||
motor_ids.append(motor_idx)
|
||||
models.append(model)
|
||||
|
||||
assert_same_address(self.model_ctrl_table, models, data_name)
|
||||
addr, bytes = self.model_ctrl_table[model][data_name]
|
||||
group_key = get_group_sync_key(data_name, motor_names)
|
||||
|
||||
if data_name not in self.group_readers:
|
||||
# Very Important to flush the buffer!
|
||||
self.port_handler.ser.reset_output_buffer()
|
||||
self.port_handler.ser.reset_input_buffer()
|
||||
|
||||
# Create new group reader
|
||||
self.group_readers[group_key] = scs.GroupSyncRead(
|
||||
self.port_handler, self.packet_handler, addr, bytes
|
||||
)
|
||||
for idx in motor_ids:
|
||||
self.group_readers[group_key].addParam(idx)
|
||||
|
||||
for _ in range(NUM_READ_RETRY):
|
||||
comm = self.group_readers[group_key].txRxPacket()
|
||||
if comm == scs.COMM_SUCCESS:
|
||||
break
|
||||
|
||||
if comm != scs.COMM_SUCCESS:
|
||||
raise ConnectionError(
|
||||
f"Read failed due to communication error on port {self.port} for group_key {group_key}: "
|
||||
f"{self.packet_handler.getTxRxResult(comm)}"
|
||||
)
|
||||
|
||||
values = []
|
||||
for idx in motor_ids:
|
||||
value = self.group_readers[group_key].getData(idx, addr, bytes)
|
||||
values.append(value)
|
||||
|
||||
values = np.array(values)
|
||||
|
||||
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
|
||||
values = self.apply_calibration(values, motor_names)
|
||||
|
||||
return values
|
||||
|
||||
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
|
||||
import scservo_sdk as scs
|
||||
|
||||
if not isinstance(motor_ids, list):
|
||||
motor_ids = [motor_ids]
|
||||
if not isinstance(values, list):
|
||||
values = [values]
|
||||
|
||||
assert_same_address(self.model_ctrl_table, motor_models, data_name)
|
||||
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
|
||||
group = scs.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
|
||||
for idx, value in zip(motor_ids, values, strict=True):
|
||||
data = convert_to_bytes(value, bytes)
|
||||
group.addParam(idx, data)
|
||||
|
||||
for _ in range(num_retry):
|
||||
comm = group.txPacket()
|
||||
if comm == scs.COMM_SUCCESS:
|
||||
break
|
||||
|
||||
if comm != scs.COMM_SUCCESS:
|
||||
raise ConnectionError(
|
||||
f"Write failed due to communication error on port {self.port_handler.port_name} for indices {motor_ids}: "
|
||||
f"{self.packet_handler.getTxRxResult(comm)}"
|
||||
)
|
||||
|
||||
def _write(self, data_name: str, values: list[int], motor_names: list[str]) -> None:
|
||||
import scservo_sdk as scs
|
||||
|
||||
motor_ids = []
|
||||
models = []
|
||||
for name in motor_names:
|
||||
motor_idx, model = self.motors[name]
|
||||
motor_ids.append(motor_idx)
|
||||
models.append(model)
|
||||
|
||||
if data_name in CALIBRATION_REQUIRED and self.calibration is not None:
|
||||
values = self.revert_calibration(values, motor_names)
|
||||
|
||||
assert_same_address(self.model_ctrl_table, models, data_name)
|
||||
addr, bytes = self.model_ctrl_table[model][data_name]
|
||||
group_key = get_group_sync_key(data_name, motor_names)
|
||||
|
||||
init_group = data_name not in self.group_readers
|
||||
if init_group:
|
||||
self.group_writers[group_key] = scs.GroupSyncWrite(
|
||||
self.port_handler, self.packet_handler, addr, bytes
|
||||
)
|
||||
|
||||
for idx, value in zip(motor_ids, values, strict=True):
|
||||
data = convert_to_bytes(value, bytes)
|
||||
if init_group:
|
||||
self.group_writers[group_key].addParam(idx, data)
|
||||
else:
|
||||
self.group_writers[group_key].changeParam(idx, data)
|
||||
|
||||
comm = self.group_writers[group_key].txPacket()
|
||||
if comm != scs.COMM_SUCCESS:
|
||||
raise ConnectionError(
|
||||
f"Write failed due to communication error on port {self.port} for group_key {group_key}: "
|
||||
f"{self.packet_handler.getTxRxResult(comm)}"
|
||||
)
|
||||
# Note: No need to convert back into unsigned int, since this byte preprocessing
|
||||
# already handles it for us.
|
||||
if n_bytes == 1:
|
||||
data = [
|
||||
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
|
||||
]
|
||||
elif n_bytes == 2:
|
||||
data = [
|
||||
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
|
||||
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
|
||||
]
|
||||
elif n_bytes == 4:
|
||||
data = [
|
||||
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
|
||||
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
|
||||
scs.SCS_LOBYTE(scs.SCS_HIWORD(value)),
|
||||
scs.SCS_HIBYTE(scs.SCS_HIWORD(value)),
|
||||
]
|
||||
return data
|
||||
|
|
|
@ -13,11 +13,12 @@
|
|||
# limitations under the License.
|
||||
import numpy as np
|
||||
|
||||
from ..motors_bus import MotorsBus, TorqueMode
|
||||
from .feetech import (
|
||||
from ..motors_bus import (
|
||||
CalibrationMode,
|
||||
FeetechMotorsBus,
|
||||
MotorsBus,
|
||||
TorqueMode,
|
||||
)
|
||||
from .feetech import MODEL_RESOLUTION, FeetechMotorsBus
|
||||
|
||||
URL_TEMPLATE = (
|
||||
"https://raw.githubusercontent.com/huggingface/lerobot/main/media/{robot}/{arm}_{position}.webp"
|
||||
|
@ -140,19 +141,19 @@ def run_full_arm_calibration(arm: MotorsBus, robot_type: str, arm_name: str, arm
|
|||
) # TODO(pepijn): replace with new instruction homing pos (all motors in middle) in tutorial
|
||||
input("Press Enter to continue...")
|
||||
|
||||
start_positions = np.zeros(len(arm.motor_indices))
|
||||
end_positions = np.zeros(len(arm.motor_indices))
|
||||
encoder_offsets = np.zeros(len(arm.motor_indices))
|
||||
start_positions = np.zeros(len(arm.motor_ids))
|
||||
end_positions = np.zeros(len(arm.motor_ids))
|
||||
encoder_offsets = np.zeros(len(arm.motor_ids))
|
||||
|
||||
modes = get_calibration_modes(arm)
|
||||
|
||||
for i, motor_id in enumerate(arm.motor_indices):
|
||||
for i, motor_id in enumerate(arm.motor_ids):
|
||||
if modes[i] == CalibrationMode.DEGREE.name:
|
||||
encoder_offsets[i] = calibrate_homing_motor(motor_id, arm)
|
||||
start_positions[i] = 0
|
||||
end_positions[i] = 0
|
||||
|
||||
for i, motor_id in enumerate(arm.motor_indices):
|
||||
for i, motor_id in enumerate(arm.motor_ids):
|
||||
if modes[i] == CalibrationMode.LINEAR.name:
|
||||
start_positions[i], end_positions[i] = calibrate_linear_motor(motor_id, arm)
|
||||
encoder_offsets[i] = 0
|
||||
|
@ -251,3 +252,62 @@ def apply_feetech_offsets_from_calibration(motorsbus: FeetechMotorsBus, calibrat
|
|||
|
||||
motorsbus.write("Lock", 0)
|
||||
print("Offsets have been saved to EEPROM successfully.")
|
||||
|
||||
|
||||
def convert_ticks_to_degrees(ticks, model):
|
||||
resolutions = MODEL_RESOLUTION[model]
|
||||
# Convert the ticks to degrees
|
||||
return ticks * (360.0 / resolutions)
|
||||
|
||||
|
||||
def convert_degrees_to_ticks(degrees, model):
|
||||
resolutions = MODEL_RESOLUTION[model]
|
||||
# Convert degrees to motor ticks
|
||||
return int(degrees * (resolutions / 360.0))
|
||||
|
||||
|
||||
def adjusted_to_homing_ticks(raw_motor_ticks: int, model: str, motor_bus: MotorsBus, motor_id: int) -> int:
|
||||
"""
|
||||
Takes a raw reading [0..(res-1)] (e.g. 0..4095) and shifts it so that '2048'
|
||||
becomes 0 in the homed coordinate system ([-2048..+2047] for 4096 resolution).
|
||||
"""
|
||||
resolutions = MODEL_RESOLUTION[model]
|
||||
|
||||
# Shift raw ticks by half-resolution so 2048 -> 0, then wrap [0..res-1].
|
||||
ticks = (raw_motor_ticks - (resolutions // 2)) % resolutions
|
||||
|
||||
# If above halfway, fold it into negative territory => [-2048..+2047].
|
||||
if ticks > (resolutions // 2):
|
||||
ticks -= resolutions
|
||||
|
||||
# Flip sign if drive_mode is set.
|
||||
drive_mode = 0
|
||||
if motor_bus.calibration is not None:
|
||||
drive_mode = motor_bus.calibration["drive_mode"][motor_id - 1]
|
||||
|
||||
if drive_mode:
|
||||
ticks *= -1
|
||||
|
||||
return ticks
|
||||
|
||||
|
||||
def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motor_bus: MotorsBus, motor_id: int) -> int:
|
||||
"""
|
||||
Inverse of adjusted_to_homing_ticks(). Takes a 'homed' position in [-2048..+2047]
|
||||
and recovers the raw [0..(res-1)] ticks with 2048 as midpoint.
|
||||
"""
|
||||
# Flip sign if drive_mode was set.
|
||||
drive_mode = 0
|
||||
if motor_bus.calibration is not None:
|
||||
drive_mode = motor_bus.calibration["drive_mode"][motor_id - 1]
|
||||
|
||||
if drive_mode:
|
||||
adjusted_pos *= -1
|
||||
|
||||
resolutions = MODEL_RESOLUTION[model]
|
||||
|
||||
# Shift by +half-resolution and wrap into [0..res-1].
|
||||
# This undoes the earlier shift by -half-resolution.
|
||||
ticks = (adjusted_pos + (resolutions // 2)) % resolutions
|
||||
|
||||
return ticks
|
||||
|
|
|
@ -119,9 +119,7 @@ class ACTPolicy(PreTrainedPolicy):
|
|||
batch = self.normalize_inputs(batch)
|
||||
if self.config.image_features:
|
||||
batch = dict(batch) # shallow copy so that adding a key doesn't modify the original
|
||||
batch["observation.images"] = torch.stack(
|
||||
[batch[key] for key in self.config.image_features], dim=-4
|
||||
)
|
||||
batch["observation.images"] = [batch[key] for key in self.config.image_features]
|
||||
|
||||
# If we are doing temporal ensembling, do online updates where we keep track of the number of actions
|
||||
# we are ensembling over.
|
||||
|
@ -149,9 +147,8 @@ class ACTPolicy(PreTrainedPolicy):
|
|||
batch = self.normalize_inputs(batch)
|
||||
if self.config.image_features:
|
||||
batch = dict(batch) # shallow copy so that adding a key doesn't modify the original
|
||||
batch["observation.images"] = torch.stack(
|
||||
[batch[key] for key in self.config.image_features], dim=-4
|
||||
)
|
||||
batch["observation.images"] = [batch[key] for key in self.config.image_features]
|
||||
|
||||
batch = self.normalize_targets(batch)
|
||||
actions_hat, (mu_hat, log_sigma_x2_hat) = self.model(batch)
|
||||
|
||||
|
@ -413,11 +410,10 @@ class ACT(nn.Module):
|
|||
"actions must be provided when using the variational objective in training mode."
|
||||
)
|
||||
|
||||
batch_size = (
|
||||
batch["observation.images"]
|
||||
if "observation.images" in batch
|
||||
else batch["observation.environment_state"]
|
||||
).shape[0]
|
||||
if "observation.images" in batch:
|
||||
batch_size = batch["observation.images"][0].shape[0]
|
||||
else:
|
||||
batch_size = batch["observation.environment_state"].shape[0]
|
||||
|
||||
# Prepare the latent for input to the transformer encoder.
|
||||
if self.config.use_vae and "action" in batch:
|
||||
|
@ -490,20 +486,21 @@ class ACT(nn.Module):
|
|||
all_cam_features = []
|
||||
all_cam_pos_embeds = []
|
||||
|
||||
for cam_index in range(batch["observation.images"].shape[-4]):
|
||||
cam_features = self.backbone(batch["observation.images"][:, cam_index])["feature_map"]
|
||||
# TODO(rcadene, alexander-soare): remove call to `.to` to speedup forward ; precompute and use
|
||||
# buffer
|
||||
# For a list of images, the H and W may vary but H*W is constant.
|
||||
for img in batch["observation.images"]:
|
||||
cam_features = self.backbone(img)["feature_map"]
|
||||
cam_pos_embed = self.encoder_cam_feat_pos_embed(cam_features).to(dtype=cam_features.dtype)
|
||||
cam_features = self.encoder_img_feat_input_proj(cam_features) # (B, C, h, w)
|
||||
cam_features = self.encoder_img_feat_input_proj(cam_features)
|
||||
|
||||
# Rearrange features to (sequence, batch, dim).
|
||||
cam_features = einops.rearrange(cam_features, "b c h w -> (h w) b c")
|
||||
cam_pos_embed = einops.rearrange(cam_pos_embed, "b c h w -> (h w) b c")
|
||||
|
||||
all_cam_features.append(cam_features)
|
||||
all_cam_pos_embeds.append(cam_pos_embed)
|
||||
# Concatenate camera observation feature maps and positional embeddings along the width dimension,
|
||||
# and move to (sequence, batch, dim).
|
||||
all_cam_features = torch.cat(all_cam_features, axis=-1)
|
||||
encoder_in_tokens.extend(einops.rearrange(all_cam_features, "b c h w -> (h w) b c"))
|
||||
all_cam_pos_embeds = torch.cat(all_cam_pos_embeds, axis=-1)
|
||||
encoder_in_pos_embed.extend(einops.rearrange(all_cam_pos_embeds, "b c h w -> (h w) b c"))
|
||||
|
||||
encoder_in_tokens.extend(torch.cat(all_cam_features, axis=0))
|
||||
encoder_in_pos_embed.extend(torch.cat(all_cam_pos_embeds, axis=0))
|
||||
|
||||
# Stack all tokens along the sequence dimension.
|
||||
encoder_in_tokens = torch.stack(encoder_in_tokens, axis=0)
|
||||
|
|
|
@ -27,7 +27,6 @@ from lerobot.common.motors import TorqueMode
|
|||
from lerobot.common.motors.dynamixel import (
|
||||
DynamixelMotorsBus,
|
||||
run_arm_calibration,
|
||||
set_operating_mode,
|
||||
)
|
||||
|
||||
from ..robot import Robot
|
||||
|
@ -103,7 +102,22 @@ class KochRobot(Robot):
|
|||
self.arm.write("Torque_Enable", TorqueMode.DISABLED.value)
|
||||
self.calibrate()
|
||||
|
||||
set_operating_mode(self.arm)
|
||||
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't
|
||||
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm,
|
||||
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
|
||||
all_motors_except_gripper = [name for name in self.arm.motor_names if name != "gripper"]
|
||||
if len(all_motors_except_gripper) > 0:
|
||||
# 4 corresponds to Extended Position on Koch motors
|
||||
self.arm.write("Operating_Mode", 4, all_motors_except_gripper)
|
||||
|
||||
# Use 'position control current based' for gripper to be limited by the limit of the current.
|
||||
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
|
||||
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
|
||||
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
|
||||
# to make it move, and it will move back to its original target position when we release the force.
|
||||
# 5 corresponds to Current Controlled Position on Koch gripper motors "xl330-m077, xl330-m288"
|
||||
self.arm.write("Operating_Mode", 5, "gripper")
|
||||
|
||||
# Set better PID values to close the gap between recorded states and actions
|
||||
# TODO(rcadene): Implement an automatic procedure to set optimal PID values for each motor
|
||||
|
|
|
@ -580,7 +580,7 @@ class ManipulatorRobot:
|
|||
# Used when record_data=True
|
||||
follower_goal_pos[name] = goal_pos
|
||||
|
||||
goal_pos = goal_pos.numpy().astype(np.int32)
|
||||
goal_pos = goal_pos.numpy().astype(np.float32)
|
||||
self.follower_arms[name].write("Goal_Position", goal_pos)
|
||||
self.logs[f"write_follower_{name}_goal_pos_dt_s"] = time.perf_counter() - before_fwrite_t
|
||||
|
||||
|
@ -702,7 +702,7 @@ class ManipulatorRobot:
|
|||
action_sent.append(goal_pos)
|
||||
|
||||
# Send goal position to each follower
|
||||
goal_pos = goal_pos.numpy().astype(np.int32)
|
||||
goal_pos = goal_pos.numpy().astype(np.float32)
|
||||
self.follower_arms[name].write("Goal_Position", goal_pos)
|
||||
|
||||
return torch.cat(action_sent)
|
||||
|
|
|
@ -28,13 +28,3 @@ class KochTeleopConfig(TeleoperatorConfig):
|
|||
# Sets the arm in torque mode with the gripper motor set to this angle. This makes it possible
|
||||
# to squeeze the gripper and have it spring back to an open position on its own.
|
||||
gripper_open_degree: float = 35.156
|
||||
|
||||
mock: bool = False
|
||||
|
||||
# motors
|
||||
shoulder_pan: tuple = (1, "xl330-m077")
|
||||
shoulder_lift: tuple = (2, "xl330-m077")
|
||||
elbow_flex: tuple = (3, "xl330-m077")
|
||||
wrist_flex: tuple = (4, "xl330-m077")
|
||||
wrist_roll: tuple = (5, "xl330-m077")
|
||||
gripper: tuple = (6, "xl330-m077")
|
||||
|
|
|
@ -25,7 +25,6 @@ from lerobot.common.motors import TorqueMode
|
|||
from lerobot.common.motors.dynamixel import (
|
||||
DynamixelMotorsBus,
|
||||
run_arm_calibration,
|
||||
set_operating_mode,
|
||||
)
|
||||
|
||||
from ..teleoperator import Teleoperator
|
||||
|
@ -50,12 +49,12 @@ class KochTeleop(Teleoperator):
|
|||
self.arm = DynamixelMotorsBus(
|
||||
port=self.config.port,
|
||||
motors={
|
||||
"shoulder_pan": config.shoulder_pan,
|
||||
"shoulder_lift": config.shoulder_lift,
|
||||
"elbow_flex": config.elbow_flex,
|
||||
"wrist_flex": config.wrist_flex,
|
||||
"wrist_roll": config.wrist_roll,
|
||||
"gripper": config.gripper,
|
||||
"shoulder_pan": (1, "xl330-m077"),
|
||||
"shoulder_lift": (2, "xl330-m077"),
|
||||
"elbow_flex": (3, "xl330-m077"),
|
||||
"wrist_flex": (4, "xl330-m077"),
|
||||
"wrist_roll": (5, "xl330-m077"),
|
||||
"gripper": (6, "xl330-m077"),
|
||||
},
|
||||
)
|
||||
|
||||
|
@ -88,7 +87,22 @@ class KochTeleop(Teleoperator):
|
|||
self.arm.write("Torque_Enable", TorqueMode.DISABLED.value)
|
||||
self.calibrate()
|
||||
|
||||
set_operating_mode(self.arm)
|
||||
# Use 'extended position mode' for all motors except gripper, because in joint mode the servos can't
|
||||
# rotate more than 360 degrees (from 0 to 4095) And some mistake can happen while assembling the arm,
|
||||
# you could end up with a servo with a position 0 or 4095 at a crucial point See [
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/x_series/#operating-mode11]
|
||||
all_motors_except_gripper = [name for name in self.arm.motor_names if name != "gripper"]
|
||||
if len(all_motors_except_gripper) > 0:
|
||||
# 4 corresponds to Extended Position on Koch motors
|
||||
self.arm.write("Operating_Mode", 4, all_motors_except_gripper)
|
||||
|
||||
# Use 'position control current based' for gripper to be limited by the limit of the current.
|
||||
# For the follower gripper, it means it can grasp an object without forcing too much even tho,
|
||||
# it's goal position is a complete grasp (both gripper fingers are ordered to join and reach a touch).
|
||||
# For the leader gripper, it means we can use it as a physical trigger, since we can force with our finger
|
||||
# to make it move, and it will move back to its original target position when we release the force.
|
||||
# 5 corresponds to Current Controlled Position on Koch gripper motors "xl330-m077, xl330-m288"
|
||||
self.arm.write("Operating_Mode", 5, "gripper")
|
||||
|
||||
# Enable torque on the gripper and move it to 45 degrees so that we can use it as a trigger.
|
||||
logging.info("Activating torque.")
|
||||
|
|
|
@ -20,6 +20,7 @@ from lerobot.common import (
|
|||
policies, # noqa: F401
|
||||
)
|
||||
from lerobot.common.datasets.transforms import ImageTransformsConfig
|
||||
from lerobot.common.datasets.video_utils import get_safe_default_codec
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -35,7 +36,7 @@ class DatasetConfig:
|
|||
image_transforms: ImageTransformsConfig = field(default_factory=ImageTransformsConfig)
|
||||
revision: str | None = None
|
||||
use_imagenet_stats: bool = True
|
||||
video_backend: str = "pyav"
|
||||
video_backend: str = field(default_factory=get_safe_default_codec)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
|
@ -1,100 +0,0 @@
|
|||
"""
|
||||
usage:
|
||||
|
||||
```python
|
||||
python lerobot/scripts/calibration_visualization_so100.py \
|
||||
--teleop.type=so100 \
|
||||
--teleop.port=/dev/tty.usbmodem58760430541
|
||||
|
||||
python lerobot/scripts/calibration_visualization_so100.py \
|
||||
--robot.type=so100 \
|
||||
--robot.port=/dev/tty.usbmodem585A0084711
|
||||
```
|
||||
"""
|
||||
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
|
||||
import draccus
|
||||
|
||||
from lerobot.common.motors.feetech.feetech import (
|
||||
adjusted_to_homing_ticks,
|
||||
adjusted_to_motor_ticks,
|
||||
convert_degrees_to_ticks,
|
||||
convert_ticks_to_degrees,
|
||||
)
|
||||
from lerobot.common.robots import RobotConfig
|
||||
from lerobot.common.robots.so100 import SO100Robot, SO100RobotConfig # noqa: F401
|
||||
from lerobot.common.teleoperators import TeleoperatorConfig
|
||||
from lerobot.common.teleoperators.so100 import SO100Teleop, SO100TeleopConfig # noqa: F401
|
||||
|
||||
|
||||
@dataclass
|
||||
class DebugFeetechConfig:
|
||||
teleop: TeleoperatorConfig | None = None
|
||||
robot: RobotConfig | None = None
|
||||
|
||||
def __post_init__(self):
|
||||
if bool(self.teleop) == bool(self.robot):
|
||||
raise ValueError("Select a single device.")
|
||||
|
||||
|
||||
@draccus.wrap()
|
||||
def debug_feetech_positions(cfg: DebugFeetechConfig):
|
||||
"""
|
||||
Reads each joint's (1) raw ticks, (2) homed ticks, (3) degrees, and (4) invert-adjusted ticks.
|
||||
"""
|
||||
device = SO100Teleop(cfg.teleop) if cfg.teleop else SO100Robot(cfg.robot)
|
||||
device.connect()
|
||||
|
||||
# Disable torque on all motors so you can move them freely by hand
|
||||
device.arm.write("Torque_Enable", 0, motor_names=device.arm.motor_names)
|
||||
print("Torque disabled on all joints.")
|
||||
|
||||
try:
|
||||
print("\nPress Ctrl+C to quit.\n")
|
||||
while True:
|
||||
# Read *raw* positions (no calibration).
|
||||
raw_positions = device.arm.read_with_motor_ids(
|
||||
device.arm.motor_models, device.arm.motor_indices, data_name="Present_Position"
|
||||
)
|
||||
|
||||
# Read *already-homed* positions
|
||||
homed_positions = device.arm.read("Present_Position")
|
||||
|
||||
for i, name in enumerate(device.arm.motor_names):
|
||||
motor_idx, model = device.arm.motors[name]
|
||||
|
||||
raw_ticks = raw_positions[i] # 0..4095
|
||||
homed_val = homed_positions[i] # degrees or % if linear
|
||||
|
||||
# Manually compute "adjusted ticks" from raw ticks
|
||||
manual_adjusted = adjusted_to_homing_ticks(raw_ticks, model, device.arm, motor_idx)
|
||||
# Convert to degrees
|
||||
manual_degs = convert_ticks_to_degrees(manual_adjusted, model)
|
||||
|
||||
# Convert that deg back to ticks
|
||||
manual_ticks = convert_degrees_to_ticks(manual_degs, model)
|
||||
# Then invert them using offset & bus drive mode
|
||||
inv_ticks = adjusted_to_motor_ticks(manual_ticks, model, device.arm, motor_idx)
|
||||
|
||||
print(
|
||||
f"{name:15s} | "
|
||||
f"RAW={raw_ticks:4d} | "
|
||||
f"HOMED_FROM_READ={homed_val:7.2f} | "
|
||||
f"HOMED_TICKS={manual_adjusted:6d} | "
|
||||
f"MANUAL_ADJ_DEG={manual_degs:7.2f} | "
|
||||
f"MANUAL_ADJ_TICKS={manual_ticks:6d} | "
|
||||
f"INV_TICKS={inv_ticks:4d} "
|
||||
)
|
||||
print("----------------------------------------------------")
|
||||
time.sleep(0.25)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
print("\nExiting. Disconnecting bus...")
|
||||
device.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
debug_feetech_positions()
|
|
@ -69,7 +69,7 @@ dependencies = [
|
|||
"rerun-sdk>=0.21.0",
|
||||
"termcolor>=2.4.0",
|
||||
"torch>=2.2.1",
|
||||
"torchcodec>=0.2.1",
|
||||
"torchcodec>=0.2.1; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l'))",
|
||||
"torchvision>=0.21.0",
|
||||
"wandb>=0.16.3",
|
||||
"zarr>=2.17.0",
|
||||
|
|
|
@ -51,34 +51,34 @@ DXL_CRC_TABLE = [
|
|||
|
||||
# https://emanual.robotis.com/docs/en/dxl/protocol2/#instruction
|
||||
INSTRUCTION_TYPES = {
|
||||
"Ping": 0x01, # Checks whether the Packet has arrived at a device with the same ID as the specified packet ID
|
||||
"Read": 0x02, # Read data from the Device
|
||||
"Write": 0x03, # Write data to the Device
|
||||
"Reg_Write": 0x04, # Register the Instruction Packet in standby status; Packet can later be executed using the Action command
|
||||
"Action": 0x05, # Executes a Packet that was registered beforehand using Reg Write
|
||||
"Factory_Reset": 0x06, # Resets the Control Table to its initial factory default settings
|
||||
"Reboot": 0x08, # Reboot the Device
|
||||
"Clear": 0x10, # Reset certain information stored in memory
|
||||
"Control_Table_Backup": 0x20, # Store current Control Table status data to a Backup or to restore backup EEPROM data.
|
||||
"Status": 0x55, # Return packet sent following the execution of an Instruction Packet
|
||||
"Sync_Read": 0x82, # Read data from multiple devices with the same Address with the same length at once
|
||||
"Sync_Write": 0x83, # Write data to multiple devices with the same Address with the same length at once
|
||||
"Fast_Sync_Read": 0x8A, # Read data from multiple devices with the same Address with the same length at once
|
||||
"Bulk_Read": 0x92, # Read data from multiple devices with different Addresses with different lengths at once
|
||||
"Bulk_Write": 0x93, # Write data to multiple devices with different Addresses with different lengths at once
|
||||
"Fast_Bulk_Read": 0x9A, # Read data from multiple devices with different Addresses with different lengths at once
|
||||
"Ping": 0x01, # Checks whether the Packet has arrived at a device with the same ID as the specified packet ID
|
||||
"Read": 0x02, # Read data from the Device
|
||||
"Write": 0x03, # Write data to the Device
|
||||
"Reg_Write": 0x04, # Register the Instruction Packet in standby status; Packet can later be executed using the Action command
|
||||
"Action": 0x05, # Executes a Packet that was registered beforehand using Reg Write
|
||||
"Factory_Reset": 0x06, # Resets the Control Table to its initial factory default settings
|
||||
"Reboot": 0x08, # Reboot the Device
|
||||
"Clear": 0x10, # Reset certain information stored in memory
|
||||
"Control_Table_Backup": 0x20, # Store current Control Table status data to a Backup or to restore backup EEPROM data.
|
||||
"Status": 0x55, # Return packet sent following the execution of an Instruction Packet
|
||||
"Sync_Read": 0x82, # Read data from multiple devices with the same Address with the same length at once
|
||||
"Sync_Write": 0x83, # Write data to multiple devices with the same Address with the same length at once
|
||||
"Fast_Sync_Read": 0x8A, # Read data from multiple devices with the same Address with the same length at once
|
||||
"Bulk_Read": 0x92, # Read data from multiple devices with different Addresses with different lengths at once
|
||||
"Bulk_Write": 0x93, # Write data to multiple devices with different Addresses with different lengths at once
|
||||
"Fast_Bulk_Read": 0x9A, # Read data from multiple devices with different Addresses with different lengths at once
|
||||
} # fmt: skip
|
||||
|
||||
# https://emanual.robotis.com/docs/en/dxl/protocol2/#error
|
||||
STATUS_TYPE = {
|
||||
ERROR_TYPE = {
|
||||
"Success": 0x00, # No error
|
||||
"Result_Fail": 0x01, # Failed to process the sent Instruction Packet
|
||||
"Instruction_Error": 0x02, # An undefined Instruction has been usedAction has been used without Reg Write
|
||||
"Instruction_Error": 0x02, # An undefined Instruction has been usedAction has been used without Reg Write
|
||||
"CRC_Error": 0x03, # The CRC of the sent Packet does not match the expected value
|
||||
"Data_Range_Error": 0x04, # Data to be written to the specified Address is outside the range of the minimum/maximum value
|
||||
"Data_Length_Error": 0x05, # Attempted to write Data that is shorter than the required data length of the specified Address
|
||||
"Data_Range_Error": 0x04, # Data to be written to the specified Address is outside the range of the minimum/maximum value
|
||||
"Data_Length_Error": 0x05, # Attempted to write Data that is shorter than the required data length of the specified Address
|
||||
# (ex: when you attempt to only use 2 bytes of a register that has been defined as 4 bytes)
|
||||
"Data_Limit_Error": 0x06, # Data to be written to the specified Address is outside of the configured Limit value
|
||||
"Data_Limit_Error": 0x06, # Data to be written to the specified Address is outside of the configured Limit value
|
||||
"Access_Error": 0x07, # Attempted to write a value to an Address that is Read Only or has not been defined
|
||||
# Attempted to read a value from an Address that is Write Only or has not been defined
|
||||
# Attempted to write a value to an EEPROM register while Torque was Enabled.
|
||||
|
@ -86,64 +86,110 @@ STATUS_TYPE = {
|
|||
|
||||
|
||||
class MockDynamixelPacketv2(abc.ABC):
|
||||
@staticmethod
|
||||
def _compute_crc(crc_accum: int, data_blk: list[int], data_blk_size: int) -> int:
|
||||
for j in range(data_blk_size):
|
||||
i = ((crc_accum >> 8) ^ data_blk[j]) & 0xFF
|
||||
crc_accum = ((crc_accum << 8) ^ DXL_CRC_TABLE[i]) & 0xFFFF
|
||||
return crc_accum
|
||||
|
||||
@classmethod
|
||||
def build(cls, dxl_id: int, params: list[int], *args, **kwargs) -> bytes:
|
||||
packet = cls._build(dxl_id, params, *args, **kwargs)
|
||||
crc = cls._compute_crc(0, packet, len(packet) - 2)
|
||||
packet[-2] = crc & 0xFF # lower byte
|
||||
packet[-1] = (crc >> 8) & 0xFF # upper byte
|
||||
def build(cls, dxl_id: int, params: list[int], length: list[int], *args, **kwargs) -> bytes:
|
||||
packet = cls._build(dxl_id, params, length, *args, **kwargs)
|
||||
packet = cls._add_stuffing(packet)
|
||||
packet = cls._add_crc(packet)
|
||||
return bytes(packet)
|
||||
|
||||
@abc.abstractclassmethod
|
||||
def _build(cls, dxl_id: int, params: list[int], *args, **kwargs):
|
||||
def _build(cls, dxl_id: int, params: list[int], length: int, *args, **kwargs) -> list[int]:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _add_stuffing(packet: list[int]) -> list[int]:
|
||||
"""
|
||||
Byte stuffing is a method of adding additional data to generated instruction packets to ensure that
|
||||
the packets are processed successfully. When the byte pattern "0xFF 0xFF 0xFD" appears in a packet,
|
||||
byte stuffing adds 0xFD to the end of the pattern to convert it to “0xFF 0xFF 0xFD 0xFD” to ensure
|
||||
that it is not interpreted as the header at the start of another packet.
|
||||
|
||||
Source: https://emanual.robotis.com/docs/en/dxl/protocol2/#transmission-process
|
||||
|
||||
Args:
|
||||
packet (list[int]): The raw packet without stuffing.
|
||||
|
||||
Returns:
|
||||
list[int]: The packet stuffed if it contained a "0xFF 0xFF 0xFD" byte sequence in its data bytes.
|
||||
"""
|
||||
packet_length_in = dxl.DXL_MAKEWORD(packet[dxl.PKT_LENGTH_L], packet[dxl.PKT_LENGTH_H])
|
||||
packet_length_out = packet_length_in
|
||||
|
||||
temp = [0] * dxl.TXPACKET_MAX_LEN
|
||||
|
||||
# FF FF FD XX ID LEN_L LEN_H
|
||||
temp[dxl.PKT_HEADER0 : dxl.PKT_HEADER0 + dxl.PKT_LENGTH_H + 1] = packet[
|
||||
dxl.PKT_HEADER0 : dxl.PKT_HEADER0 + dxl.PKT_LENGTH_H + 1
|
||||
]
|
||||
|
||||
index = dxl.PKT_INSTRUCTION
|
||||
|
||||
for i in range(0, packet_length_in - 2): # except CRC
|
||||
temp[index] = packet[i + dxl.PKT_INSTRUCTION]
|
||||
index = index + 1
|
||||
if (
|
||||
packet[i + dxl.PKT_INSTRUCTION] == 0xFD
|
||||
and packet[i + dxl.PKT_INSTRUCTION - 1] == 0xFF
|
||||
and packet[i + dxl.PKT_INSTRUCTION - 2] == 0xFF
|
||||
):
|
||||
# FF FF FD
|
||||
temp[index] = 0xFD
|
||||
index = index + 1
|
||||
packet_length_out = packet_length_out + 1
|
||||
|
||||
temp[index] = packet[dxl.PKT_INSTRUCTION + packet_length_in - 2]
|
||||
temp[index + 1] = packet[dxl.PKT_INSTRUCTION + packet_length_in - 1]
|
||||
index = index + 2
|
||||
|
||||
if packet_length_in != packet_length_out:
|
||||
packet = [0] * index
|
||||
|
||||
packet[0:index] = temp[0:index]
|
||||
|
||||
packet[dxl.PKT_LENGTH_L] = dxl.DXL_LOBYTE(packet_length_out)
|
||||
packet[dxl.PKT_LENGTH_H] = dxl.DXL_HIBYTE(packet_length_out)
|
||||
|
||||
return packet
|
||||
|
||||
@staticmethod
|
||||
def _add_crc(packet: list[int]) -> list[int]:
|
||||
crc = 0
|
||||
for j in range(len(packet) - 2):
|
||||
i = ((crc >> 8) ^ packet[j]) & 0xFF
|
||||
crc = ((crc << 8) ^ DXL_CRC_TABLE[i]) & 0xFFFF
|
||||
|
||||
packet[-2] = dxl.DXL_LOBYTE(crc)
|
||||
packet[-1] = dxl.DXL_HIBYTE(crc)
|
||||
|
||||
return packet
|
||||
|
||||
|
||||
class MockInstructionPacket(MockDynamixelPacketv2):
|
||||
"""
|
||||
Helper class to build valid Dynamixel Protocol 2.0 Instruction Packets with correct CRC.
|
||||
Helper class to build valid Dynamixel Protocol 2.0 Instruction Packets.
|
||||
|
||||
Protocol 2.0 Instruction Packet structure
|
||||
(from https://emanual.robotis.com/docs/en/dxl/protocol2/#instruction-packet)
|
||||
|
||||
0xFF 0xFF 0xFD 0x00 # 4-byte header
|
||||
<servo_id> # typically 0x01
|
||||
<length L> <length H> # 2-byte length of (instruction+error+params+CRC)
|
||||
<instruction> # instruction type
|
||||
<parameters...> # for a 4-byte read, we have 4 param bytes
|
||||
<crc_low> <crc_high> # 16-bit CRC
|
||||
| Header | Packet ID | Length | Instruction | Params | CRC |
|
||||
| ------------------- | --------- | ----------- | ----------- | ----------------- | ----------- |
|
||||
| 0xFF 0xFF 0xFD 0x00 | ID | Len_L Len_H | Instr | Param 1 … Param N | CRC_L CRC_H |
|
||||
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def _build(cls, dxl_id: int, params: list[int], instruct_type: str):
|
||||
def _build(cls, dxl_id: int, params: list[int], length: int, instruct_type: str) -> list[int]:
|
||||
instruct_value = INSTRUCTION_TYPES[instruct_type]
|
||||
|
||||
# For Protocol 2.0, "length" = (number_of_params + 3),
|
||||
# where:
|
||||
# +1 is for <instruction> byte,
|
||||
# +2 is for the CRC at the end.
|
||||
# The official Dynamixel SDK sometimes uses (+7) logic for sync reads
|
||||
# because it includes special sub-parameters. But at core:
|
||||
# length = (instruction byte) + (len(params)) + (CRC16 =2).
|
||||
packet_length = len(params) + 3
|
||||
return bytearray(
|
||||
[
|
||||
0xFF, 0xFF, 0xFD, 0x00, # header
|
||||
dxl_id, # servo id
|
||||
packet_length & 0xFF, # length_l
|
||||
(packet_length >> 8) & 0xFF, # length_h
|
||||
instruct_value, # instruction type
|
||||
*params, # data bytes
|
||||
0x00, 0x00 # placeholder for CRC
|
||||
]
|
||||
) # fmt: skip
|
||||
return [
|
||||
0xFF, 0xFF, 0xFD, 0x00, # header
|
||||
dxl_id, # servo id
|
||||
dxl.DXL_LOBYTE(length), # length_l
|
||||
dxl.DXL_HIBYTE(length), # length_h
|
||||
instruct_value, # instruction type
|
||||
*params, # data bytes
|
||||
0x00, 0x00 # placeholder for CRC
|
||||
] # fmt: skip
|
||||
|
||||
@classmethod
|
||||
def sync_read(
|
||||
|
@ -153,68 +199,67 @@ class MockInstructionPacket(MockDynamixelPacketv2):
|
|||
data_length: int,
|
||||
) -> bytes:
|
||||
"""
|
||||
Helper method to build a "Sync Read" broadcast instruction.
|
||||
Builds a "Sync_Read" broadcast instruction.
|
||||
(from https://emanual.robotis.com/docs/en/dxl/protocol2/#sync-read-0x82)
|
||||
|
||||
The official SDK might add some “stuffing” or check param_length,
|
||||
but this is enough for basic compliance if you want a raw packet.
|
||||
|
||||
The parameters for Sync Read (Protocol 2.0) are:
|
||||
The parameters for Sync_Read (Protocol 2.0) are:
|
||||
param[0] = start_address L
|
||||
param[1] = start_address H
|
||||
param[2] = data_length L
|
||||
param[3] = data_length H
|
||||
param[4+] = motor IDs to read from
|
||||
"""
|
||||
# Example param: [LowAddr, HighAddr, LowLen, HighLen, ID1, ID2, ...]
|
||||
params = [
|
||||
(start_address & 0xFF),
|
||||
((start_address >> 8) & 0xFF),
|
||||
(data_length & 0xFF),
|
||||
((data_length >> 8) & 0xFF),
|
||||
] + dxl_ids
|
||||
|
||||
# broadcast ID: 0xFE
|
||||
return cls.build(dxl_id=0xFE, instruct_type="Sync_Read", params=params)
|
||||
And 'length' = (number_of_params + 7), where:
|
||||
+1 is for instruction byte,
|
||||
+2 is for the address bytes,
|
||||
+2 is for the length bytes,
|
||||
+2 is for the CRC at the end.
|
||||
"""
|
||||
params = [
|
||||
dxl.DXL_LOBYTE(start_address),
|
||||
dxl.DXL_HIBYTE(start_address),
|
||||
dxl.DXL_LOBYTE(data_length),
|
||||
dxl.DXL_HIBYTE(data_length),
|
||||
*dxl_ids,
|
||||
]
|
||||
length = len(dxl_ids) + 7
|
||||
return cls.build(dxl_id=dxl.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Read")
|
||||
|
||||
|
||||
class MockStatusPacket(MockDynamixelPacketv2):
|
||||
"""
|
||||
Helper class to build valid Dynamixel Protocol 2.0 Status Packets with correct CRC.
|
||||
Helper class to build valid Dynamixel Protocol 2.0 Status Packets.
|
||||
|
||||
Protocol 2.0 Status Packet structure
|
||||
(from https://emanual.robotis.com/docs/en/dxl/protocol2/#status-packet)
|
||||
|
||||
0xFF 0xFF 0xFD 0x00 # 4-byte header
|
||||
<servo_id> # typically 0x01
|
||||
<length L> <length H> # 2-byte length of (instruction+error+params+CRC)
|
||||
0x55 # instruction = 0x55 means "status packet"
|
||||
<error> # 0 if no error
|
||||
<parameters...> # for a 4-byte read, we have 4 param bytes
|
||||
<crc_low> <crc_high> # 16-bit CRC
|
||||
| Header | Packet ID | Length | Instruction | Error | Params | CRC |
|
||||
| ------------------- | --------- | ----------- | ----------- | ----- | ----------------- | ----------- |
|
||||
| 0xFF 0xFF 0xFD 0x00 | ID | Len_L Len_H | 0x55 | Err | Param 1 … Param N | CRC_L CRC_H |
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def _build(cls, dxl_id: int, params: list[int], status: str = "Success") -> bytearray:
|
||||
status_byte = STATUS_TYPE[status]
|
||||
return bytearray(
|
||||
[
|
||||
0xFF, 0xFF, 0xFD, 0x00, # header
|
||||
dxl_id, # servo id
|
||||
0x08, 0x00, # length_l, length_h = 8
|
||||
0x55, # instruction = status
|
||||
status_byte, # status
|
||||
*params, # data bytes
|
||||
0x00, 0x00 # placeholder for CRC
|
||||
]
|
||||
) # fmt: skip
|
||||
def _build(cls, dxl_id: int, params: list[int], length: int, error: str = "Success") -> list[int]:
|
||||
err_byte = ERROR_TYPE[error]
|
||||
return [
|
||||
0xFF, 0xFF, 0xFD, 0x00, # header
|
||||
dxl_id, # servo id
|
||||
dxl.DXL_LOBYTE(length), # length_l
|
||||
dxl.DXL_HIBYTE(length), # length_h
|
||||
0x55, # instruction = 'status'
|
||||
err_byte, # error
|
||||
*params, # data bytes
|
||||
0x00, 0x00 # placeholder for CRC
|
||||
] # fmt: skip
|
||||
|
||||
@classmethod
|
||||
def present_position(cls, dxl_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes:
|
||||
"""Builds a 'Present_Position' packet.
|
||||
"""Builds a 'Present_Position' status packet.
|
||||
|
||||
Args:
|
||||
pos (int | None, optional): Desired 'Present_Position'. If None, it will use a random value in the
|
||||
min_max_range. Defaults to None.
|
||||
dxl_id (int): List of the servos ids
|
||||
pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it
|
||||
will use a random value in the min_max_range. Defaults to None.
|
||||
min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos'
|
||||
is None. Note that the bounds are included in the range. Defaults to (0, 4095).
|
||||
|
||||
|
@ -222,19 +267,24 @@ class MockStatusPacket(MockDynamixelPacketv2):
|
|||
bytes: The raw 'Present_Position' status packet ready to be sent through serial.
|
||||
"""
|
||||
pos = random.randint(*min_max_range) if pos is None else pos
|
||||
# [lower pos 8 bits, higher pos 8 bits, 0, 0]
|
||||
params = [pos & 0xFF, (pos >> 8) & 0xFF, 0, 0]
|
||||
return cls.build(dxl_id, params)
|
||||
params = [dxl.DXL_LOBYTE(pos), dxl.DXL_HIBYTE(pos), 0, 0]
|
||||
length = 8
|
||||
return cls.build(dxl_id, params=params, length=length)
|
||||
|
||||
|
||||
class MockPortHandler(dxl.PortHandler):
|
||||
def setupPort(self, baud): # noqa: N802
|
||||
"""
|
||||
This class overwrite the 'setupPort' method of the Dynamixel PortHandler because it can specify
|
||||
baudrates that are not supported with a serial port on MacOS.
|
||||
"""
|
||||
|
||||
def setupPort(self, cflag_baud): # noqa: N802
|
||||
if self.is_open:
|
||||
self.closePort()
|
||||
|
||||
self.ser = serial.Serial(
|
||||
port=self.port_name,
|
||||
# baudrate=self.baudrate, <- This is forbidden for ttys ports (on macos at least)
|
||||
# baudrate=self.baudrate, <- This will fail on MacOS
|
||||
# parity = serial.PARITY_ODD,
|
||||
# stopbits = serial.STOPBITS_TWO,
|
||||
bytesize=serial.EIGHTBITS,
|
||||
|
@ -248,59 +298,72 @@ class MockPortHandler(dxl.PortHandler):
|
|||
|
||||
|
||||
class MockMotors(MockSerial):
|
||||
"""
|
||||
This class will simulate physical motors by responding with valid status packets upon receiving some
|
||||
instruction packets. It is meant to test MotorsBus classes.
|
||||
|
||||
'data_name' supported:
|
||||
- Present_Position
|
||||
"""
|
||||
|
||||
ctrl_table = X_SERIES_CONTROL_TABLE
|
||||
|
||||
def __init__(self, dlx_ids: list[int], default_stubs: bool = True):
|
||||
def __init__(self, dlx_ids: list[int]):
|
||||
super().__init__()
|
||||
self._ids = dlx_ids
|
||||
self.open()
|
||||
|
||||
if default_stubs:
|
||||
self._create_stubs("Present_Position")
|
||||
|
||||
def _create_stubs(self, data_name: str):
|
||||
def build_single_motor_stubs(
|
||||
self, data_name: str, return_value: int | None = None, num_invalid_try: int | None = None
|
||||
) -> None:
|
||||
address, length = self.ctrl_table[data_name]
|
||||
|
||||
# sync read all motors
|
||||
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
|
||||
sync_read_response_all = self._create_present_pos_send_fn(self._ids, data_name)
|
||||
self.stub(
|
||||
name=f"SyncRead_{data_name}_all",
|
||||
receive_bytes=sync_read_request_all,
|
||||
send_fn=sync_read_response_all,
|
||||
)
|
||||
|
||||
# sync read single motors
|
||||
for idx in self._ids:
|
||||
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
|
||||
sync_read_response_single = self._create_present_pos_send_fn([idx], data_name)
|
||||
if data_name == "Present_Position":
|
||||
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
|
||||
sync_read_response_single = self._build_present_pos_send_fn(
|
||||
[idx], [return_value], num_invalid_try
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError # TODO(aliberts): add ping?
|
||||
|
||||
self.stub(
|
||||
name=f"SyncRead_{data_name}_{idx}",
|
||||
receive_bytes=sync_read_request_single,
|
||||
send_fn=sync_read_response_single,
|
||||
)
|
||||
|
||||
def _create_present_pos_send_fn(
|
||||
self, dxl_ids: list[int], data_name: str, num_invalid_try: int | None = None
|
||||
def build_all_motors_stub(
|
||||
self, data_name: str, return_values: list[int] | None = None, num_invalid_try: int | None = None
|
||||
) -> None:
|
||||
address, length = self.ctrl_table[data_name]
|
||||
if data_name == "Present_Position":
|
||||
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
|
||||
sync_read_response_all = self._build_present_pos_send_fn(
|
||||
self._ids, return_values, num_invalid_try
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError # TODO(aliberts): add ping?
|
||||
|
||||
self.stub(
|
||||
name=f"SyncRead_{data_name}_all",
|
||||
receive_bytes=sync_read_request_all,
|
||||
send_fn=sync_read_response_all,
|
||||
)
|
||||
|
||||
def _build_present_pos_send_fn(
|
||||
self, dxl_ids: list[int], return_pos: list[int] | None = None, num_invalid_try: int | None = None
|
||||
) -> Callable[[int], bytes]:
|
||||
# if data_name == "Present_Position":
|
||||
# packet_generator = MockStatusPacket.present_position
|
||||
# else:
|
||||
# # TODO(aliberts): add "Goal_Position"
|
||||
# raise NotImplementedError
|
||||
return_pos = [None for _ in dxl_ids] if return_pos is None else return_pos
|
||||
assert len(return_pos) == len(dxl_ids)
|
||||
|
||||
def send_fn(_call_count: int) -> bytes:
|
||||
if num_invalid_try is not None and num_invalid_try >= _call_count:
|
||||
return bytes(0)
|
||||
|
||||
first_packet = MockStatusPacket.present_position(next(iter(dxl_ids)))
|
||||
if len(dxl_ids) == 1:
|
||||
return first_packet
|
||||
|
||||
packets = first_packet
|
||||
for idx in dxl_ids:
|
||||
packets += MockStatusPacket.present_position(dxl_id=idx)
|
||||
return b""
|
||||
|
||||
packets = b"".join(
|
||||
MockStatusPacket.present_position(idx, pos)
|
||||
for idx, pos in zip(dxl_ids, return_pos, strict=True)
|
||||
)
|
||||
return packets
|
||||
|
||||
return send_fn
|
||||
|
|
|
@ -0,0 +1,245 @@
|
|||
import abc
|
||||
import random
|
||||
from typing import Callable
|
||||
|
||||
import scservo_sdk as scs
|
||||
import serial
|
||||
from mock_serial import MockSerial
|
||||
|
||||
from lerobot.common.motors.feetech.feetech import SCS_SERIES_CONTROL_TABLE
|
||||
|
||||
# https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf
|
||||
INSTRUCTION_TYPES = {
|
||||
"Ping": 0x01, # Checks whether the Packet has arrived at a device with the same ID as the specified packet ID
|
||||
"Read": 0x02, # Read data from the Device
|
||||
"Write": 0x03, # Write data to the Device
|
||||
"Reg_Write": 0x04, # Register the Instruction Packet in standby status; Packet can later be executed using the Action command
|
||||
"Action": 0x05, # Executes a Packet that was registered beforehand using Reg Write
|
||||
"Factory_Reset": 0x06, # Resets the Control Table to its initial factory default settings
|
||||
"Sync_Read": 0x82, # Read data from multiple devices with the same Address with the same length at once
|
||||
"Sync_Write": 0x83, # Write data to multiple devices with the same Address with the same length at once
|
||||
} # fmt: skip
|
||||
|
||||
ERROR_TYPE = {
|
||||
"Success": 0x00,
|
||||
"Voltage": 0x01,
|
||||
"Angle": 0x02,
|
||||
"Overheat": 0x04,
|
||||
"Overele": 0x08,
|
||||
"Overload": 0x20,
|
||||
}
|
||||
|
||||
|
||||
class MockFeetechPacket(abc.ABC):
|
||||
@classmethod
|
||||
def build(cls, scs_id: int, params: list[int], length: int, *args, **kwargs) -> bytes:
|
||||
packet = cls._build(scs_id, params, length, *args, **kwargs)
|
||||
packet = cls._add_checksum(packet)
|
||||
return bytes(packet)
|
||||
|
||||
@abc.abstractclassmethod
|
||||
def _build(cls, scs_id: int, params: list[int], length: int, *args, **kwargs) -> list[int]:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _add_checksum(packet: list[int]) -> list[int]:
|
||||
checksum = 0
|
||||
for idx in range(2, len(packet) - 1): # except header & checksum
|
||||
checksum += packet[idx]
|
||||
|
||||
packet[-1] = scs.SCS_LOBYTE(~checksum)
|
||||
|
||||
return packet
|
||||
|
||||
|
||||
class MockInstructionPacket(MockFeetechPacket):
|
||||
"""
|
||||
Helper class to build valid Feetech Instruction Packets.
|
||||
|
||||
Instruction Packet structure
|
||||
(from https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf)
|
||||
|
||||
| Header | Packet ID | Length | Instruction | Params | Checksum |
|
||||
| --------- | --------- | ------ | ----------- | ----------------- | -------- |
|
||||
| 0xFF 0xFF | ID | Len | Instr | Param 1 … Param N | Sum |
|
||||
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def _build(cls, scs_id: int, params: list[int], length: int, instruct_type: str) -> list[int]:
|
||||
instruct_value = INSTRUCTION_TYPES[instruct_type]
|
||||
return [
|
||||
0xFF, 0xFF, # header
|
||||
scs_id, # servo id
|
||||
length, # length
|
||||
instruct_value, # instruction type
|
||||
*params, # data bytes
|
||||
0x00, # placeholder for checksum
|
||||
] # fmt: skip
|
||||
|
||||
@classmethod
|
||||
def sync_read(
|
||||
cls,
|
||||
scs_ids: list[int],
|
||||
start_address: int,
|
||||
data_length: int,
|
||||
) -> bytes:
|
||||
"""
|
||||
Builds a "Sync_Read" broadcast instruction.
|
||||
|
||||
The parameters for Sync Read (Protocol 2.0) are:
|
||||
param[0] = start_address
|
||||
param[1] = data_length
|
||||
param[2+] = motor IDs to read from
|
||||
|
||||
And 'length' = (number_of_params + 7), where:
|
||||
+1 is for instruction byte,
|
||||
+1 is for the address byte,
|
||||
+1 is for the length bytes,
|
||||
+1 is for the checksum at the end.
|
||||
"""
|
||||
params = [start_address, data_length, *scs_ids]
|
||||
length = len(scs_ids) + 4
|
||||
return cls.build(scs_id=scs.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Read")
|
||||
|
||||
|
||||
class MockStatusPacket(MockFeetechPacket):
|
||||
"""
|
||||
Helper class to build valid Feetech Status Packets.
|
||||
|
||||
Status Packet structure
|
||||
(from https://files.waveshare.com/upload/2/27/Communication_Protocol_User_Manual-EN%28191218-0923%29.pdf)
|
||||
|
||||
| Header | Packet ID | Length | Error | Params | Checksum |
|
||||
| --------- | --------- | ------ | ----- | ----------------- | -------- |
|
||||
| 0xFF 0xFF | ID | Len | Err | Param 1 … Param N | Sum |
|
||||
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def _build(cls, scs_id: int, params: list[int], length: int, error: str = "Success") -> list[int]:
|
||||
err_byte = ERROR_TYPE[error]
|
||||
return [
|
||||
0xFF, 0xFF, # header
|
||||
scs_id, # servo id
|
||||
length, # length
|
||||
err_byte, # status
|
||||
*params, # data bytes
|
||||
0x00, # placeholder for checksum
|
||||
] # fmt: skip
|
||||
|
||||
@classmethod
|
||||
def present_position(cls, scs_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes:
|
||||
"""Builds a 'Present_Position' status packet.
|
||||
|
||||
Args:
|
||||
scs_id (int): List of the servos ids
|
||||
pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it
|
||||
will use a random value in the min_max_range. Defaults to None.
|
||||
min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos'
|
||||
is None. Note that the bounds are included in the range. Defaults to (0, 4095).
|
||||
|
||||
Returns:
|
||||
bytes: The raw 'Present_Position' status packet ready to be sent through serial.
|
||||
"""
|
||||
pos = random.randint(*min_max_range) if pos is None else pos
|
||||
params = [scs.SCS_LOBYTE(pos), scs.SCS_HIBYTE(pos)]
|
||||
length = 4
|
||||
return cls.build(scs_id, params=params, length=length)
|
||||
|
||||
|
||||
class MockPortHandler(scs.PortHandler):
|
||||
"""
|
||||
This class overwrite the 'setupPort' method of the Feetech PortHandler because it can specify
|
||||
baudrates that are not supported with a serial port on MacOS.
|
||||
"""
|
||||
|
||||
def setupPort(self, cflag_baud): # noqa: N802
|
||||
if self.is_open:
|
||||
self.closePort()
|
||||
|
||||
self.ser = serial.Serial(
|
||||
port=self.port_name,
|
||||
# baudrate=self.baudrate, <- This will fail on MacOS
|
||||
# parity = serial.PARITY_ODD,
|
||||
# stopbits = serial.STOPBITS_TWO,
|
||||
bytesize=serial.EIGHTBITS,
|
||||
timeout=0,
|
||||
)
|
||||
self.is_open = True
|
||||
self.ser.reset_input_buffer()
|
||||
self.tx_time_per_byte = (1000.0 / self.baudrate) * 10.0
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class MockMotors(MockSerial):
|
||||
"""
|
||||
This class will simulate physical motors by responding with valid status packets upon receiving some
|
||||
instruction packets. It is meant to test MotorsBus classes.
|
||||
|
||||
'data_name' supported:
|
||||
- Present_Position
|
||||
"""
|
||||
|
||||
ctrl_table = SCS_SERIES_CONTROL_TABLE
|
||||
|
||||
def __init__(self, scs_ids: list[int]):
|
||||
super().__init__()
|
||||
self._ids = scs_ids
|
||||
self.open()
|
||||
|
||||
def build_single_motor_stubs(
|
||||
self, data_name: str, return_value: int | None = None, num_invalid_try: int | None = None
|
||||
) -> None:
|
||||
address, length = self.ctrl_table[data_name]
|
||||
for idx in self._ids:
|
||||
if data_name == "Present_Position":
|
||||
sync_read_request_single = MockInstructionPacket.sync_read([idx], address, length)
|
||||
sync_read_response_single = self._build_present_pos_send_fn(
|
||||
[idx], [return_value], num_invalid_try
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError # TODO(aliberts): add ping?
|
||||
|
||||
self.stub(
|
||||
name=f"SyncRead_{data_name}_{idx}",
|
||||
receive_bytes=sync_read_request_single,
|
||||
send_fn=sync_read_response_single,
|
||||
)
|
||||
|
||||
def build_all_motors_stub(
|
||||
self, data_name: str, return_values: list[int] | None = None, num_invalid_try: int | None = None
|
||||
) -> None:
|
||||
address, length = self.ctrl_table[data_name]
|
||||
if data_name == "Present_Position":
|
||||
sync_read_request_all = MockInstructionPacket.sync_read(self._ids, address, length)
|
||||
sync_read_response_all = self._build_present_pos_send_fn(
|
||||
self._ids, return_values, num_invalid_try
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError # TODO(aliberts): add ping?
|
||||
|
||||
self.stub(
|
||||
name=f"SyncRead_{data_name}_all",
|
||||
receive_bytes=sync_read_request_all,
|
||||
send_fn=sync_read_response_all,
|
||||
)
|
||||
|
||||
def _build_present_pos_send_fn(
|
||||
self, scs_ids: list[int], return_pos: list[int] | None = None, num_invalid_try: int | None = None
|
||||
) -> Callable[[int], bytes]:
|
||||
return_pos = [None for _ in scs_ids] if return_pos is None else return_pos
|
||||
assert len(return_pos) == len(scs_ids)
|
||||
|
||||
def send_fn(_call_count: int) -> bytes:
|
||||
if num_invalid_try is not None and num_invalid_try >= _call_count:
|
||||
return b""
|
||||
|
||||
packets = b"".join(
|
||||
MockStatusPacket.present_position(idx, pos)
|
||||
for idx, pos in zip(scs_ids, return_pos, strict=True)
|
||||
)
|
||||
return packets
|
||||
|
||||
return send_fn
|
|
@ -1,151 +0,0 @@
|
|||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# ruff: noqa: N802, E741
|
||||
|
||||
"""
|
||||
Mocked classes and functions from scservo_sdk to allow for testing FeetechMotorsBus code.
|
||||
|
||||
Warning: These mocked versions are minimalist. They do not exactly mock every behaviors
|
||||
from the original classes and functions (e.g. return types might be None instead of boolean).
|
||||
"""
|
||||
|
||||
DEFAULT_BAUDRATE = 1_000_000
|
||||
COMM_SUCCESS = 0 # tx or rx packet communication success
|
||||
|
||||
|
||||
def convert_to_bytes(value, bytes):
|
||||
# TODO(rcadene): remove need to mock `convert_to_bytes` by implemented the inverse transform
|
||||
# `convert_bytes_to_value`
|
||||
del bytes # unused
|
||||
return value
|
||||
|
||||
|
||||
def get_default_motor_values(motor_index):
|
||||
return {
|
||||
# Key (int) are from SCS_SERIES_CONTROL_TABLE
|
||||
5: motor_index, # ID
|
||||
6: DEFAULT_BAUDRATE, # Baud_rate
|
||||
10: 0, # Drive_Mode
|
||||
21: 32, # P_Coefficient
|
||||
22: 32, # D_Coefficient
|
||||
23: 0, # I_Coefficient
|
||||
40: 0, # Torque_Enable
|
||||
41: 254, # Acceleration
|
||||
31: -2047, # Offset
|
||||
33: 0, # Mode
|
||||
55: 1, # Lock
|
||||
# Set 2560 since calibration values for Aloha gripper is between start_pos=2499 and end_pos=3144
|
||||
# For other joints, 2560 will be autocorrected to be in calibration range
|
||||
56: 2560, # Present_Position
|
||||
58: 0, # Present_Speed
|
||||
69: 0, # Present_Current
|
||||
85: 150, # Maximum_Acceleration
|
||||
}
|
||||
|
||||
|
||||
# Macro for Control Table Value
|
||||
def SCS_MAKEWORD(a, b):
|
||||
return (a & 0xFF) | ((b & 0xFF) << 8)
|
||||
|
||||
|
||||
def SCS_MAKEDWORD(a, b):
|
||||
return (a & 0xFFFF) | (b & 0xFFFF) << 16
|
||||
|
||||
|
||||
def SCS_LOWORD(l):
|
||||
return l & 0xFFFF
|
||||
|
||||
|
||||
def SCS_HIWORD(l):
|
||||
return (l >> 16) & 0xFFFF
|
||||
|
||||
|
||||
def SCS_LOBYTE(w):
|
||||
return w & 0xFF
|
||||
|
||||
|
||||
def SCS_HIBYTE(w):
|
||||
return (w >> 8) & 0xFF
|
||||
|
||||
|
||||
class PortHandler:
|
||||
def __init__(self, port):
|
||||
self.port = port
|
||||
# factory default baudrate
|
||||
self.baudrate = DEFAULT_BAUDRATE
|
||||
self.ser = SerialMock()
|
||||
|
||||
def openPort(self): # noqa: N802
|
||||
return True
|
||||
|
||||
def closePort(self): # noqa: N802
|
||||
pass
|
||||
|
||||
def setPacketTimeoutMillis(self, timeout_ms): # noqa: N802
|
||||
del timeout_ms # unused
|
||||
|
||||
def getBaudRate(self): # noqa: N802
|
||||
return self.baudrate
|
||||
|
||||
def setBaudRate(self, baudrate): # noqa: N802
|
||||
self.baudrate = baudrate
|
||||
|
||||
|
||||
class PacketHandler:
|
||||
def __init__(self, protocol_version):
|
||||
del protocol_version # unused
|
||||
# Use packet_handler.data to communicate across Read and Write
|
||||
self.data = {}
|
||||
|
||||
|
||||
class GroupSyncRead:
|
||||
def __init__(self, port_handler, packet_handler, address, bytes):
|
||||
self.packet_handler = packet_handler
|
||||
|
||||
def addParam(self, motor_index): # noqa: N802
|
||||
# Initialize motor default values
|
||||
if motor_index not in self.packet_handler.data:
|
||||
self.packet_handler.data[motor_index] = get_default_motor_values(motor_index)
|
||||
|
||||
def txRxPacket(self): # noqa: N802
|
||||
return COMM_SUCCESS
|
||||
|
||||
def getData(self, index, address, bytes): # noqa: N802
|
||||
return self.packet_handler.data[index][address]
|
||||
|
||||
|
||||
class GroupSyncWrite:
|
||||
def __init__(self, port_handler, packet_handler, address, bytes):
|
||||
self.packet_handler = packet_handler
|
||||
self.address = address
|
||||
|
||||
def addParam(self, index, data): # noqa: N802
|
||||
if index not in self.packet_handler.data:
|
||||
self.packet_handler.data[index] = get_default_motor_values(index)
|
||||
self.changeParam(index, data)
|
||||
|
||||
def txPacket(self): # noqa: N802
|
||||
return COMM_SUCCESS
|
||||
|
||||
def changeParam(self, index, data): # noqa: N802
|
||||
self.packet_handler.data[index][self.address] = data
|
||||
|
||||
|
||||
class SerialMock:
|
||||
def reset_output_buffer(self):
|
||||
pass
|
||||
|
||||
def reset_input_buffer(self):
|
||||
pass
|
|
@ -1,35 +1,51 @@
|
|||
import sys
|
||||
from unittest.mock import patch
|
||||
|
||||
import dynamixel_sdk as dxl
|
||||
import pytest
|
||||
|
||||
from lerobot.common.motors.dynamixel.dynamixel import DynamixelMotorsBus
|
||||
from tests.mocks.mock_dynamixel import MockInstructionPacket, MockMotors, MockPortHandler
|
||||
from lerobot.common.motors.dynamixel import DynamixelMotorsBus
|
||||
from tests.mocks.mock_dynamixel import MockMotors, MockPortHandler
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_port_handler():
|
||||
with patch.object(dxl, "PortHandler", MockPortHandler):
|
||||
yield # Patch is applied for the duration of each test
|
||||
if sys.platform == "darwin":
|
||||
with patch.object(dxl, "PortHandler", MockPortHandler):
|
||||
yield
|
||||
else:
|
||||
yield
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}")
|
||||
def test_autouse_patch():
|
||||
"""Ensure that the autouse fixture correctly patches dxl.PortHandler with MockPortHandler."""
|
||||
"""Ensures that the autouse fixture correctly patches dxl.PortHandler with MockPortHandler."""
|
||||
assert dxl.PortHandler is MockPortHandler
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"value, n_bytes, expected",
|
||||
[
|
||||
(0x12, 1, [0x12]), # Single byte
|
||||
(0x1234, 2, [0x34, 0x12]), # Two bytes
|
||||
(0x12345678, 4, [0x78, 0x56, 0x34, 0x12]), # Four bytes
|
||||
(0, 1, [0x00]), # Zero with 1 byte
|
||||
(0, 2, [0x00, 0x00]), # Zero with 2 bytes
|
||||
(0, 4, [0x00, 0x00, 0x00, 0x00]), # Zero with 4 bytes
|
||||
(255, 1, [0xFF]), # Max single byte
|
||||
(65535, 2, [0xFF, 0xFF]), # Max two bytes
|
||||
(4294967295, 4, [0xFF, 0xFF, 0xFF, 0xFF]), # Max four bytes
|
||||
(0x12, 1, [0x12]),
|
||||
(0x1234, 2, [0x34, 0x12]),
|
||||
(0x12345678, 4, [0x78, 0x56, 0x34, 0x12]),
|
||||
(0, 1, [0x00]),
|
||||
(0, 2, [0x00, 0x00]),
|
||||
(0, 4, [0x00, 0x00, 0x00, 0x00]),
|
||||
(255, 1, [0xFF]),
|
||||
(65535, 2, [0xFF, 0xFF]),
|
||||
(4294967295, 4, [0xFF, 0xFF, 0xFF, 0xFF]),
|
||||
],
|
||||
ids=[
|
||||
"1 byte",
|
||||
"2 bytes",
|
||||
"4 bytes",
|
||||
"0 with 1 byte",
|
||||
"0 with 2 bytes",
|
||||
"0 with 4 bytes",
|
||||
"max single byte",
|
||||
"max two bytes",
|
||||
"max four bytes",
|
||||
],
|
||||
) # fmt: skip
|
||||
def test_split_int_bytes(value, n_bytes, expected):
|
||||
|
@ -53,7 +69,7 @@ def test_split_int_bytes_large_number():
|
|||
|
||||
|
||||
def test_abc_implementation():
|
||||
# Instantiation should raise an error if the class doesn't implements abstract methods/properties
|
||||
"""Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
|
||||
DynamixelMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "xl330-m077")})
|
||||
|
||||
|
||||
|
@ -63,11 +79,14 @@ def test_abc_implementation():
|
|||
None,
|
||||
[1, 2, 3],
|
||||
["dummy_1", "dummy_2", "dummy_3"],
|
||||
[1, "dummy_2", 3], # Mixed
|
||||
[1, "dummy_2", 3],
|
||||
],
|
||||
ids=["None", "by ids", "by names", "mixed"],
|
||||
)
|
||||
def test_read_all_motors(motors):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
positions = [1337, 42, 4016]
|
||||
mock_motors.build_all_motors_stub("Present_Position", return_values=positions)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
|
@ -81,13 +100,22 @@ def test_read_all_motors(motors):
|
|||
pos_dict = motors_bus.read("Present_Position", motors=motors)
|
||||
|
||||
assert mock_motors.stubs["SyncRead_Present_Position_all"].called
|
||||
assert len(pos_dict) == 3
|
||||
assert all(returned_pos == pos for returned_pos, pos in zip(pos_dict.values(), positions, strict=True))
|
||||
assert set(pos_dict) == {"dummy_1", "dummy_2", "dummy_3"}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
|
||||
|
||||
@pytest.mark.parametrize("idx", [1, 2, 3])
|
||||
def test_read_single_motor_name(idx):
|
||||
@pytest.mark.parametrize(
|
||||
"idx, pos",
|
||||
[
|
||||
[1, 1337],
|
||||
[2, 42],
|
||||
[3, 4016],
|
||||
],
|
||||
)
|
||||
def test_read_single_motor_by_name(idx, pos):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
|
@ -101,13 +129,21 @@ def test_read_single_motor_name(idx):
|
|||
pos_dict = motors_bus.read("Present_Position", f"dummy_{idx}")
|
||||
|
||||
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
|
||||
assert len(pos_dict) == 1
|
||||
assert pos_dict == {f"dummy_{idx}": pos}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
|
||||
|
||||
@pytest.mark.parametrize("idx", [1, 2, 3])
|
||||
def test_read_single_motor_id(idx):
|
||||
@pytest.mark.parametrize(
|
||||
"idx, pos",
|
||||
[
|
||||
[1, 1337],
|
||||
[2, 42],
|
||||
[3, 4016],
|
||||
],
|
||||
)
|
||||
def test_read_single_motor_by_id(idx, pos):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
|
@ -121,28 +157,24 @@ def test_read_single_motor_id(idx):
|
|||
pos_dict = motors_bus.read("Present_Position", idx)
|
||||
|
||||
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
|
||||
assert len(pos_dict) == 1
|
||||
assert pos_dict == {f"dummy_{idx}": pos}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"num_retry, num_invalid_try",
|
||||
"num_retry, num_invalid_try, pos",
|
||||
[
|
||||
[1, 2],
|
||||
[2, 3],
|
||||
[3, 2],
|
||||
[2, 1],
|
||||
[1, 2, 1337],
|
||||
[2, 3, 42],
|
||||
[3, 2, 4016],
|
||||
[2, 1, 999],
|
||||
],
|
||||
)
|
||||
def test_read_num_retry(num_retry, num_invalid_try):
|
||||
mock_motors = MockMotors([1, 2, 3], default_stubs=None)
|
||||
address, length = mock_motors.ctrl_table["Present_Position"]
|
||||
receive_bytes = MockInstructionPacket.sync_read([1], address, length)
|
||||
send_fn = mock_motors._create_present_pos_send_fn(
|
||||
[1], "Present_Position", num_invalid_try=num_invalid_try
|
||||
def test_read_num_retry(num_retry, num_invalid_try, pos):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
mock_motors.build_single_motor_stubs(
|
||||
"Present_Position", return_value=pos, num_invalid_try=num_invalid_try
|
||||
)
|
||||
mock_motors.stub(name="num_retry", receive_bytes=receive_bytes, send_fn=send_fn)
|
||||
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
|
@ -155,10 +187,10 @@ def test_read_num_retry(num_retry, num_invalid_try):
|
|||
|
||||
if num_retry >= num_invalid_try:
|
||||
pos_dict = motors_bus.read("Present_Position", 1, num_retry=num_retry)
|
||||
assert len(pos_dict) == 1
|
||||
assert pos_dict == {"dummy_1": pos}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
else:
|
||||
with pytest.raises(ConnectionError):
|
||||
_ = motors_bus.read("Present_Position", 1, num_retry=num_retry)
|
||||
|
||||
assert mock_motors.stubs["num_retry"].calls == num_retry
|
||||
assert mock_motors.stubs["SyncRead_Present_Position_1"].calls == num_retry
|
||||
|
|
|
@ -2,43 +2,195 @@ import sys
|
|||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import scservo_sdk as scs
|
||||
|
||||
from lerobot.common.motors.feetech import FeetechMotorsBus
|
||||
from tests.mocks import mock_scservo_sdk
|
||||
from tests.mocks.mock_feetech import MockMotors, MockPortHandler
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_scservo_sdk():
|
||||
with patch.dict(sys.modules, {"scservo_sdk": mock_scservo_sdk}):
|
||||
def patch_port_handler():
|
||||
if sys.platform == "darwin":
|
||||
with patch.object(scs, "PortHandler", MockPortHandler):
|
||||
yield
|
||||
else:
|
||||
yield
|
||||
|
||||
|
||||
def test_patch_sdk():
|
||||
assert "scservo_sdk" in sys.modules # Should be patched
|
||||
assert sys.modules["scservo_sdk"] is mock_scservo_sdk # Should match the mock
|
||||
@pytest.mark.skipif(sys.platform != "darwin", reason=f"No patching needed on {sys.platform=}")
|
||||
def test_autouse_patch():
|
||||
"""Ensures that the autouse fixture correctly patches scs.PortHandler with MockPortHandler."""
|
||||
assert scs.PortHandler is MockPortHandler
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"value, n_bytes, expected",
|
||||
[
|
||||
(0x12, 1, [0x12]),
|
||||
(0x1234, 2, [0x34, 0x12]),
|
||||
(0x12345678, 4, [0x78, 0x56, 0x34, 0x12]),
|
||||
(0, 1, [0x00]),
|
||||
(0, 2, [0x00, 0x00]),
|
||||
(0, 4, [0x00, 0x00, 0x00, 0x00]),
|
||||
(255, 1, [0xFF]),
|
||||
(65535, 2, [0xFF, 0xFF]),
|
||||
(4294967295, 4, [0xFF, 0xFF, 0xFF, 0xFF]),
|
||||
],
|
||||
ids=[
|
||||
"1 byte",
|
||||
"2 bytes",
|
||||
"4 bytes",
|
||||
"0 with 1 byte",
|
||||
"0 with 2 bytes",
|
||||
"0 with 4 bytes",
|
||||
"max single byte",
|
||||
"max two bytes",
|
||||
"max four bytes",
|
||||
],
|
||||
) # fmt: skip
|
||||
def test_split_int_bytes(value, n_bytes, expected):
|
||||
assert FeetechMotorsBus.split_int_bytes(value, n_bytes) == expected
|
||||
|
||||
|
||||
def test_split_int_bytes_invalid_n_bytes():
|
||||
with pytest.raises(NotImplementedError):
|
||||
FeetechMotorsBus.split_int_bytes(100, 3)
|
||||
|
||||
|
||||
def test_split_int_bytes_negative_numbers():
|
||||
with pytest.raises(ValueError):
|
||||
neg = FeetechMotorsBus.split_int_bytes(-1, 1)
|
||||
print(neg)
|
||||
|
||||
|
||||
def test_split_int_bytes_large_number():
|
||||
with pytest.raises(ValueError):
|
||||
FeetechMotorsBus.split_int_bytes(2**32, 4) # 4-byte max is 0xFFFFFFFF
|
||||
|
||||
|
||||
def test_abc_implementation():
|
||||
# Instantiation should raise an error if the class doesn't implements abstract methods/properties
|
||||
"""Instantiation should raise an error if the class doesn't implement abstract methods/properties."""
|
||||
FeetechMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "sts3215")})
|
||||
|
||||
|
||||
def test_configure_motors_all_ids_1():
|
||||
# see SCS_SERIES_BAUDRATE_TABLE
|
||||
smaller_baudrate = 19_200
|
||||
smaller_baudrate_value = 7
|
||||
|
||||
# This test expect the configuration was already correct.
|
||||
motors_bus = FeetechMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "sts3215")})
|
||||
@pytest.mark.parametrize(
|
||||
"motors",
|
||||
[
|
||||
None,
|
||||
[1, 2, 3],
|
||||
["dummy_1", "dummy_2", "dummy_3"],
|
||||
[1, "dummy_2", 3],
|
||||
],
|
||||
ids=["None", "by ids", "by names", "mixed"],
|
||||
)
|
||||
def test_read_all_motors(motors):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
positions = [1337, 42, 4016]
|
||||
mock_motors.build_all_motors_stub("Present_Position", return_values=positions)
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
"dummy_1": (1, "sts3215"),
|
||||
"dummy_2": (2, "sts3215"),
|
||||
"dummy_3": (3, "sts3215"),
|
||||
},
|
||||
)
|
||||
motors_bus.connect()
|
||||
motors_bus.write("Baud_Rate", [smaller_baudrate_value] * len(motors_bus))
|
||||
|
||||
motors_bus.set_baudrate(smaller_baudrate)
|
||||
motors_bus.write("ID", [1] * len(motors_bus))
|
||||
del motors_bus
|
||||
pos_dict = motors_bus.read("Present_Position", motors=motors)
|
||||
|
||||
# Test configure
|
||||
motors_bus = FeetechMotorsBus(port="/dev/dummy-port", motors={"dummy": (1, "sts3215")})
|
||||
assert mock_motors.stubs["SyncRead_Present_Position_all"].called
|
||||
assert all(returned_pos == pos for returned_pos, pos in zip(pos_dict.values(), positions, strict=True))
|
||||
assert set(pos_dict) == {"dummy_1", "dummy_2", "dummy_3"}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"idx, pos",
|
||||
[
|
||||
[1, 1337],
|
||||
[2, 42],
|
||||
[3, 4016],
|
||||
],
|
||||
)
|
||||
def test_read_single_motor_by_name(idx, pos):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
"dummy_1": (1, "sts3215"),
|
||||
"dummy_2": (2, "sts3215"),
|
||||
"dummy_3": (3, "sts3215"),
|
||||
},
|
||||
)
|
||||
motors_bus.connect()
|
||||
assert motors_bus.are_motors_configured()
|
||||
del motors_bus
|
||||
|
||||
pos_dict = motors_bus.read("Present_Position", f"dummy_{idx}")
|
||||
|
||||
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
|
||||
assert pos_dict == {f"dummy_{idx}": pos}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"idx, pos",
|
||||
[
|
||||
[1, 1337],
|
||||
[2, 42],
|
||||
[3, 4016],
|
||||
],
|
||||
)
|
||||
def test_read_single_motor_by_id(idx, pos):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
mock_motors.build_single_motor_stubs("Present_Position", return_value=pos)
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
"dummy_1": (1, "sts3215"),
|
||||
"dummy_2": (2, "sts3215"),
|
||||
"dummy_3": (3, "sts3215"),
|
||||
},
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
pos_dict = motors_bus.read("Present_Position", idx)
|
||||
|
||||
assert mock_motors.stubs[f"SyncRead_Present_Position_{idx}"].called
|
||||
assert pos_dict == {f"dummy_{idx}": pos}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"num_retry, num_invalid_try, pos",
|
||||
[
|
||||
[1, 2, 1337],
|
||||
[2, 3, 42],
|
||||
[3, 2, 4016],
|
||||
[2, 1, 999],
|
||||
],
|
||||
)
|
||||
def test_read_num_retry(num_retry, num_invalid_try, pos):
|
||||
mock_motors = MockMotors([1, 2, 3])
|
||||
mock_motors.build_single_motor_stubs(
|
||||
"Present_Position", return_value=pos, num_invalid_try=num_invalid_try
|
||||
)
|
||||
motors_bus = FeetechMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors={
|
||||
"dummy_1": (1, "sts3215"),
|
||||
"dummy_2": (2, "sts3215"),
|
||||
"dummy_3": (3, "sts3215"),
|
||||
},
|
||||
)
|
||||
motors_bus.connect()
|
||||
|
||||
if num_retry >= num_invalid_try:
|
||||
pos_dict = motors_bus.read("Present_Position", 1, num_retry=num_retry)
|
||||
assert pos_dict == {"dummy_1": pos}
|
||||
assert all(pos >= 0 and pos <= 4095 for pos in pos_dict.values())
|
||||
else:
|
||||
with pytest.raises(ConnectionError):
|
||||
_ = motors_bus.read("Present_Position", 1, num_retry=num_retry)
|
||||
|
||||
assert mock_motors.stubs["SyncRead_Present_Position_1"].calls == num_retry
|
||||
|
|
|
@ -1,123 +0,0 @@
|
|||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Tests for physical motors and their mocked versions.
|
||||
If the physical motors are not connected to the computer, or not working,
|
||||
the test will be skipped.
|
||||
|
||||
Example of running a specific test:
|
||||
```bash
|
||||
pytest -sx tests/test_motors.py::test_find_port
|
||||
pytest -sx tests/test_motors.py::test_motors_bus
|
||||
```
|
||||
|
||||
Example of running test on real dynamixel motors connected to the computer:
|
||||
```bash
|
||||
pytest -sx 'tests/test_motors.py::test_motors_bus[dynamixel-False]'
|
||||
```
|
||||
|
||||
Example of running test on a mocked version of dynamixel motors:
|
||||
```bash
|
||||
pytest -sx 'tests/test_motors.py::test_motors_bus[dynamixel-True]'
|
||||
```
|
||||
"""
|
||||
|
||||
# TODO(rcadene): measure fps in nightly?
|
||||
# TODO(rcadene): test logs
|
||||
# TODO(rcadene): test calibration
|
||||
# TODO(rcadene): add compatibility with other motors bus
|
||||
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
from lerobot.scripts.find_motors_bus_port import find_port
|
||||
from tests.utils import TEST_MOTOR_TYPES, make_motors_bus, require_motor
|
||||
|
||||
|
||||
@pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES)
|
||||
@require_motor
|
||||
def test_find_port(request, motor_type, mock):
|
||||
if mock:
|
||||
request.getfixturevalue("patch_builtins_input")
|
||||
with pytest.raises(OSError):
|
||||
find_port()
|
||||
else:
|
||||
find_port()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("motor_type, mock", TEST_MOTOR_TYPES)
|
||||
@require_motor
|
||||
def test_motors_bus(request, motor_type, mock):
|
||||
if mock:
|
||||
request.getfixturevalue("patch_builtins_input")
|
||||
|
||||
motors_bus = make_motors_bus(motor_type, mock=mock)
|
||||
|
||||
# Test reading and writing before connecting raises an error
|
||||
with pytest.raises(DeviceNotConnectedError):
|
||||
motors_bus.read("Torque_Enable")
|
||||
with pytest.raises(DeviceNotConnectedError):
|
||||
motors_bus.write("Torque_Enable", 1)
|
||||
with pytest.raises(DeviceNotConnectedError):
|
||||
motors_bus.disconnect()
|
||||
|
||||
# Test deleting the object without connecting first
|
||||
del motors_bus
|
||||
|
||||
# Test connecting
|
||||
motors_bus = make_motors_bus(motor_type, mock=mock)
|
||||
motors_bus.connect()
|
||||
|
||||
# Test connecting twice raises an error
|
||||
with pytest.raises(DeviceAlreadyConnectedError):
|
||||
motors_bus.connect()
|
||||
|
||||
# Test disabling torque and reading torque on all motors
|
||||
motors_bus.write("Torque_Enable", 0)
|
||||
values = motors_bus.read("Torque_Enable")
|
||||
assert isinstance(values, np.ndarray)
|
||||
assert len(values) == len(motors_bus.motors)
|
||||
assert (values == 0).all()
|
||||
|
||||
# Test writing torque on a specific motor
|
||||
motors_bus.write("Torque_Enable", 1, "gripper")
|
||||
|
||||
# Test reading torque from this specific motor. It is now 1
|
||||
values = motors_bus.read("Torque_Enable", "gripper")
|
||||
assert len(values) == 1
|
||||
assert values[0] == 1
|
||||
|
||||
# Test reading torque from all motors. It is 1 for the specific motor,
|
||||
# and 0 on the others.
|
||||
values = motors_bus.read("Torque_Enable")
|
||||
gripper_index = motors_bus.motor_names.index("gripper")
|
||||
assert values[gripper_index] == 1
|
||||
assert values.sum() == 1 # gripper is the only motor to have torque 1
|
||||
|
||||
# Test writing torque on all motors and it is 1 for all.
|
||||
motors_bus.write("Torque_Enable", 1)
|
||||
values = motors_bus.read("Torque_Enable")
|
||||
assert (values == 1).all()
|
||||
|
||||
# Test ordering the motors to move slightly (+1 value among 4096) and this move
|
||||
# can be executed and seen by the motor position sensor
|
||||
values = motors_bus.read("Present_Position")
|
||||
motors_bus.write("Goal_Position", values + 1)
|
||||
# Give time for the motors to move to the goal position
|
||||
time.sleep(1)
|
||||
new_values = motors_bus.read("Present_Position")
|
||||
assert (new_values == values).all()
|
Loading…
Reference in New Issue