Go2Py_SIM/Go2Py/utils/point_cloud2.py

329 lines
13 KiB
Python

# Copyright 2008 Willow Garage, Inc.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Serialization of sensor_msgs.PointCloud2 messages.
Author: Tim Field
ROS 2 port by Sebastian Grans
File originally ported from:
https://github.com/ros/common_msgs/blob/f48b00d43cdb82ed9367e0956db332484f676598/
sensor_msgs/src/sensor_msgs/point_cloud2.py
"""
import array
from collections import namedtuple
import sys
from typing import Iterable, List, NamedTuple, Optional
import numpy as np
try:
from numpy.lib.recfunctions import (structured_to_unstructured, unstructured_to_structured)
except ImportError:
from sensor_msgs_py.numpy_compat import (structured_to_unstructured,
unstructured_to_structured)
from sensor_msgs.msg import PointCloud2, PointField
from std_msgs.msg import Header
_DATATYPES = {}
_DATATYPES[PointField.INT8] = np.dtype(np.int8)
_DATATYPES[PointField.UINT8] = np.dtype(np.uint8)
_DATATYPES[PointField.INT16] = np.dtype(np.int16)
_DATATYPES[PointField.UINT16] = np.dtype(np.uint16)
_DATATYPES[PointField.INT32] = np.dtype(np.int32)
_DATATYPES[PointField.UINT32] = np.dtype(np.uint32)
_DATATYPES[PointField.FLOAT32] = np.dtype(np.float32)
_DATATYPES[PointField.FLOAT64] = np.dtype(np.float64)
DUMMY_FIELD_PREFIX = 'unnamed_field'
def read_points(
cloud: PointCloud2,
field_names: Optional[List[str]] = None,
skip_nans: bool = False,
uvs: Optional[Iterable] = None,
reshape_organized_cloud: bool = False) -> np.ndarray:
"""
Read points from a sensor_msgs.PointCloud2 message.
:param cloud: The point cloud to read from sensor_msgs.PointCloud2.
:param field_names: The names of fields to read. If None, read all fields.
(Type: Iterable, Default: None)
:param skip_nans: If True, then don't return any point with a NaN value.
(Type: Bool, Default: False)
:param uvs: If specified, then only return the points at the given
coordinates. (Type: Iterable, Default: None)
:param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set.
:return: Structured NumPy array containing all points.
"""
assert isinstance(cloud, PointCloud2), \
'Cloud is not a sensor_msgs.msg.PointCloud2'
# Cast bytes to numpy array
points = np.ndarray(
shape=(cloud.width * cloud.height, ),
dtype=dtype_from_fields(cloud.fields, point_step=cloud.point_step),
buffer=cloud.data)
# Keep only the requested fields
if field_names is not None:
assert all(field_name in points.dtype.names for field_name in field_names), \
'Requests field is not in the fields of the PointCloud!'
# Mask fields
points = points[list(field_names)]
# Swap array if byte order does not match
if bool(sys.byteorder != 'little') != bool(cloud.is_bigendian):
points = points.byteswap(inplace=True)
# Check if we want to drop points with nan values
if skip_nans and not cloud.is_dense:
# Init mask which selects all points
not_nan_mask = np.ones(len(points), dtype=bool)
for field_name in points.dtype.names:
# Only keep points without any non values in the mask
not_nan_mask = np.logical_and(
not_nan_mask, ~np.isnan(points[field_name]))
# Select these points
points = points[not_nan_mask]
# Select points indexed by the uvs field
if uvs is not None:
# Don't convert to numpy array if it is already one
if not isinstance(uvs, np.ndarray):
uvs = np.fromiter(uvs, int)
# Index requested points
points = points[uvs]
# Cast into 2d array if cloud is 'organized'
if reshape_organized_cloud and cloud.height > 1:
points = points.reshape(cloud.width, cloud.height)
return points
def read_points_numpy(
cloud: PointCloud2,
field_names: Optional[List[str]] = None,
skip_nans: bool = False,
uvs: Optional[Iterable] = None,
reshape_organized_cloud: bool = False) -> np.ndarray:
"""
Read equally typed fields from sensor_msgs.PointCloud2 message as a unstructured numpy array.
This method is better suited if one wants to perform math operations
on e.g. all x,y,z fields.
But it is limited to fields with the same dtype as unstructured numpy arrays
only contain one dtype.
:param cloud: The point cloud to read from sensor_msgs.PointCloud2.
:param field_names: The names of fields to read. If None, read all fields.
(Type: Iterable, Default: None)
:param skip_nans: If True, then don't return any point with a NaN value.
(Type: Bool, Default: False)
:param uvs: If specified, then only return the points at the given
coordinates. (Type: Iterable, Default: None)
:param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set.
:return: Numpy array containing all points.
"""
assert all(cloud.fields[0].datatype == field.datatype for field in cloud.fields[1:]), \
'All fields need to have the same datatype. Use `read_points()` otherwise.'
structured_numpy_array = read_points(
cloud, field_names, skip_nans, uvs, reshape_organized_cloud)
return structured_to_unstructured(structured_numpy_array)
def read_points_list(
cloud: PointCloud2,
field_names: Optional[List[str]] = None,
skip_nans: bool = False,
uvs: Optional[Iterable] = None) -> List[NamedTuple]:
"""
Read points from a sensor_msgs.PointCloud2 message.
This function returns a list of namedtuples. It operates on top of the
read_points method. For more efficient access use read_points directly.
:param cloud: The point cloud to read from. (Type: sensor_msgs.PointCloud2)
:param field_names: The names of fields to read. If None, read all fields.
(Type: Iterable, Default: None)
:param skip_nans: If True, then don't return any point with a NaN value.
(Type: Bool, Default: False)
:param uvs: If specified, then only return the points at the given
coordinates. (Type: Iterable, Default: None]
:return: List of namedtuples containing the values for each point
"""
assert isinstance(cloud, PointCloud2), \
'cloud is not a sensor_msgs.msg.PointCloud2'
if field_names is None:
field_names = [f.name for f in cloud.fields]
Point = namedtuple('Point', field_names)
return [Point._make(p) for p in read_points(cloud, field_names,
skip_nans, uvs)]
def dtype_from_fields(fields: Iterable[PointField], point_step: Optional[int] = None) -> np.dtype:
"""
Convert a Iterable of sensor_msgs.msg.PointField messages to a np.dtype.
:param fields: The point cloud fields.
(Type: iterable of sensor_msgs.msg.PointField)
:param point_step: Point step size in bytes. Calculated from the given fields by default.
(Type: optional of integer)
:returns: NumPy datatype
"""
# Create a lists containing the names, offsets and datatypes of all fields
field_names = []
field_offsets = []
field_datatypes = []
for i, field in enumerate(fields):
# Datatype as numpy datatype
datatype = _DATATYPES[field.datatype]
# Name field
if field.name == '':
name = f'{DUMMY_FIELD_PREFIX}_{i}'
else:
name = field.name
# Handle fields with count > 1 by creating subfields with a suffix consiting
# of "_" followed by the subfield counter [0 -> (count - 1)]
assert field.count > 0, "Can't process fields with count = 0."
for a in range(field.count):
# Add suffix if we have multiple subfields
if field.count > 1:
subfield_name = f'{name}_{a}'
else:
subfield_name = name
assert subfield_name not in field_names, 'Duplicate field names are not allowed!'
field_names.append(subfield_name)
# Create new offset that includes subfields
field_offsets.append(field.offset + a * datatype.itemsize)
field_datatypes.append(datatype.str)
# Create dtype
dtype_dict = {
'names': field_names,
'formats': field_datatypes,
'offsets': field_offsets
}
if point_step is not None:
dtype_dict['itemsize'] = point_step
return np.dtype(dtype_dict)
def create_cloud(
header: Header,
fields: Iterable[PointField],
points: Iterable) -> PointCloud2:
"""
Create a sensor_msgs.msg.PointCloud2 message.
:param header: The point cloud header. (Type: std_msgs.msg.Header)
:param fields: The point cloud fields.
(Type: iterable of sensor_msgs.msg.PointField)
:param points: The point cloud points. List of iterables, i.e. one iterable
for each point, with the elements of each iterable being the
values of the fields for that point (in the same order as
the fields parameter)
:return: The point cloud as sensor_msgs.msg.PointCloud2
"""
# Check if input is numpy array
if isinstance(points, np.ndarray):
# Check if this is an unstructured array
if points.dtype.names is None:
assert all(fields[0].datatype == field.datatype for field in fields[1:]), \
'All fields need to have the same datatype. Pass a structured NumPy array \
with multiple dtypes otherwise.'
# Convert unstructured to structured array
points = unstructured_to_structured(
points,
dtype=dtype_from_fields(fields))
else:
assert points.dtype == dtype_from_fields(fields), \
'PointFields and structured NumPy array dtype do not match for all fields! \
Check their field order, names and types.'
else:
# Cast python objects to structured NumPy array (slow)
points = np.array(
# Points need to be tuples in the structured array
list(map(tuple, points)),
dtype=dtype_from_fields(fields))
# Handle organized clouds
assert len(points.shape) <= 2, \
'Too many dimensions for organized cloud! \
Points can only be organized in max. two dimensional space'
height = 1
width = points.shape[0]
# Check if input points are an organized cloud (2D array of points)
if len(points.shape) == 2:
height = points.shape[1]
# Convert numpy points to array.array
memory_view = memoryview(points)
casted = memory_view.cast('B')
array_array = array.array('B')
array_array.frombytes(casted)
# Put everything together
cloud = PointCloud2(
header=header,
height=height,
width=width,
is_dense=False,
is_bigendian=sys.byteorder != 'little',
fields=fields,
point_step=points.dtype.itemsize,
row_step=(points.dtype.itemsize * width))
# Set cloud via property instead of the constructor because of the bug described in
# https://github.com/ros2/common_interfaces/issues/176
cloud.data = array_array
return cloud
def create_cloud_xyz32(header: Header, points: Iterable) -> PointCloud2:
"""
Create a sensor_msgs.msg.PointCloud2 message with (x, y, z) fields.
:param header: The point cloud header. (Type: std_msgs.msg.Header)
:param points: The point cloud points. (Type: Iterable)
:return: The point cloud as sensor_msgs.msg.PointCloud2.
"""
fields = [PointField(name='x', offset=0,
datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4,
datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8,
datatype=PointField.FLOAT32, count=1)]
return create_cloud(header, fields, points)