开始整合dstar_lite

This commit is contained in:
ChenXL97 2023-11-08 16:20:02 +08:00
parent 39ef2709da
commit c39a13fd5b
19 changed files with 1363 additions and 6 deletions

View File

@ -0,0 +1,2 @@
from robowaiter.algos.navigate.DstarLite.navigate import Navigator as DstarLite

View File

@ -0,0 +1,4 @@
from . import navigate
from . import dstar_lite

View File

@ -0,0 +1,510 @@
'''
基于两个D* lite的实现
按照原始算法的伪代码
自己写的D* lite
'''
import math
import queue
from functools import partial
import numpy as np
import heapq
from matplotlib import pyplot as plt
def diagonal_distance(start, end): #
return max(abs(start[0] - end[0]), abs(start[1] - end[1]))
def manhattan_distance(start, end): # 曼哈顿距离
return abs(start[0] - end[0]) + abs(start[1] - end[1])
def euclidean_distance(start, end): # 欧式距离
# return np.linalg.norm(start-end)
return math.sqrt((start[0] - end[0]) ** 2 + (start[1] - end[1]) ** 2)
def heuristic(start, end, name='euclidean'):
if name == 'diagonal':
return diagonal_distance(start, end)
elif name == 'euclidean':
return euclidean_distance(start, end)
elif name == 'manhattan':
return manhattan_distance(start, end)
else:
raise Exception('Error heuristic name!')
class Priority:
'''
优先级类
'''
def __init__(self, k1, k2):
self.k1 = k1
self.k2 = k2
def __lt__(self, other): # 定义对象的 < 运算
return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 < other.k2)
def __le__(self, other): # 定于对象的 <= 运算
return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 <= other.k2)
class Node:
'''
节点类
'''
def __init__(self, pos: (int, int), priority: Priority):
self.pos = pos # X Y
self.priority = priority
def __lt__(self, other): # 定义对象的 < 运算
return self.priority < other.priority
def __le__(self, other): # 定于对象的 <= 运算
return self.priority <= other.priority
class PriorityQueue:
def __init__(self):
self.queue = [] # 节点队列
self.nodes = [] # 节点位置列表
def is_empty(self):
# 队列判空
return len(self.queue) == 0
def top(self):
return self.queue[0].pos
def top_key(self):
if self.is_empty():
return Priority(float('inf'), float('inf'))
return self.queue[0].priority
def pop(self):
# 取出第一个节点
node = heapq.heappop(self.queue)
self.nodes.remove(node.pos)
return node
def insert(self, pos, priority):
# 创建并添加节点
node = Node(pos, priority)
heapq.heappush(self.queue, node)
self.nodes.append(pos)
def remove(self, pos):
# 删除指定节点并重新排序
self.queue = [n for n in self.queue if n.pos != pos]
heapq.heapify(self.queue) # 重排序
self.nodes.remove(pos)
def update(self, pos, priority):
# 更新指定位置的priority值
for n in self.queue:
if n.pos == pos:
n.priority = priority
break
class DStarLite:
def __init__(self,
map: np.array([int, int]), # [X, Y]
area_range, # [x_min, x_max, y_min, y_max] 实际坐标范围
scale_ratio=5, # 地图缩放率
dyna_obs_radius=20 # dyna_obs实际身位半径
):
# self.area_bounds = area
self.map = map
self.background = map
self.X = map.shape[0]
self.Y = map.shape[1]
(self.x_min, self.x_max, self.y_min, self.y_max) = area_range
self.scale_ratio = scale_ratio
self.dyna_obs_list = [] # 动态障碍物位置列表( 当前地图 ) [(x, y)]
self.dyna_obs_radius = math.ceil(dyna_obs_radius/scale_ratio) # dyna_obs缩放后身位半径
# free:0, obs:1, dyna_obs:2
self.idx_to_object = {
0: "free",
1: "obstacle",
2: "dynamic obstacle"
}
self.object_to_idx = dict(zip(self.idx_to_object.values(), self.idx_to_object.keys()))
self.object_to_cost = {
"free": 0,
"obstacle": float('inf'),
"dynamic obstacle": 50
}
self.compute_cost_map()
self.s_start = None # (int,int) 必须是元组(元组可以直接当作矩阵索引)
self.s_goal = None # (int,int)
self.s_last = None # (int,int)
self.U = PriorityQueue()
self.k_m = 0
self.rhs = np.ones((self.X, self.Y)) * np.inf
self.g = self.rhs.copy()
self.path = []
def set_map(self, map_):
'''
设置map cost_map
'''
self.map = map_
self.X = map_.shape[0]
self.Y = map_.shape[1]
self.compute_cost_map()
def reset(self):
'''
(完成一次导航后)
重置 1.环境变量
2.dstar_lite变量
'''
# env reset
self.map = self.background.copy()
self.compute_cost_map()
self.dyna_obs_list.clear()
self.path.clear()
# dstar_lite reset
self.s_start = None
self.s_goal = None
self.s_last = None
self.U = PriorityQueue()
self.k_m = 0
self.rhs = np.ones((self.X, self.Y)) * np.inf
self.g = self.rhs.copy()
def calculate_key(self, s: (int, int)):
'''
计算 位置s key1, key2
:returns
Priority(k1, k2): 可比较的对象
'''
k1 = min(self.g[s], self.rhs[s]) + heuristic(self.s_start, s) + self.k_m
k2 = min(self.g[s], self.rhs[s])
return Priority(k1, k2)
def c(self, u: (int, int), v: (int, int), v_old=None) -> float:
'''
计算节点间的 路径代价 目标位置代价 (目标位置代价为0时采用路径代价)
(因为是终点->起点的扩展方向因此v是nodeu是v扩展的neighbor)
Args:
u: from pos
v: to pos
v_old: 指定的v的类型
'''
if v_old:
obj_cost = self.object_to_cost[v_old]
else:
obj_cost = self.cost_map[v]
if obj_cost > 0:
return obj_cost
return heuristic(u, v)
def contain(self, u: (int, int)):
'''
判断 节点u 是否在队列中
'''
return u in self.U.nodes
def update_vertex(self, u: (int, int)):
'''
判定节点状态, 更新队列
不一致且在队列 --> 更新key
不一致且不在队列 --> 计算key并加入队列
一致且在队列 --> 移出队列
'''
if self.g[u] != self.rhs[u] and self.contain(u):
self.U.update(u, self.calculate_key(u))
elif self.g[u] != self.rhs[u] and not self.contain(u):
self.U.insert(u, self.calculate_key(u))
elif self.g[u] == self.rhs[u] and self.contain(u):
self.U.remove(u)
def compute_shortest_path(self):
'''
计算最短路径
'''
while self.U.top_key() < self.calculate_key(self.s_start) or self.rhs[self.s_start] > self.g[self.s_start]:
u = self.U.top()
k_old = self.U.top_key()
k_new = self.calculate_key(u)
if k_old < k_new:
self.U.update(u, k_new)
elif self.g[u] > self.rhs[u]: # 过一致
self.g[u] = self.rhs[u]
self.U.remove(u)
pred = self.get_neighbors(u)
for s in pred:
if s != self.s_goal:
self.rhs[s] = min(self.rhs[s], self.c(s, u) + self.g[u])
self.update_vertex(s)
else: # 欠一致
g_old = self.g[u]
self.g[u] = float('inf')
pred = self.get_neighbors(u)
for s in pred + [u]:
if self.rhs[s] == self.c(s, u) + g_old:
if s != self.s_goal:
succ = self.get_neighbors(s)
self.rhs[s] = min([self.c(s, s_) + self.g[s_] for s_ in succ])
self.update_vertex(s)
def _planning(self, s_start, s_goal, dyna_obs, step_num=None, debug=False):
'''
规划路径(实际实现)
Args:
dyna_obs: 动态障碍物位置列表
step_num: 单次移动步数
'''
# 确保目标合法
if not self.in_bounds_without_obstacle(s_goal):
return None
# 第一次规划需要初始化rhs并将goal加入队列计算最短路径
if self.s_goal is None:
self.s_start = tuple(s_start)
self.s_goal = tuple(s_goal)
self.s_last = tuple(s_start)
self.rhs[tuple(s_goal)] = 0
self.U.insert(tuple(s_goal), Priority(k1=heuristic(tuple(s_start), tuple(s_goal)), k2=0))
self.compute_shortest_path() # 计算最短路径
self.path = self.get_path()
# 后续规划只更新起点,直接使用原路径(去掉已走过部分)
else:
self.s_start = tuple(s_start)
self.path = self.path[step_num:]
# 根据obs更新map, cost_map, edge_cost
changed_pos = self.update_map(dyna_obs=dyna_obs)
if changed_pos:
self.update_cost_map(changed_pos)
self.update_edge_costs(changed_pos)
# 若地图改变,重新计算最短路径
self.compute_shortest_path()
self.path = self.get_path()
# TODO: 误差抖动使robot没有到达路径上的点导致新起点的rhs=∞可能导致get_path失败 ( 当前版本没有该问题 )
# assert (self.rhs[self.s_start] != float('inf')), "There is no known path!"
# self.path = self.get_path(step_num)
# # debug
# if debug:
# pass
return self.path
def planning(self, s_start, s_goal, dyna_obs, step_num=None, debug=False):
'''
路径规划(供外部调用处理实际坐标和地图坐标的转换)
'''
# 实际坐标 -> 地图坐标
s_start = self.real2map(s_start)
s_goal = self.real2map(s_goal)
for i in range(len(dyna_obs)):
dyna_obs[i] = self.real2map(dyna_obs[i])
self._planning(s_start, s_goal, dyna_obs, step_num, debug)
# 地图坐标->实际坐标
path = [self.map2real(node) for node in self.path]
return path
def get_path(self):
'''
得到路径
Args:
step_num: 路径步数 (None表示返回完整路径)
return:
path: [(x, y), ...]
'''
if self.s_start is None or self.s_goal == self.s_start:
return []
path = []
cur = self.s_start
# if step_num is None: # 得到完整路径
while cur != self.s_goal:
succ = self.get_neighbors(cur)
cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])]
path.append(cur)
# else:
# for i in range(step_num):
# if cur == self.s_goal:
# break
# succ = self.get_neighbors(cur)
# cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])]
# path.append(cur)
return path
def in_bounds_without_obstacle(self, pos):
'''
判断位置在地图范围内 不是静态障碍物
'''
(x, y) = pos
return 0 <= x < self.X and 0 <= y < self.Y and self.map[x, y] != self.object_to_idx["obstacle"]
def get_neighbors(self, pos, mode=8):
'''
获取邻居节点, 地图范围内
'''
(x_, y_) = pos
# results = [(x_+1,y_), (x_-1,y_), (x_, y_+1), (x_,y_-1)]
# if mode == 8:
neighbors = [(x_ + 1, y_), (x_ - 1, y_), (x_, y_ + 1), (x_, y_ - 1), (x_ + 1, y_ + 1), (x_ - 1, y_ + 1),
(x_ + 1, y_ - 1), (x_ - 1, y_ - 1)]
neighbors = filter(self.in_bounds_without_obstacle, neighbors) # 确保位置在地图范围内 且 不是静态障碍物
return list(neighbors)
def compute_cost_map(self):
# 计算当前地图的cost_map
self.cost_map = np.zeros(self.map.shape)
for idx, obj in self.idx_to_object.items():
self.cost_map[self.map == idx] = self.object_to_cost[obj]
def update_map(self, dyna_obs):
'''
更新地图 动态障碍物列表
Args:
dyna_obs: 当前动态障碍物位置列表 [(x,y), ...]
return:
update_obj: 改变的位置列表 [(x, y, obj_idx, obj_idx_old), ...]
'''
# dyna_obs没有变化 (集合set可以忽略元素在列表中的位置)
if set(dyna_obs) == set(self.dyna_obs_list):
return []
# 新旧dyna_obs占用位置列表
old_obs_occupy = []
new_obs_occupy = []
for pos in self.dyna_obs_list:
old_obs_occupy.extend(self.get_occupy_pos(pos))
for pos in dyna_obs:
new_obs_occupy.extend(self.get_occupy_pos(pos))
old_obs_occupy = [pos for i, pos in enumerate(old_obs_occupy) if pos not in old_obs_occupy[:i]] # 去除重复位置
new_obs_occupy = [pos for i, pos in enumerate(new_obs_occupy) if pos not in new_obs_occupy[:i]] # 去除重复位置
# 转变为free 和 转变为obs的位置列表
changed_free = [pos for pos in old_obs_occupy if pos not in new_obs_occupy]
changed_obs = [pos for pos in new_obs_occupy if pos not in old_obs_occupy]
# 更新地图计算changed_pos
changed_pos = []
for (x, y) in changed_free:
self.map[x, y] = self.object_to_idx['free']
changed_pos.append((x, y, self.object_to_idx["free"], self.object_to_idx["dynamic obstacle"]))
for (x, y) in changed_obs:
self.map[x, y] = self.object_to_idx['dynamic obstacle']
changed_pos.append((x, y, self.object_to_idx["dynamic obstacle"], self.object_to_idx["free"]))
# 更新动态障碍物列表
self.dyna_obs_list = dyna_obs
return changed_pos
def get_occupy_pos(self, obs_pos):
'''
根据dyna_obs中心位置计算其占用的所有网格位置
'''
(x, y) = obs_pos
occupy_pos = []
for i in range(x-self.dyna_obs_radius, x+self.dyna_obs_radius+1):
for j in range(y-self.dyna_obs_radius, y+self.dyna_obs_radius+1):
occupy_pos.append((i, j))
occupy_pos = filter(self.in_bounds_without_obstacle, occupy_pos) # 确保位置在地图范围内 且 不是静态障碍物
return list(occupy_pos)
def update_cost_map(self, changed_pos):
'''
更新cost_map
Args:
changed_pos: 改变的位置列表 [x, y, idx]
'''
for (x, y, idx, _) in changed_pos:
self.cost_map[x, y] = self.object_to_cost[self.idx_to_object[idx]]
def update_edge_costs(self, changed_pos):
'''
重新计算edge_cost更新受影响节点的rhs
Args:
changed_pos: 改变的位置列表 [(x, y, obj_idx_new, obj_idx_old)]
'''
if not changed_pos: return
self.k_m += heuristic(self.s_last, self.s_start, name='euclidean') # 使用欧式距离 更新km
self.s_last = self.s_start
for pos in changed_pos:
v = (pos[0], pos[1]) # to pos
v_old = self.idx_to_object[pos[3]] # 位置v的旧类型
pred_v = self.get_neighbors(v)
for u in pred_v:
c_old = self.c(u, v, v_old=v_old)
c_new = self.c(u, v)
if c_old > c_new and u != self.s_goal:
self.rhs[u] = min(self.rhs[u], self.c(u, v) + self.g[v])
elif self.rhs[u] == c_old + self.g[v] and u != self.s_goal:
succ_u = self.get_neighbors(u)
self.rhs[u] = min([self.c(u, s_) + self.g[s_] for s_ in succ_u])
self.update_vertex(u)
def map2real(self, pos):
'''
地图坐标->实际坐标
'''
x = pos[0] * self.scale_ratio + self.x_min
y = pos[1] * self.scale_ratio + self.y_min
return tuple((x, y))
def real2map(self, pos, approximation='round'):
'''
实际坐标->地图坐标
'''
if approximation == 'round': # 四舍五入
x = round((pos[0] - self.x_min) / self.scale_ratio)
y = round((pos[1] - self.y_min) / self.scale_ratio)
elif approximation == 'low': # 向下取整
x = math.floor((pos[0] - self.x_min) / self.scale_ratio)
y = math.floor((pos[1] - self.y_min) / self.scale_ratio)
else:
raise Exception("Wrong approximation!")
return tuple((x, y))
def draw_graph(self, step_num):
# 清空当前figure内容保留figure对象
plt.clf()
# for stopping simulation with the esc key. 按esc结束
plt.gcf().canvas.mpl_connect(
'key_release_event',
lambda event: [exit(0) if event.key == 'escape' else None])
# 缩放坐标偏移量
offset = (self.x_min / self.scale_ratio, self.x_max / self.scale_ratio,
self.y_min / self.scale_ratio, self.y_max / self.scale_ratio,)
# 画地图: X行Y列第一行在下面
# 范围: 横向Y[-80,290] 纵向X[-70,120]
plt.imshow(self.map, cmap='binary', alpha=0.5, origin='lower',
extent=(offset[2], offset[3],
offset[0], offset[1]))
# 画起点和目标
plt.plot(self.s_start[1] + offset[2], self.s_start[0] + offset[0], "xr")
plt.plot(self.s_goal[1] + offset[2], self.s_goal[0] + offset[0], "xr")
# 画搜索路径
plt.plot([y + offset[2] for (x, y) in self.path],
[x + offset[0] for (x, y) in self.path], "-g")
# 画移动路径
next_step = min(step_num, len(self.path))
# plt.plot([self.s_start[1] + offset[2], self.path[next_step-1][1] + offset[2]],
# [self.s_start[0] + offset[0], self.path[next_step-1][0] + offset[0]], "-r")
plt.plot([y + offset[2] for (x, y) in self.path[:next_step]],
[x + offset[0] for (x, y) in self.path[:next_step]], "-r")
plt.xlabel('y', loc='right')
plt.ylabel('x', loc='top')
plt.grid(color='black', linestyle='-', linewidth=0.5)
plt.pause(0.3)

Binary file not shown.

View File

@ -0,0 +1,77 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import math
import sys
import time
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
from dstar_lite import DStarLite
class Navigator:
'''
导航类
'''
def __init__(self, scene, area_range, map, scale_ratio=5):
self.scene = scene
self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax
self.map = map # 缩放并离散化的地图 array(X,Y)
self.scale_ratio = scale_ratio # 地图缩放率
self.step_length = 50 # 步长(单次移动)
self.step_num = self.step_length // self.scale_ratio # 单次移动地图格数
self.v = 200 # 速度
self.step_time = self.step_length/self.v + 0.1 # 单步移动时长
self.planner = DStarLite(area_range=area_range, map=map, scale_ratio=scale_ratio)
@staticmethod
def is_reached(pos: np.array((float, float)), goal: np.array((float, float)), dis_limit=25):
'''
判断是否到达目标
'''
dis = np.linalg.norm(pos - goal)
return dis < dis_limit
def reset_goal(self, goal:(float, float)):
# TODO: 使目标可达
# 目标在障碍物上:从目标开始方形向外扩展,直到找到可行点
# 目标在地图外面:起点和目标连线最靠近目标的可行点
pass
def navigate(self, goal: (float, float), animation=True):
'''
单次导航直到到达目标
'''
pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) # 机器人当前: 位置 和 朝向
yaw = self.scene.status.rotation.Yaw
print('------------------navigation_start----------------------')
while not self.is_reached(pos, goal):
dyna_obs = [(walker.pose.X, walker.pose.Y) for walker in self.scene.status.walkers] # 动态障碍物(顾客)位置列表
path = self.planner.planning(pos, goal, dyna_obs, step_num=self.step_num)
if path:
next_step = min(self.step_num, len(path))
(next_x, next_y) = path[next_step-1]
print('plan pos:', (next_x, next_y), end=' ')
self.scene.walk_to(next_x, next_y, yaw, velocity=self.v)
if animation:
self.planner.draw_graph(self.step_num) # 画出搜索路径
time.sleep(self.step_time)
pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
print('reach pos:', pos)
self.planner.reset() # 完成一轮导航,重置变量
if self.is_reached(pos, goal):
print('The robot has achieved goal !!')

View File

@ -0,0 +1,4 @@
### dstar_lite.py ——Dstar lite算法文件
### navigate.py ——导航类
### test.py ——测试文件
### map_5.pkl ——离散化地图文件

View File

@ -0,0 +1,72 @@
import os
import pickle
import time
import random
import matplotlib.pyplot as plt
import numpy as np
from robowaiter.scene.scene import Scene,init_world # TODO: 文件名改成Scene.py
from navigate import Navigator
if __name__ == '__main__':
file_name = 'map_5.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
init_world(1, 3)
scene = Scene(sceneID=0)
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map)
'''场景1: 无行人环境 robot到达指定目标'''
# goal = np.array((-100, 700))
'''场景2: 静止行人环境 robot到达指定目标'''
# scene.clean_walker()
# scene.add_walker(50, 300, 0)
# scene.add_walker(-50, 500, 0)
# goal = np.array((-100, 700))
'''场景3: 移动行人环境 robot到达指定目标'''
scene.clean_walker()
scene.add_walker(50, 300, 0)
scene.add_walker(-50, 500, 0)
scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=False, speed=20, X=-50, Y=600, Yaw=0)])
scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=False, speed=20, X=100, Y=150, Yaw=0)])
goal = np.array((-100, 700))
'''场景4: 行人自由移动 robot到达指定目标'''
# # TODO: autowalk=True仿真器会闪退 ???
# scene.clean_walker()
# scene.add_walker(50, 300, 0)
# scene.add_walker(-50, 500, 0)
# scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=True, speed=20, X=0, Y=0, Yaw=0)])
# scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=True, speed=20, X=0, Y=0, Yaw=0)])
# time.sleep(5)
# goal = np.array((-100, 700))
navigator.navigate(goal, animation=False)
scene.clean_walker()
print(scene.status.collision) # TODO: 不显示碰撞信息 ???

