Add raw_values option
This commit is contained in:
parent
7c8ab8e2d6
commit
c6212d585d
|
@ -471,13 +471,19 @@ class MotorsBus(abc.ABC):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
def sync_read(self, data_name: str, motors: None = ..., num_retry: int = ...) -> dict[str, Value]: ...
|
def sync_read(
|
||||||
|
self, data_name: str, motors: None = ..., raw_values: bool = ..., num_retry: int = ...
|
||||||
|
) -> dict[str, Value]: ...
|
||||||
@overload
|
@overload
|
||||||
def sync_read(
|
def sync_read(
|
||||||
self, data_name: str, motors: NameOrID | list[NameOrID], num_retry: int = ...
|
self, data_name: str, motors: NameOrID | list[NameOrID], raw_values: bool = ..., num_retry: int = ...
|
||||||
) -> dict[NameOrID, Value]: ...
|
) -> dict[NameOrID, Value]: ...
|
||||||
def sync_read(
|
def sync_read(
|
||||||
self, data_name: str, motors: NameOrID | list[NameOrID] | None = None, num_retry: int = 0
|
self,
|
||||||
|
data_name: str,
|
||||||
|
motors: NameOrID | list[NameOrID] | None = None,
|
||||||
|
raw_values: bool = False,
|
||||||
|
num_retry: int = 0,
|
||||||
) -> dict[NameOrID, Value]:
|
) -> dict[NameOrID, Value]:
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
raise DeviceNotConnectedError(
|
raise DeviceNotConnectedError(
|
||||||
|
@ -503,7 +509,7 @@ class MotorsBus(abc.ABC):
|
||||||
f"{self.packet_handler.getTxRxResult(comm)}"
|
f"{self.packet_handler.getTxRxResult(comm)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if data_name in self.calibration_required and self.calibration is not None:
|
if not raw_values and data_name in self.calibration_required and self.calibration is not None:
|
||||||
ids_values = self._calibrate_values(ids_values)
|
ids_values = self._calibrate_values(ids_values)
|
||||||
|
|
||||||
return {id_key_map[idx]: val for idx, val in ids_values.items()}
|
return {id_key_map[idx]: val for idx, val in ids_values.items()}
|
||||||
|
@ -551,7 +557,13 @@ class MotorsBus(abc.ABC):
|
||||||
# for idx in motor_ids:
|
# for idx in motor_ids:
|
||||||
# value = self.reader.getData(idx, address, n_bytes)
|
# value = self.reader.getData(idx, address, n_bytes)
|
||||||
|
|
||||||
def sync_write(self, data_name: str, values: Value | dict[NameOrID, Value], num_retry: int = 0) -> None:
|
def sync_write(
|
||||||
|
self,
|
||||||
|
data_name: str,
|
||||||
|
values: Value | dict[NameOrID, Value],
|
||||||
|
raw_values: bool = False,
|
||||||
|
num_retry: int = 0,
|
||||||
|
) -> None:
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
raise DeviceNotConnectedError(
|
raise DeviceNotConnectedError(
|
||||||
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
||||||
|
@ -564,7 +576,7 @@ class MotorsBus(abc.ABC):
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"'values' is expected to be a single value or a dict. Got {values}")
|
raise ValueError(f"'values' is expected to be a single value or a dict. Got {values}")
|
||||||
|
|
||||||
if data_name in self.calibration_required and self.calibration is not None:
|
if not raw_values and data_name in self.calibration_required and self.calibration is not None:
|
||||||
ids_values = self._uncalibrate_values(ids_values)
|
ids_values = self._uncalibrate_values(ids_values)
|
||||||
|
|
||||||
comm = self._sync_write(data_name, ids_values, num_retry)
|
comm = self._sync_write(data_name, ids_values, num_retry)
|
||||||
|
@ -602,7 +614,9 @@ class MotorsBus(abc.ABC):
|
||||||
data = self._split_int_to_bytes(value, n_bytes)
|
data = self._split_int_to_bytes(value, n_bytes)
|
||||||
self.sync_writer.addParam(idx, data)
|
self.sync_writer.addParam(idx, data)
|
||||||
|
|
||||||
def write(self, data_name: str, motor: NameOrID, value: Value, num_retry: int = 0) -> None:
|
def write(
|
||||||
|
self, data_name: str, motor: NameOrID, value: Value, raw_value: bool = False, num_retry: int = 0
|
||||||
|
) -> None:
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
raise DeviceNotConnectedError(
|
raise DeviceNotConnectedError(
|
||||||
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
f"{self.__class__.__name__}('{self.port}') is not connected. You need to run `{self.__class__.__name__}.connect()`."
|
||||||
|
@ -610,7 +624,7 @@ class MotorsBus(abc.ABC):
|
||||||
|
|
||||||
idx = self._get_motor_id(motor)
|
idx = self._get_motor_id(motor)
|
||||||
|
|
||||||
if data_name in self.calibration_required and self.calibration is not None:
|
if not raw_value and data_name in self.calibration_required and self.calibration is not None:
|
||||||
id_value = self._uncalibrate_values({idx: value})
|
id_value = self._uncalibrate_values({idx: value})
|
||||||
value = id_value[idx]
|
value = id_value[idx]
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue