RoboWaiter/zoo/navigator_DstarLite/dstar_lite.py

511 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'''
基于两个D* lite的实现
按照原始算法的伪代码
自己写的D* lite
'''
import math
import queue
from functools import partial
import numpy as np
import heapq
from matplotlib import pyplot as plt
def diagonal_distance(start, end): #
return max(abs(start[0] - end[0]), abs(start[1] - end[1]))
def manhattan_distance(start, end): # 曼哈顿距离
return abs(start[0] - end[0]) + abs(start[1] - end[1])
def euclidean_distance(start, end): # 欧式距离
# return np.linalg.norm(start-end)
return math.sqrt((start[0] - end[0]) ** 2 + (start[1] - end[1]) ** 2)
def heuristic(start, end, name='euclidean'):
if name == 'diagonal':
return diagonal_distance(start, end)
elif name == 'euclidean':
return euclidean_distance(start, end)
elif name == 'manhattan':
return manhattan_distance(start, end)
else:
raise Exception('Error heuristic name!')
class Priority:
'''
优先级类
'''
def __init__(self, k1, k2):
self.k1 = k1
self.k2 = k2
def __lt__(self, other): # 定义对象的 < 运算
return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 < other.k2)
def __le__(self, other): # 定于对象的 <= 运算
return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 <= other.k2)
class Node:
'''
节点类
'''
def __init__(self, pos: (int, int), priority: Priority):
self.pos = pos # X Y
self.priority = priority
def __lt__(self, other): # 定义对象的 < 运算
return self.priority < other.priority
def __le__(self, other): # 定于对象的 <= 运算
return self.priority <= other.priority
class PriorityQueue:
def __init__(self):
self.queue = [] # 节点队列
self.nodes = [] # 节点位置列表
def is_empty(self):
# 队列判空
return len(self.queue) == 0
def top(self):
return self.queue[0].pos
def top_key(self):
if self.is_empty():
return Priority(float('inf'), float('inf'))
return self.queue[0].priority
def pop(self):
# 取出第一个节点
node = heapq.heappop(self.queue)
self.nodes.remove(node.pos)
return node
def insert(self, pos, priority):
# 创建并添加节点
node = Node(pos, priority)
heapq.heappush(self.queue, node)
self.nodes.append(pos)
def remove(self, pos):
# 删除指定节点并重新排序
self.queue = [n for n in self.queue if n.pos != pos]
heapq.heapify(self.queue) # 重排序
self.nodes.remove(pos)
def update(self, pos, priority):
# 更新指定位置的priority值
for n in self.queue:
if n.pos == pos:
n.priority = priority
break
class DStarLite:
def __init__(self,
map: np.array([int, int]), # [X, Y]
area_range, # [x_min, x_max, y_min, y_max] 实际坐标范围
scale_ratio=5, # 地图缩放率
dyna_obs_radius=20 # dyna_obs实际身位半径
):
# self.area_bounds = area
self.map = map
self.background = map
self.X = map.shape[0]
self.Y = map.shape[1]
(self.x_min, self.x_max, self.y_min, self.y_max) = area_range
self.scale_ratio = scale_ratio
self.dyna_obs_list = [] # 动态障碍物位置列表( 当前地图 ) [(x, y)]
self.dyna_obs_radius = math.ceil(dyna_obs_radius/scale_ratio) # dyna_obs缩放后身位半径
# free:0, obs:1, dyna_obs:2
self.idx_to_object = {
0: "free",
1: "obstacle",
2: "dynamic obstacle"
}
self.object_to_idx = dict(zip(self.idx_to_object.values(), self.idx_to_object.keys()))
self.object_to_cost = {
"free": 0,
"obstacle": float('inf'),
"dynamic obstacle": 50
}
self.compute_cost_map()
self.s_start = None # (int,int) 必须是元组(元组可以直接当作矩阵索引)
self.s_goal = None # (int,int)
self.s_last = None # (int,int)
self.U = PriorityQueue()
self.k_m = 0
self.rhs = np.ones((self.X, self.Y)) * np.inf
self.g = self.rhs.copy()
self.path = []
def set_map(self, map_):
'''
设置map 和 cost_map
'''
self.map = map_
self.X = map_.shape[0]
self.Y = map_.shape[1]
self.compute_cost_map()
def reset(self):
'''
(完成一次导航后)
重置 1.环境变量
2.dstar_lite变量
'''
# env reset
self.map = self.background.copy()
self.compute_cost_map()
self.dyna_obs_list.clear()
self.path.clear()
# dstar_lite reset
self.s_start = None
self.s_goal = None
self.s_last = None
self.U = PriorityQueue()
self.k_m = 0
self.rhs = np.ones((self.X, self.Y)) * np.inf
self.g = self.rhs.copy()
def calculate_key(self, s: (int, int)):
'''
计算 位置s 的 key1, key2
:returns
Priority(k1, k2): 可比较的对象
'''
k1 = min(self.g[s], self.rhs[s]) + heuristic(self.s_start, s) + self.k_m
k2 = min(self.g[s], self.rhs[s])
return Priority(k1, k2)
def c(self, u: (int, int), v: (int, int), v_old=None) -> float:
'''
计算节点间的 路径代价 和 目标位置代价 (目标位置代价为0时采用路径代价)
(因为是终点->起点的扩展方向因此v是nodeu是v扩展的neighbor)
Args:
u: from pos
v: to pos
v_old: 指定的v的类型
'''
if v_old:
obj_cost = self.object_to_cost[v_old]
else:
obj_cost = self.cost_map[v]
if obj_cost > 0:
return obj_cost
return heuristic(u, v)
def contain(self, u: (int, int)):
'''
判断 节点u 是否在队列中
'''
return u in self.U.nodes
def update_vertex(self, u: (int, int)):
'''
判定节点状态, 更新队列
不一致且在队列 --> 更新key
不一致且不在队列 --> 计算key并加入队列
一致且在队列 --> 移出队列
'''
if self.g[u] != self.rhs[u] and self.contain(u):
self.U.update(u, self.calculate_key(u))
elif self.g[u] != self.rhs[u] and not self.contain(u):
self.U.insert(u, self.calculate_key(u))
elif self.g[u] == self.rhs[u] and self.contain(u):
self.U.remove(u)
def compute_shortest_path(self):
'''
计算最短路径
'''
while self.U.top_key() < self.calculate_key(self.s_start) or self.rhs[self.s_start] > self.g[self.s_start]:
u = self.U.top()
k_old = self.U.top_key()
k_new = self.calculate_key(u)
if k_old < k_new:
self.U.update(u, k_new)
elif self.g[u] > self.rhs[u]: # 过一致
self.g[u] = self.rhs[u]
self.U.remove(u)
pred = self.get_neighbors(u)
for s in pred:
if s != self.s_goal:
self.rhs[s] = min(self.rhs[s], self.c(s, u) + self.g[u])
self.update_vertex(s)
else: # 欠一致
g_old = self.g[u]
self.g[u] = float('inf')
pred = self.get_neighbors(u)
for s in pred + [u]:
if self.rhs[s] == self.c(s, u) + g_old:
if s != self.s_goal:
succ = self.get_neighbors(s)
self.rhs[s] = min([self.c(s, s_) + self.g[s_] for s_ in succ])
self.update_vertex(s)
def _planning(self, s_start, s_goal, dyna_obs, step_num=None, debug=False):
'''
规划路径(实际实现)
Args:
dyna_obs: 动态障碍物位置列表
step_num: 单次移动步数
'''
# 确保目标合法
if not self.in_bounds_without_obstacle(s_goal):
return None
# 第一次规划需要初始化rhs并将goal加入队列计算最短路径
if self.s_goal is None:
self.s_start = tuple(s_start)
self.s_goal = tuple(s_goal)
self.s_last = tuple(s_start)
self.rhs[tuple(s_goal)] = 0
self.U.insert(tuple(s_goal), Priority(k1=heuristic(tuple(s_start), tuple(s_goal)), k2=0))
self.compute_shortest_path() # 计算最短路径
self.path = self.get_path()
# 后续规划只更新起点,直接使用原路径(去掉已走过部分)
else:
self.s_start = tuple(s_start)
self.path = self.path[step_num:]
# 根据obs更新map, cost_map, edge_cost
changed_pos = self.update_map(dyna_obs=dyna_obs)
if changed_pos:
self.update_cost_map(changed_pos)
self.update_edge_costs(changed_pos)
# 若地图改变,重新计算最短路径
self.compute_shortest_path()
self.path = self.get_path()
# TODO: 误差抖动使robot没有到达路径上的点导致新起点的rhs=∞可能导致get_path失败 ( 当前版本没有该问题 )
# assert (self.rhs[self.s_start] != float('inf')), "There is no known path!"
# self.path = self.get_path(step_num)
# # debug
# if debug:
# pass
return self.path
def planning(self, s_start, s_goal, dyna_obs, step_num=None, debug=False):
'''
路径规划(供外部调用,处理实际坐标和地图坐标的转换)
'''
# 实际坐标 -> 地图坐标
s_start = self.real2map(s_start)
s_goal = self.real2map(s_goal)
for i in range(len(dyna_obs)):
dyna_obs[i] = self.real2map(dyna_obs[i])
self._planning(s_start, s_goal, dyna_obs, step_num, debug)
# 地图坐标->实际坐标
path = [self.map2real(node) for node in self.path]
return path
def get_path(self):
'''
得到路径
Args:
step_num: 路径步数 (None表示返回完整路径)
return:
path: [(x, y), ...]
'''
if self.s_start is None or self.s_goal == self.s_start:
return []
path = []
cur = self.s_start
# if step_num is None: # 得到完整路径
while cur != self.s_goal:
succ = self.get_neighbors(cur)
cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])]
path.append(cur)
# else:
# for i in range(step_num):
# if cur == self.s_goal:
# break
# succ = self.get_neighbors(cur)
# cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])]
# path.append(cur)
return path
def in_bounds_without_obstacle(self, pos):
'''
判断位置在地图范围内 且 不是静态障碍物
'''
(x, y) = pos
return 0 <= x < self.X and 0 <= y < self.Y and self.map[x, y] != self.object_to_idx["obstacle"]
def get_neighbors(self, pos, mode=8):
'''
获取邻居节点, 地图范围内
'''
(x_, y_) = pos
# results = [(x_+1,y_), (x_-1,y_), (x_, y_+1), (x_,y_-1)]
# if mode == 8:
neighbors = [(x_ + 1, y_), (x_ - 1, y_), (x_, y_ + 1), (x_, y_ - 1), (x_ + 1, y_ + 1), (x_ - 1, y_ + 1),
(x_ + 1, y_ - 1), (x_ - 1, y_ - 1)]
neighbors = filter(self.in_bounds_without_obstacle, neighbors) # 确保位置在地图范围内 且 不是静态障碍物
return list(neighbors)
def compute_cost_map(self):
# 计算当前地图的cost_map
self.cost_map = np.zeros(self.map.shape)
for idx, obj in self.idx_to_object.items():
self.cost_map[self.map == idx] = self.object_to_cost[obj]
def update_map(self, dyna_obs):
'''
更新地图 和 动态障碍物列表
Args:
dyna_obs: 当前动态障碍物位置列表 [(x,y), ...]
return:
update_obj: 改变的位置列表 [(x, y, obj_idx, obj_idx_old), ...]
'''
# dyna_obs没有变化 (集合set可以忽略元素在列表中的位置)
if set(dyna_obs) == set(self.dyna_obs_list):
return []
# 新旧dyna_obs占用位置列表
old_obs_occupy = []
new_obs_occupy = []
for pos in self.dyna_obs_list:
old_obs_occupy.extend(self.get_occupy_pos(pos))
for pos in dyna_obs:
new_obs_occupy.extend(self.get_occupy_pos(pos))
old_obs_occupy = [pos for i, pos in enumerate(old_obs_occupy) if pos not in old_obs_occupy[:i]] # 去除重复位置
new_obs_occupy = [pos for i, pos in enumerate(new_obs_occupy) if pos not in new_obs_occupy[:i]] # 去除重复位置
# 转变为free 和 转变为obs的位置列表
changed_free = [pos for pos in old_obs_occupy if pos not in new_obs_occupy]
changed_obs = [pos for pos in new_obs_occupy if pos not in old_obs_occupy]
# 更新地图计算changed_pos
changed_pos = []
for (x, y) in changed_free:
self.map[x, y] = self.object_to_idx['free']
changed_pos.append((x, y, self.object_to_idx["free"], self.object_to_idx["dynamic obstacle"]))
for (x, y) in changed_obs:
self.map[x, y] = self.object_to_idx['dynamic obstacle']
changed_pos.append((x, y, self.object_to_idx["dynamic obstacle"], self.object_to_idx["free"]))
# 更新动态障碍物列表
self.dyna_obs_list = dyna_obs
return changed_pos
def get_occupy_pos(self, obs_pos):
'''
根据dyna_obs中心位置计算其占用的所有网格位置
'''
(x, y) = obs_pos
occupy_pos = []
for i in range(x-self.dyna_obs_radius, x+self.dyna_obs_radius+1):
for j in range(y-self.dyna_obs_radius, y+self.dyna_obs_radius+1):
occupy_pos.append((i, j))
occupy_pos = filter(self.in_bounds_without_obstacle, occupy_pos) # 确保位置在地图范围内 且 不是静态障碍物
return list(occupy_pos)
def update_cost_map(self, changed_pos):
'''
更新cost_map
Args:
changed_pos: 改变的位置列表 [x, y, idx]
'''
for (x, y, idx, _) in changed_pos:
self.cost_map[x, y] = self.object_to_cost[self.idx_to_object[idx]]
def update_edge_costs(self, changed_pos):
'''
重新计算edge_cost更新受影响节点的rhs
Args:
changed_pos: 改变的位置列表 [(x, y, obj_idx_new, obj_idx_old)]
'''
if not changed_pos: return
self.k_m += heuristic(self.s_last, self.s_start, name='euclidean') # 使用欧式距离 更新km
self.s_last = self.s_start
for pos in changed_pos:
v = (pos[0], pos[1]) # to pos
v_old = self.idx_to_object[pos[3]] # 位置v的旧类型
pred_v = self.get_neighbors(v)
for u in pred_v:
c_old = self.c(u, v, v_old=v_old)
c_new = self.c(u, v)
if c_old > c_new and u != self.s_goal:
self.rhs[u] = min(self.rhs[u], self.c(u, v) + self.g[v])
elif self.rhs[u] == c_old + self.g[v] and u != self.s_goal:
succ_u = self.get_neighbors(u)
self.rhs[u] = min([self.c(u, s_) + self.g[s_] for s_ in succ_u])
self.update_vertex(u)
def map2real(self, pos):
'''
地图坐标->实际坐标
'''
x = pos[0] * self.scale_ratio + self.x_min
y = pos[1] * self.scale_ratio + self.y_min
return tuple((x, y))
def real2map(self, pos, approximation='round'):
'''
实际坐标->地图坐标
'''
if approximation == 'round': # 四舍五入
x = round((pos[0] - self.x_min) / self.scale_ratio)
y = round((pos[1] - self.y_min) / self.scale_ratio)
elif approximation == 'low': # 向下取整
x = math.floor((pos[0] - self.x_min) / self.scale_ratio)
y = math.floor((pos[1] - self.y_min) / self.scale_ratio)
else:
raise Exception("Wrong approximation!")
return tuple((x, y))
def draw_graph(self, step_num):
# 清空当前figure内容保留figure对象
plt.clf()
# for stopping simulation with the esc key. 按esc结束
plt.gcf().canvas.mpl_connect(
'key_release_event',
lambda event: [exit(0) if event.key == 'escape' else None])
# 缩放坐标偏移量
offset = (self.x_min / self.scale_ratio, self.x_max / self.scale_ratio,
self.y_min / self.scale_ratio, self.y_max / self.scale_ratio,)
# 画地图: X行Y列第一行在下面
# 范围: 横向Y[-80,290] 纵向X[-70,120]
plt.imshow(self.map, cmap='binary', alpha=0.5, origin='lower',
extent=(offset[2], offset[3],
offset[0], offset[1]))
# 画起点和目标
plt.plot(self.s_start[1] + offset[2], self.s_start[0] + offset[0], "xr")
plt.plot(self.s_goal[1] + offset[2], self.s_goal[0] + offset[0], "xr")
# 画搜索路径
plt.plot([y + offset[2] for (x, y) in self.path],
[x + offset[0] for (x, y) in self.path], "-g")
# 画移动路径
next_step = min(step_num, len(self.path))
# plt.plot([self.s_start[1] + offset[2], self.path[next_step-1][1] + offset[2]],
# [self.s_start[0] + offset[0], self.path[next_step-1][0] + offset[0]], "-r")
plt.plot([y + offset[2] for (x, y) in self.path[:next_step]],
[x + offset[0] for (x, y) in self.path[:next_step]], "-r")
plt.xlabel('y', loc='right')
plt.ylabel('x', loc='top')
plt.grid(color='black', linestyle='-', linewidth=0.5)
plt.pause(0.3)