''' Author: WZX 17839623189@168.com Date: 2023-12-01 14:44:07 LastEditors: Do not edit LastEditTime: 2023-12-04 15:47:05 FilePath: /lxy/zkmetaapi/ZKMetaUnit/utils/mqtt_msg.py Description: Copyright (c) 2023 by LLM, All Rights Reserved. ''' import random import threading import time import paho.mqtt.client as mqtt_client import os class MQTT(object): def __init__(self, client_id_header = 'UApp_', broker = '192.168.123.161', port = 1883, topic = "robot/state"): self.client_id = f"{client_id_header}-{str(random.randint(0, 1000))}" self.broker = broker self.port = port self.topic = topic self.bytes = "" self.lock = threading.Lock() # Thread lock used to protect self.temp def connect_mqtt(self): def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code", rc) client = mqtt_client.Client(self.client_id) client.on_connect = on_connect client.connect(self.broker, self.port) return client def subscribe(self, client: mqtt_client): def on_message(client, userdata, msg): # print(f"Received `{msg.payload}` from `{msg.topic}` topic") with self.lock: self.bytes = msg.payload client.subscribe(self.topic) client.on_message = on_message def run(self): client = self.connect_mqtt() self.subscribe(client) client.loop_start() # Start the MQTT client using multi-threading if __name__=="__main__": from mqtt_msg_decode import parse_robot_state import csv def append_to_tsv(file_path, data): with open(file_path, 'a+', newline='', encoding='utf-8') as tsvfile: writer = csv.writer(tsvfile, delimiter='\t') writer.writerow(data) mq = MQTT() mq.run() len = len(os.listdir("data")) # View the value of msg.payload asynchronously in another thread def check_temp(): while True: time.sleep(0.5) # Check every 1 second with mq.lock: # print("mq.temp:", mq.bytes, type(mq.bytes)) tmp_data = parse_robot_state(mq.bytes)["velocity"] tmp_data.append(time.time()) # append_to_tsv(f"data/result_{len}.tsv", tmp_data) # print(f"time: {tmp_data[-1]} velocity: {tmp_data[:-1]}") print(f"position {parse_robot_state(mq.bytes)['position']}, time {time.time()} ") check_temp() # thread = threading.Thread(target=check_temp) # thread.start()