View File

View File

@ -1,10 +1,11 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Act import Act
class Grasp(ptree.behaviour.Behaviour):
class ExploreEnv(Act):
def __init__(self, name: str, scene):
super().__init__(name)
def __init__(self, *args):
super().__init__(*args)
def setup(self, **kwargs: Any) -> None:
return super().setup(**kwargs)

View File

@ -4,8 +4,8 @@ from robowaiter.behavior_lib._base.Act import Act
class MoveTo(Act):
def __init__(self, name: str, scene, a, b, c, d):
super().__init__(name)
def __init__(self, *args):
super().__init__(*args)
def setup(self, **kwargs: Any) -> None:
return super().setup(**kwargs)

View File

@ -0,0 +1,14 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Cond import Cond
class HasMap(Cond):
def __init__(self):
super().__init__()
def _update(self) -> ptree.common.Status:
# if self.scene.status?
if self.scene.state['map']['2d'] == None:
return ptree.common.Status.FAILURE
else:
return ptree.common.Status.SUCCESS

View File

@ -1,4 +1,8 @@
selector{
selector{
cond HasMap()
act ExploreEnv
}
sequence{
cond Chatting()
act DealChat()

View File

@ -21,7 +21,8 @@ class Robot(object):
self.bt = load_bt_from_ptml(self.scene, self.ptml_path,self.behavior_lib_path)
sub_task_seq = find_node_by_name(self.bt.root,"SubTaskPlaceHolder").parent
sub_task_seq.children.pop()
print(sub_task_seq.children)
self.scene.sub_task_seq = sub_task_seq
def step(self):
if self.scene.time > self.next_response_time:

View File

@ -0,0 +1,4 @@
from . import navigate
from . import dstar_lite

View File

@ -0,0 +1,510 @@
'''
基于两个D* lite的实现
按照原始算法的伪代码
自己写的D* lite
'''
import math
import queue
from functools import partial
import numpy as np
import heapq
from matplotlib import pyplot as plt
def diagonal_distance(start, end): #
return max(abs(start[0] - end[0]), abs(start[1] - end[1]))
def manhattan_distance(start, end): # 曼哈顿距离
return abs(start[0] - end[0]) + abs(start[1] - end[1])
def euclidean_distance(start, end): # 欧式距离
# return np.linalg.norm(start-end)
return math.sqrt((start[0] - end[0]) ** 2 + (start[1] - end[1]) ** 2)
def heuristic(start, end, name='euclidean'):
if name == 'diagonal':
return diagonal_distance(start, end)
elif name == 'euclidean':
return euclidean_distance(start, end)
elif name == 'manhattan':
return manhattan_distance(start, end)
else:
raise Exception('Error heuristic name!')
class Priority:
'''
优先级类
'''
def __init__(self, k1, k2):
self.k1 = k1
self.k2 = k2
def __lt__(self, other): # 定义对象的 < 运算
return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 < other.k2)
def __le__(self, other): # 定于对象的 <= 运算
return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 <= other.k2)
class Node:
'''
节点类
'''
def __init__(self, pos: (int, int), priority: Priority):
self.pos = pos # X Y
self.priority = priority
def __lt__(self, other): # 定义对象的 < 运算
return self.priority < other.priority
def __le__(self, other): # 定于对象的 <= 运算
return self.priority <= other.priority
class PriorityQueue:
def __init__(self):
self.queue = [] # 节点队列
self.nodes = [] # 节点位置列表
def is_empty(self):
# 队列判空
return len(self.queue) == 0
def top(self):
return self.queue[0].pos
def top_key(self):
if self.is_empty():
return Priority(float('inf'), float('inf'))
return self.queue[0].priority
def pop(self):
# 取出第一个节点
node = heapq.heappop(self.queue)
self.nodes.remove(node.pos)
return node
def insert(self, pos, priority):
# 创建并添加节点
node = Node(pos, priority)
heapq.heappush(self.queue, node)
self.nodes.append(pos)
def remove(self, pos):
# 删除指定节点并重新排序
self.queue = [n for n in self.queue if n.pos != pos]
heapq.heapify(self.queue) # 重排序
self.nodes.remove(pos)
def update(self, pos, priority):
# 更新指定位置的priority值
for n in self.queue:
if n.pos == pos:
n.priority = priority
break
class DStarLite:
def __init__(self,
map: np.array([int, int]), # [X, Y]
area_range, # [x_min, x_max, y_min, y_max] 实际坐标范围
scale_ratio=5, # 地图缩放率
dyna_obs_radius=20 # dyna_obs实际身位半径
):
# self.area_bounds = area
self.map = map
self.background = map
self.X = map.shape[0]
self.Y = map.shape[1]
(self.x_min, self.x_max, self.y_min, self.y_max) = area_range
self.scale_ratio = scale_ratio
self.dyna_obs_list = [] # 动态障碍物位置列表( 当前地图 ) [(x, y)]
self.dyna_obs_radius = math.ceil(dyna_obs_radius/scale_ratio) # dyna_obs缩放后身位半径
# free:0, obs:1, dyna_obs:2
self.idx_to_object = {
0: "free",
1: "obstacle",
2: "dynamic obstacle"
}
self.object_to_idx = dict(zip(self.idx_to_object.values(), self.idx_to_object.keys()))
self.object_to_cost = {
"free": 0,
"obstacle": float('inf'),
"dynamic obstacle": 50
}
self.compute_cost_map()
self.s_start = None # (int,int) 必须是元组(元组可以直接当作矩阵索引)
self.s_goal = None # (int,int)
self.s_last = None # (int,int)
self.U = PriorityQueue()
self.k_m = 0
self.rhs = np.ones((self.X, self.Y)) * np.inf
self.g = self.rhs.copy()
self.path = []
def set_map(self, map_):
'''
设置map cost_map
'''
self.map = map_
self.X = map_.shape[0]
self.Y = map_.shape[1]
self.compute_cost_map()
def reset(self):
'''
(完成一次导航后)
重置 1.环境变量
2.dstar_lite变量
'''
# env reset
self.map = self.background.copy()
self.compute_cost_map()
self.dyna_obs_list.clear()
self.path.clear()
# dstar_lite reset
self.s_start = None
self.s_goal = None
self.s_last = None
self.U = PriorityQueue()
self.k_m = 0
self.rhs = np.ones((self.X, self.Y)) * np.inf
self.g = self.rhs.copy()
def calculate_key(self, s: (int, int)):
'''
计算 位置s key1, key2
:returns
Priority(k1, k2): 可比较的对象
'''
k1 = min(self.g[s], self.rhs[s]) + heuristic(self.s_start, s) + self.k_m
k2 = min(self.g[s], self.rhs[s])
return Priority(k1, k2)
def c(self, u: (int, int), v: (int, int), v_old=None) -> float:
'''
计算节点间的 路径代价 目标位置代价 (目标位置代价为0时采用路径代价)
(因为是终点->起点的扩展方向因此v是nodeu是v扩展的neighbor)
Args:
u: from pos
v: to pos
v_old: 指定的v的类型
'''
if v_old:
obj_cost = self.object_to_cost[v_old]
else:
obj_cost = self.cost_map[v]
if obj_cost > 0:
return obj_cost
return heuristic(u, v)
def contain(self, u: (int, int)):
'''
判断 节点u 是否在队列中
'''
return u in self.U.nodes
def update_vertex(self, u: (int, int)):
'''
判定节点状态, 更新队列
不一致且在队列 --> 更新key
不一致且不在队列 --> 计算key并加入队列
一致且在队列 --> 移出队列
'''
if self.g[u] != self.rhs[u] and self.contain(u):
self.U.update(u, self.calculate_key(u))
elif self.g[u] != self.rhs[u] and not self.contain(u):
self.U.insert(u, self.calculate_key(u))
elif self.g[u] == self.rhs[u] and self.contain(u):
self.U.remove(u)
def compute_shortest_path(self):
'''
计算最短路径
'''
while self.U.top_key() < self.calculate_key(self.s_start) or self.rhs[self.s_start] > self.g[self.s_start]:
u = self.U.top()
k_old = self.U.top_key()
k_new = self.calculate_key(u)
if k_old < k_new:
self.U.update(u, k_new)
elif self.g[u] > self.rhs[u]: # 过一致
self.g[u] = self.rhs[u]
self.U.remove(u)
pred = self.get_neighbors(u)
for s in pred:
if s != self.s_goal:
self.rhs[s] = min(self.rhs[s], self.c(s, u) + self.g[u])
self.update_vertex(s)
else: # 欠一致
g_old = self.g[u]
self.g[u] = float('inf')
pred = self.get_neighbors(u)
for s in pred + [u]:
if self.rhs[s] == self.c(s, u) + g_old:
if s != self.s_goal:
succ = self.get_neighbors(s)
self.rhs[s] = min([self.c(s, s_) + self.g[s_] for s_ in succ])
self.update_vertex(s)
def _planning(self, s_start, s_goal, dyna_obs, step_num=None, debug=False):
'''
规划路径(实际实现)
Args:
dyna_obs: 动态障碍物位置列表
step_num: 单次移动步数
'''
# 确保目标合法
if not self.in_bounds_without_obstacle(s_goal):
return None
# 第一次规划需要初始化rhs并将goal加入队列计算最短路径
if self.s_goal is None:
self.s_start = tuple(s_start)
self.s_goal = tuple(s_goal)
self.s_last = tuple(s_start)
self.rhs[tuple(s_goal)] = 0
self.U.insert(tuple(s_goal), Priority(k1=heuristic(tuple(s_start), tuple(s_goal)), k2=0))
self.compute_shortest_path() # 计算最短路径
self.path = self.get_path()
# 后续规划只更新起点,直接使用原路径(去掉已走过部分)
else:
self.s_start = tuple(s_start)
self.path = self.path[step_num:]
# 根据obs更新map, cost_map, edge_cost
changed_pos = self.update_map(dyna_obs=dyna_obs)
if changed_pos:
self.update_cost_map(changed_pos)
self.update_edge_costs(changed_pos)
# 若地图改变,重新计算最短路径
self.compute_shortest_path()
self.path = self.get_path()
# TODO: 误差抖动使robot没有到达路径上的点导致新起点的rhs=∞可能导致get_path失败 ( 当前版本没有该问题 )
# assert (self.rhs[self.s_start] != float('inf')), "There is no known path!"
# self.path = self.get_path(step_num)
# # debug
# if debug:
# pass
return self.path
def planning(self, s_start, s_goal, dyna_obs, step_num=None, debug=False):
'''
路径规划(供外部调用处理实际坐标和地图坐标的转换)
'''
# 实际坐标 -> 地图坐标
s_start = self.real2map(s_start)
s_goal = self.real2map(s_goal)
for i in range(len(dyna_obs)):
dyna_obs[i] = self.real2map(dyna_obs[i])
self._planning(s_start, s_goal, dyna_obs, step_num, debug)
# 地图坐标->实际坐标
path = [self.map2real(node) for node in self.path]
return path
def get_path(self):
'''
得到路径
Args:
step_num: 路径步数 (None表示返回完整路径)
return:
path: [(x, y), ...]
'''
if self.s_start is None or self.s_goal == self.s_start:
return []
path = []
cur = self.s_start
# if step_num is None: # 得到完整路径
while cur != self.s_goal:
succ = self.get_neighbors(cur)
cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])]
path.append(cur)
# else:
# for i in range(step_num):
# if cur == self.s_goal:
# break
# succ = self.get_neighbors(cur)
# cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])]
# path.append(cur)
return path
def in_bounds_without_obstacle(self, pos):
'''
判断位置在地图范围内 不是静态障碍物
'''
(x, y) = pos
return 0 <= x < self.X and 0 <= y < self.Y and self.map[x, y] != self.object_to_idx["obstacle"]
def get_neighbors(self, pos, mode=8):
'''
获取邻居节点, 地图范围内
'''
(x_, y_) = pos
# results = [(x_+1,y_), (x_-1,y_), (x_, y_+1), (x_,y_-1)]
# if mode == 8:
neighbors = [(x_ + 1, y_), (x_ - 1, y_), (x_, y_ + 1), (x_, y_ - 1), (x_ + 1, y_ + 1), (x_ - 1, y_ + 1),
(x_ + 1, y_ - 1), (x_ - 1, y_ - 1)]
neighbors = filter(self.in_bounds_without_obstacle, neighbors) # 确保位置在地图范围内 且 不是静态障碍物
return list(neighbors)
def compute_cost_map(self):
# 计算当前地图的cost_map
self.cost_map = np.zeros(self.map.shape)
for idx, obj in self.idx_to_object.items():
self.cost_map[self.map == idx] = self.object_to_cost[obj]
def update_map(self, dyna_obs):
'''
更新地图 动态障碍物列表
Args:
dyna_obs: 当前动态障碍物位置列表 [(x,y), ...]
return:
update_obj: 改变的位置列表 [(x, y, obj_idx, obj_idx_old), ...]
'''
# dyna_obs没有变化 (集合set可以忽略元素在列表中的位置)
if set(dyna_obs) == set(self.dyna_obs_list):
return []
# 新旧dyna_obs占用位置列表
old_obs_occupy = []
new_obs_occupy = []
for pos in self.dyna_obs_list:
old_obs_occupy.extend(self.get_occupy_pos(pos))
for pos in dyna_obs:
new_obs_occupy.extend(self.get_occupy_pos(pos))
old_obs_occupy = [pos for i, pos in enumerate(old_obs_occupy) if pos not in old_obs_occupy[:i]] # 去除重复位置
new_obs_occupy = [pos for i, pos in enumerate(new_obs_occupy) if pos not in new_obs_occupy[:i]] # 去除重复位置
# 转变为free 和 转变为obs的位置列表
changed_free = [pos for pos in old_obs_occupy if pos not in new_obs_occupy]
changed_obs = [pos for pos in new_obs_occupy if pos not in old_obs_occupy]
# 更新地图计算changed_pos
changed_pos = []
for (x, y) in changed_free:
self.map[x, y] = self.object_to_idx['free']
changed_pos.append((x, y, self.object_to_idx["free"], self.object_to_idx["dynamic obstacle"]))
for (x, y) in changed_obs:
self.map[x, y] = self.object_to_idx['dynamic obstacle']
changed_pos.append((x, y, self.object_to_idx["dynamic obstacle"], self.object_to_idx["free"]))
# 更新动态障碍物列表
self.dyna_obs_list = dyna_obs
return changed_pos
def get_occupy_pos(self, obs_pos):
'''
根据dyna_obs中心位置计算其占用的所有网格位置
'''
(x, y) = obs_pos
occupy_pos = []
for i in range(x-self.dyna_obs_radius, x+self.dyna_obs_radius+1):
for j in range(y-self.dyna_obs_radius, y+self.dyna_obs_radius+1):
occupy_pos.append((i, j))
occupy_pos = filter(self.in_bounds_without_obstacle, occupy_pos) # 确保位置在地图范围内 且 不是静态障碍物
return list(occupy_pos)
def update_cost_map(self, changed_pos):
'''
更新cost_map
Args:
changed_pos: 改变的位置列表 [x, y, idx]
'''
for (x, y, idx, _) in changed_pos:
self.cost_map[x, y] = self.object_to_cost[self.idx_to_object[idx]]
def update_edge_costs(self, changed_pos):
'''
重新计算edge_cost更新受影响节点的rhs
Args:
changed_pos: 改变的位置列表 [(x, y, obj_idx_new, obj_idx_old)]
'''
if not changed_pos: return
self.k_m += heuristic(self.s_last, self.s_start, name='euclidean') # 使用欧式距离 更新km
self.s_last = self.s_start
for pos in changed_pos:
v = (pos[0], pos[1]) # to pos
v_old = self.idx_to_object[pos[3]] # 位置v的旧类型
pred_v = self.get_neighbors(v)
for u in pred_v:
c_old = self.c(u, v, v_old=v_old)
c_new = self.c(u, v)
if c_old > c_new and u != self.s_goal:
self.rhs[u] = min(self.rhs[u], self.c(u, v) + self.g[v])
elif self.rhs[u] == c_old + self.g[v] and u != self.s_goal:
succ_u = self.get_neighbors(u)
self.rhs[u] = min([self.c(u, s_) + self.g[s_] for s_ in succ_u])
self.update_vertex(u)
def map2real(self, pos):
'''
地图坐标->实际坐标
'''
x = pos[0] * self.scale_ratio + self.x_min
y = pos[1] * self.scale_ratio + self.y_min
return tuple((x, y))
def real2map(self, pos, approximation='round'):
'''
实际坐标->地图坐标
'''
if approximation == 'round': # 四舍五入
x = round((pos[0] - self.x_min) / self.scale_ratio)
y = round((pos[1] - self.y_min) / self.scale_ratio)
elif approximation == 'low': # 向下取整
x = math.floor((pos[0] - self.x_min) / self.scale_ratio)
y = math.floor((pos[1] - self.y_min) / self.scale_ratio)
else:
raise Exception("Wrong approximation!")
return tuple((x, y))
def draw_graph(self, step_num):
# 清空当前figure内容保留figure对象
plt.clf()
# for stopping simulation with the esc key. 按esc结束
plt.gcf().canvas.mpl_connect(
'key_release_event',
lambda event: [exit(0) if event.key == 'escape' else None])
# 缩放坐标偏移量
offset = (self.x_min / self.scale_ratio, self.x_max / self.scale_ratio,
self.y_min / self.scale_ratio, self.y_max / self.scale_ratio,)
# 画地图: X行Y列第一行在下面
# 范围: 横向Y[-80,290] 纵向X[-70,120]
plt.imshow(self.map, cmap='binary', alpha=0.5, origin='lower',
extent=(offset[2], offset[3],
offset[0], offset[1]))
# 画起点和目标
plt.plot(self.s_start[1] + offset[2], self.s_start[0] + offset[0], "xr")
plt.plot(self.s_goal[1] + offset[2], self.s_goal[0] + offset[0], "xr")
# 画搜索路径
plt.plot([y + offset[2] for (x, y) in self.path],
[x + offset[0] for (x, y) in self.path], "-g")
# 画移动路径
next_step = min(step_num, len(self.path))
# plt.plot([self.s_start[1] + offset[2], self.path[next_step-1][1] + offset[2]],
# [self.s_start[0] + offset[0], self.path[next_step-1][0] + offset[0]], "-r")
plt.plot([y + offset[2] for (x, y) in self.path[:next_step]],
[x + offset[0] for (x, y) in self.path[:next_step]], "-r")
plt.xlabel('y', loc='right')
plt.ylabel('x', loc='top')
plt.grid(color='black', linestyle='-', linewidth=0.5)
plt.pause(0.3)

Binary file not shown.

View File

@ -0,0 +1,78 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import math
import sys
import time
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
from scene_utils import Scene
from dstar_lite import DStarLite
class Navigator:
'''
导航类
'''
def __init__(self, scene, area_range, map, scale_ratio=5):
self.scene = scene
self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax
self.map = map # 缩放并离散化的地图 array(X,Y)
self.scale_ratio = scale_ratio # 地图缩放率
self.step_length = 50 # 步长(单次移动)
self.step_num = self.step_length // self.scale_ratio # 单次移动地图格数
self.v = 200 # 速度
self.step_time = self.step_length/self.v + 0.1 # 单步移动时长
self.planner = DStarLite(area_range=area_range, map=map, scale_ratio=scale_ratio)
@staticmethod
def is_reached(pos: np.array((float, float)), goal: np.array((float, float)), dis_limit=25):
'''
判断是否到达目标
'''
dis = np.linalg.norm(pos - goal)
return dis < dis_limit
def reset_goal(self, goal:(float, float)):
# TODO: 使目标可达
# 目标在障碍物上:从目标开始方形向外扩展,直到找到可行点
# 目标在地图外面:起点和目标连线最靠近目标的可行点
pass
def navigate(self, goal: (float, float), animation=True):
'''
单次导航直到到达目标
'''
pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) # 机器人当前: 位置 和 朝向
yaw = self.scene.status.rotation.Yaw
print('------------------navigation_start----------------------')
while not self.is_reached(pos, goal):
dyna_obs = [(walker.pose.X, walker.pose.Y) for walker in self.scene.status.walkers] # 动态障碍物(顾客)位置列表
path = self.planner.planning(pos, goal, dyna_obs, step_num=self.step_num)
if path:
next_step = min(self.step_num, len(path))
(next_x, next_y) = path[next_step-1]
print('plan pos:', (next_x, next_y), end=' ')
self.scene.walk_to(next_x, next_y, yaw, velocity=self.v)
if animation:
self.planner.draw_graph(self.step_num) # 画出搜索路径
time.sleep(self.step_time)
pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
print('reach pos:', pos)
self.planner.reset() # 完成一轮导航,重置变量
if self.is_reached(pos, goal):
print('The robot has achieved goal !!')

View File

@ -0,0 +1,4 @@
### dstar_lite.py ——Dstar lite算法文件
### navigate.py ——导航类
### test.py ——测试文件
### map_5.pkl ——离散化地图文件

View File

@ -0,0 +1,72 @@
import os
import pickle
import time
import random
import matplotlib.pyplot as plt
import numpy as np
from scene_utils import Scene # TODO: 文件名改成Scene.py
from navigate import Navigator
if __name__ == '__main__':
file_name = 'map_5.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
Scene.init_world(1, 3)
scene = Scene.Scene(sceneID=0)
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map)
'''场景1: 无行人环境 robot到达指定目标'''
# goal = np.array((-100, 700))
'''场景2: 静止行人环境 robot到达指定目标'''
# scene.clean_walker()
# scene.add_walker(50, 300, 0)
# scene.add_walker(-50, 500, 0)
# goal = np.array((-100, 700))
'''场景3: 移动行人环境 robot到达指定目标'''
scene.clean_walker()
scene.add_walker(50, 300, 0)
scene.add_walker(-50, 500, 0)
scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=False, speed=20, X=-50, Y=600, Yaw=0)])
scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=False, speed=20, X=100, Y=150, Yaw=0)])
goal = np.array((-100, 700))
'''场景4: 行人自由移动 robot到达指定目标'''
# # TODO: autowalk=True仿真器会闪退 ???
# scene.clean_walker()
# scene.add_walker(50, 300, 0)
# scene.add_walker(-50, 500, 0)
# scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=True, speed=20, X=0, Y=0, Yaw=0)])
# scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=True, speed=20, X=0, Y=0, Yaw=0)])
# time.sleep(5)
# goal = np.array((-100, 700))
navigator.navigate(goal, animation=False)
scene.clean_walker()
print(scene.status.collision) # TODO: 不显示碰撞信息 ???