Update dynamixel with motors bus & tables changes
This commit is contained in:
parent
e998dddcfa
commit
792c3d961d
|
@ -29,7 +29,8 @@ from .tables import (
|
|||
AVAILABLE_BAUDRATES,
|
||||
MODEL_BAUDRATE_TABLE,
|
||||
MODEL_CONTROL_TABLE,
|
||||
MODEL_NUMBER,
|
||||
MODEL_ENCODING_TABLE,
|
||||
MODEL_NUMBER_TABLE,
|
||||
MODEL_RESOLUTION,
|
||||
)
|
||||
|
||||
|
@ -37,7 +38,7 @@ PROTOCOL_VERSION = 2.0
|
|||
BAUDRATE = 1_000_000
|
||||
DEFAULT_TIMEOUT_MS = 1000
|
||||
|
||||
NORMALIZATION_REQUIRED = ["Goal_Position", "Present_Position"]
|
||||
NORMALIZED_DATA = ["Goal_Position", "Present_Position"]
|
||||
CONVERT_UINT32_TO_INT32_REQUIRED = ["Goal_Position", "Present_Position"]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -94,9 +95,10 @@ class DynamixelMotorsBus(MotorsBus):
|
|||
default_timeout = DEFAULT_TIMEOUT_MS
|
||||
model_baudrate_table = deepcopy(MODEL_BAUDRATE_TABLE)
|
||||
model_ctrl_table = deepcopy(MODEL_CONTROL_TABLE)
|
||||
model_number_table = deepcopy(MODEL_NUMBER)
|
||||
model_encoding_table = deepcopy(MODEL_ENCODING_TABLE)
|
||||
model_number_table = deepcopy(MODEL_NUMBER_TABLE)
|
||||
model_resolution_table = deepcopy(MODEL_RESOLUTION)
|
||||
normalization_required = deepcopy(NORMALIZATION_REQUIRED)
|
||||
normalized_data = deepcopy(NORMALIZED_DATA)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -128,11 +130,25 @@ class DynamixelMotorsBus(MotorsBus):
|
|||
for motor in motors:
|
||||
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value)
|
||||
|
||||
def _encode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
|
||||
return encode_twos_complement(value, n_bytes)
|
||||
def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
|
||||
for id_ in ids_values:
|
||||
model = self._id_to_model(id_)
|
||||
encoding_table = self.model_encoding_table.get(model)
|
||||
if encoding_table and data_name in encoding_table:
|
||||
n_bytes = encoding_table[data_name]
|
||||
ids_values[id_] = encode_twos_complement(ids_values[id_], n_bytes)
|
||||
|
||||
def _decode_value(self, value: int, data_name: str | None = None, n_bytes: int | None = None) -> int:
|
||||
return decode_twos_complement(value, n_bytes)
|
||||
return ids_values
|
||||
|
||||
def _decode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
|
||||
for id_ in ids_values:
|
||||
model = self._id_to_model(id_)
|
||||
encoding_table = self.model_encoding_table.get(model)
|
||||
if encoding_table and data_name in encoding_table:
|
||||
n_bytes = encoding_table[data_name]
|
||||
ids_values[id_] = decode_twos_complement(ids_values[id_], n_bytes)
|
||||
|
||||
return ids_values
|
||||
|
||||
def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
|
||||
"""
|
||||
|
@ -182,13 +198,13 @@ class DynamixelMotorsBus(MotorsBus):
|
|||
data_list, comm = self.packet_handler.broadcastPing(self.port_handler)
|
||||
if self._is_comm_success(comm):
|
||||
break
|
||||
logger.debug(f"Broadcast failed on port '{self.port}' ({n_try=})")
|
||||
logger.debug(f"Broadcast ping failed on port '{self.port}' ({n_try=})")
|
||||
logger.debug(self.packet_handler.getTxRxResult(comm))
|
||||
|
||||
if not self._is_comm_success(comm):
|
||||
if raise_on_error:
|
||||
raise ConnectionError(self.packet_handler.getTxRxResult(comm))
|
||||
|
||||
return data_list if data_list else None
|
||||
return
|
||||
|
||||
return {id_: data[0] for id_, data in data_list.items()}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# data_name: (address, size_byte)
|
||||
# {data_name: (address, size_byte)}
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#control-table
|
||||
X_SERIES_CONTROL_TABLE = {
|
||||
"Model_Number": (0, 2),
|
||||
|
@ -66,6 +66,27 @@ X_SERIES_BAUDRATE_TABLE = {
|
|||
6: 4_000_000,
|
||||
}
|
||||
|
||||
# {data_name: size_byte}
|
||||
X_SERIES_ENCODINGS_TABLE = {
|
||||
"Homing_Offset": X_SERIES_CONTROL_TABLE["Homing_Offset"][1],
|
||||
"Goal_PWM": X_SERIES_CONTROL_TABLE["Goal_PWM"][1],
|
||||
"Goal_Current": X_SERIES_CONTROL_TABLE["Goal_Current"][1],
|
||||
"Goal_Velocity": X_SERIES_CONTROL_TABLE["Goal_Velocity"][1],
|
||||
"Present_PWM": X_SERIES_CONTROL_TABLE["Present_PWM"][1],
|
||||
"Present_Current": X_SERIES_CONTROL_TABLE["Present_Current"][1],
|
||||
"Present_Velocity": X_SERIES_CONTROL_TABLE["Present_Velocity"][1],
|
||||
}
|
||||
|
||||
MODEL_ENCODING_TABLE = {
|
||||
"x_series": X_SERIES_ENCODINGS_TABLE,
|
||||
"xl330-m077": X_SERIES_ENCODINGS_TABLE,
|
||||
"xl330-m288": X_SERIES_ENCODINGS_TABLE,
|
||||
"xl430-w250": X_SERIES_ENCODINGS_TABLE,
|
||||
"xm430-w350": X_SERIES_ENCODINGS_TABLE,
|
||||
"xm540-w270": X_SERIES_ENCODINGS_TABLE,
|
||||
"xc430-w150": X_SERIES_ENCODINGS_TABLE,
|
||||
}
|
||||
|
||||
# {model: model_resolution}
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#specifications
|
||||
MODEL_RESOLUTION = {
|
||||
|
@ -80,7 +101,7 @@ MODEL_RESOLUTION = {
|
|||
|
||||
# {model: model_number}
|
||||
# https://emanual.robotis.com/docs/en/dxl/x/{MODEL}/#control-table-of-eeprom-area
|
||||
MODEL_NUMBER = {
|
||||
MODEL_NUMBER_TABLE = {
|
||||
"xl330-m077": 1190,
|
||||
"xl330-m288": 1200,
|
||||
"xl430-w250": 1060,
|
||||
|
|
|
@ -6,7 +6,7 @@ import dynamixel_sdk as dxl
|
|||
import pytest
|
||||
|
||||
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
|
||||
from lerobot.common.motors.dynamixel import MODEL_NUMBER, DynamixelMotorsBus
|
||||
from lerobot.common.motors.dynamixel import MODEL_NUMBER_TABLE, DynamixelMotorsBus
|
||||
from lerobot.common.utils.encoding_utils import encode_twos_complement
|
||||
from tests.mocks.mock_dynamixel import MockMotors, MockPortHandler
|
||||
|
||||
|
@ -113,7 +113,7 @@ def test_abc_implementation(dummy_motors):
|
|||
|
||||
@pytest.mark.parametrize("id_", [1, 2, 3])
|
||||
def test_ping(id_, mock_motors, dummy_motors):
|
||||
expected_model_nb = MODEL_NUMBER[dummy_motors[f"dummy_{id_}"].model]
|
||||
expected_model_nb = MODEL_NUMBER_TABLE[dummy_motors[f"dummy_{id_}"].model]
|
||||
stub_name = mock_motors.build_ping_stub(id_, expected_model_nb)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
|
@ -129,7 +129,7 @@ def test_ping(id_, mock_motors, dummy_motors):
|
|||
|
||||
def test_broadcast_ping(mock_motors, dummy_motors):
|
||||
models = {m.id: m.model for m in dummy_motors.values()}
|
||||
expected_model_nbs = {id_: MODEL_NUMBER[model] for id_, model in models.items()}
|
||||
expected_model_nbs = {id_: MODEL_NUMBER_TABLE[model] for id_, model in models.items()}
|
||||
stub_name = mock_motors.build_broadcast_ping_stub(expected_model_nbs)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
|
@ -171,55 +171,7 @@ def test_sync_read_none(mock_motors, dummy_motors):
|
|||
(3, 4016),
|
||||
],
|
||||
)
|
||||
def test_sync_read_by_id(id_, position, mock_motors, dummy_motors):
|
||||
expected_position = {id_: position}
|
||||
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_position)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect(assert_motors_exist=False)
|
||||
|
||||
read_position = motors_bus.sync_read("Present_Position", id_, normalize=False)
|
||||
|
||||
assert mock_motors.stubs[stub_name].called
|
||||
assert read_position == expected_position
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ids, positions",
|
||||
[
|
||||
([1], [1337]),
|
||||
([1, 2], [1337, 42]),
|
||||
([1, 2, 3], [1337, 42, 4016]),
|
||||
],
|
||||
ids=["1 motor", "2 motors", "3 motors"],
|
||||
) # fmt: skip
|
||||
def test_sync_read_by_ids(ids, positions, mock_motors, dummy_motors):
|
||||
assert len(ids) == len(positions)
|
||||
expected_positions = dict(zip(ids, positions, strict=True))
|
||||
stub_name = mock_motors.build_sync_read_stub("Present_Position", expected_positions)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect(assert_motors_exist=False)
|
||||
|
||||
read_positions = motors_bus.sync_read("Present_Position", ids, normalize=False)
|
||||
|
||||
assert mock_motors.stubs[stub_name].called
|
||||
assert read_positions == expected_positions
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"id_, position",
|
||||
[
|
||||
(1, 1337),
|
||||
(2, 42),
|
||||
(3, 4016),
|
||||
],
|
||||
)
|
||||
def test_sync_read_by_name(id_, position, mock_motors, dummy_motors):
|
||||
def test_sync_read_single_value(id_, position, mock_motors, dummy_motors):
|
||||
expected_position = {f"dummy_{id_}": position}
|
||||
stub_name = mock_motors.build_sync_read_stub("Present_Position", {id_: position})
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
|
@ -243,7 +195,7 @@ def test_sync_read_by_name(id_, position, mock_motors, dummy_motors):
|
|||
],
|
||||
ids=["1 motor", "2 motors", "3 motors"],
|
||||
) # fmt: skip
|
||||
def test_sync_read_by_names(ids, positions, mock_motors, dummy_motors):
|
||||
def test_sync_read(ids, positions, mock_motors, dummy_motors):
|
||||
assert len(ids) == len(positions)
|
||||
names = [f"dummy_{dxl_id}" for dxl_id in ids]
|
||||
expected_positions = dict(zip(names, positions, strict=True))
|
||||
|
@ -271,9 +223,9 @@ def test_sync_read_by_names(ids, positions, mock_motors, dummy_motors):
|
|||
],
|
||||
)
|
||||
def test_sync_read_num_retry(num_retry, num_invalid_try, pos, mock_motors, dummy_motors):
|
||||
expected_position = {1: pos}
|
||||
expected_position = {"dummy_1": pos}
|
||||
stub_name = mock_motors.build_sync_read_stub(
|
||||
"Present_Position", expected_position, num_invalid_try=num_invalid_try
|
||||
"Present_Position", {1: pos}, num_invalid_try=num_invalid_try
|
||||
)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
|
@ -282,11 +234,11 @@ def test_sync_read_num_retry(num_retry, num_invalid_try, pos, mock_motors, dummy
|
|||
motors_bus.connect(assert_motors_exist=False)
|
||||
|
||||
if num_retry >= num_invalid_try:
|
||||
pos_dict = motors_bus.sync_read("Present_Position", 1, normalize=False, num_retry=num_retry)
|
||||
assert pos_dict == {1: pos}
|
||||
pos_dict = motors_bus.sync_read("Present_Position", "dummy_1", normalize=False, num_retry=num_retry)
|
||||
assert pos_dict == expected_position
|
||||
else:
|
||||
with pytest.raises(ConnectionError):
|
||||
_ = motors_bus.sync_read("Present_Position", 1, normalize=False, num_retry=num_retry)
|
||||
_ = motors_bus.sync_read("Present_Position", "dummy_1", normalize=False, num_retry=num_retry)
|
||||
|
||||
expected_calls = min(1 + num_retry, 1 + num_invalid_try)
|
||||
assert mock_motors.stubs[stub_name].calls == expected_calls
|
||||
|
@ -315,28 +267,6 @@ def test_sync_write_single_value(data_name, value, mock_motors, dummy_motors):
|
|||
assert mock_motors.stubs[stub_name].wait_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"id_, position",
|
||||
[
|
||||
(1, 1337),
|
||||
(2, 42),
|
||||
(3, 4016),
|
||||
],
|
||||
)
|
||||
def test_sync_write_by_id(id_, position, mock_motors, dummy_motors):
|
||||
value = {id_: position}
|
||||
stub_name = mock_motors.build_sync_write_stub("Goal_Position", value)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect(assert_motors_exist=False)
|
||||
|
||||
motors_bus.sync_write("Goal_Position", value, normalize=False)
|
||||
|
||||
assert mock_motors.stubs[stub_name].wait_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ids, positions",
|
||||
[
|
||||
|
@ -346,54 +276,7 @@ def test_sync_write_by_id(id_, position, mock_motors, dummy_motors):
|
|||
],
|
||||
ids=["1 motor", "2 motors", "3 motors"],
|
||||
) # fmt: skip
|
||||
def test_sync_write_by_ids(ids, positions, mock_motors, dummy_motors):
|
||||
assert len(ids) == len(positions)
|
||||
values = dict(zip(ids, positions, strict=True))
|
||||
stub_name = mock_motors.build_sync_write_stub("Goal_Position", values)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect(assert_motors_exist=False)
|
||||
|
||||
motors_bus.sync_write("Goal_Position", values, normalize=False)
|
||||
|
||||
assert mock_motors.stubs[stub_name].wait_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"id_, position",
|
||||
[
|
||||
(1, 1337),
|
||||
(2, 42),
|
||||
(3, 4016),
|
||||
],
|
||||
)
|
||||
def test_sync_write_by_name(id_, position, mock_motors, dummy_motors):
|
||||
id_value = {id_: position}
|
||||
stub_name = mock_motors.build_sync_write_stub("Goal_Position", id_value)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect(assert_motors_exist=False)
|
||||
|
||||
write_value = {f"dummy_{id_}": position}
|
||||
motors_bus.sync_write("Goal_Position", write_value, normalize=False)
|
||||
|
||||
assert mock_motors.stubs[stub_name].wait_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ids, positions",
|
||||
[
|
||||
([1], [1337]),
|
||||
([1, 2], [1337, 42]),
|
||||
([1, 2, 3], [1337, 42, 4016]),
|
||||
],
|
||||
ids=["1 motor", "2 motors", "3 motors"],
|
||||
) # fmt: skip
|
||||
def test_sync_write_by_names(ids, positions, mock_motors, dummy_motors):
|
||||
def test_sync_write(ids, positions, mock_motors, dummy_motors):
|
||||
assert len(ids) == len(positions)
|
||||
ids_values = dict(zip(ids, positions, strict=True))
|
||||
stub_name = mock_motors.build_sync_write_stub("Goal_Position", ids_values)
|
||||
|
@ -418,29 +301,7 @@ def test_sync_write_by_names(ids, positions, mock_motors, dummy_motors):
|
|||
("Goal_Position", 3, 42),
|
||||
],
|
||||
)
|
||||
def test_write_by_id(data_name, dxl_id, value, mock_motors, dummy_motors):
|
||||
stub_name = mock_motors.build_write_stub(data_name, dxl_id, value)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
motors=dummy_motors,
|
||||
)
|
||||
motors_bus.connect(assert_motors_exist=False)
|
||||
|
||||
motors_bus.write(data_name, dxl_id, value, normalize=False)
|
||||
|
||||
assert mock_motors.stubs[stub_name].called
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"data_name, dxl_id, value",
|
||||
[
|
||||
("Torque_Enable", 1, 0),
|
||||
("Torque_Enable", 1, 1),
|
||||
("Goal_Position", 2, 1337),
|
||||
("Goal_Position", 3, 42),
|
||||
],
|
||||
)
|
||||
def test_write_by_name(data_name, dxl_id, value, mock_motors, dummy_motors):
|
||||
def test_write(data_name, dxl_id, value, mock_motors, dummy_motors):
|
||||
stub_name = mock_motors.build_write_stub(data_name, dxl_id, value)
|
||||
motors_bus = DynamixelMotorsBus(
|
||||
port=mock_motors.port,
|
||||
|
|
Loading…
Reference in New Issue