Go2Py_SIM/Go2Py/utils/ros2.py

90 lines
3.0 KiB
Python

import struct
import threading
import time
import numpy as np
import rclpy
import tf2_ros
from rclpy.node import Node
from rclpy.qos import QoSProfile
from rclpy.executors import MultiThreadedExecutor
from geometry_msgs.msg import TransformStamped
from scipy.spatial.transform import Rotation as R
def ros2_init(args=None):
rclpy.init(args=args)
def ros2_close():
rclpy.shutdown()
class ROS2ExecutorManager:
"""A class to manage the ROS2 executor. It allows to add nodes and start the executor in a separate thread."""
def __init__(self):
self.executor = MultiThreadedExecutor()
self.nodes = []
self.executor_thread = None
def add_node(self, node: Node):
"""Add a new node to the executor."""
self.nodes.append(node)
self.executor.add_node(node)
def _run_executor(self):
try:
self.executor.spin()
except KeyboardInterrupt:
pass
finally:
self.terminate()
def start(self):
"""Start spinning the nodes in a separate thread."""
self.executor_thread = threading.Thread(target=self._run_executor)
self.executor_thread.start()
def terminate(self):
"""Terminate all nodes and shutdown rclpy."""
for node in self.nodes:
node.destroy_node()
rclpy.shutdown()
if self.executor_thread:
self.executor_thread.join()
class ROS2TFInterface(Node):
def __init__(self, parent_name, child_name, node_name):
super().__init__(f'{node_name}_tf2_listener')
self.parent_name = parent_name
self.child_name = child_name
self.tfBuffer = tf2_ros.Buffer()
self.listener = tf2_ros.TransformListener(self.tfBuffer, self)
self.T = None
self.stamp = None
self.running = True
self.thread = threading.Thread(target=self.update_loop)
self.thread.start()
self.trans = None
def update_loop(self):
while self.running:
try:
self.trans = self.tfBuffer.lookup_transform(self.parent_name, self.child_name, rclpy.time.Time(), rclpy.time.Duration(seconds=0.1))
except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException) as e:
print(e)
time.sleep(0.01)
def get_pose(self):
if self.trans is None:
return None
else:
translation = [self.trans.transform.translation.x, self.trans.transform.translation.y, self.trans.transform.translation.z]
rotation = [self.trans.transform.rotation.x, self.trans.transform.rotation.y, self.trans.transform.rotation.z, self.trans.transform.rotation.w]
self.T = np.eye(4)
self.T[0:3, 0:3] = R.from_quat(rotation).as_matrix()
self.T[:3, 3] = translation
self.stamp = self.trans.header.stamp.nanosec * 1e-9 + self.trans.header.stamp.sec
return self.T
def close(self):
self.running = False
self.thread.join()
self.destroy_node()