Move imports & remove mock entirely

This commit is contained in:
Simon Alibert 2025-03-15 19:22:12 +01:00
parent 2e9b144c56
commit 0f972661e1
2 changed files with 34 additions and 22 deletions

View File

@ -16,7 +16,6 @@ import logging
import math
from copy import deepcopy
import dynamixel_sdk as dxl
import numpy as np
from ..motors_bus import (
@ -172,24 +171,21 @@ def convert_degrees_to_steps(degrees: float | np.ndarray, models: str | list[str
return steps
def convert_to_bytes(value, bytes, mock=False):
if mock:
return value
def convert_to_bytes(value, n_bytes: int):
import dynamixel_sdk as dxl
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if bytes == 1:
if n_bytes == 1:
data = [
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
]
elif bytes == 2:
elif n_bytes == 2:
data = [
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)),
]
elif bytes == 4:
elif n_bytes == 4:
data = [
dxl.DXL_LOBYTE(dxl.DXL_LOWORD(value)),
dxl.DXL_HIBYTE(dxl.DXL_LOWORD(value)),
@ -199,7 +195,7 @@ def convert_to_bytes(value, bytes, mock=False):
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{bytes} is provided instead."
f"{n_bytes} is provided instead."
)
return data
@ -255,6 +251,8 @@ class DynamixelMotorsBus(MotorsBus):
super().__init__(port, motors)
def _set_handlers(self):
import dynamixel_sdk as dxl
self.port_handler = dxl.PortHandler(self.port)
self.packet_handler = dxl.PacketHandler(PROTOCOL_VERSION)
@ -486,6 +484,8 @@ class DynamixelMotorsBus(MotorsBus):
return values
def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
import dynamixel_sdk as dxl
return_list = True
if not isinstance(motor_ids, list):
return_list = False
@ -519,6 +519,8 @@ class DynamixelMotorsBus(MotorsBus):
return values[0]
def _read(self, data_name, motor_names: str | list[str] | None = None):
import dynamixel_sdk as dxl
motor_ids = []
models = []
for name in motor_names:
@ -566,6 +568,8 @@ class DynamixelMotorsBus(MotorsBus):
return values
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
import dynamixel_sdk as dxl
if not isinstance(motor_ids, list):
motor_ids = [motor_ids]
if not isinstance(values, list):
@ -575,7 +579,7 @@ class DynamixelMotorsBus(MotorsBus):
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = dxl.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
data = convert_to_bytes(value, bytes)
group.addParam(idx, data)
for _ in range(num_retry):
@ -590,6 +594,8 @@ class DynamixelMotorsBus(MotorsBus):
)
def _write(self, data_name: str, values: list[int], motor_names: list[str]) -> None:
import dynamixel_sdk as dxl
motor_ids = []
models = []
for name in motor_names:
@ -611,7 +617,7 @@ class DynamixelMotorsBus(MotorsBus):
)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
data = convert_to_bytes(value, bytes)
if init_group:
self.group_writers[group_key].addParam(idx, data)
else:

View File

@ -15,7 +15,6 @@
from copy import deepcopy
import numpy as np
import scservo_sdk as scs
from ..motors_bus import (
CalibrationMode,
@ -185,24 +184,21 @@ def adjusted_to_motor_ticks(adjusted_pos: int, model: str, motorbus, motor_id: i
return ticks
def convert_to_bytes(value, bytes, mock=False):
if mock:
return value
def convert_to_bytes(value, n_bytes: int):
import scservo_sdk as scs
# Note: No need to convert back into unsigned int, since this byte preprocessing
# already handles it for us.
if bytes == 1:
if n_bytes == 1:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
]
elif bytes == 2:
elif n_bytes == 2:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
]
elif bytes == 4:
elif n_bytes == 4:
data = [
scs.SCS_LOBYTE(scs.SCS_LOWORD(value)),
scs.SCS_HIBYTE(scs.SCS_LOWORD(value)),
@ -212,7 +208,7 @@ def convert_to_bytes(value, bytes, mock=False):
else:
raise NotImplementedError(
f"Value of the number of bytes to be sent is expected to be in [1, 2, 4], but "
f"{bytes} is provided instead."
f"{n_bytes} is provided instead."
)
return data
@ -268,6 +264,8 @@ class FeetechMotorsBus(MotorsBus):
super().__init__(port, motors)
def _set_handlers(self):
import scservo_sdk as scs
self.port_handler = scs.PortHandler(self.port)
self.packet_handler = scs.PacketHandler(PROTOCOL_VERSION)
@ -340,6 +338,8 @@ class FeetechMotorsBus(MotorsBus):
return values
def read_with_motor_ids(self, motor_models, motor_ids, data_name, num_retry=NUM_READ_RETRY):
import scservo_sdk as scs
return_list = True
if not isinstance(motor_ids, list):
return_list = False
@ -373,6 +373,8 @@ class FeetechMotorsBus(MotorsBus):
return values[0]
def _read(self, data_name, motor_names: str | list[str] | None = None):
import scservo_sdk as scs
motor_ids = []
models = []
for name in motor_names:
@ -420,6 +422,8 @@ class FeetechMotorsBus(MotorsBus):
return values
def write_with_motor_ids(self, motor_models, motor_ids, data_name, values, num_retry=NUM_WRITE_RETRY):
import scservo_sdk as scs
if not isinstance(motor_ids, list):
motor_ids = [motor_ids]
if not isinstance(values, list):
@ -429,7 +433,7 @@ class FeetechMotorsBus(MotorsBus):
addr, bytes = self.model_ctrl_table[motor_models[0]][data_name]
group = scs.GroupSyncWrite(self.port_handler, self.packet_handler, addr, bytes)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
data = convert_to_bytes(value, bytes)
group.addParam(idx, data)
for _ in range(num_retry):
@ -444,6 +448,8 @@ class FeetechMotorsBus(MotorsBus):
)
def _write(self, data_name: str, values: list[int], motor_names: list[str]) -> None:
import scservo_sdk as scs
motor_ids = []
models = []
for name in motor_names:
@ -465,7 +471,7 @@ class FeetechMotorsBus(MotorsBus):
)
for idx, value in zip(motor_ids, values, strict=True):
data = convert_to_bytes(value, bytes, self.mock)
data = convert_to_bytes(value, bytes)
if init_group:
self.group_writers[group_key].addParam(idx, data)
else: