From 64ce2669ca5cf186342b503edae19db05310d49a Mon Sep 17 00:00:00 2001 From: Simon Alibert Date: Thu, 20 Mar 2025 09:33:33 +0100 Subject: [PATCH] Add bytes stuffing --- tests/mocks/mock_dynamixel.py | 67 +++++++++++++++++++++++++++++------ 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/tests/mocks/mock_dynamixel.py b/tests/mocks/mock_dynamixel.py index 8176d766..57f58d6f 100644 --- a/tests/mocks/mock_dynamixel.py +++ b/tests/mocks/mock_dynamixel.py @@ -86,25 +86,70 @@ STATUS_TYPE = { class MockDynamixelPacketv2(abc.ABC): - @staticmethod - def _compute_crc(crc_accum: int, data_blk: list[int], data_blk_size: int) -> int: - for j in range(data_blk_size): - i = ((crc_accum >> 8) ^ data_blk[j]) & 0xFF - crc_accum = ((crc_accum << 8) ^ DXL_CRC_TABLE[i]) & 0xFFFF - return crc_accum - @classmethod def build(cls, dxl_id: int, params: list[int], *args, **kwargs) -> bytes: packet = cls._build(dxl_id, params, *args, **kwargs) - crc = cls._compute_crc(0, packet, len(packet) - 2) - packet[-2] = crc & 0xFF # lower byte - packet[-1] = (crc >> 8) & 0xFF # upper byte + packet = cls._add_stuffing(packet) + packet = cls._add_crc(packet) return bytes(packet) @abc.abstractclassmethod - def _build(cls, dxl_id: int, params: list[int], *args, **kwargs): + def _build(cls, dxl_id: int, params: list[int], *args, **kwargs) -> bytearray: pass + @staticmethod + def _add_stuffing(packet: bytearray) -> bytearray: + packet_length_in = dxl.DXL_MAKEWORD(packet[dxl.PKT_LENGTH_L], packet[dxl.PKT_LENGTH_H]) + packet_length_out = packet_length_in + + temp = [0] * dxl.TXPACKET_MAX_LEN + + # FF FF FD XX ID LEN_L LEN_H + temp[dxl.PKT_HEADER0 : dxl.PKT_HEADER0 + dxl.PKT_LENGTH_H + 1] = packet[ + dxl.PKT_HEADER0 : dxl.PKT_HEADER0 + dxl.PKT_LENGTH_H + 1 + ] + + index = dxl.PKT_INSTRUCTION + + for i in range(0, packet_length_in - 2): # except CRC + temp[index] = packet[i + dxl.PKT_INSTRUCTION] + index = index + 1 + if ( + packet[i + dxl.PKT_INSTRUCTION] == 0xFD + and packet[i + dxl.PKT_INSTRUCTION - 1] == 0xFF + and packet[i + dxl.PKT_INSTRUCTION - 2] == 0xFF + ): + # FF FF FD + temp[index] = 0xFD + index = index + 1 + packet_length_out = packet_length_out + 1 + + temp[index] = packet[dxl.PKT_INSTRUCTION + packet_length_in - 2] + temp[index + 1] = packet[dxl.PKT_INSTRUCTION + packet_length_in - 1] + index = index + 2 + + if packet_length_in != packet_length_out: + packet = [0] * index + + packet[0:index] = temp[0:index] + + packet[dxl.PKT_LENGTH_L] = dxl.DXL_LOBYTE(packet_length_out) + packet[dxl.PKT_LENGTH_H] = dxl.DXL_HIBYTE(packet_length_out) + + return packet + + @staticmethod + def _add_crc(packet: bytearray) -> int: + crc = 0 + for j in range(len(packet) - 2): + i = ((crc >> 8) ^ packet[j]) & 0xFF + crc = ((crc << 8) ^ DXL_CRC_TABLE[i]) & 0xFFFF + + packet[-2] = dxl.DXL_LOBYTE(crc) + packet[-1] = dxl.DXL_HIBYTE(crc) + + return packet + class MockInstructionPacket(MockDynamixelPacketv2): """