diff --git a/tests/mocks/mock_dynamixel.py b/tests/mocks/mock_dynamixel.py index cccc3c20..e45ff1b4 100644 --- a/tests/mocks/mock_dynamixel.py +++ b/tests/mocks/mock_dynamixel.py @@ -87,18 +87,32 @@ ERROR_TYPE = { class MockDynamixelPacketv2(abc.ABC): @classmethod - def build(cls, dxl_id: int, params: list[int], *args, **kwargs) -> bytes: - packet = cls._build(dxl_id, params, *args, **kwargs) + def build(cls, dxl_id: int, params: list[int], length: list[int], *args, **kwargs) -> bytes: + packet = cls._build(dxl_id, params, length, *args, **kwargs) packet = cls._add_stuffing(packet) packet = cls._add_crc(packet) return bytes(packet) @abc.abstractclassmethod - def _build(cls, dxl_id: int, params: list[int], *args, **kwargs) -> bytearray: + def _build(cls, dxl_id: int, params: list[int], length: int, *args, **kwargs) -> list[int]: pass @staticmethod - def _add_stuffing(packet: bytearray) -> bytearray: + def _add_stuffing(packet: list[int]) -> list[int]: + """ + Byte stuffing is a method of adding additional data to generated instruction packets to ensure that + the packets are processed successfully. When the byte pattern "0xFF 0xFF 0xFD" appears in a packet, + byte stuffing adds 0xFD to the end of the pattern to convert it to “0xFF 0xFF 0xFD 0xFD” to ensure + that it is not interpreted as the header at the start of another packet. + + Source: https://emanual.robotis.com/docs/en/dxl/protocol2/#transmission-process + + Args: + packet (list[int]): The raw packet without stuffing. + + Returns: + list[int]: The packet stuffed if it contained a "0xFF 0xFF 0xFD" byte sequence in its data bytes. + """ packet_length_in = dxl.DXL_MAKEWORD(packet[dxl.PKT_LENGTH_L], packet[dxl.PKT_LENGTH_H]) packet_length_out = packet_length_in @@ -139,7 +153,7 @@ class MockDynamixelPacketv2(abc.ABC): return packet @staticmethod - def _add_crc(packet: bytearray) -> int: + def _add_crc(packet: list[int]) -> list[int]: crc = 0 for j in range(len(packet) - 2): i = ((crc >> 8) ^ packet[j]) & 0xFF @@ -165,28 +179,17 @@ class MockInstructionPacket(MockDynamixelPacketv2): """ @classmethod - def _build(cls, dxl_id: int, params: list[int], instruct_type: str): + def _build(cls, dxl_id: int, params: list[int], length: int, instruct_type: str) -> list[int]: instruct_value = INSTRUCTION_TYPES[instruct_type] - - # For Protocol 2.0, "length" = (number_of_params + 3), - # where: - # +1 is for byte, - # +2 is for the CRC at the end. - # The official Dynamixel SDK sometimes uses (+7) logic for sync reads - # because it includes special sub-parameters. But at core: - # length = (instruction byte) + (len(params)) + (CRC16 =2). - packet_length = len(params) + 3 - return bytearray( - [ - 0xFF, 0xFF, 0xFD, 0x00, # header - dxl_id, # servo id - dxl.DXL_LOBYTE(packet_length), # length_l - dxl.DXL_HIBYTE(packet_length), # length_h - instruct_value, # instruction type - *params, # data bytes - 0x00, 0x00 # placeholder for CRC - ] - ) # fmt: skip + return [ + 0xFF, 0xFF, 0xFD, 0x00, # header + dxl_id, # servo id + dxl.DXL_LOBYTE(length), # length_l + dxl.DXL_HIBYTE(length), # length_h + instruct_value, # instruction type + *params, # data bytes + 0x00, 0x00 # placeholder for CRC + ] # fmt: skip @classmethod def sync_read( @@ -196,24 +199,31 @@ class MockInstructionPacket(MockDynamixelPacketv2): data_length: int, ) -> bytes: """ - Helper method to build a Sync Read broadcast instruction. + Builds a "Sync_Read" broadcast instruction. (from https://emanual.robotis.com/docs/en/dxl/protocol2/#sync-read-0x82) - The parameters for Sync Read (Protocol 2.0) are: + The parameters for Sync_Read (Protocol 2.0) are: param[0] = start_address L param[1] = start_address H param[2] = data_length L param[3] = data_length H param[4+] = motor IDs to read from + + And 'length' = (number_of_params + 7), where: + +1 is for instruction byte, + +2 is for the address bytes, + +2 is for the length bytes, + +2 is for the CRC at the end. """ params = [ dxl.DXL_LOBYTE(start_address), dxl.DXL_HIBYTE(start_address), dxl.DXL_LOBYTE(data_length), dxl.DXL_HIBYTE(data_length), - ] + dxl_ids - - return cls.build(dxl_id=dxl.BROADCAST_ID, instruct_type="Sync_Read", params=params) + *dxl_ids, + ] + length = len(dxl_ids) + 7 + return cls.build(dxl_id=dxl.BROADCAST_ID, params=params, length=length, instruct_type="Sync_Read") class MockStatusPacket(MockDynamixelPacketv2): @@ -229,27 +239,27 @@ class MockStatusPacket(MockDynamixelPacketv2): """ @classmethod - def _build(cls, dxl_id: int, params: list[int], error: str = "Success") -> bytearray: + def _build(cls, dxl_id: int, params: list[int], length: int, error: str = "Success") -> list[int]: err_byte = ERROR_TYPE[error] - return bytearray( - [ - 0xFF, 0xFF, 0xFD, 0x00, # header - dxl_id, # servo id - 0x08, 0x00, # length_l, length_h = 8 - 0x55, # instruction = 'status' - err_byte, # error - *params, # data bytes - 0x00, 0x00 # placeholder for CRC - ] - ) # fmt: skip + return [ + 0xFF, 0xFF, 0xFD, 0x00, # header + dxl_id, # servo id + dxl.DXL_LOBYTE(length), # length_l + dxl.DXL_HIBYTE(length), # length_h + 0x55, # instruction = 'status' + err_byte, # error + *params, # data bytes + 0x00, 0x00 # placeholder for CRC + ] # fmt: skip @classmethod def present_position(cls, dxl_id: int, pos: int | None = None, min_max_range: tuple = (0, 4095)) -> bytes: - """Builds a 'Present_Position' packet. + """Builds a 'Present_Position' status packet. Args: - pos (int | None, optional): Desired 'Present_Position'. If None, it will use a random value in the - min_max_range. Defaults to None. + dxl_id (int): List of the servos ids + pos (int | None, optional): Desired 'Present_Position' to be returned in the packet. If None, it + will use a random value in the min_max_range. Defaults to None. min_max_range (tuple, optional): Min/max range to generate the position values used for when 'pos' is None. Note that the bounds are included in the range. Defaults to (0, 4095). @@ -257,14 +267,14 @@ class MockStatusPacket(MockDynamixelPacketv2): bytes: The raw 'Present_Position' status packet ready to be sent through serial. """ pos = random.randint(*min_max_range) if pos is None else pos - # [lower pos 8 bits, higher pos 8 bits, 0, 0] - params = [pos & 0xFF, (pos >> 8) & 0xFF, 0, 0] - return cls.build(dxl_id, params) + params = [dxl.DXL_LOBYTE(pos), dxl.DXL_HIBYTE(pos), 0, 0] + length = 8 + return cls.build(dxl_id, params=params, length=length) class MockPortHandler(dxl.PortHandler): """ - This class overwrite the 'setupPort' method of the dynamixel PortHandler because it can specify + This class overwrite the 'setupPort' method of the Dynamixel PortHandler because it can specify baudrates that are not supported with a serial port on MacOS. """