Add bytes stuffing

This commit is contained in:
Simon Alibert 2025-03-20 09:33:33 +01:00
parent f527adf7a9
commit 64ce2669ca
1 changed files with 56 additions and 11 deletions

View File

@ -86,25 +86,70 @@ STATUS_TYPE = {
class MockDynamixelPacketv2(abc.ABC): class MockDynamixelPacketv2(abc.ABC):
@staticmethod
def _compute_crc(crc_accum: int, data_blk: list[int], data_blk_size: int) -> int:
for j in range(data_blk_size):
i = ((crc_accum >> 8) ^ data_blk[j]) & 0xFF
crc_accum = ((crc_accum << 8) ^ DXL_CRC_TABLE[i]) & 0xFFFF
return crc_accum
@classmethod @classmethod
def build(cls, dxl_id: int, params: list[int], *args, **kwargs) -> bytes: def build(cls, dxl_id: int, params: list[int], *args, **kwargs) -> bytes:
packet = cls._build(dxl_id, params, *args, **kwargs) packet = cls._build(dxl_id, params, *args, **kwargs)
crc = cls._compute_crc(0, packet, len(packet) - 2) packet = cls._add_stuffing(packet)
packet[-2] = crc & 0xFF # lower byte packet = cls._add_crc(packet)
packet[-1] = (crc >> 8) & 0xFF # upper byte
return bytes(packet) return bytes(packet)
@abc.abstractclassmethod @abc.abstractclassmethod
def _build(cls, dxl_id: int, params: list[int], *args, **kwargs): def _build(cls, dxl_id: int, params: list[int], *args, **kwargs) -> bytearray:
pass pass
@staticmethod
def _add_stuffing(packet: bytearray) -> bytearray:
packet_length_in = dxl.DXL_MAKEWORD(packet[dxl.PKT_LENGTH_L], packet[dxl.PKT_LENGTH_H])
packet_length_out = packet_length_in
temp = [0] * dxl.TXPACKET_MAX_LEN
# FF FF FD XX ID LEN_L LEN_H
temp[dxl.PKT_HEADER0 : dxl.PKT_HEADER0 + dxl.PKT_LENGTH_H + 1] = packet[
dxl.PKT_HEADER0 : dxl.PKT_HEADER0 + dxl.PKT_LENGTH_H + 1
]
index = dxl.PKT_INSTRUCTION
for i in range(0, packet_length_in - 2): # except CRC
temp[index] = packet[i + dxl.PKT_INSTRUCTION]
index = index + 1
if (
packet[i + dxl.PKT_INSTRUCTION] == 0xFD
and packet[i + dxl.PKT_INSTRUCTION - 1] == 0xFF
and packet[i + dxl.PKT_INSTRUCTION - 2] == 0xFF
):
# FF FF FD
temp[index] = 0xFD
index = index + 1
packet_length_out = packet_length_out + 1
temp[index] = packet[dxl.PKT_INSTRUCTION + packet_length_in - 2]
temp[index + 1] = packet[dxl.PKT_INSTRUCTION + packet_length_in - 1]
index = index + 2
if packet_length_in != packet_length_out:
packet = [0] * index
packet[0:index] = temp[0:index]
packet[dxl.PKT_LENGTH_L] = dxl.DXL_LOBYTE(packet_length_out)
packet[dxl.PKT_LENGTH_H] = dxl.DXL_HIBYTE(packet_length_out)
return packet
@staticmethod
def _add_crc(packet: bytearray) -> int:
crc = 0
for j in range(len(packet) - 2):
i = ((crc >> 8) ^ packet[j]) & 0xFF
crc = ((crc << 8) ^ DXL_CRC_TABLE[i]) & 0xFFFF
packet[-2] = dxl.DXL_LOBYTE(crc)
packet[-1] = dxl.DXL_HIBYTE(crc)
return packet
class MockInstructionPacket(MockDynamixelPacketv2): class MockInstructionPacket(MockDynamixelPacketv2):
""" """