706 lines
27 KiB
Python
706 lines
27 KiB
Python
'''
|
||
基于两个D* lite的实现
|
||
按照原始算法的伪代码
|
||
自己写的D* lite
|
||
'''
|
||
|
||
import math
|
||
import os
|
||
import pickle
|
||
import numpy as np
|
||
import heapq
|
||
|
||
from matplotlib import pyplot as plt
|
||
from matplotlib.patches import Polygon
|
||
import matplotlib.colors as mcolors
|
||
|
||
from robowaiter.robot.robot import root_path # 项目根目录路径
|
||
|
||
def diagonal_distance(start, end): #
|
||
return max(abs(start[0] - end[0]), abs(start[1] - end[1]))
|
||
|
||
|
||
def manhattan_distance(start, end): # 曼哈顿距离
|
||
return abs(start[0] - end[0]) + abs(start[1] - end[1])
|
||
|
||
|
||
def euclidean_distance(start, end): # 欧式距离
|
||
# return np.linalg.norm(start-end)
|
||
# return math.sqrt((start[0] - end[0]) ** 2 + (start[1] - end[1]) ** 2)
|
||
return math.hypot(start[0] - end[0], start[1] - end[1])
|
||
|
||
|
||
def heuristic(start, end, name='euclidean'):
|
||
if name == 'diagonal':
|
||
return diagonal_distance(start, end)
|
||
elif name == 'euclidean':
|
||
return euclidean_distance(start, end)
|
||
elif name == 'manhattan':
|
||
return manhattan_distance(start, end)
|
||
else:
|
||
raise Exception('Error heuristic name!')
|
||
|
||
|
||
class Priority:
|
||
'''
|
||
优先级类
|
||
'''
|
||
|
||
def __init__(self, k1, k2):
|
||
self.k1 = k1
|
||
self.k2 = k2
|
||
|
||
def __lt__(self, other): # 定义对象的 < 运算
|
||
return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 < other.k2)
|
||
|
||
def __le__(self, other): # 定于对象的 <= 运算
|
||
return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 <= other.k2)
|
||
|
||
|
||
class Node:
|
||
'''
|
||
节点类
|
||
'''
|
||
|
||
def __init__(self, pos: (int, int), priority: Priority):
|
||
self.pos = pos # X Y
|
||
self.priority = priority
|
||
|
||
def __lt__(self, other): # 定义对象的 < 运算
|
||
return self.priority < other.priority
|
||
|
||
def __le__(self, other): # 定于对象的 <= 运算
|
||
return self.priority <= other.priority
|
||
|
||
|
||
class PriorityQueue:
|
||
def __init__(self):
|
||
self.queue = [] # 节点队列
|
||
self.nodes = [] # 节点位置列表
|
||
|
||
def is_empty(self):
|
||
# 队列判空
|
||
return len(self.queue) == 0
|
||
|
||
def top(self):
|
||
return self.queue[0].pos
|
||
|
||
def top_key(self):
|
||
if self.is_empty():
|
||
return Priority(float('inf'), float('inf'))
|
||
return self.queue[0].priority
|
||
|
||
def pop(self):
|
||
# 取出第一个节点
|
||
node = heapq.heappop(self.queue)
|
||
self.nodes.remove(node.pos)
|
||
return node
|
||
|
||
def insert(self, pos, priority):
|
||
# 创建并添加节点
|
||
node = Node(pos, priority)
|
||
heapq.heappush(self.queue, node)
|
||
self.nodes.append(pos)
|
||
|
||
def remove(self, pos):
|
||
# 删除指定节点并重新排序
|
||
self.queue = [n for n in self.queue if n.pos != pos]
|
||
heapq.heapify(self.queue) # 重排序
|
||
self.nodes.remove(pos)
|
||
|
||
def update(self, pos, priority):
|
||
# 更新指定位置的priority值
|
||
for n in self.queue:
|
||
if n.pos == pos:
|
||
n.priority = priority
|
||
break
|
||
|
||
|
||
class DStarLite:
|
||
def __init__(self,
|
||
map: np.array([int, int]),
|
||
area_range, # [x_min, x_max, y_min, y_max] 实际坐标范围
|
||
scale_ratio=5, # 地图缩放率
|
||
react_radius=200, # 反应半径
|
||
vision_radius=math.pi/3, # 视角半径
|
||
dyna_obs_radius=36, # dyna_obs实际身位半径
|
||
max_path_length=2000 # 最大路径步数
|
||
):
|
||
|
||
self.map = map
|
||
self.background = map.copy()
|
||
self.X = map.shape[0]
|
||
self.Y = map.shape[1]
|
||
(self.x_min, self.x_max, self.y_min, self.y_max) = area_range
|
||
self.scale_ratio = scale_ratio
|
||
self.react_radius = math.floor(react_radius / scale_ratio)
|
||
self.vision_radius = vision_radius
|
||
self.dyna_obs_radius = math.floor(dyna_obs_radius / scale_ratio) # dyna_obs缩放后身位半径
|
||
self.max_path_length = max_path_length
|
||
|
||
self.dyna_obs_list = [] # dyna_obs位置列表 [(x, y)]
|
||
self.dyna_obs_occupy = [] # dyna_obs占用位置列表
|
||
|
||
# free:0, obs:1, dyna_obs:2
|
||
self.idx_to_object = {
|
||
0: "free",
|
||
1: "obstacle",
|
||
2: "dynamic obstacle"
|
||
}
|
||
self.object_to_idx = dict(zip(self.idx_to_object.values(), self.idx_to_object.keys()))
|
||
self.object_to_cost = {
|
||
"free": 0,
|
||
"obstacle": float('inf'),
|
||
"dynamic obstacle": float('inf')
|
||
}
|
||
|
||
# 读取 cost_map 文件
|
||
file_cost_map = os.path.join(root_path, 'robowaiter/algos/navigator/costMap_' + str(self.scale_ratio) + '.pkl')
|
||
if os.path.exists(file_cost_map):
|
||
with open(file_cost_map, 'rb') as file:
|
||
cost_map = pickle.load(file)
|
||
self.cost_map = cost_map
|
||
self.cost_background = self.cost_map.copy()
|
||
|
||
self.s_start = None # (int,int) 必须是元组(元组可以直接当作矩阵索引)
|
||
self.s_goal = None # (int,int)
|
||
self.s_last = None # (int,int)
|
||
self.U = PriorityQueue()
|
||
self.k_m = 0
|
||
self.rhs = np.ones((self.X, self.Y)) * np.inf
|
||
# self.rhs = np.ones_like(self.map) * 1000
|
||
# self.rhs[self.map==self.object_to_idx['obstacle']] = np.inf
|
||
self.g = self.rhs.copy()
|
||
self.path = []
|
||
|
||
# def set_map(self, map_):
|
||
# '''
|
||
# 设置map 和 cost_map
|
||
# '''
|
||
# self.map = map_
|
||
# self.background = map_.copy()
|
||
# self.X = map_.shape[0]
|
||
# self.Y = map_.shape[1]
|
||
# self.compute_cost_map()
|
||
# self.cost_background = self.cost_map.copy()
|
||
|
||
def reset(self):
|
||
'''
|
||
(完成一次导航后)
|
||
重置 1.环境变量
|
||
2.dstar_lite变量
|
||
'''
|
||
# env reset
|
||
self.map = self.background.copy()
|
||
self.cost_map = self.cost_background.copy()
|
||
self.dyna_obs_list.clear()
|
||
self.dyna_obs_occupy.clear()
|
||
self.path.clear()
|
||
# dstar_lite reset
|
||
self.s_start = None
|
||
self.s_goal = None
|
||
self.s_last = None
|
||
self.U = PriorityQueue()
|
||
self.k_m = 0
|
||
self.rhs = np.ones((self.X, self.Y)) * np.inf
|
||
self.g = self.rhs.copy()
|
||
|
||
def calculate_key(self, s: (int, int)):
|
||
'''
|
||
计算 位置s 的 key1, key2
|
||
:returns:
|
||
Priority(k1, k2): 可比较的对象
|
||
'''
|
||
k1 = min(self.g[s], self.rhs[s]) + heuristic(self.s_start, s) + self.k_m
|
||
k2 = min(self.g[s], self.rhs[s])
|
||
return Priority(k1, k2)
|
||
|
||
def c(self, u: (int, int), v: (int, int), c_old=None) -> float:
|
||
'''
|
||
计算节点间的 路径代价 和 目标位置代价 (目标位置代价为0时采用路径代价)
|
||
(因为是终点->起点的扩展方向,因此v是node,u是v扩展的neighbor)
|
||
Args:
|
||
u: from pos
|
||
v: to pos
|
||
c_old: 节点v的旧cost
|
||
'''
|
||
if u == v:
|
||
return 0.0
|
||
if c_old is not None:
|
||
obj_cost = c_old
|
||
else:
|
||
obj_cost = self.cost_map[v]
|
||
if obj_cost > 0:
|
||
return obj_cost
|
||
return heuristic(u, v)
|
||
|
||
def contain(self, u: (int, int)):
|
||
'''
|
||
判断 节点u 是否在队列中
|
||
'''
|
||
return u in self.U.nodes
|
||
|
||
def update_vertex(self, u: (int, int)):
|
||
'''
|
||
判定节点状态, 更新队列
|
||
不一致且在队列 --> 更新key
|
||
不一致且不在队列 --> 计算key并加入队列
|
||
一致且在队列 --> 移出队列
|
||
'''
|
||
if self.g[u] != self.rhs[u] and self.contain(u):
|
||
self.U.update(u, self.calculate_key(u))
|
||
elif self.g[u] != self.rhs[u] and not self.contain(u):
|
||
self.U.insert(u, self.calculate_key(u))
|
||
elif self.g[u] == self.rhs[u] and self.contain(u):
|
||
self.U.remove(u)
|
||
|
||
def compute_shortest_path(self):
|
||
'''
|
||
计算最短路径
|
||
'''
|
||
|
||
c_map = self.map.copy()
|
||
|
||
while self.U.top_key() < self.calculate_key(self.s_start) or self.rhs[self.s_start] > self.g[self.s_start]:
|
||
u = self.U.top()
|
||
c_map[u] = 10
|
||
k_old = self.U.top_key()
|
||
k_new = self.calculate_key(u)
|
||
if k_old < k_new:
|
||
self.U.update(u, k_new)
|
||
elif self.g[u] > self.rhs[u]: # 过一致
|
||
self.g[u] = self.rhs[u]
|
||
self.U.remove(u)
|
||
pred = self.get_neighbors(u)
|
||
for s in pred:
|
||
if s != self.s_goal:
|
||
self.rhs[s] = min(self.rhs[s], self.c(s, u) + self.g[u])
|
||
self.update_vertex(s)
|
||
else: # 欠一致
|
||
g_old = self.g[u]
|
||
self.g[u] = float('inf')
|
||
pred = self.get_neighbors(u)
|
||
for s in pred + [u]:
|
||
if self.rhs[s] == self.c(s, u) + g_old:
|
||
if s != self.s_goal:
|
||
succ = self.get_neighbors(s)
|
||
self.rhs[s] = min([self.c(s, s_) + self.g[s_] for s_ in succ])
|
||
self.update_vertex(s)
|
||
# self.draw_rhs(self.rhs)
|
||
|
||
|
||
|
||
|
||
def _planning(self, s_start, s_goal, dyna_obs, debug=False):
|
||
'''
|
||
规划路径(实际实现)
|
||
Args:
|
||
dyna_obs: 动态障碍物位置列表
|
||
step_num: 单次移动步数
|
||
'''
|
||
# 确保目标合法
|
||
if not self.in_bounds_without_obstacle(s_goal):
|
||
return []
|
||
# 第一次规划需要初始化rhs并将goal加入队列,计算最短路径
|
||
if self.s_goal is None:
|
||
self.s_start = tuple(s_start)
|
||
self.s_goal = tuple(s_goal)
|
||
self.s_last = tuple(s_start)
|
||
self.rhs[tuple(s_goal)] = 0
|
||
self.U.insert(tuple(s_goal), Priority(k1=heuristic(tuple(s_start), tuple(s_goal)), k2=0))
|
||
changed_pos = self.update_map(dyna_obs=dyna_obs)
|
||
if changed_pos:
|
||
self.update_cost_map(changed_pos)
|
||
self.compute_shortest_path() # 计算最短路径
|
||
self.path = self.get_path()
|
||
return self.path
|
||
# 后续规划只更新起点,直接使用原路径(去掉已走过部分)
|
||
else:
|
||
self.s_start = tuple(s_start)
|
||
# 根据obs更新map, cost_map, edge_cost
|
||
changed_pos = self.update_map(dyna_obs=dyna_obs)
|
||
if changed_pos:
|
||
self.update_cost_map(changed_pos)
|
||
self.update_edge_costs(changed_pos)
|
||
# 若地图改变,重新计算最短路径
|
||
self.compute_shortest_path()
|
||
self.path = self.get_path()
|
||
return self.path
|
||
|
||
def planning(self, s_start, s_goal, dyna_obs, debug=False):
|
||
'''
|
||
路径规划(供外部调用,处理实际坐标和地图坐标的转换)
|
||
'''
|
||
# 实际坐标 -> 地图坐标
|
||
s_start = self.real2map(s_start)
|
||
if self.s_goal is None:
|
||
s_goal = self.real2map(s_goal)
|
||
else:
|
||
s_goal = self.s_goal
|
||
dyna_obs = [self.real2map(obs, reachable_assurance=False) for obs in dyna_obs]
|
||
|
||
self._planning(s_start, s_goal, dyna_obs, debug)
|
||
|
||
# 地图坐标->实际坐标
|
||
path = [self.map2real(node) for node in self.path]
|
||
return path
|
||
|
||
def get_path(self):
|
||
'''
|
||
得到路径
|
||
Args:
|
||
step_num: 路径步数
|
||
return:
|
||
path: [(x, y), ...]
|
||
'''
|
||
if self.s_start is None or self.s_goal == self.s_start:
|
||
return []
|
||
path = []
|
||
cur = self.s_start
|
||
for i in range(self.max_path_length):
|
||
succ = [s_ for s_ in self.get_neighbors(cur)]
|
||
cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] + 20*(s_ in path) for s_ in succ])] # 避免抖动 (走重复的点会有惩罚)
|
||
# print(cur)
|
||
path.append(cur)
|
||
if cur == self.s_goal:
|
||
while cur in self.dyna_obs_occupy: # 确保目标未在dyna_obs范围中
|
||
cur = path.pop()
|
||
break
|
||
return path
|
||
|
||
def in_bounds_without_obstacle(self, pos):
|
||
'''
|
||
判断位置在地图范围内 且 不是静态障碍物
|
||
'''
|
||
(x, y) = pos
|
||
return 0 <= x < self.X and 0 <= y < self.Y and self.map[x, y] != self.object_to_idx["obstacle"]
|
||
|
||
def get_neighbors(self, pos, mode=8):
|
||
'''
|
||
获取邻居节点, 地图范围内
|
||
'''
|
||
(x_, y_) = pos
|
||
neighbors = [(x_ + 1, y_), (x_ - 1, y_), (x_, y_ + 1), (x_, y_ - 1), (x_ + 1, y_ + 1), (x_ - 1, y_ + 1),
|
||
(x_ + 1, y_ - 1), (x_ - 1, y_ - 1)]
|
||
neighbors = filter(self.in_bounds_without_obstacle, neighbors) # 确保位置在地图范围内 且 不是静态障碍物
|
||
return list(neighbors)
|
||
|
||
def compute_cost_map(self):
|
||
# 计算当前地图的cost_map
|
||
self.cost_map = np.zeros_like(self.map)
|
||
for idx, obj in self.idx_to_object.items():
|
||
self.cost_map[self.map == idx] = self.object_to_cost[obj]
|
||
|
||
# 扩张静态障碍物影响范围
|
||
obs_pos = np.where(self.map == self.object_to_idx['obstacle']) # 静态障碍物位置列表
|
||
for (x, y) in zip(obs_pos[0], obs_pos[1]):
|
||
start_x, end_x = max(x - 1, 0), min(x + 1, self.X - 1)
|
||
start_y, end_y = max(y - 1, 0), min(y + 1, self.Y - 1)
|
||
for cost in range(9, 0, -3):
|
||
for x_ in range(start_x, end_x + 1):
|
||
self.cost_map[x_, start_y] = max(self.cost_map[x_, start_y], cost)
|
||
for y_ in range(start_y + 1, end_y + 1):
|
||
self.cost_map[end_x, y_] = max(self.cost_map[end_x, y_], cost)
|
||
for x_ in range(end_x - 1, start_x - 1, -1):
|
||
self.cost_map[x_, end_y] = max(self.cost_map[x_, end_y], cost)
|
||
for y_ in range(end_y - 1, start_y, -1):
|
||
self.cost_map[start_x, y_] = max(self.cost_map[start_x, y_], cost)
|
||
start_x, end_x = max(start_x - 1, 0), min(end_x + 1, self.X - 1)
|
||
start_y, end_y = max(start_y - 1, 0), min(end_y + 1, self.Y - 1)
|
||
|
||
self.cost_background = self.cost_map.copy()
|
||
|
||
def update_map(self, dyna_obs):
|
||
'''
|
||
更新地图 和 动态障碍物列表
|
||
Args:
|
||
dyna_obs: 当前动态障碍物位置列表 [(x,y), ...]
|
||
return:
|
||
update_obj: 改变的位置列表 [(x, y, obj_idx, cost_old), ...]
|
||
'''
|
||
# dyna_obs没有变化 (集合set可以忽略元素在列表中的位置) 且 robot未在dyna_obs占用位置中
|
||
if set(dyna_obs) == set(self.dyna_obs_list) and self.s_start not in self.dyna_obs_occupy:
|
||
return []
|
||
|
||
# 当前dyna_obs占用位置列表
|
||
dyna_obs_occupy = []
|
||
for pos in dyna_obs:
|
||
dyna_obs_occupy.extend(self.get_occupy_pos(pos))
|
||
dyna_obs_occupy = [pos for i, pos in enumerate(dyna_obs_occupy) if pos not in dyna_obs_occupy[:i]] # 去除重复位置
|
||
# 转变为free 和 转变为dyna_obs的位置列表
|
||
changed_free = [pos for pos in self.dyna_obs_occupy if pos not in dyna_obs_occupy]
|
||
changed_obs = [pos for pos in dyna_obs_occupy if pos not in self.dyna_obs_occupy]
|
||
|
||
# 更新地图,计算changed_pos
|
||
changed_pos = []
|
||
for (x, y) in changed_free:
|
||
self.map[x, y] = self.object_to_idx['free']
|
||
changed_pos.append((x, y, self.object_to_idx["free"], self.cost_map[x, y]))
|
||
# changed_pos.append((x, y, self.object_to_idx["free"], self.object_to_idx["obstacle"]))
|
||
for (x, y) in changed_obs:
|
||
self.map[x, y] = self.object_to_idx['obstacle']
|
||
changed_pos.append((x, y, self.object_to_idx["obstacle"], self.cost_map[x, y]))
|
||
# changed_pos.append((x, y, self.object_to_idx["obstacle"], self.object_to_idx["free"]))
|
||
|
||
# 更新dyna_obs 位置列表 和 占用位置列表
|
||
self.dyna_obs_list = dyna_obs
|
||
self.dyna_obs_occupy = dyna_obs_occupy
|
||
|
||
return changed_pos
|
||
|
||
def get_occupy_pos(self, obs_pos):
|
||
'''
|
||
根据dyna_obs中心位置,计算其占用的所有网格位置
|
||
'''
|
||
(x, y) = obs_pos
|
||
occupy_radius = min(self.dyna_obs_radius,
|
||
int(euclidean_distance(obs_pos, self.s_start) - 1)) # 避免robot被dyna_obs的占用区域包裹住
|
||
# 圆形区域
|
||
occupy_pos = [(i, j) for i in range(x - occupy_radius, x + occupy_radius + 1)
|
||
for j in range(y - occupy_radius, y + occupy_radius + 1)
|
||
if euclidean_distance((i, j), obs_pos) < occupy_radius]
|
||
# # 等边三角形区域 TODO: 效果不及预期(三角形棱角容易导致robot卡住)
|
||
# triangle_occupy = self.triangle_occupy(obs=obs_pos, occupy_radius=occupy_radius)
|
||
# occupy_pos = [(i, j) for i in range(x - occupy_radius, x + occupy_radius + 1)
|
||
# for j in range(y - occupy_radius, y + occupy_radius + 1)
|
||
# if triangle_occupy.contains_point((i, j))]
|
||
occupy_pos = filter(self.in_bounds_without_obstacle, occupy_pos) # 确保位置在地图范围内 且 不是静态障碍物
|
||
return list(occupy_pos)
|
||
|
||
def update_cost_map(self, changed_pos):
|
||
'''
|
||
更新cost_map
|
||
Args:
|
||
changed_pos: 改变的位置列表 [x, y, idx]
|
||
'''
|
||
for (x, y, idx, _) in changed_pos:
|
||
if self.idx_to_object[idx] == 'free': # 空位的cost取决于离障碍物的距离
|
||
self.cost_map[x, y] = self.cost_background[x, y]
|
||
else:
|
||
self.cost_map[x, y] = self.object_to_cost[self.idx_to_object[idx]]
|
||
|
||
def update_edge_costs(self, changed_pos):
|
||
'''
|
||
重新计算edge_cost,更新受影响节点的rhs
|
||
Args:
|
||
changed_pos: 改变的位置列表 [(x, y, obj_idx_new, obj_idx_old)]
|
||
'''
|
||
if not changed_pos: return
|
||
self.k_m += heuristic(self.s_last, self.s_start, name='euclidean') # 使用欧式距离 更新km
|
||
self.s_last = self.s_start
|
||
for pos in changed_pos:
|
||
v = (pos[0], pos[1]) # to pos
|
||
c_old = pos[3] # 位置v的旧cost
|
||
pred_v = self.get_neighbors(v)
|
||
for u in pred_v:
|
||
c_old = self.c(u, v, c_old=c_old)
|
||
c_new = self.c(u, v)
|
||
if c_old > c_new and u != self.s_goal:
|
||
self.rhs[u] = min(self.rhs[u], c_new + self.g[v])
|
||
elif self.rhs[u] == c_old + self.g[v] and u != self.s_goal:
|
||
succ_u = self.get_neighbors(u)
|
||
self.rhs[u] = min([self.c(u, s_) + self.g[s_] for s_ in succ_u])
|
||
self.update_vertex(u)
|
||
|
||
def map2real(self, pos):
|
||
'''
|
||
地图坐标->实际坐标
|
||
'''
|
||
x = pos[0] * self.scale_ratio + self.x_min
|
||
y = pos[1] * self.scale_ratio + self.y_min
|
||
return tuple((x, y))
|
||
|
||
def real2map(self, pos, reachable_assurance=True):
|
||
'''
|
||
实际坐标->地图坐标
|
||
'''
|
||
x = round((pos[0] - self.x_min) / self.scale_ratio)
|
||
y = round((pos[1] - self.y_min) / self.scale_ratio)
|
||
# 确保点不在静态障碍物上,否则就不断向外圈扩展直到找到非静态障碍物位置
|
||
if reachable_assurance:
|
||
return self.validate_pos((x, y))
|
||
else:
|
||
return tuple((x, y))
|
||
|
||
def validate_pos(self, pos):
|
||
'''
|
||
对于不合法的pos,找到周围距离最近的合法坐标
|
||
'''
|
||
(x, y) = pos
|
||
x = max(0, min(x, self.X - 1))
|
||
y = max(0, min(y, self.Y - 1))
|
||
if self.idx_to_object[self.map[x, y]] == 'obstacle':
|
||
start_x, end_x = max(x - 1, 0), min(x + 1, self.X - 1)
|
||
start_y, end_y = max(y - 1, 0), min(y + 1, self.Y - 1)
|
||
while True:
|
||
for x_ in range(start_x, end_x + 1):
|
||
if self.idx_to_object[self.map[x_, start_y]] != 'obstacle':
|
||
return tuple((x_, start_y))
|
||
for y_ in range(start_y + 1, end_y + 1):
|
||
if self.idx_to_object[self.map[end_x, y_]] != 'obstacle':
|
||
return tuple((end_x, y_))
|
||
for x_ in range(end_x - 1, start_x - 1, -1):
|
||
if self.idx_to_object[self.map[x_, end_y]] != 'obstacle':
|
||
return tuple((x_, end_y))
|
||
for y_ in range(end_y - 1, start_y, -1):
|
||
if self.idx_to_object[self.map[start_x, y_]] != 'obstacle':
|
||
return tuple((start_x, y_))
|
||
start_x, end_x = max(start_x - 1, 0), min(end_x + 1, self.X - 1)
|
||
start_y, end_y = max(start_y - 1, 0), min(end_y + 1, self.Y - 1)
|
||
# raise Exception('invalid pos!')
|
||
return tuple((x, y))
|
||
|
||
def draw_graph(self, step_num, yaw):
|
||
'''
|
||
Args:
|
||
step_num: 移动步数
|
||
yaw: robot朝向 (弧度)
|
||
'''
|
||
# # 清空当前figure内容,保留figure对象
|
||
# plt.clf()
|
||
# # for stopping simulation with the esc key. 按esc结束
|
||
# plt.gcf().canvas.mpl_connect(
|
||
# 'key_release_event',
|
||
# lambda event: [exit(0) if event.key == 'escape' else None])
|
||
|
||
# 缩放坐标偏移量
|
||
offset = (self.x_min / self.scale_ratio, self.x_max / self.scale_ratio,
|
||
self.y_min / self.scale_ratio, self.y_max / self.scale_ratio)
|
||
# 画起点和目标
|
||
if self.s_start:
|
||
start = (self.s_start[0] + offset[0], self.s_start[1] + offset[2])
|
||
plt.plot(start[1], start[0], 'x', color='r')
|
||
if self.s_goal:
|
||
goal = (self.s_goal[0] + offset[0], self.s_goal[1] + offset[2])
|
||
plt.plot(goal[1], goal[0], 'x', color='darkorange')
|
||
|
||
|
||
# 画地图: X行Y列,第一行在下面
|
||
# 范围: 横向Y[-80,290] 纵向X[-70,120]
|
||
plt.imshow(self.map, cmap='binary', alpha=0.5, origin='lower',
|
||
extent=(offset[2], offset[3],
|
||
offset[0], offset[1]))
|
||
|
||
# 画搜索路径
|
||
plt.plot([y + offset[2] for (x, y) in self.path],
|
||
[x + offset[0] for (x, y) in self.path], "-g")
|
||
|
||
if self.s_start:
|
||
# 画移动路径
|
||
next_step = min(step_num, len(self.path))
|
||
plt.plot([start[1], self.path[next_step - 1][1] + offset[2]],
|
||
[start[0], self.path[next_step - 1][0] + offset[0]], "-r")
|
||
# plt.plot([y + offset[2] for (x, y) in self.path[:next_step]],
|
||
# [x + offset[0] for (x, y) in self.path[:next_step]], "-r")
|
||
|
||
# 画感应半径和观测范围
|
||
self.plot_circle(start[1], start[0], self.react_radius, 'lightgrey')
|
||
if yaw is not None:
|
||
plt.plot([start[1], start[1] + self.react_radius * (math.sin(yaw + self.vision_radius))],
|
||
[start[0], start[0] + self.react_radius * (math.cos(yaw + self.vision_radius))], "aqua", linewidth=1)
|
||
plt.plot([start[1], start[1] + self.react_radius * (math.sin(yaw - self.vision_radius))],
|
||
[start[0], start[0] + self.react_radius * (math.cos(yaw - self.vision_radius))], "aqua", linewidth=1)
|
||
|
||
plt.xlabel('y', loc='right')
|
||
plt.ylabel('x', loc='top')
|
||
plt.grid(color='black', linestyle='-', linewidth=0.5)
|
||
# plt.show()
|
||
|
||
def draw_rhs(self, rhs):
|
||
# 画地图: X行Y列,第一行在下面
|
||
|
||
# 缩放坐标偏移量
|
||
offset = (self.x_min / self.scale_ratio, self.x_max / self.scale_ratio,
|
||
self.y_min / self.scale_ratio, self.y_max / self.scale_ratio)
|
||
|
||
# 将无穷大位置的值替换为指定的颜色
|
||
# rhs[np.isinf(rhs)] = np.nan
|
||
|
||
# 将无穷大位置的值替换为 np.nan
|
||
rhs = np.ma.masked_invalid(rhs)
|
||
|
||
# 设置颜色映射范围
|
||
vmin = np.nanmin(rhs)
|
||
vmax = np.nanmax(rhs)
|
||
|
||
|
||
# 创建一个图形对象
|
||
fig, ax = plt.subplots()
|
||
|
||
# cmap = mcolors.LinearSegmentedColormap.from_list('custom_cmap', ['#0000FF', '#FFFFFF'])
|
||
|
||
# 绘制热力图
|
||
heatmap = ax.imshow(rhs, cmap='jet', vmin=vmin, vmax=vmax, origin='lower',
|
||
extent=(offset[2], offset[3],
|
||
offset[0], offset[1]))
|
||
# 添加颜色条
|
||
plt.colorbar(heatmap)
|
||
|
||
start = (self.s_start[0] + offset[0], self.s_start[1] + offset[2])
|
||
goal = (self.s_goal[0] + offset[0], self.s_goal[1] + offset[2])
|
||
|
||
# 画起点和目标
|
||
plt.plot(start[1], start[0], 'x', color='black')
|
||
plt.plot(goal[1], goal[0], 'x', color='darkorange')
|
||
|
||
|
||
# 设置图形标题和轴标签
|
||
ax.set_title('Heatmap(G)')
|
||
ax.set_xlabel('y', loc='right')
|
||
ax.set_ylabel('x', loc='top')
|
||
ax.grid(color='black', linestyle='-', linewidth=0.5)
|
||
# 显示图形
|
||
plt.show()
|
||
|
||
|
||
|
||
|
||
@staticmethod
|
||
def plot_circle(y, x, size, color="lightgrey"): # pragma: no cover
|
||
'''
|
||
以(x,y)为圆心,size为半径 画圆
|
||
'''
|
||
deg = list(range(0, 360, 5))
|
||
deg.append(0)
|
||
yl = [y + size * math.cos(np.deg2rad(d)) for d in deg]
|
||
xl = [x + size * math.sin(np.deg2rad(d)) for d in deg]
|
||
plt.plot(yl, xl, color)
|
||
|
||
def triangle_occupy(self, obs, occupy_radius):
|
||
'''
|
||
计算等边三角形的三个顶点并返回三角形对象
|
||
TODO: 效果不及预期
|
||
Args:
|
||
obs: 三角形底边中点
|
||
occupy_radius: 三角形的高
|
||
'''
|
||
dist = euclidean_distance(self.s_start, obs)
|
||
# obs(底边中点)的坐标
|
||
x1, y1 = obs
|
||
# 顶点A的坐标
|
||
x2 = x1 + occupy_radius * ((self.s_start[0] - x1) / dist)
|
||
y2 = y1 + occupy_radius * ((self.s_start[1] - y1) / dist)
|
||
|
||
# 计算向量AP的坐标
|
||
AP_x = x1 - x2
|
||
AP_y = y1 - y2
|
||
# 计算向量AB的坐标
|
||
AB_x = AP_x * math.cos(math.pi / 6) - AP_y * math.sin(math.pi / 6)
|
||
AB_y = AP_x * math.sin(math.pi / 6) + AP_y * math.cos(math.pi / 6)
|
||
# 计算向量AC的坐标
|
||
AC_x = AP_x * math.cos(math.pi / 6) + AP_y * math.sin(math.pi / 6)
|
||
AC_y = -AP_x * math.sin(math.pi / 6) + AP_y * math.cos(math.pi / 6)
|
||
|
||
# 计算顶点B的坐标
|
||
x3 = x2 + AB_x
|
||
y3 = y2 + AB_y
|
||
# 计算顶点C的坐标
|
||
x4 = x2 + AC_x
|
||
y4 = y2 + AC_y
|
||
|
||
ver1 = (x2, y2)
|
||
ver2 = (x3, y3)
|
||
ver3 = (x4, y4)
|
||
|
||
return Polygon([ver1, ver2, ver3], closed=True, fill=True)
|