2023-11-08 16:20:02 +08:00
|
|
|
|
'''
|
|
|
|
|
基于两个D* lite的实现
|
|
|
|
|
按照原始算法的伪代码
|
|
|
|
|
自己写的D* lite
|
|
|
|
|
'''
|
|
|
|
|
|
|
|
|
|
import math
|
2023-11-16 15:28:08 +08:00
|
|
|
|
import os
|
|
|
|
|
import pickle
|
2023-11-08 16:20:02 +08:00
|
|
|
|
import numpy as np
|
|
|
|
|
import heapq
|
|
|
|
|
|
|
|
|
|
from matplotlib import pyplot as plt
|
2023-11-20 17:47:39 +08:00
|
|
|
|
from matplotlib.patches import Polygon
|
|
|
|
|
import matplotlib.colors as mcolors
|
|
|
|
|
|
|
|
|
|
from robowaiter.robot.robot import root_path # 项目根目录路径
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
|
|
|
|
def diagonal_distance(start, end): #
|
|
|
|
|
return max(abs(start[0] - end[0]), abs(start[1] - end[1]))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def manhattan_distance(start, end): # 曼哈顿距离
|
|
|
|
|
return abs(start[0] - end[0]) + abs(start[1] - end[1])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def euclidean_distance(start, end): # 欧式距离
|
|
|
|
|
# return np.linalg.norm(start-end)
|
2023-11-13 21:52:15 +08:00
|
|
|
|
# return math.sqrt((start[0] - end[0]) ** 2 + (start[1] - end[1]) ** 2)
|
|
|
|
|
return math.hypot(start[0] - end[0], start[1] - end[1])
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def heuristic(start, end, name='euclidean'):
|
|
|
|
|
if name == 'diagonal':
|
|
|
|
|
return diagonal_distance(start, end)
|
|
|
|
|
elif name == 'euclidean':
|
|
|
|
|
return euclidean_distance(start, end)
|
|
|
|
|
elif name == 'manhattan':
|
|
|
|
|
return manhattan_distance(start, end)
|
|
|
|
|
else:
|
|
|
|
|
raise Exception('Error heuristic name!')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Priority:
|
|
|
|
|
'''
|
|
|
|
|
优先级类
|
|
|
|
|
'''
|
|
|
|
|
|
|
|
|
|
def __init__(self, k1, k2):
|
|
|
|
|
self.k1 = k1
|
|
|
|
|
self.k2 = k2
|
|
|
|
|
|
|
|
|
|
def __lt__(self, other): # 定义对象的 < 运算
|
|
|
|
|
return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 < other.k2)
|
|
|
|
|
|
|
|
|
|
def __le__(self, other): # 定于对象的 <= 运算
|
|
|
|
|
return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 <= other.k2)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Node:
|
|
|
|
|
'''
|
|
|
|
|
节点类
|
|
|
|
|
'''
|
|
|
|
|
|
|
|
|
|
def __init__(self, pos: (int, int), priority: Priority):
|
|
|
|
|
self.pos = pos # X Y
|
|
|
|
|
self.priority = priority
|
|
|
|
|
|
|
|
|
|
def __lt__(self, other): # 定义对象的 < 运算
|
|
|
|
|
return self.priority < other.priority
|
|
|
|
|
|
|
|
|
|
def __le__(self, other): # 定于对象的 <= 运算
|
|
|
|
|
return self.priority <= other.priority
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PriorityQueue:
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.queue = [] # 节点队列
|
|
|
|
|
self.nodes = [] # 节点位置列表
|
|
|
|
|
|
|
|
|
|
def is_empty(self):
|
|
|
|
|
# 队列判空
|
|
|
|
|
return len(self.queue) == 0
|
|
|
|
|
|
|
|
|
|
def top(self):
|
|
|
|
|
return self.queue[0].pos
|
|
|
|
|
|
|
|
|
|
def top_key(self):
|
|
|
|
|
if self.is_empty():
|
|
|
|
|
return Priority(float('inf'), float('inf'))
|
|
|
|
|
return self.queue[0].priority
|
|
|
|
|
|
|
|
|
|
def pop(self):
|
|
|
|
|
# 取出第一个节点
|
|
|
|
|
node = heapq.heappop(self.queue)
|
|
|
|
|
self.nodes.remove(node.pos)
|
|
|
|
|
return node
|
|
|
|
|
|
|
|
|
|
def insert(self, pos, priority):
|
|
|
|
|
# 创建并添加节点
|
|
|
|
|
node = Node(pos, priority)
|
|
|
|
|
heapq.heappush(self.queue, node)
|
|
|
|
|
self.nodes.append(pos)
|
|
|
|
|
|
|
|
|
|
def remove(self, pos):
|
|
|
|
|
# 删除指定节点并重新排序
|
|
|
|
|
self.queue = [n for n in self.queue if n.pos != pos]
|
|
|
|
|
heapq.heapify(self.queue) # 重排序
|
|
|
|
|
self.nodes.remove(pos)
|
|
|
|
|
|
|
|
|
|
def update(self, pos, priority):
|
|
|
|
|
# 更新指定位置的priority值
|
|
|
|
|
for n in self.queue:
|
|
|
|
|
if n.pos == pos:
|
|
|
|
|
n.priority = priority
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DStarLite:
|
|
|
|
|
def __init__(self,
|
2023-11-16 15:28:08 +08:00
|
|
|
|
map: np.array([int, int]),
|
2023-11-20 17:47:39 +08:00
|
|
|
|
area_range, # [x_min, x_max, y_min, y_max] 实际坐标范围
|
|
|
|
|
scale_ratio=5, # 地图缩放率
|
|
|
|
|
react_radius=200, # 反应半径
|
|
|
|
|
vision_radius=math.pi/3, # 视角半径
|
|
|
|
|
dyna_obs_radius=36, # dyna_obs实际身位半径
|
|
|
|
|
max_path_length=2000 # 最大路径步数
|
2023-11-08 16:20:02 +08:00
|
|
|
|
):
|
|
|
|
|
|
|
|
|
|
self.map = map
|
2023-11-15 15:40:57 +08:00
|
|
|
|
self.background = map.copy()
|
2023-11-08 16:20:02 +08:00
|
|
|
|
self.X = map.shape[0]
|
|
|
|
|
self.Y = map.shape[1]
|
|
|
|
|
(self.x_min, self.x_max, self.y_min, self.y_max) = area_range
|
|
|
|
|
self.scale_ratio = scale_ratio
|
2023-11-20 17:47:39 +08:00
|
|
|
|
self.react_radius = math.floor(react_radius / scale_ratio)
|
|
|
|
|
self.vision_radius = vision_radius
|
|
|
|
|
self.dyna_obs_radius = math.floor(dyna_obs_radius / scale_ratio) # dyna_obs缩放后身位半径
|
|
|
|
|
self.max_path_length = max_path_length
|
2023-11-15 15:40:57 +08:00
|
|
|
|
|
|
|
|
|
self.dyna_obs_list = [] # dyna_obs位置列表 [(x, y)]
|
|
|
|
|
self.dyna_obs_occupy = [] # dyna_obs占用位置列表
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
|
|
|
|
# free:0, obs:1, dyna_obs:2
|
|
|
|
|
self.idx_to_object = {
|
|
|
|
|
0: "free",
|
|
|
|
|
1: "obstacle",
|
|
|
|
|
2: "dynamic obstacle"
|
|
|
|
|
}
|
|
|
|
|
self.object_to_idx = dict(zip(self.idx_to_object.values(), self.idx_to_object.keys()))
|
|
|
|
|
self.object_to_cost = {
|
|
|
|
|
"free": 0,
|
|
|
|
|
"obstacle": float('inf'),
|
2023-11-20 17:47:39 +08:00
|
|
|
|
"dynamic obstacle": float('inf')
|
2023-11-08 16:20:02 +08:00
|
|
|
|
}
|
2023-11-16 15:28:08 +08:00
|
|
|
|
|
2023-11-20 17:47:39 +08:00
|
|
|
|
# 读取 cost_map 文件
|
|
|
|
|
file_cost_map = os.path.join(root_path, 'robowaiter/algos/navigator/costMap_' + str(self.scale_ratio) + '.pkl')
|
|
|
|
|
if os.path.exists(file_cost_map):
|
|
|
|
|
with open(file_cost_map, 'rb') as file:
|
2023-11-16 15:28:08 +08:00
|
|
|
|
cost_map = pickle.load(file)
|
|
|
|
|
self.cost_map = cost_map
|
2023-11-15 15:40:57 +08:00
|
|
|
|
self.cost_background = self.cost_map.copy()
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
|
|
|
|
self.s_start = None # (int,int) 必须是元组(元组可以直接当作矩阵索引)
|
|
|
|
|
self.s_goal = None # (int,int)
|
|
|
|
|
self.s_last = None # (int,int)
|
|
|
|
|
self.U = PriorityQueue()
|
|
|
|
|
self.k_m = 0
|
|
|
|
|
self.rhs = np.ones((self.X, self.Y)) * np.inf
|
2023-11-20 17:47:39 +08:00
|
|
|
|
# self.rhs = np.ones_like(self.map) * 1000
|
|
|
|
|
# self.rhs[self.map==self.object_to_idx['obstacle']] = np.inf
|
2023-11-08 16:20:02 +08:00
|
|
|
|
self.g = self.rhs.copy()
|
|
|
|
|
self.path = []
|
|
|
|
|
|
2023-11-15 15:40:57 +08:00
|
|
|
|
# def set_map(self, map_):
|
|
|
|
|
# '''
|
|
|
|
|
# 设置map 和 cost_map
|
|
|
|
|
# '''
|
|
|
|
|
# self.map = map_
|
2023-11-16 15:28:08 +08:00
|
|
|
|
# self.background = map_.copy()
|
2023-11-15 15:40:57 +08:00
|
|
|
|
# self.X = map_.shape[0]
|
|
|
|
|
# self.Y = map_.shape[1]
|
|
|
|
|
# self.compute_cost_map()
|
2023-11-16 15:28:08 +08:00
|
|
|
|
# self.cost_background = self.cost_map.copy()
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
|
|
|
|
def reset(self):
|
|
|
|
|
'''
|
|
|
|
|
(完成一次导航后)
|
|
|
|
|
重置 1.环境变量
|
|
|
|
|
2.dstar_lite变量
|
|
|
|
|
'''
|
|
|
|
|
# env reset
|
|
|
|
|
self.map = self.background.copy()
|
2023-11-15 15:40:57 +08:00
|
|
|
|
self.cost_map = self.cost_background.copy()
|
2023-11-08 16:20:02 +08:00
|
|
|
|
self.dyna_obs_list.clear()
|
2023-11-15 15:40:57 +08:00
|
|
|
|
self.dyna_obs_occupy.clear()
|
2023-11-08 16:20:02 +08:00
|
|
|
|
self.path.clear()
|
|
|
|
|
# dstar_lite reset
|
|
|
|
|
self.s_start = None
|
|
|
|
|
self.s_goal = None
|
|
|
|
|
self.s_last = None
|
|
|
|
|
self.U = PriorityQueue()
|
|
|
|
|
self.k_m = 0
|
|
|
|
|
self.rhs = np.ones((self.X, self.Y)) * np.inf
|
|
|
|
|
self.g = self.rhs.copy()
|
|
|
|
|
|
|
|
|
|
def calculate_key(self, s: (int, int)):
|
|
|
|
|
'''
|
|
|
|
|
计算 位置s 的 key1, key2
|
|
|
|
|
:returns:
|
|
|
|
|
Priority(k1, k2): 可比较的对象
|
|
|
|
|
'''
|
|
|
|
|
k1 = min(self.g[s], self.rhs[s]) + heuristic(self.s_start, s) + self.k_m
|
|
|
|
|
k2 = min(self.g[s], self.rhs[s])
|
|
|
|
|
return Priority(k1, k2)
|
|
|
|
|
|
2023-11-20 17:47:39 +08:00
|
|
|
|
def c(self, u: (int, int), v: (int, int), c_old=None) -> float:
|
2023-11-08 16:20:02 +08:00
|
|
|
|
'''
|
|
|
|
|
计算节点间的 路径代价 和 目标位置代价 (目标位置代价为0时采用路径代价)
|
|
|
|
|
(因为是终点->起点的扩展方向,因此v是node,u是v扩展的neighbor)
|
|
|
|
|
Args:
|
|
|
|
|
u: from pos
|
|
|
|
|
v: to pos
|
2023-11-20 17:47:39 +08:00
|
|
|
|
c_old: 节点v的旧cost
|
2023-11-08 16:20:02 +08:00
|
|
|
|
'''
|
2023-11-20 17:47:39 +08:00
|
|
|
|
if u == v:
|
|
|
|
|
return 0.0
|
|
|
|
|
if c_old is not None:
|
|
|
|
|
obj_cost = c_old
|
2023-11-08 16:20:02 +08:00
|
|
|
|
else:
|
|
|
|
|
obj_cost = self.cost_map[v]
|
|
|
|
|
if obj_cost > 0:
|
|
|
|
|
return obj_cost
|
|
|
|
|
return heuristic(u, v)
|
|
|
|
|
|
|
|
|
|
def contain(self, u: (int, int)):
|
|
|
|
|
'''
|
|
|
|
|
判断 节点u 是否在队列中
|
|
|
|
|
'''
|
|
|
|
|
return u in self.U.nodes
|
|
|
|
|
|
|
|
|
|
def update_vertex(self, u: (int, int)):
|
|
|
|
|
'''
|
|
|
|
|
判定节点状态, 更新队列
|
|
|
|
|
不一致且在队列 --> 更新key
|
|
|
|
|
不一致且不在队列 --> 计算key并加入队列
|
|
|
|
|
一致且在队列 --> 移出队列
|
|
|
|
|
'''
|
|
|
|
|
if self.g[u] != self.rhs[u] and self.contain(u):
|
|
|
|
|
self.U.update(u, self.calculate_key(u))
|
|
|
|
|
elif self.g[u] != self.rhs[u] and not self.contain(u):
|
|
|
|
|
self.U.insert(u, self.calculate_key(u))
|
|
|
|
|
elif self.g[u] == self.rhs[u] and self.contain(u):
|
|
|
|
|
self.U.remove(u)
|
|
|
|
|
|
|
|
|
|
def compute_shortest_path(self):
|
|
|
|
|
'''
|
|
|
|
|
计算最短路径
|
|
|
|
|
'''
|
2023-11-20 17:47:39 +08:00
|
|
|
|
|
|
|
|
|
c_map = self.map.copy()
|
|
|
|
|
|
2023-11-08 16:20:02 +08:00
|
|
|
|
while self.U.top_key() < self.calculate_key(self.s_start) or self.rhs[self.s_start] > self.g[self.s_start]:
|
|
|
|
|
u = self.U.top()
|
2023-11-20 17:47:39 +08:00
|
|
|
|
c_map[u] = 10
|
2023-11-08 16:20:02 +08:00
|
|
|
|
k_old = self.U.top_key()
|
|
|
|
|
k_new = self.calculate_key(u)
|
|
|
|
|
if k_old < k_new:
|
|
|
|
|
self.U.update(u, k_new)
|
|
|
|
|
elif self.g[u] > self.rhs[u]: # 过一致
|
|
|
|
|
self.g[u] = self.rhs[u]
|
|
|
|
|
self.U.remove(u)
|
|
|
|
|
pred = self.get_neighbors(u)
|
|
|
|
|
for s in pred:
|
|
|
|
|
if s != self.s_goal:
|
|
|
|
|
self.rhs[s] = min(self.rhs[s], self.c(s, u) + self.g[u])
|
|
|
|
|
self.update_vertex(s)
|
|
|
|
|
else: # 欠一致
|
|
|
|
|
g_old = self.g[u]
|
|
|
|
|
self.g[u] = float('inf')
|
|
|
|
|
pred = self.get_neighbors(u)
|
|
|
|
|
for s in pred + [u]:
|
|
|
|
|
if self.rhs[s] == self.c(s, u) + g_old:
|
|
|
|
|
if s != self.s_goal:
|
|
|
|
|
succ = self.get_neighbors(s)
|
|
|
|
|
self.rhs[s] = min([self.c(s, s_) + self.g[s_] for s_ in succ])
|
|
|
|
|
self.update_vertex(s)
|
2023-11-20 17:47:39 +08:00
|
|
|
|
# self.draw_rhs(self.rhs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
2023-11-13 21:52:15 +08:00
|
|
|
|
def _planning(self, s_start, s_goal, dyna_obs, debug=False):
|
2023-11-08 16:20:02 +08:00
|
|
|
|
'''
|
|
|
|
|
规划路径(实际实现)
|
|
|
|
|
Args:
|
|
|
|
|
dyna_obs: 动态障碍物位置列表
|
|
|
|
|
step_num: 单次移动步数
|
|
|
|
|
'''
|
|
|
|
|
# 确保目标合法
|
|
|
|
|
if not self.in_bounds_without_obstacle(s_goal):
|
2023-11-13 21:52:15 +08:00
|
|
|
|
return []
|
2023-11-08 16:20:02 +08:00
|
|
|
|
# 第一次规划需要初始化rhs并将goal加入队列,计算最短路径
|
|
|
|
|
if self.s_goal is None:
|
|
|
|
|
self.s_start = tuple(s_start)
|
|
|
|
|
self.s_goal = tuple(s_goal)
|
|
|
|
|
self.s_last = tuple(s_start)
|
|
|
|
|
self.rhs[tuple(s_goal)] = 0
|
|
|
|
|
self.U.insert(tuple(s_goal), Priority(k1=heuristic(tuple(s_start), tuple(s_goal)), k2=0))
|
2023-11-15 15:40:57 +08:00
|
|
|
|
changed_pos = self.update_map(dyna_obs=dyna_obs)
|
|
|
|
|
if changed_pos:
|
|
|
|
|
self.update_cost_map(changed_pos)
|
2023-11-08 16:20:02 +08:00
|
|
|
|
self.compute_shortest_path() # 计算最短路径
|
|
|
|
|
self.path = self.get_path()
|
2023-11-15 15:40:57 +08:00
|
|
|
|
return self.path
|
2023-11-08 16:20:02 +08:00
|
|
|
|
# 后续规划只更新起点,直接使用原路径(去掉已走过部分)
|
|
|
|
|
else:
|
|
|
|
|
self.s_start = tuple(s_start)
|
2023-11-15 15:40:57 +08:00
|
|
|
|
# 根据obs更新map, cost_map, edge_cost
|
|
|
|
|
changed_pos = self.update_map(dyna_obs=dyna_obs)
|
|
|
|
|
if changed_pos:
|
|
|
|
|
self.update_cost_map(changed_pos)
|
|
|
|
|
self.update_edge_costs(changed_pos)
|
|
|
|
|
# 若地图改变,重新计算最短路径
|
|
|
|
|
self.compute_shortest_path()
|
|
|
|
|
self.path = self.get_path()
|
|
|
|
|
return self.path
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
2023-11-13 21:52:15 +08:00
|
|
|
|
def planning(self, s_start, s_goal, dyna_obs, debug=False):
|
2023-11-08 16:20:02 +08:00
|
|
|
|
'''
|
|
|
|
|
路径规划(供外部调用,处理实际坐标和地图坐标的转换)
|
|
|
|
|
'''
|
|
|
|
|
# 实际坐标 -> 地图坐标
|
|
|
|
|
s_start = self.real2map(s_start)
|
2023-11-16 15:28:08 +08:00
|
|
|
|
if self.s_goal is None:
|
|
|
|
|
s_goal = self.real2map(s_goal)
|
|
|
|
|
else:
|
|
|
|
|
s_goal = self.s_goal
|
2023-11-15 15:40:57 +08:00
|
|
|
|
dyna_obs = [self.real2map(obs, reachable_assurance=False) for obs in dyna_obs]
|
|
|
|
|
|
2023-11-13 21:52:15 +08:00
|
|
|
|
self._planning(s_start, s_goal, dyna_obs, debug)
|
2023-11-15 15:40:57 +08:00
|
|
|
|
|
2023-11-08 16:20:02 +08:00
|
|
|
|
# 地图坐标->实际坐标
|
|
|
|
|
path = [self.map2real(node) for node in self.path]
|
|
|
|
|
return path
|
|
|
|
|
|
|
|
|
|
def get_path(self):
|
|
|
|
|
'''
|
|
|
|
|
得到路径
|
|
|
|
|
Args:
|
2023-11-13 21:52:15 +08:00
|
|
|
|
step_num: 路径步数
|
2023-11-08 16:20:02 +08:00
|
|
|
|
return:
|
|
|
|
|
path: [(x, y), ...]
|
|
|
|
|
'''
|
|
|
|
|
if self.s_start is None or self.s_goal == self.s_start:
|
|
|
|
|
return []
|
|
|
|
|
path = []
|
|
|
|
|
cur = self.s_start
|
2023-11-20 17:47:39 +08:00
|
|
|
|
for i in range(self.max_path_length):
|
|
|
|
|
succ = [s_ for s_ in self.get_neighbors(cur)]
|
|
|
|
|
cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] + 20*(s_ in path) for s_ in succ])] # 避免抖动 (走重复的点会有惩罚)
|
|
|
|
|
# print(cur)
|
2023-11-08 16:20:02 +08:00
|
|
|
|
path.append(cur)
|
2023-11-20 17:47:39 +08:00
|
|
|
|
if cur == self.s_goal:
|
|
|
|
|
while cur in self.dyna_obs_occupy: # 确保目标未在dyna_obs范围中
|
|
|
|
|
cur = path.pop()
|
|
|
|
|
break
|
2023-11-08 16:20:02 +08:00
|
|
|
|
return path
|
|
|
|
|
|
|
|
|
|
def in_bounds_without_obstacle(self, pos):
|
|
|
|
|
'''
|
|
|
|
|
判断位置在地图范围内 且 不是静态障碍物
|
|
|
|
|
'''
|
|
|
|
|
(x, y) = pos
|
|
|
|
|
return 0 <= x < self.X and 0 <= y < self.Y and self.map[x, y] != self.object_to_idx["obstacle"]
|
|
|
|
|
|
|
|
|
|
def get_neighbors(self, pos, mode=8):
|
|
|
|
|
'''
|
|
|
|
|
获取邻居节点, 地图范围内
|
|
|
|
|
'''
|
|
|
|
|
(x_, y_) = pos
|
|
|
|
|
neighbors = [(x_ + 1, y_), (x_ - 1, y_), (x_, y_ + 1), (x_, y_ - 1), (x_ + 1, y_ + 1), (x_ - 1, y_ + 1),
|
|
|
|
|
(x_ + 1, y_ - 1), (x_ - 1, y_ - 1)]
|
|
|
|
|
neighbors = filter(self.in_bounds_without_obstacle, neighbors) # 确保位置在地图范围内 且 不是静态障碍物
|
|
|
|
|
return list(neighbors)
|
|
|
|
|
|
|
|
|
|
def compute_cost_map(self):
|
|
|
|
|
# 计算当前地图的cost_map
|
2023-11-16 15:28:08 +08:00
|
|
|
|
self.cost_map = np.zeros_like(self.map)
|
2023-11-08 16:20:02 +08:00
|
|
|
|
for idx, obj in self.idx_to_object.items():
|
|
|
|
|
self.cost_map[self.map == idx] = self.object_to_cost[obj]
|
|
|
|
|
|
2023-11-16 15:28:08 +08:00
|
|
|
|
# 扩张静态障碍物影响范围
|
|
|
|
|
obs_pos = np.where(self.map == self.object_to_idx['obstacle']) # 静态障碍物位置列表
|
|
|
|
|
for (x, y) in zip(obs_pos[0], obs_pos[1]):
|
|
|
|
|
start_x, end_x = max(x - 1, 0), min(x + 1, self.X - 1)
|
|
|
|
|
start_y, end_y = max(y - 1, 0), min(y + 1, self.Y - 1)
|
|
|
|
|
for cost in range(9, 0, -3):
|
|
|
|
|
for x_ in range(start_x, end_x + 1):
|
|
|
|
|
self.cost_map[x_, start_y] = max(self.cost_map[x_, start_y], cost)
|
|
|
|
|
for y_ in range(start_y + 1, end_y + 1):
|
|
|
|
|
self.cost_map[end_x, y_] = max(self.cost_map[end_x, y_], cost)
|
|
|
|
|
for x_ in range(end_x - 1, start_x - 1, -1):
|
|
|
|
|
self.cost_map[x_, end_y] = max(self.cost_map[x_, end_y], cost)
|
|
|
|
|
for y_ in range(end_y - 1, start_y, -1):
|
|
|
|
|
self.cost_map[start_x, y_] = max(self.cost_map[start_x, y_], cost)
|
|
|
|
|
start_x, end_x = max(start_x - 1, 0), min(end_x + 1, self.X - 1)
|
|
|
|
|
start_y, end_y = max(start_y - 1, 0), min(end_y + 1, self.Y - 1)
|
2023-11-15 15:40:57 +08:00
|
|
|
|
|
|
|
|
|
self.cost_background = self.cost_map.copy()
|
|
|
|
|
|
2023-11-08 16:20:02 +08:00
|
|
|
|
def update_map(self, dyna_obs):
|
|
|
|
|
'''
|
|
|
|
|
更新地图 和 动态障碍物列表
|
|
|
|
|
Args:
|
|
|
|
|
dyna_obs: 当前动态障碍物位置列表 [(x,y), ...]
|
|
|
|
|
return:
|
2023-11-20 17:47:39 +08:00
|
|
|
|
update_obj: 改变的位置列表 [(x, y, obj_idx, cost_old), ...]
|
2023-11-08 16:20:02 +08:00
|
|
|
|
'''
|
2023-11-16 15:28:08 +08:00
|
|
|
|
# dyna_obs没有变化 (集合set可以忽略元素在列表中的位置) 且 robot未在dyna_obs占用位置中
|
|
|
|
|
if set(dyna_obs) == set(self.dyna_obs_list) and self.s_start not in self.dyna_obs_occupy:
|
2023-11-08 16:20:02 +08:00
|
|
|
|
return []
|
|
|
|
|
|
2023-11-15 15:40:57 +08:00
|
|
|
|
# 当前dyna_obs占用位置列表
|
|
|
|
|
dyna_obs_occupy = []
|
2023-11-08 16:20:02 +08:00
|
|
|
|
for pos in dyna_obs:
|
2023-11-15 15:40:57 +08:00
|
|
|
|
dyna_obs_occupy.extend(self.get_occupy_pos(pos))
|
|
|
|
|
dyna_obs_occupy = [pos for i, pos in enumerate(dyna_obs_occupy) if pos not in dyna_obs_occupy[:i]] # 去除重复位置
|
2023-11-16 15:28:08 +08:00
|
|
|
|
# 转变为free 和 转变为dyna_obs的位置列表
|
2023-11-15 15:40:57 +08:00
|
|
|
|
changed_free = [pos for pos in self.dyna_obs_occupy if pos not in dyna_obs_occupy]
|
|
|
|
|
changed_obs = [pos for pos in dyna_obs_occupy if pos not in self.dyna_obs_occupy]
|
|
|
|
|
|
2023-11-08 16:20:02 +08:00
|
|
|
|
# 更新地图,计算changed_pos
|
|
|
|
|
changed_pos = []
|
|
|
|
|
for (x, y) in changed_free:
|
|
|
|
|
self.map[x, y] = self.object_to_idx['free']
|
2023-11-20 17:47:39 +08:00
|
|
|
|
changed_pos.append((x, y, self.object_to_idx["free"], self.cost_map[x, y]))
|
2023-11-28 22:29:31 +08:00
|
|
|
|
# changed_pos.append((x, y, self.object_to_idx["free"], self.object_to_idx["obstacle"]))
|
2023-11-08 16:20:02 +08:00
|
|
|
|
for (x, y) in changed_obs:
|
2023-11-28 22:29:31 +08:00
|
|
|
|
self.map[x, y] = self.object_to_idx['obstacle']
|
|
|
|
|
changed_pos.append((x, y, self.object_to_idx["obstacle"], self.cost_map[x, y]))
|
|
|
|
|
# changed_pos.append((x, y, self.object_to_idx["obstacle"], self.object_to_idx["free"]))
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
2023-11-15 15:40:57 +08:00
|
|
|
|
# 更新dyna_obs 位置列表 和 占用位置列表
|
2023-11-08 16:20:02 +08:00
|
|
|
|
self.dyna_obs_list = dyna_obs
|
2023-11-15 15:40:57 +08:00
|
|
|
|
self.dyna_obs_occupy = dyna_obs_occupy
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
|
|
|
|
return changed_pos
|
|
|
|
|
|
|
|
|
|
def get_occupy_pos(self, obs_pos):
|
|
|
|
|
'''
|
|
|
|
|
根据dyna_obs中心位置,计算其占用的所有网格位置
|
|
|
|
|
'''
|
|
|
|
|
(x, y) = obs_pos
|
2023-11-20 17:47:39 +08:00
|
|
|
|
occupy_radius = min(self.dyna_obs_radius,
|
|
|
|
|
int(euclidean_distance(obs_pos, self.s_start) - 1)) # 避免robot被dyna_obs的占用区域包裹住
|
|
|
|
|
# 圆形区域
|
|
|
|
|
occupy_pos = [(i, j) for i in range(x - occupy_radius, x + occupy_radius + 1)
|
2023-11-15 15:40:57 +08:00
|
|
|
|
for j in range(y - occupy_radius, y + occupy_radius + 1)
|
|
|
|
|
if euclidean_distance((i, j), obs_pos) < occupy_radius]
|
2023-11-20 17:47:39 +08:00
|
|
|
|
# # 等边三角形区域 TODO: 效果不及预期(三角形棱角容易导致robot卡住)
|
|
|
|
|
# triangle_occupy = self.triangle_occupy(obs=obs_pos, occupy_radius=occupy_radius)
|
|
|
|
|
# occupy_pos = [(i, j) for i in range(x - occupy_radius, x + occupy_radius + 1)
|
|
|
|
|
# for j in range(y - occupy_radius, y + occupy_radius + 1)
|
|
|
|
|
# if triangle_occupy.contains_point((i, j))]
|
2023-11-08 16:20:02 +08:00
|
|
|
|
occupy_pos = filter(self.in_bounds_without_obstacle, occupy_pos) # 确保位置在地图范围内 且 不是静态障碍物
|
|
|
|
|
return list(occupy_pos)
|
|
|
|
|
|
|
|
|
|
def update_cost_map(self, changed_pos):
|
|
|
|
|
'''
|
|
|
|
|
更新cost_map
|
|
|
|
|
Args:
|
|
|
|
|
changed_pos: 改变的位置列表 [x, y, idx]
|
|
|
|
|
'''
|
|
|
|
|
for (x, y, idx, _) in changed_pos:
|
2023-11-15 15:40:57 +08:00
|
|
|
|
if self.idx_to_object[idx] == 'free': # 空位的cost取决于离障碍物的距离
|
|
|
|
|
self.cost_map[x, y] = self.cost_background[x, y]
|
|
|
|
|
else:
|
|
|
|
|
self.cost_map[x, y] = self.object_to_cost[self.idx_to_object[idx]]
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
|
|
|
|
def update_edge_costs(self, changed_pos):
|
|
|
|
|
'''
|
|
|
|
|
重新计算edge_cost,更新受影响节点的rhs
|
|
|
|
|
Args:
|
|
|
|
|
changed_pos: 改变的位置列表 [(x, y, obj_idx_new, obj_idx_old)]
|
|
|
|
|
'''
|
|
|
|
|
if not changed_pos: return
|
|
|
|
|
self.k_m += heuristic(self.s_last, self.s_start, name='euclidean') # 使用欧式距离 更新km
|
|
|
|
|
self.s_last = self.s_start
|
|
|
|
|
for pos in changed_pos:
|
|
|
|
|
v = (pos[0], pos[1]) # to pos
|
2023-11-20 17:47:39 +08:00
|
|
|
|
c_old = pos[3] # 位置v的旧cost
|
2023-11-08 16:20:02 +08:00
|
|
|
|
pred_v = self.get_neighbors(v)
|
|
|
|
|
for u in pred_v:
|
2023-11-20 17:47:39 +08:00
|
|
|
|
c_old = self.c(u, v, c_old=c_old)
|
2023-11-08 16:20:02 +08:00
|
|
|
|
c_new = self.c(u, v)
|
|
|
|
|
if c_old > c_new and u != self.s_goal:
|
2023-11-20 17:47:39 +08:00
|
|
|
|
self.rhs[u] = min(self.rhs[u], c_new + self.g[v])
|
2023-11-08 16:20:02 +08:00
|
|
|
|
elif self.rhs[u] == c_old + self.g[v] and u != self.s_goal:
|
|
|
|
|
succ_u = self.get_neighbors(u)
|
|
|
|
|
self.rhs[u] = min([self.c(u, s_) + self.g[s_] for s_ in succ_u])
|
|
|
|
|
self.update_vertex(u)
|
|
|
|
|
|
|
|
|
|
def map2real(self, pos):
|
|
|
|
|
'''
|
|
|
|
|
地图坐标->实际坐标
|
|
|
|
|
'''
|
|
|
|
|
x = pos[0] * self.scale_ratio + self.x_min
|
|
|
|
|
y = pos[1] * self.scale_ratio + self.y_min
|
|
|
|
|
return tuple((x, y))
|
|
|
|
|
|
2023-11-15 15:40:57 +08:00
|
|
|
|
def real2map(self, pos, reachable_assurance=True):
|
2023-11-08 16:20:02 +08:00
|
|
|
|
'''
|
|
|
|
|
实际坐标->地图坐标
|
|
|
|
|
'''
|
2023-11-15 15:40:57 +08:00
|
|
|
|
x = round((pos[0] - self.x_min) / self.scale_ratio)
|
|
|
|
|
y = round((pos[1] - self.y_min) / self.scale_ratio)
|
2023-11-16 15:28:08 +08:00
|
|
|
|
# 确保点不在静态障碍物上,否则就不断向外圈扩展直到找到非静态障碍物位置
|
|
|
|
|
if reachable_assurance:
|
|
|
|
|
return self.validate_pos((x, y))
|
2023-11-08 16:20:02 +08:00
|
|
|
|
else:
|
2023-11-15 15:40:57 +08:00
|
|
|
|
return tuple((x, y))
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
2023-11-16 15:28:08 +08:00
|
|
|
|
def validate_pos(self, pos):
|
|
|
|
|
'''
|
|
|
|
|
对于不合法的pos,找到周围距离最近的合法坐标
|
|
|
|
|
'''
|
|
|
|
|
(x, y) = pos
|
|
|
|
|
x = max(0, min(x, self.X - 1))
|
|
|
|
|
y = max(0, min(y, self.Y - 1))
|
|
|
|
|
if self.idx_to_object[self.map[x, y]] == 'obstacle':
|
|
|
|
|
start_x, end_x = max(x - 1, 0), min(x + 1, self.X - 1)
|
|
|
|
|
start_y, end_y = max(y - 1, 0), min(y + 1, self.Y - 1)
|
|
|
|
|
while True:
|
|
|
|
|
for x_ in range(start_x, end_x + 1):
|
|
|
|
|
if self.idx_to_object[self.map[x_, start_y]] != 'obstacle':
|
|
|
|
|
return tuple((x_, start_y))
|
|
|
|
|
for y_ in range(start_y + 1, end_y + 1):
|
|
|
|
|
if self.idx_to_object[self.map[end_x, y_]] != 'obstacle':
|
|
|
|
|
return tuple((end_x, y_))
|
|
|
|
|
for x_ in range(end_x - 1, start_x - 1, -1):
|
|
|
|
|
if self.idx_to_object[self.map[x_, end_y]] != 'obstacle':
|
|
|
|
|
return tuple((x_, end_y))
|
|
|
|
|
for y_ in range(end_y - 1, start_y, -1):
|
|
|
|
|
if self.idx_to_object[self.map[start_x, y_]] != 'obstacle':
|
|
|
|
|
return tuple((start_x, y_))
|
|
|
|
|
start_x, end_x = max(start_x - 1, 0), min(end_x + 1, self.X - 1)
|
|
|
|
|
start_y, end_y = max(start_y - 1, 0), min(end_y + 1, self.Y - 1)
|
|
|
|
|
# raise Exception('invalid pos!')
|
|
|
|
|
return tuple((x, y))
|
|
|
|
|
|
2023-11-20 17:47:39 +08:00
|
|
|
|
def draw_graph(self, step_num, yaw):
|
|
|
|
|
'''
|
|
|
|
|
Args:
|
|
|
|
|
step_num: 移动步数
|
|
|
|
|
yaw: robot朝向 (弧度)
|
|
|
|
|
'''
|
2023-11-28 22:29:31 +08:00
|
|
|
|
# # 清空当前figure内容,保留figure对象
|
|
|
|
|
# plt.clf()
|
|
|
|
|
# # for stopping simulation with the esc key. 按esc结束
|
|
|
|
|
# plt.gcf().canvas.mpl_connect(
|
|
|
|
|
# 'key_release_event',
|
|
|
|
|
# lambda event: [exit(0) if event.key == 'escape' else None])
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
|
|
|
|
# 缩放坐标偏移量
|
|
|
|
|
offset = (self.x_min / self.scale_ratio, self.x_max / self.scale_ratio,
|
2023-11-20 17:47:39 +08:00
|
|
|
|
self.y_min / self.scale_ratio, self.y_max / self.scale_ratio)
|
2023-11-28 22:29:31 +08:00
|
|
|
|
# 画起点和目标
|
|
|
|
|
if self.s_start:
|
|
|
|
|
start = (self.s_start[0] + offset[0], self.s_start[1] + offset[2])
|
|
|
|
|
plt.plot(start[1], start[0], 'x', color='r')
|
|
|
|
|
if self.s_goal:
|
|
|
|
|
goal = (self.s_goal[0] + offset[0], self.s_goal[1] + offset[2])
|
|
|
|
|
plt.plot(goal[1], goal[0], 'x', color='darkorange')
|
|
|
|
|
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
|
|
|
|
# 画地图: X行Y列,第一行在下面
|
|
|
|
|
# 范围: 横向Y[-80,290] 纵向X[-70,120]
|
|
|
|
|
plt.imshow(self.map, cmap='binary', alpha=0.5, origin='lower',
|
|
|
|
|
extent=(offset[2], offset[3],
|
|
|
|
|
offset[0], offset[1]))
|
|
|
|
|
|
|
|
|
|
# 画搜索路径
|
|
|
|
|
plt.plot([y + offset[2] for (x, y) in self.path],
|
|
|
|
|
[x + offset[0] for (x, y) in self.path], "-g")
|
|
|
|
|
|
2023-11-28 22:29:31 +08:00
|
|
|
|
if self.s_start:
|
|
|
|
|
# 画移动路径
|
|
|
|
|
next_step = min(step_num, len(self.path))
|
|
|
|
|
plt.plot([start[1], self.path[next_step - 1][1] + offset[2]],
|
|
|
|
|
[start[0], self.path[next_step - 1][0] + offset[0]], "-r")
|
|
|
|
|
# plt.plot([y + offset[2] for (x, y) in self.path[:next_step]],
|
|
|
|
|
# [x + offset[0] for (x, y) in self.path[:next_step]], "-r")
|
|
|
|
|
|
|
|
|
|
# 画感应半径和观测范围
|
|
|
|
|
self.plot_circle(start[1], start[0], self.react_radius, 'lightgrey')
|
|
|
|
|
if yaw is not None:
|
|
|
|
|
plt.plot([start[1], start[1] + self.react_radius * (math.sin(yaw + self.vision_radius))],
|
|
|
|
|
[start[0], start[0] + self.react_radius * (math.cos(yaw + self.vision_radius))], "aqua", linewidth=1)
|
|
|
|
|
plt.plot([start[1], start[1] + self.react_radius * (math.sin(yaw - self.vision_radius))],
|
|
|
|
|
[start[0], start[0] + self.react_radius * (math.cos(yaw - self.vision_radius))], "aqua", linewidth=1)
|
2023-11-08 16:20:02 +08:00
|
|
|
|
|
|
|
|
|
plt.xlabel('y', loc='right')
|
|
|
|
|
plt.ylabel('x', loc='top')
|
|
|
|
|
plt.grid(color='black', linestyle='-', linewidth=0.5)
|
2023-11-28 22:29:31 +08:00
|
|
|
|
# plt.show()
|
2023-11-20 17:47:39 +08:00
|
|
|
|
|
|
|
|
|
def draw_rhs(self, rhs):
|
|
|
|
|
# 画地图: X行Y列,第一行在下面
|
|
|
|
|
|
|
|
|
|
# 缩放坐标偏移量
|
|
|
|
|
offset = (self.x_min / self.scale_ratio, self.x_max / self.scale_ratio,
|
|
|
|
|
self.y_min / self.scale_ratio, self.y_max / self.scale_ratio)
|
|
|
|
|
|
|
|
|
|
# 将无穷大位置的值替换为指定的颜色
|
|
|
|
|
# rhs[np.isinf(rhs)] = np.nan
|
|
|
|
|
|
|
|
|
|
# 将无穷大位置的值替换为 np.nan
|
|
|
|
|
rhs = np.ma.masked_invalid(rhs)
|
|
|
|
|
|
|
|
|
|
# 设置颜色映射范围
|
|
|
|
|
vmin = np.nanmin(rhs)
|
|
|
|
|
vmax = np.nanmax(rhs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 创建一个图形对象
|
|
|
|
|
fig, ax = plt.subplots()
|
|
|
|
|
|
|
|
|
|
# cmap = mcolors.LinearSegmentedColormap.from_list('custom_cmap', ['#0000FF', '#FFFFFF'])
|
|
|
|
|
|
|
|
|
|
# 绘制热力图
|
|
|
|
|
heatmap = ax.imshow(rhs, cmap='jet', vmin=vmin, vmax=vmax, origin='lower',
|
|
|
|
|
extent=(offset[2], offset[3],
|
|
|
|
|
offset[0], offset[1]))
|
|
|
|
|
# 添加颜色条
|
|
|
|
|
plt.colorbar(heatmap)
|
|
|
|
|
|
|
|
|
|
start = (self.s_start[0] + offset[0], self.s_start[1] + offset[2])
|
|
|
|
|
goal = (self.s_goal[0] + offset[0], self.s_goal[1] + offset[2])
|
|
|
|
|
|
|
|
|
|
# 画起点和目标
|
|
|
|
|
plt.plot(start[1], start[0], 'x', color='black')
|
|
|
|
|
plt.plot(goal[1], goal[0], 'x', color='darkorange')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 设置图形标题和轴标签
|
|
|
|
|
ax.set_title('Heatmap(G)')
|
|
|
|
|
ax.set_xlabel('y', loc='right')
|
|
|
|
|
ax.set_ylabel('x', loc='top')
|
|
|
|
|
ax.grid(color='black', linestyle='-', linewidth=0.5)
|
|
|
|
|
# 显示图形
|
|
|
|
|
plt.show()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def plot_circle(y, x, size, color="lightgrey"): # pragma: no cover
|
|
|
|
|
'''
|
|
|
|
|
以(x,y)为圆心,size为半径 画圆
|
|
|
|
|
'''
|
|
|
|
|
deg = list(range(0, 360, 5))
|
|
|
|
|
deg.append(0)
|
|
|
|
|
yl = [y + size * math.cos(np.deg2rad(d)) for d in deg]
|
|
|
|
|
xl = [x + size * math.sin(np.deg2rad(d)) for d in deg]
|
|
|
|
|
plt.plot(yl, xl, color)
|
|
|
|
|
|
|
|
|
|
def triangle_occupy(self, obs, occupy_radius):
|
|
|
|
|
'''
|
|
|
|
|
计算等边三角形的三个顶点并返回三角形对象
|
|
|
|
|
TODO: 效果不及预期
|
|
|
|
|
Args:
|
|
|
|
|
obs: 三角形底边中点
|
|
|
|
|
occupy_radius: 三角形的高
|
|
|
|
|
'''
|
|
|
|
|
dist = euclidean_distance(self.s_start, obs)
|
|
|
|
|
# obs(底边中点)的坐标
|
|
|
|
|
x1, y1 = obs
|
|
|
|
|
# 顶点A的坐标
|
|
|
|
|
x2 = x1 + occupy_radius * ((self.s_start[0] - x1) / dist)
|
|
|
|
|
y2 = y1 + occupy_radius * ((self.s_start[1] - y1) / dist)
|
|
|
|
|
|
|
|
|
|
# 计算向量AP的坐标
|
|
|
|
|
AP_x = x1 - x2
|
|
|
|
|
AP_y = y1 - y2
|
|
|
|
|
# 计算向量AB的坐标
|
|
|
|
|
AB_x = AP_x * math.cos(math.pi / 6) - AP_y * math.sin(math.pi / 6)
|
|
|
|
|
AB_y = AP_x * math.sin(math.pi / 6) + AP_y * math.cos(math.pi / 6)
|
|
|
|
|
# 计算向量AC的坐标
|
|
|
|
|
AC_x = AP_x * math.cos(math.pi / 6) + AP_y * math.sin(math.pi / 6)
|
|
|
|
|
AC_y = -AP_x * math.sin(math.pi / 6) + AP_y * math.cos(math.pi / 6)
|
|
|
|
|
|
|
|
|
|
# 计算顶点B的坐标
|
|
|
|
|
x3 = x2 + AB_x
|
|
|
|
|
y3 = y2 + AB_y
|
|
|
|
|
# 计算顶点C的坐标
|
|
|
|
|
x4 = x2 + AC_x
|
|
|
|
|
y4 = y2 + AC_y
|
|
|
|
|
|
|
|
|
|
ver1 = (x2, y2)
|
|
|
|
|
ver2 = (x3, y3)
|
|
|
|
|
ver3 = (x4, y4)
|
|
|
|
|
|
|
|
|
|
return Polygon([ver1, ver2, ver3], closed=True, fill=True)
|