# Copyright 2008 Willow Garage, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # * Neither the name of the Willow Garage, Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. """ Serialization of sensor_msgs.PointCloud2 messages. Author: Tim Field ROS 2 port by Sebastian Grans File originally ported from: https://github.com/ros/common_msgs/blob/f48b00d43cdb82ed9367e0956db332484f676598/ sensor_msgs/src/sensor_msgs/point_cloud2.py """ import array from collections import namedtuple import sys from typing import Iterable, List, NamedTuple, Optional import numpy as np try: from numpy.lib.recfunctions import (structured_to_unstructured, unstructured_to_structured) except ImportError: from sensor_msgs_py.numpy_compat import (structured_to_unstructured, unstructured_to_structured) from sensor_msgs.msg import PointCloud2, PointField from std_msgs.msg import Header _DATATYPES = {} _DATATYPES[PointField.INT8] = np.dtype(np.int8) _DATATYPES[PointField.UINT8] = np.dtype(np.uint8) _DATATYPES[PointField.INT16] = np.dtype(np.int16) _DATATYPES[PointField.UINT16] = np.dtype(np.uint16) _DATATYPES[PointField.INT32] = np.dtype(np.int32) _DATATYPES[PointField.UINT32] = np.dtype(np.uint32) _DATATYPES[PointField.FLOAT32] = np.dtype(np.float32) _DATATYPES[PointField.FLOAT64] = np.dtype(np.float64) DUMMY_FIELD_PREFIX = 'unnamed_field' def read_points( cloud: PointCloud2, field_names: Optional[List[str]] = None, skip_nans: bool = False, uvs: Optional[Iterable] = None, reshape_organized_cloud: bool = False) -> np.ndarray: """ Read points from a sensor_msgs.PointCloud2 message. :param cloud: The point cloud to read from sensor_msgs.PointCloud2. :param field_names: The names of fields to read. If None, read all fields. (Type: Iterable, Default: None) :param skip_nans: If True, then don't return any point with a NaN value. (Type: Bool, Default: False) :param uvs: If specified, then only return the points at the given coordinates. (Type: Iterable, Default: None) :param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set. :return: Structured NumPy array containing all points. """ assert isinstance(cloud, PointCloud2), \ 'Cloud is not a sensor_msgs.msg.PointCloud2' # Cast bytes to numpy array points = np.ndarray( shape=(cloud.width * cloud.height, ), dtype=dtype_from_fields(cloud.fields, point_step=cloud.point_step), buffer=cloud.data) # Keep only the requested fields if field_names is not None: assert all(field_name in points.dtype.names for field_name in field_names), \ 'Requests field is not in the fields of the PointCloud!' # Mask fields points = points[list(field_names)] # Swap array if byte order does not match if bool(sys.byteorder != 'little') != bool(cloud.is_bigendian): points = points.byteswap(inplace=True) # Check if we want to drop points with nan values if skip_nans and not cloud.is_dense: # Init mask which selects all points not_nan_mask = np.ones(len(points), dtype=bool) for field_name in points.dtype.names: # Only keep points without any non values in the mask not_nan_mask = np.logical_and( not_nan_mask, ~np.isnan(points[field_name])) # Select these points points = points[not_nan_mask] # Select points indexed by the uvs field if uvs is not None: # Don't convert to numpy array if it is already one if not isinstance(uvs, np.ndarray): uvs = np.fromiter(uvs, int) # Index requested points points = points[uvs] # Cast into 2d array if cloud is 'organized' if reshape_organized_cloud and cloud.height > 1: points = points.reshape(cloud.width, cloud.height) return points def read_points_numpy( cloud: PointCloud2, field_names: Optional[List[str]] = None, skip_nans: bool = False, uvs: Optional[Iterable] = None, reshape_organized_cloud: bool = False) -> np.ndarray: """ Read equally typed fields from sensor_msgs.PointCloud2 message as a unstructured numpy array. This method is better suited if one wants to perform math operations on e.g. all x,y,z fields. But it is limited to fields with the same dtype as unstructured numpy arrays only contain one dtype. :param cloud: The point cloud to read from sensor_msgs.PointCloud2. :param field_names: The names of fields to read. If None, read all fields. (Type: Iterable, Default: None) :param skip_nans: If True, then don't return any point with a NaN value. (Type: Bool, Default: False) :param uvs: If specified, then only return the points at the given coordinates. (Type: Iterable, Default: None) :param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set. :return: Numpy array containing all points. """ assert all(cloud.fields[0].datatype == field.datatype for field in cloud.fields[1:]), \ 'All fields need to have the same datatype. Use `read_points()` otherwise.' structured_numpy_array = read_points( cloud, field_names, skip_nans, uvs, reshape_organized_cloud) return structured_to_unstructured(structured_numpy_array) def read_points_list( cloud: PointCloud2, field_names: Optional[List[str]] = None, skip_nans: bool = False, uvs: Optional[Iterable] = None) -> List[NamedTuple]: """ Read points from a sensor_msgs.PointCloud2 message. This function returns a list of namedtuples. It operates on top of the read_points method. For more efficient access use read_points directly. :param cloud: The point cloud to read from. (Type: sensor_msgs.PointCloud2) :param field_names: The names of fields to read. If None, read all fields. (Type: Iterable, Default: None) :param skip_nans: If True, then don't return any point with a NaN value. (Type: Bool, Default: False) :param uvs: If specified, then only return the points at the given coordinates. (Type: Iterable, Default: None] :return: List of namedtuples containing the values for each point """ assert isinstance(cloud, PointCloud2), \ 'cloud is not a sensor_msgs.msg.PointCloud2' if field_names is None: field_names = [f.name for f in cloud.fields] Point = namedtuple('Point', field_names) return [Point._make(p) for p in read_points(cloud, field_names, skip_nans, uvs)] def dtype_from_fields(fields: Iterable[PointField], point_step: Optional[int] = None) -> np.dtype: """ Convert a Iterable of sensor_msgs.msg.PointField messages to a np.dtype. :param fields: The point cloud fields. (Type: iterable of sensor_msgs.msg.PointField) :param point_step: Point step size in bytes. Calculated from the given fields by default. (Type: optional of integer) :returns: NumPy datatype """ # Create a lists containing the names, offsets and datatypes of all fields field_names = [] field_offsets = [] field_datatypes = [] for i, field in enumerate(fields): # Datatype as numpy datatype datatype = _DATATYPES[field.datatype] # Name field if field.name == '': name = f'{DUMMY_FIELD_PREFIX}_{i}' else: name = field.name # Handle fields with count > 1 by creating subfields with a suffix consiting # of "_" followed by the subfield counter [0 -> (count - 1)] assert field.count > 0, "Can't process fields with count = 0." for a in range(field.count): # Add suffix if we have multiple subfields if field.count > 1: subfield_name = f'{name}_{a}' else: subfield_name = name assert subfield_name not in field_names, 'Duplicate field names are not allowed!' field_names.append(subfield_name) # Create new offset that includes subfields field_offsets.append(field.offset + a * datatype.itemsize) field_datatypes.append(datatype.str) # Create dtype dtype_dict = { 'names': field_names, 'formats': field_datatypes, 'offsets': field_offsets } if point_step is not None: dtype_dict['itemsize'] = point_step return np.dtype(dtype_dict) def create_cloud( header: Header, fields: Iterable[PointField], points: Iterable) -> PointCloud2: """ Create a sensor_msgs.msg.PointCloud2 message. :param header: The point cloud header. (Type: std_msgs.msg.Header) :param fields: The point cloud fields. (Type: iterable of sensor_msgs.msg.PointField) :param points: The point cloud points. List of iterables, i.e. one iterable for each point, with the elements of each iterable being the values of the fields for that point (in the same order as the fields parameter) :return: The point cloud as sensor_msgs.msg.PointCloud2 """ # Check if input is numpy array if isinstance(points, np.ndarray): # Check if this is an unstructured array if points.dtype.names is None: assert all(fields[0].datatype == field.datatype for field in fields[1:]), \ 'All fields need to have the same datatype. Pass a structured NumPy array \ with multiple dtypes otherwise.' # Convert unstructured to structured array points = unstructured_to_structured( points, dtype=dtype_from_fields(fields)) else: assert points.dtype == dtype_from_fields(fields), \ 'PointFields and structured NumPy array dtype do not match for all fields! \ Check their field order, names and types.' else: # Cast python objects to structured NumPy array (slow) points = np.array( # Points need to be tuples in the structured array list(map(tuple, points)), dtype=dtype_from_fields(fields)) # Handle organized clouds assert len(points.shape) <= 2, \ 'Too many dimensions for organized cloud! \ Points can only be organized in max. two dimensional space' height = 1 width = points.shape[0] # Check if input points are an organized cloud (2D array of points) if len(points.shape) == 2: height = points.shape[1] # Convert numpy points to array.array memory_view = memoryview(points) casted = memory_view.cast('B') array_array = array.array('B') array_array.frombytes(casted) # Put everything together cloud = PointCloud2( header=header, height=height, width=width, is_dense=False, is_bigendian=sys.byteorder != 'little', fields=fields, point_step=points.dtype.itemsize, row_step=(points.dtype.itemsize * width)) # Set cloud via property instead of the constructor because of the bug described in # https://github.com/ros2/common_interfaces/issues/176 cloud.data = array_array return cloud def create_cloud_xyz32(header: Header, points: Iterable) -> PointCloud2: """ Create a sensor_msgs.msg.PointCloud2 message with (x, y, z) fields. :param header: The point cloud header. (Type: std_msgs.msg.Header) :param points: The point cloud points. (Type: Iterable) :return: The point cloud as sensor_msgs.msg.PointCloud2. """ fields = [PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1), PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1), PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1)] return create_cloud(header, fields, points)