From c39a13fd5b5b16377af4417594de150fe5a71942 Mon Sep 17 00:00:00 2001 From: ChenXL97 <908926798@qq.com> Date: Wed, 8 Nov 2023 16:20:02 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=80=E5=A7=8B=E6=95=B4=E5=90=88dstar=5Flit?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- robowaiter/algos/__init__.py | 2 + .../algos/navigate/DstarLite/__init__.py | 4 + .../algos/navigate/DstarLite/dstar_lite.py | 510 ++++++++++++++++++ robowaiter/algos/navigate/DstarLite/map_5.pkl | Bin 0 -> 562562 bytes .../algos/navigate/DstarLite/navigate.py | 77 +++ robowaiter/algos/navigate/DstarLite/readme.md | 4 + robowaiter/algos/navigate/DstarLite/test.py | 72 +++ robowaiter/algos/navigate/__init__.py | 0 .../{Grasp.py => act/ExploreEnv.py} | 7 +- robowaiter/behavior_lib/act/MoveTo.py | 4 +- robowaiter/behavior_lib/cond/HasMap.py | 14 + robowaiter/robot/Default.ptml | 4 + robowaiter/robot/robot.py | 3 +- zoo/navigator_DstarLite/__init__.py | 4 + zoo/navigator_DstarLite/dstar_lite.py | 510 ++++++++++++++++++ zoo/navigator_DstarLite/map_5.pkl | Bin 0 -> 562562 bytes zoo/navigator_DstarLite/navigate.py | 78 +++ zoo/navigator_DstarLite/readme.md | 4 + zoo/navigator_DstarLite/test.py | 72 +++ 19 files changed, 1363 insertions(+), 6 deletions(-) create mode 100644 robowaiter/algos/__init__.py create mode 100644 robowaiter/algos/navigate/DstarLite/__init__.py create mode 100644 robowaiter/algos/navigate/DstarLite/dstar_lite.py create mode 100644 robowaiter/algos/navigate/DstarLite/map_5.pkl create mode 100644 robowaiter/algos/navigate/DstarLite/navigate.py create mode 100644 robowaiter/algos/navigate/DstarLite/readme.md create mode 100644 robowaiter/algos/navigate/DstarLite/test.py create mode 100644 robowaiter/algos/navigate/__init__.py rename robowaiter/behavior_lib/{Grasp.py => act/ExploreEnv.py} (76%) create mode 100644 robowaiter/behavior_lib/cond/HasMap.py create mode 100644 zoo/navigator_DstarLite/__init__.py create mode 100644 zoo/navigator_DstarLite/dstar_lite.py create mode 100644 zoo/navigator_DstarLite/map_5.pkl create mode 100644 zoo/navigator_DstarLite/navigate.py create mode 100644 zoo/navigator_DstarLite/readme.md create mode 100644 zoo/navigator_DstarLite/test.py diff --git a/robowaiter/algos/__init__.py b/robowaiter/algos/__init__.py new file mode 100644 index 0000000..2d6cb8a --- /dev/null +++ b/robowaiter/algos/__init__.py @@ -0,0 +1,2 @@ + +from robowaiter.algos.navigate.DstarLite.navigate import Navigator as DstarLite \ No newline at end of file diff --git a/robowaiter/algos/navigate/DstarLite/__init__.py b/robowaiter/algos/navigate/DstarLite/__init__.py new file mode 100644 index 0000000..1ceb32a --- /dev/null +++ b/robowaiter/algos/navigate/DstarLite/__init__.py @@ -0,0 +1,4 @@ + +from . import navigate +from . import dstar_lite + diff --git a/robowaiter/algos/navigate/DstarLite/dstar_lite.py b/robowaiter/algos/navigate/DstarLite/dstar_lite.py new file mode 100644 index 0000000..6a83108 --- /dev/null +++ b/robowaiter/algos/navigate/DstarLite/dstar_lite.py @@ -0,0 +1,510 @@ +''' +基于两个D* lite的实现 +按照原始算法的伪代码 +自己写的D* lite +''' + +import math +import queue +from functools import partial +import numpy as np +import heapq + +from matplotlib import pyplot as plt + + +def diagonal_distance(start, end): # + return max(abs(start[0] - end[0]), abs(start[1] - end[1])) + + +def manhattan_distance(start, end): # 曼哈顿距离 + return abs(start[0] - end[0]) + abs(start[1] - end[1]) + + +def euclidean_distance(start, end): # 欧式距离 + # return np.linalg.norm(start-end) + return math.sqrt((start[0] - end[0]) ** 2 + (start[1] - end[1]) ** 2) + + +def heuristic(start, end, name='euclidean'): + if name == 'diagonal': + return diagonal_distance(start, end) + elif name == 'euclidean': + return euclidean_distance(start, end) + elif name == 'manhattan': + return manhattan_distance(start, end) + else: + raise Exception('Error heuristic name!') + + +class Priority: + ''' + 优先级类 + ''' + + def __init__(self, k1, k2): + self.k1 = k1 + self.k2 = k2 + + def __lt__(self, other): # 定义对象的 < 运算 + return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 < other.k2) + + def __le__(self, other): # 定于对象的 <= 运算 + return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 <= other.k2) + + +class Node: + ''' + 节点类 + ''' + + def __init__(self, pos: (int, int), priority: Priority): + self.pos = pos # X Y + self.priority = priority + + def __lt__(self, other): # 定义对象的 < 运算 + return self.priority < other.priority + + def __le__(self, other): # 定于对象的 <= 运算 + return self.priority <= other.priority + + +class PriorityQueue: + def __init__(self): + self.queue = [] # 节点队列 + self.nodes = [] # 节点位置列表 + + def is_empty(self): + # 队列判空 + return len(self.queue) == 0 + + def top(self): + return self.queue[0].pos + + def top_key(self): + if self.is_empty(): + return Priority(float('inf'), float('inf')) + return self.queue[0].priority + + def pop(self): + # 取出第一个节点 + node = heapq.heappop(self.queue) + self.nodes.remove(node.pos) + return node + + def insert(self, pos, priority): + # 创建并添加节点 + node = Node(pos, priority) + heapq.heappush(self.queue, node) + self.nodes.append(pos) + + def remove(self, pos): + # 删除指定节点并重新排序 + self.queue = [n for n in self.queue if n.pos != pos] + heapq.heapify(self.queue) # 重排序 + self.nodes.remove(pos) + + def update(self, pos, priority): + # 更新指定位置的priority值 + for n in self.queue: + if n.pos == pos: + n.priority = priority + break + + +class DStarLite: + def __init__(self, + map: np.array([int, int]), # [X, Y] + area_range, # [x_min, x_max, y_min, y_max] 实际坐标范围 + scale_ratio=5, # 地图缩放率 + dyna_obs_radius=20 # dyna_obs实际身位半径 + ): + + # self.area_bounds = area + self.map = map + self.background = map + self.X = map.shape[0] + self.Y = map.shape[1] + (self.x_min, self.x_max, self.y_min, self.y_max) = area_range + self.scale_ratio = scale_ratio + self.dyna_obs_list = [] # 动态障碍物位置列表( 当前地图 ) [(x, y)] + self.dyna_obs_radius = math.ceil(dyna_obs_radius/scale_ratio) # dyna_obs缩放后身位半径 + + # free:0, obs:1, dyna_obs:2 + self.idx_to_object = { + 0: "free", + 1: "obstacle", + 2: "dynamic obstacle" + } + self.object_to_idx = dict(zip(self.idx_to_object.values(), self.idx_to_object.keys())) + self.object_to_cost = { + "free": 0, + "obstacle": float('inf'), + "dynamic obstacle": 50 + } + + self.compute_cost_map() + + self.s_start = None # (int,int) 必须是元组(元组可以直接当作矩阵索引) + self.s_goal = None # (int,int) + self.s_last = None # (int,int) + self.U = PriorityQueue() + self.k_m = 0 + self.rhs = np.ones((self.X, self.Y)) * np.inf + self.g = self.rhs.copy() + self.path = [] + + def set_map(self, map_): + ''' + 设置map 和 cost_map + ''' + self.map = map_ + self.X = map_.shape[0] + self.Y = map_.shape[1] + self.compute_cost_map() + + def reset(self): + ''' + (完成一次导航后) + 重置 1.环境变量 + 2.dstar_lite变量 + ''' + # env reset + self.map = self.background.copy() + self.compute_cost_map() + self.dyna_obs_list.clear() + self.path.clear() + # dstar_lite reset + self.s_start = None + self.s_goal = None + self.s_last = None + self.U = PriorityQueue() + self.k_m = 0 + self.rhs = np.ones((self.X, self.Y)) * np.inf + self.g = self.rhs.copy() + + def calculate_key(self, s: (int, int)): + ''' + 计算 位置s 的 key1, key2 + :returns: + Priority(k1, k2): 可比较的对象 + ''' + k1 = min(self.g[s], self.rhs[s]) + heuristic(self.s_start, s) + self.k_m + k2 = min(self.g[s], self.rhs[s]) + return Priority(k1, k2) + + def c(self, u: (int, int), v: (int, int), v_old=None) -> float: + ''' + 计算节点间的 路径代价 和 目标位置代价 (目标位置代价为0时采用路径代价) + (因为是终点->起点的扩展方向,因此v是node,u是v扩展的neighbor) + Args: + u: from pos + v: to pos + v_old: 指定的v的类型 + ''' + if v_old: + obj_cost = self.object_to_cost[v_old] + else: + obj_cost = self.cost_map[v] + if obj_cost > 0: + return obj_cost + return heuristic(u, v) + + def contain(self, u: (int, int)): + ''' + 判断 节点u 是否在队列中 + ''' + return u in self.U.nodes + + def update_vertex(self, u: (int, int)): + ''' + 判定节点状态, 更新队列 + 不一致且在队列 --> 更新key + 不一致且不在队列 --> 计算key并加入队列 + 一致且在队列 --> 移出队列 + ''' + if self.g[u] != self.rhs[u] and self.contain(u): + self.U.update(u, self.calculate_key(u)) + elif self.g[u] != self.rhs[u] and not self.contain(u): + self.U.insert(u, self.calculate_key(u)) + elif self.g[u] == self.rhs[u] and self.contain(u): + self.U.remove(u) + + def compute_shortest_path(self): + ''' + 计算最短路径 + ''' + while self.U.top_key() < self.calculate_key(self.s_start) or self.rhs[self.s_start] > self.g[self.s_start]: + u = self.U.top() + k_old = self.U.top_key() + k_new = self.calculate_key(u) + if k_old < k_new: + self.U.update(u, k_new) + elif self.g[u] > self.rhs[u]: # 过一致 + self.g[u] = self.rhs[u] + self.U.remove(u) + pred = self.get_neighbors(u) + for s in pred: + if s != self.s_goal: + self.rhs[s] = min(self.rhs[s], self.c(s, u) + self.g[u]) + self.update_vertex(s) + else: # 欠一致 + g_old = self.g[u] + self.g[u] = float('inf') + pred = self.get_neighbors(u) + for s in pred + [u]: + if self.rhs[s] == self.c(s, u) + g_old: + if s != self.s_goal: + succ = self.get_neighbors(s) + self.rhs[s] = min([self.c(s, s_) + self.g[s_] for s_ in succ]) + self.update_vertex(s) + + def _planning(self, s_start, s_goal, dyna_obs, step_num=None, debug=False): + ''' + 规划路径(实际实现) + Args: + dyna_obs: 动态障碍物位置列表 + step_num: 单次移动步数 + ''' + # 确保目标合法 + if not self.in_bounds_without_obstacle(s_goal): + return None + # 第一次规划需要初始化rhs并将goal加入队列,计算最短路径 + if self.s_goal is None: + self.s_start = tuple(s_start) + self.s_goal = tuple(s_goal) + self.s_last = tuple(s_start) + self.rhs[tuple(s_goal)] = 0 + self.U.insert(tuple(s_goal), Priority(k1=heuristic(tuple(s_start), tuple(s_goal)), k2=0)) + self.compute_shortest_path() # 计算最短路径 + self.path = self.get_path() + # 后续规划只更新起点,直接使用原路径(去掉已走过部分) + else: + self.s_start = tuple(s_start) + self.path = self.path[step_num:] + # 根据obs更新map, cost_map, edge_cost + changed_pos = self.update_map(dyna_obs=dyna_obs) + if changed_pos: + self.update_cost_map(changed_pos) + self.update_edge_costs(changed_pos) + # 若地图改变,重新计算最短路径 + self.compute_shortest_path() + self.path = self.get_path() + + # TODO: 误差抖动使robot没有到达路径上的点,导致新起点的rhs=∞,可能导致get_path失败 ( 当前版本没有该问题 ) + # assert (self.rhs[self.s_start] != float('inf')), "There is no known path!" + # self.path = self.get_path(step_num) + # # debug + # if debug: + # pass + return self.path + + def planning(self, s_start, s_goal, dyna_obs, step_num=None, debug=False): + ''' + 路径规划(供外部调用,处理实际坐标和地图坐标的转换) + ''' + # 实际坐标 -> 地图坐标 + s_start = self.real2map(s_start) + s_goal = self.real2map(s_goal) + for i in range(len(dyna_obs)): + dyna_obs[i] = self.real2map(dyna_obs[i]) + + self._planning(s_start, s_goal, dyna_obs, step_num, debug) + + # 地图坐标->实际坐标 + path = [self.map2real(node) for node in self.path] + return path + + def get_path(self): + ''' + 得到路径 + Args: + step_num: 路径步数 (None表示返回完整路径) + return: + path: [(x, y), ...] + ''' + if self.s_start is None or self.s_goal == self.s_start: + return [] + path = [] + cur = self.s_start + # if step_num is None: # 得到完整路径 + while cur != self.s_goal: + succ = self.get_neighbors(cur) + cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])] + path.append(cur) + # else: + # for i in range(step_num): + # if cur == self.s_goal: + # break + # succ = self.get_neighbors(cur) + # cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])] + # path.append(cur) + return path + + def in_bounds_without_obstacle(self, pos): + ''' + 判断位置在地图范围内 且 不是静态障碍物 + ''' + (x, y) = pos + return 0 <= x < self.X and 0 <= y < self.Y and self.map[x, y] != self.object_to_idx["obstacle"] + + def get_neighbors(self, pos, mode=8): + ''' + 获取邻居节点, 地图范围内 + ''' + (x_, y_) = pos + # results = [(x_+1,y_), (x_-1,y_), (x_, y_+1), (x_,y_-1)] + # if mode == 8: + neighbors = [(x_ + 1, y_), (x_ - 1, y_), (x_, y_ + 1), (x_, y_ - 1), (x_ + 1, y_ + 1), (x_ - 1, y_ + 1), + (x_ + 1, y_ - 1), (x_ - 1, y_ - 1)] + neighbors = filter(self.in_bounds_without_obstacle, neighbors) # 确保位置在地图范围内 且 不是静态障碍物 + return list(neighbors) + + def compute_cost_map(self): + # 计算当前地图的cost_map + self.cost_map = np.zeros(self.map.shape) + for idx, obj in self.idx_to_object.items(): + self.cost_map[self.map == idx] = self.object_to_cost[obj] + + def update_map(self, dyna_obs): + ''' + 更新地图 和 动态障碍物列表 + Args: + dyna_obs: 当前动态障碍物位置列表 [(x,y), ...] + return: + update_obj: 改变的位置列表 [(x, y, obj_idx, obj_idx_old), ...] + ''' + # dyna_obs没有变化 (集合set可以忽略元素在列表中的位置) + if set(dyna_obs) == set(self.dyna_obs_list): + return [] + + # 新旧dyna_obs占用位置列表 + old_obs_occupy = [] + new_obs_occupy = [] + for pos in self.dyna_obs_list: + old_obs_occupy.extend(self.get_occupy_pos(pos)) + for pos in dyna_obs: + new_obs_occupy.extend(self.get_occupy_pos(pos)) + old_obs_occupy = [pos for i, pos in enumerate(old_obs_occupy) if pos not in old_obs_occupy[:i]] # 去除重复位置 + new_obs_occupy = [pos for i, pos in enumerate(new_obs_occupy) if pos not in new_obs_occupy[:i]] # 去除重复位置 + + # 转变为free 和 转变为obs的位置列表 + changed_free = [pos for pos in old_obs_occupy if pos not in new_obs_occupy] + changed_obs = [pos for pos in new_obs_occupy if pos not in old_obs_occupy] + + # 更新地图,计算changed_pos + changed_pos = [] + for (x, y) in changed_free: + self.map[x, y] = self.object_to_idx['free'] + changed_pos.append((x, y, self.object_to_idx["free"], self.object_to_idx["dynamic obstacle"])) + for (x, y) in changed_obs: + self.map[x, y] = self.object_to_idx['dynamic obstacle'] + changed_pos.append((x, y, self.object_to_idx["dynamic obstacle"], self.object_to_idx["free"])) + + # 更新动态障碍物列表 + self.dyna_obs_list = dyna_obs + + return changed_pos + + def get_occupy_pos(self, obs_pos): + ''' + 根据dyna_obs中心位置,计算其占用的所有网格位置 + ''' + (x, y) = obs_pos + occupy_pos = [] + for i in range(x-self.dyna_obs_radius, x+self.dyna_obs_radius+1): + for j in range(y-self.dyna_obs_radius, y+self.dyna_obs_radius+1): + occupy_pos.append((i, j)) + occupy_pos = filter(self.in_bounds_without_obstacle, occupy_pos) # 确保位置在地图范围内 且 不是静态障碍物 + return list(occupy_pos) + + def update_cost_map(self, changed_pos): + ''' + 更新cost_map + Args: + changed_pos: 改变的位置列表 [x, y, idx] + ''' + for (x, y, idx, _) in changed_pos: + self.cost_map[x, y] = self.object_to_cost[self.idx_to_object[idx]] + + def update_edge_costs(self, changed_pos): + ''' + 重新计算edge_cost,更新受影响节点的rhs + Args: + changed_pos: 改变的位置列表 [(x, y, obj_idx_new, obj_idx_old)] + ''' + if not changed_pos: return + self.k_m += heuristic(self.s_last, self.s_start, name='euclidean') # 使用欧式距离 更新km + self.s_last = self.s_start + for pos in changed_pos: + v = (pos[0], pos[1]) # to pos + v_old = self.idx_to_object[pos[3]] # 位置v的旧类型 + pred_v = self.get_neighbors(v) + for u in pred_v: + c_old = self.c(u, v, v_old=v_old) + c_new = self.c(u, v) + if c_old > c_new and u != self.s_goal: + self.rhs[u] = min(self.rhs[u], self.c(u, v) + self.g[v]) + elif self.rhs[u] == c_old + self.g[v] and u != self.s_goal: + succ_u = self.get_neighbors(u) + self.rhs[u] = min([self.c(u, s_) + self.g[s_] for s_ in succ_u]) + self.update_vertex(u) + + def map2real(self, pos): + ''' + 地图坐标->实际坐标 + ''' + x = pos[0] * self.scale_ratio + self.x_min + y = pos[1] * self.scale_ratio + self.y_min + return tuple((x, y)) + + def real2map(self, pos, approximation='round'): + ''' + 实际坐标->地图坐标 + ''' + if approximation == 'round': # 四舍五入 + x = round((pos[0] - self.x_min) / self.scale_ratio) + y = round((pos[1] - self.y_min) / self.scale_ratio) + elif approximation == 'low': # 向下取整 + x = math.floor((pos[0] - self.x_min) / self.scale_ratio) + y = math.floor((pos[1] - self.y_min) / self.scale_ratio) + else: + raise Exception("Wrong approximation!") + return tuple((x, y)) + + def draw_graph(self, step_num): + # 清空当前figure内容,保留figure对象 + plt.clf() + # for stopping simulation with the esc key. 按esc结束 + plt.gcf().canvas.mpl_connect( + 'key_release_event', + lambda event: [exit(0) if event.key == 'escape' else None]) + + # 缩放坐标偏移量 + offset = (self.x_min / self.scale_ratio, self.x_max / self.scale_ratio, + self.y_min / self.scale_ratio, self.y_max / self.scale_ratio,) + + # 画地图: X行Y列,第一行在下面 + # 范围: 横向Y[-80,290] 纵向X[-70,120] + plt.imshow(self.map, cmap='binary', alpha=0.5, origin='lower', + extent=(offset[2], offset[3], + offset[0], offset[1])) + # 画起点和目标 + plt.plot(self.s_start[1] + offset[2], self.s_start[0] + offset[0], "xr") + plt.plot(self.s_goal[1] + offset[2], self.s_goal[0] + offset[0], "xr") + + # 画搜索路径 + plt.plot([y + offset[2] for (x, y) in self.path], + [x + offset[0] for (x, y) in self.path], "-g") + + # 画移动路径 + next_step = min(step_num, len(self.path)) + # plt.plot([self.s_start[1] + offset[2], self.path[next_step-1][1] + offset[2]], + # [self.s_start[0] + offset[0], self.path[next_step-1][0] + offset[0]], "-r") + plt.plot([y + offset[2] for (x, y) in self.path[:next_step]], + [x + offset[0] for (x, y) in self.path[:next_step]], "-r") + + plt.xlabel('y', loc='right') + plt.ylabel('x', loc='top') + plt.grid(color='black', linestyle='-', linewidth=0.5) + plt.pause(0.3) diff --git a/robowaiter/algos/navigate/DstarLite/map_5.pkl b/robowaiter/algos/navigate/DstarLite/map_5.pkl new file mode 100644 index 0000000000000000000000000000000000000000..7b49370226f1770a974c60a3578f1c44c62dea35 GIT binary patch literal 562562 zcmeI!&8{p*QowPW!GZ-_-hq|ZERc|3hlLls02+G+fy8)57kl9y3%*A3ZqB?8&%z_{ z20%_wx;}kxpUSGr%*v17U1Z zKmWyd|M=}c{Pf+spML-H!yongB=kXt}I3xl*-~kUl?1Ql4 z!_W1Le$g-b!W=LM%mH)2954sWf%1JRB9YcKJN!~8Tq%};Z{954sW0dv3{m}w3~HTPHE z^L{4p`OIfN`#on4m;>g3Indi2h^~CT_7<;r%u#dH95n~b0dv3{FbB+mndU%L^ZCkm zzR%=4pZUyZzvIjSbHE%h2h0I;z#K3K%mH)2954sW0dv3{FbB*5bHE%h2h0I;z#K3K z%mH)299VA-g!Sv?ZQhx8=AFJb2h0I;z#K3K%z+-}KvZFWw<>o*=d z;b?sJ2VWd;NCbGm10LuDbHE%h2g=WZu&Uge+o$DjJaY7u&Dg{tUt@&wC0J= z`1(A2zyXIufCoI_fj%$?%z+W+Kvb#x`sjc88=RO3JaYCvyubkm{b4?s1LlA^U=Ea? z1JPCeuch<8>%9I+zmD)OPVS95;DAFSzylueKp&U`_soH~b=`Fp??$QcD)o@(J@XEC z+!FyF@PG&UAa@Q#w^&OP$mKulzS2Bvoi{6gd4(}J;E)LLfCoI#2WRI%bdPlu0%z;I z%5%NeaTWgZ30f$6@2Wxr|S+`q9_pCOlyZO}4Z`Si4 z>o#hhk9l|>l|0L#7Z1+H@40LB#}B_mfCoIt@gOL*TSxV>Hlg0R)Xx9z^Gox;JD;=n zm*%kgd1vcT>$%nIA^$K72OJUs9`xZsL~-9biU+j`jl$tsoVEGnp8qVqyFbs3zn6V? z^P2U3^$h~E)>mHYn+WiL2WLD8YWJZ2!&*M5cinp9i*F*p10Hs0#r)%eRd48j42M1TivJ&4=JSzGzh zib$(`+|#zNa(TD%?CL$a`Pf&+v#a$g{k*!CsVj~+CIUR*!N)uZD&wxDy>3OKbllF1 z%8kaMm2X$?8I9}c@m{T0t>;JQD*iA72OJUs9`xZsL^02L)vHz{uEy#4KKiG09a{N! z^`6ps*FJAA-`e+IJ%2C$*56;v$MaD!9B@bkc))`oF!!44c`FLJ@z}R!f41_l^gVa^ zI!hk8{7UD0^}O7;_x5dZ?CtY?kir3nM1Ti#co12vvp(~-6@h2>`+SamDpki;KGweH zDv#RnJv;wu+_j&77Wdts=jOc^XYM)n1xXxmNCbE=wg(ZtwG{#VGIqbTu3sxJTivtq zYjxk%d$xFceZ87rt>-shXV2fB_ZkFoz#$Rf!JHmMR@PPo^wFGsRJ*RN{B+OT1Zvm$ z?D=m;J?^(>d3Y{rjsp&f01w9SAfmL^CZHe2=!ahGRNGJIA0yCMS!<; zyj`7ZEnl5`j=)_$o{M(er|#nH-f#v791;N@)b$`vx0WKn*Sfy0&bO4O&M8Zvm6v6s z9_O@Hd{@6`Gh%fP@(&YmxQD~mhhDD{;O{;DuEzbErJk-upq7WV;vMDOT3nqQ25cOj z`LKQO^))X%@JIw|c@T81M+wxbhjXJ6IK1X?)KB}{5UAy8o9O$$r50c3M(sBadnRoA zyuRj!2Ofz)X%B*`b&o)4?#>J1IK0ha&r9FmOQ4j$_h!-G?WMRnCu;kQ!_mq=JICj5 zbH)dsL?G9Lpk;j|kju}$AcsR84v$`X-xqba<8E{EL8=bmTZG5&br zl?Xg~5LB#>T)Yn_aJYwqjvRx)(f$}CtNWuo%GLg;SuTg%3g@0@-!cAp;gtwH^C0M~ zw?5-`9p$iB;iKo-f36O=;g$$&9s~{R#=~p4 zfWti;bRq%>yeD7|1&h)g-V=WPzwMsw)9wBCx_!RAZl8M%A~@iX2%PmG=vns&oaO7e zAdAD>9QOS4JpwBeFh8RDS2;X$=<(cb-?PuR?;l?u_j@0NaKIrE*z+JLS+5e<QxY|#iFH1n*&E0opZE+d`2%I7C zY|b7%>)AQIef})3+&G^-U+eSSxa^Ax;4lY=TK(kQIt29HSbbN=0;eK?K+gnvnbXf0 z9gXkVIjufF8n69P863vq(5j!@(}sZl>!bhLRNE~GATR=fyXNvUHo5UUd%l;?bMvq- zDuzSf9D3<1akL?zAM5sGn<}~m0R%=SP z{dufE`>2{I5I|rC0$1m6E`!{Aa?k5^Uv8fEMFnxVheNOZBd)#)=-0mMBRT{Sn1{g8 zd7GPg?zy?=^}a7RU;CnxIJD)^d;f@UOaim_@t9Q+H3A5XMBwNg&CTuTx!eBS^LyX7 z<>NIfwK|8**Sk4vad<@l0R#|00D=1n9Q|F88}ZR|ANS|pGs?cl{Jf9Kt;S*V^{%|P z`@JH700IagfIvS4a(@>bjX3w*XZw!cH_HBJ`T9I6I4g%oKXdbZd~SQ6`~2g+?Rl>V zAbV=kGn|djt?b z009ILXis4KyItz$?YvU2rQ+)KoKl>e6BX`-!=7)saqgYtdjt?b009ILKwu+~`}^YP zzU}NnRveD{nH$&9bL~d}0R#|00D<)gR> zx^F$Qk?~b-p0CUCte3fUIeWh65I_I{1Q0-A9Rj((D@yHKhwST2OYw6~a4p5*b(Qkp z<=&rvqMZmJfB*srAh0Td+}{9p!xWKF=e700IagfWW*2j{dHw zwSQj1AJ427N9P9bXB<9WjpLs_JOA9;d#&3Co?-ll*2+dY2APxJ%=2q1s}0tl>1 zVDEQDFW;|HpI&&+7Ds7+p7AMtkMj^f009ILKwy3X&wf|*`uY47i41$?KWbd1z1cXF zKG%5&Ab_NAb_oLTk*7!&DN!)7^n6(e(6+YvDLn5%c2VucH>*)2IrxLF-8NDvE#{arX-XYM2 zz^wfwukZ&491;N@obw=_nrEHGgLx`)&dXWq*n1wcc_yE>1bX*deCBT=zyltPE-CnbGSpYy7i$CZCTJ znAKl-MZIyrAraug*d9dm^RA=&Ia_5$ug7fh&*qnWUK5zjSNTMJalj!F;K5uTL^kuU zvv@m8rDmzeEP2f0l{{V(n8i#U>uJWHiUufr_y&*GCjUK5zb zPkBULalj!F;KAG;M0V?~qkBC|#YV5g=<(0ukvxLH=st>H9TNc_@L**RqWbHtqxbzR z6&t+{qsKpsN2}*Cx`*NqgK@wi5#Yhv9z^!*t+RMPdgW%R!)SSo?v40^z-ZoySKSf; z9`Jw%ANE05VI6%wj9$Uf`HvR==)SBT|7gC6H;lyrheUt}Jb254pkp1qPe!lc==?{E ze{^3~jej)H#2cpKfI}j{10KBPLC~>o1ZL@%EuT?dXW>3d9;0|6&esG+@k^XhHym(C z1bD!MBOU}T>qcOf{@L>B{dE@JqvX-M58``GU=**!8Fj+}heUt}JUHV)P_sTGFnV7- z%d3~qN9Q|g{Jqp)9B&gC)h}^JU2(u65#Rw2j(8BXtj`FH-cQf+y8H9dxsDqD-F0_= z9)VH)5_i-W2OJUs9`N9d2SLsHjKC~?^(?>EpU=W`v^-kZ*?q?djOLSgqrN!ckO=UA z2PHfRiq<^>v-I0up0&T9h2yAs)UL1d&k-2aCviu;alj!F-~kUxcn}n=xdcYWIc7w_8=n8hP`gf}?gkO=UA2Q@tiy4I@%X6aKLf`EEnHDT@NXYofKbIk+wRL?|! z2Yq@FQCUk7n8ow6JZ8y%mOReZ(R0@b%;Jwc!UG&|NCbGmgBBhHWosz{v-Isz9<$^< zOCCq-Xa6+v+ggjj=>43_W%N8}i$Aw+_O&H2n`iO~e{jGd z5#Rw2N_Y?yt+fc~(;$mO8xFNRcJ93d^iP}q!5Qa7fCoId(}SRItwlh82U#5IaH!?2 zbLSwS@9Oj&uDB)wJm5h|4}zw(HUWJbgmEa#p|-cqUz>n_E!(en;+Y8WfCq1R5Ol1q z2pHtH92$kge8$TN z0R#|00DLz54n->UWmIR{gWjKR&me=k+y#jnnH-`+2v;=QV1A0}hD*5Bl;T zqM3JH#kXE6xbf~Kp5EiwczKOVmEy2h_oMghoj3dUxx9MikQ={!Q9&GVNCbG$mj@Be z`0FZO)vnZ0jzx~C?R|H1aDP-72OJUs9`xx!M0MZF zi?&|eSLwcP=3N=@`l@O_(e&b1FGPCbA&xSAk0XwW01xVT5U1X^s%xFPKI3{{rTe(~ z85f`DdDur4`;DR(kM1Va3kPxJ^)*g7B?3IS*MqqI{*?!9J-EN(Z6n~Ut2T-?yr7-TFTAy|2>u-29lo_icUK<9&}6C5 zWq9nQyYh8!)CmV15&<5J?LkC;-^!o1{@hpTHaGLBtxsopx3Qu8Eobv>^<4Rk^G7@L*04BCGpWUbXS+z6!UwnMZ9rD#f{t73FRzm3OUk%4M2&Qms6k8~ky= zAraugnjS>f_pQ8ZQ95qtMICU!ArasK4?gBWP-(yNv5k-I zD_iD9ezftUHRm#RTtB6C-lgulp84A+l#0_i!5Ie}5&<6Y;D`r7tNqH;x}LVLXqg*% zQP+#R`IfQd_$hbiUF!bhndd!GDn91~Zyaz)1bD!MGadxBb}LWYc-pR_Wp3g{8!ztW zT*j9CQ|`{!{cCuw&gz^9@PG#;JP3;IR^Hb2wp}I5-o%T#Ui8AZtTi8=)=Qq^Si@&^ zR_8>32Rtb0LC|cs^0%(P?J8OJCSKI_q8Gkpt$BZ1FZqdM4Ug4Xof82b@SvmzL9_kJ z<2oLMD-8`&NC`Hxb}L8xP`^_N)5Vsc-vAmc5Y|b-d_> zYbjfL$wwS>`>MX`n+WiL2Q57a+U-_;*Y&$yCF|V8kGg)`&G)P|cgOAixxG|h^-TnL zz=Iwtbxj0#z=IJyh$_`z^+TP0s9(9ZH}Is6CwFswwCL`5+&`yh>Z-1Z01tRDvIkMU z`l~)E+b8uacFzqwDeFluJU5$qiANlBdZw=Gnh5ZK2P1nB)vLeild^qMzhd{?z>~6` z;E87Lvx;+P2VfCoI#2j;+7b0DIvxANhd z5A`b3=gEAz<_muKB?3I)0T1+nIWXoNh^XtWyg24Xy-JL6GGC7Qf)`$i01tS;1ASl) zj6DY;`uZzR@;s?ui7{^AOP(({;gksQfCoI#2j;*Wb0D&)zw+hdzSJi_<_&!LxG%Wi zk_hmC2RzUR=D?hDAhN2z@~1xInQq`qR1*gr5&<6YfCu`(99UxxM3(i|)jg_5cJ7l` z_eTEVHx4)?0zBXW5A?w*b0E6C-nt6M$2{v1UE}0Op58~@alj!F-~kVKpbyM}Ip;uR zRc~Fzn|cJ-Jb4vwBV&`rMo| z2h0I;;KMl(R+L|7>$mcRocI=j+5C}DR2v5z5&<6YfCu`(9QgPgh)>I}v+yoY#)&xu zX7NWJQEeP>NCbGm10LuDbKvbc5Z{(xN9A6ggcHvY7}XzfN40UlArasK4|t#t%zk%$-<*z$pHRGpdaP4v7E{c)$aFU=CcH1MzgZwfB0I%hX9V3H0uf_@eqa z;E)LLfCoI#2j;-FIS@}<5$KbE{*JoffI}j{10L`|AD9Dk&wv1k9hP3l2CW0zBXW z5A=aKF!vmY?5qg%Nx=Mxy5N9ABESP4@IW7!19Q)T$j*vDp9IXGs0$7_Bmz9(0T1+n zIWYGei0rHg^hv<{iMrr`Ln6Qf9`HaPm;-aqfymB^K%WH6pQsBCI3xl*-~kWxfjO|| z9Ehy*SDxJCNq&XK+R3MTe8L^~M1Ti8;DJ6c2l|);5ykUWo%*QLR)^=68ta=aAFqA+ zr5@^$2=IUhJkST`K-)PGx4mESZ_EEE@As7&`+rCCw!d#L)kl320Uq#x2l~JqFb7te z15xd9SN*DASL#>3^PTU0rCy4q`dfeNZ*#yLSa}Xa_4{A- zsXkq~Px;S({`bHE%h2h0I;V1+plRqk)qfBJ8Q{^L2%dG7b7IbaT$1LnXs2cjGLS+}^nnp5VK zIb{x*1LlA^U=ElAE6jnYazCrS(|0TM9nX2rbH6jq0dv3{Fb5vzKy<}@>tj6Ln^WeL zIb{x*1LlA^U=ElAE6stZ_I;~<({C&F8{hfPcfT{u0dv3{FbBq;1Hb(&I{A~AZ(qLp G{C@$1cKMJ1 literal 0 HcmV?d00001 diff --git a/robowaiter/algos/navigate/DstarLite/navigate.py b/robowaiter/algos/navigate/DstarLite/navigate.py new file mode 100644 index 0000000..4e335b5 --- /dev/null +++ b/robowaiter/algos/navigate/DstarLite/navigate.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- +import math +import sys +import time + +import matplotlib.pyplot as plt +import numpy as np +from mpl_toolkits.axes_grid1 import make_axes_locatable + + +from dstar_lite import DStarLite + +class Navigator: + ''' + 导航类 + ''' + + def __init__(self, scene, area_range, map, scale_ratio=5): + self.scene = scene + self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax + self.map = map # 缩放并离散化的地图 array(X,Y) + self.scale_ratio = scale_ratio # 地图缩放率 + self.step_length = 50 # 步长(单次移动) + self.step_num = self.step_length // self.scale_ratio # 单次移动地图格数 + self.v = 200 # 速度 + self.step_time = self.step_length/self.v + 0.1 # 单步移动时长 + + + self.planner = DStarLite(area_range=area_range, map=map, scale_ratio=scale_ratio) + + + @staticmethod + def is_reached(pos: np.array((float, float)), goal: np.array((float, float)), dis_limit=25): + ''' + 判断是否到达目标 + ''' + dis = np.linalg.norm(pos - goal) + return dis < dis_limit + + def reset_goal(self, goal:(float, float)): + # TODO: 使目标可达 + # 目标在障碍物上:从目标开始方形向外扩展,直到找到可行点 + # 目标在地图外面:起点和目标连线最靠近目标的可行点 + pass + + def navigate(self, goal: (float, float), animation=True): + ''' + 单次导航,直到到达目标 + ''' + + pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) # 机器人当前: 位置 和 朝向 + yaw = self.scene.status.rotation.Yaw + print('------------------navigation_start----------------------') + while not self.is_reached(pos, goal): + dyna_obs = [(walker.pose.X, walker.pose.Y) for walker in self.scene.status.walkers] # 动态障碍物(顾客)位置列表 + path = self.planner.planning(pos, goal, dyna_obs, step_num=self.step_num) + if path: + next_step = min(self.step_num, len(path)) + (next_x, next_y) = path[next_step-1] + print('plan pos:', (next_x, next_y), end=' ') + self.scene.walk_to(next_x, next_y, yaw, velocity=self.v) + if animation: + self.planner.draw_graph(self.step_num) # 画出搜索路径 + time.sleep(self.step_time) + pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) + print('reach pos:', pos) + + self.planner.reset() # 完成一轮导航,重置变量 + + if self.is_reached(pos, goal): + print('The robot has achieved goal !!') + + + + + diff --git a/robowaiter/algos/navigate/DstarLite/readme.md b/robowaiter/algos/navigate/DstarLite/readme.md new file mode 100644 index 0000000..3a77674 --- /dev/null +++ b/robowaiter/algos/navigate/DstarLite/readme.md @@ -0,0 +1,4 @@ +### dstar_lite.py ——Dstar lite算法文件 +### navigate.py ——导航类 +### test.py ——测试文件 +### map_5.pkl ——离散化地图文件 \ No newline at end of file diff --git a/robowaiter/algos/navigate/DstarLite/test.py b/robowaiter/algos/navigate/DstarLite/test.py new file mode 100644 index 0000000..f6008f7 --- /dev/null +++ b/robowaiter/algos/navigate/DstarLite/test.py @@ -0,0 +1,72 @@ +import os +import pickle +import time +import random + +import matplotlib.pyplot as plt +import numpy as np + +from robowaiter.scene.scene import Scene,init_world # TODO: 文件名改成Scene.py +from navigate import Navigator + + + + +if __name__ == '__main__': + + file_name = 'map_5.pkl' + if os.path.exists(file_name): + with open(file_name, 'rb') as file: + map = pickle.load(file) + + init_world(1, 3) + scene = Scene(sceneID=0) + + navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map) + + '''场景1: 无行人环境 robot到达指定目标''' + # goal = np.array((-100, 700)) + + + '''场景2: 静止行人环境 robot到达指定目标''' + # scene.clean_walker() + # scene.add_walker(50, 300, 0) + # scene.add_walker(-50, 500, 0) + # goal = np.array((-100, 700)) + + '''场景3: 移动行人环境 robot到达指定目标''' + scene.clean_walker() + scene.add_walker(50, 300, 0) + scene.add_walker(-50, 500, 0) + scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=False, speed=20, X=-50, Y=600, Yaw=0)]) + scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=False, speed=20, X=100, Y=150, Yaw=0)]) + goal = np.array((-100, 700)) + + '''场景4: 行人自由移动 robot到达指定目标''' + # # TODO: autowalk=True仿真器会闪退 ??? + # scene.clean_walker() + # scene.add_walker(50, 300, 0) + # scene.add_walker(-50, 500, 0) + # scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=True, speed=20, X=0, Y=0, Yaw=0)]) + # scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=True, speed=20, X=0, Y=0, Yaw=0)]) + # time.sleep(5) + # goal = np.array((-100, 700)) + + navigator.navigate(goal, animation=False) + + scene.clean_walker() + print(scene.status.collision) # TODO: 不显示碰撞信息 ??? + + + + + + + + + + + + + + diff --git a/robowaiter/algos/navigate/__init__.py b/robowaiter/algos/navigate/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/robowaiter/behavior_lib/Grasp.py b/robowaiter/behavior_lib/act/ExploreEnv.py similarity index 76% rename from robowaiter/behavior_lib/Grasp.py rename to robowaiter/behavior_lib/act/ExploreEnv.py index 57c5708..5adf413 100644 --- a/robowaiter/behavior_lib/Grasp.py +++ b/robowaiter/behavior_lib/act/ExploreEnv.py @@ -1,10 +1,11 @@ import py_trees as ptree from typing import Any +from robowaiter.behavior_lib._base.Act import Act -class Grasp(ptree.behaviour.Behaviour): +class ExploreEnv(Act): - def __init__(self, name: str, scene): - super().__init__(name) + def __init__(self, *args): + super().__init__(*args) def setup(self, **kwargs: Any) -> None: return super().setup(**kwargs) diff --git a/robowaiter/behavior_lib/act/MoveTo.py b/robowaiter/behavior_lib/act/MoveTo.py index 3d017da..8a2209f 100644 --- a/robowaiter/behavior_lib/act/MoveTo.py +++ b/robowaiter/behavior_lib/act/MoveTo.py @@ -4,8 +4,8 @@ from robowaiter.behavior_lib._base.Act import Act class MoveTo(Act): - def __init__(self, name: str, scene, a, b, c, d): - super().__init__(name) + def __init__(self, *args): + super().__init__(*args) def setup(self, **kwargs: Any) -> None: return super().setup(**kwargs) diff --git a/robowaiter/behavior_lib/cond/HasMap.py b/robowaiter/behavior_lib/cond/HasMap.py new file mode 100644 index 0000000..8f77e74 --- /dev/null +++ b/robowaiter/behavior_lib/cond/HasMap.py @@ -0,0 +1,14 @@ +import py_trees as ptree +from typing import Any +from robowaiter.behavior_lib._base.Cond import Cond + +class HasMap(Cond): + def __init__(self): + super().__init__() + + def _update(self) -> ptree.common.Status: + # if self.scene.status? + if self.scene.state['map']['2d'] == None: + return ptree.common.Status.FAILURE + else: + return ptree.common.Status.SUCCESS diff --git a/robowaiter/robot/Default.ptml b/robowaiter/robot/Default.ptml index 1324a40..a278601 100644 --- a/robowaiter/robot/Default.ptml +++ b/robowaiter/robot/Default.ptml @@ -1,4 +1,8 @@ selector{ + selector{ + cond HasMap() + act ExploreEnv + } sequence{ cond Chatting() act DealChat() diff --git a/robowaiter/robot/robot.py b/robowaiter/robot/robot.py index a8aa957..c2a1d50 100644 --- a/robowaiter/robot/robot.py +++ b/robowaiter/robot/robot.py @@ -21,7 +21,8 @@ class Robot(object): self.bt = load_bt_from_ptml(self.scene, self.ptml_path,self.behavior_lib_path) sub_task_seq = find_node_by_name(self.bt.root,"SubTaskPlaceHolder").parent sub_task_seq.children.pop() - print(sub_task_seq.children) + self.scene.sub_task_seq = sub_task_seq + def step(self): if self.scene.time > self.next_response_time: diff --git a/zoo/navigator_DstarLite/__init__.py b/zoo/navigator_DstarLite/__init__.py new file mode 100644 index 0000000..1ceb32a --- /dev/null +++ b/zoo/navigator_DstarLite/__init__.py @@ -0,0 +1,4 @@ + +from . import navigate +from . import dstar_lite + diff --git a/zoo/navigator_DstarLite/dstar_lite.py b/zoo/navigator_DstarLite/dstar_lite.py new file mode 100644 index 0000000..6a83108 --- /dev/null +++ b/zoo/navigator_DstarLite/dstar_lite.py @@ -0,0 +1,510 @@ +''' +基于两个D* lite的实现 +按照原始算法的伪代码 +自己写的D* lite +''' + +import math +import queue +from functools import partial +import numpy as np +import heapq + +from matplotlib import pyplot as plt + + +def diagonal_distance(start, end): # + return max(abs(start[0] - end[0]), abs(start[1] - end[1])) + + +def manhattan_distance(start, end): # 曼哈顿距离 + return abs(start[0] - end[0]) + abs(start[1] - end[1]) + + +def euclidean_distance(start, end): # 欧式距离 + # return np.linalg.norm(start-end) + return math.sqrt((start[0] - end[0]) ** 2 + (start[1] - end[1]) ** 2) + + +def heuristic(start, end, name='euclidean'): + if name == 'diagonal': + return diagonal_distance(start, end) + elif name == 'euclidean': + return euclidean_distance(start, end) + elif name == 'manhattan': + return manhattan_distance(start, end) + else: + raise Exception('Error heuristic name!') + + +class Priority: + ''' + 优先级类 + ''' + + def __init__(self, k1, k2): + self.k1 = k1 + self.k2 = k2 + + def __lt__(self, other): # 定义对象的 < 运算 + return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 < other.k2) + + def __le__(self, other): # 定于对象的 <= 运算 + return self.k1 < other.k1 or (self.k1 == other.k1 and self.k2 <= other.k2) + + +class Node: + ''' + 节点类 + ''' + + def __init__(self, pos: (int, int), priority: Priority): + self.pos = pos # X Y + self.priority = priority + + def __lt__(self, other): # 定义对象的 < 运算 + return self.priority < other.priority + + def __le__(self, other): # 定于对象的 <= 运算 + return self.priority <= other.priority + + +class PriorityQueue: + def __init__(self): + self.queue = [] # 节点队列 + self.nodes = [] # 节点位置列表 + + def is_empty(self): + # 队列判空 + return len(self.queue) == 0 + + def top(self): + return self.queue[0].pos + + def top_key(self): + if self.is_empty(): + return Priority(float('inf'), float('inf')) + return self.queue[0].priority + + def pop(self): + # 取出第一个节点 + node = heapq.heappop(self.queue) + self.nodes.remove(node.pos) + return node + + def insert(self, pos, priority): + # 创建并添加节点 + node = Node(pos, priority) + heapq.heappush(self.queue, node) + self.nodes.append(pos) + + def remove(self, pos): + # 删除指定节点并重新排序 + self.queue = [n for n in self.queue if n.pos != pos] + heapq.heapify(self.queue) # 重排序 + self.nodes.remove(pos) + + def update(self, pos, priority): + # 更新指定位置的priority值 + for n in self.queue: + if n.pos == pos: + n.priority = priority + break + + +class DStarLite: + def __init__(self, + map: np.array([int, int]), # [X, Y] + area_range, # [x_min, x_max, y_min, y_max] 实际坐标范围 + scale_ratio=5, # 地图缩放率 + dyna_obs_radius=20 # dyna_obs实际身位半径 + ): + + # self.area_bounds = area + self.map = map + self.background = map + self.X = map.shape[0] + self.Y = map.shape[1] + (self.x_min, self.x_max, self.y_min, self.y_max) = area_range + self.scale_ratio = scale_ratio + self.dyna_obs_list = [] # 动态障碍物位置列表( 当前地图 ) [(x, y)] + self.dyna_obs_radius = math.ceil(dyna_obs_radius/scale_ratio) # dyna_obs缩放后身位半径 + + # free:0, obs:1, dyna_obs:2 + self.idx_to_object = { + 0: "free", + 1: "obstacle", + 2: "dynamic obstacle" + } + self.object_to_idx = dict(zip(self.idx_to_object.values(), self.idx_to_object.keys())) + self.object_to_cost = { + "free": 0, + "obstacle": float('inf'), + "dynamic obstacle": 50 + } + + self.compute_cost_map() + + self.s_start = None # (int,int) 必须是元组(元组可以直接当作矩阵索引) + self.s_goal = None # (int,int) + self.s_last = None # (int,int) + self.U = PriorityQueue() + self.k_m = 0 + self.rhs = np.ones((self.X, self.Y)) * np.inf + self.g = self.rhs.copy() + self.path = [] + + def set_map(self, map_): + ''' + 设置map 和 cost_map + ''' + self.map = map_ + self.X = map_.shape[0] + self.Y = map_.shape[1] + self.compute_cost_map() + + def reset(self): + ''' + (完成一次导航后) + 重置 1.环境变量 + 2.dstar_lite变量 + ''' + # env reset + self.map = self.background.copy() + self.compute_cost_map() + self.dyna_obs_list.clear() + self.path.clear() + # dstar_lite reset + self.s_start = None + self.s_goal = None + self.s_last = None + self.U = PriorityQueue() + self.k_m = 0 + self.rhs = np.ones((self.X, self.Y)) * np.inf + self.g = self.rhs.copy() + + def calculate_key(self, s: (int, int)): + ''' + 计算 位置s 的 key1, key2 + :returns: + Priority(k1, k2): 可比较的对象 + ''' + k1 = min(self.g[s], self.rhs[s]) + heuristic(self.s_start, s) + self.k_m + k2 = min(self.g[s], self.rhs[s]) + return Priority(k1, k2) + + def c(self, u: (int, int), v: (int, int), v_old=None) -> float: + ''' + 计算节点间的 路径代价 和 目标位置代价 (目标位置代价为0时采用路径代价) + (因为是终点->起点的扩展方向,因此v是node,u是v扩展的neighbor) + Args: + u: from pos + v: to pos + v_old: 指定的v的类型 + ''' + if v_old: + obj_cost = self.object_to_cost[v_old] + else: + obj_cost = self.cost_map[v] + if obj_cost > 0: + return obj_cost + return heuristic(u, v) + + def contain(self, u: (int, int)): + ''' + 判断 节点u 是否在队列中 + ''' + return u in self.U.nodes + + def update_vertex(self, u: (int, int)): + ''' + 判定节点状态, 更新队列 + 不一致且在队列 --> 更新key + 不一致且不在队列 --> 计算key并加入队列 + 一致且在队列 --> 移出队列 + ''' + if self.g[u] != self.rhs[u] and self.contain(u): + self.U.update(u, self.calculate_key(u)) + elif self.g[u] != self.rhs[u] and not self.contain(u): + self.U.insert(u, self.calculate_key(u)) + elif self.g[u] == self.rhs[u] and self.contain(u): + self.U.remove(u) + + def compute_shortest_path(self): + ''' + 计算最短路径 + ''' + while self.U.top_key() < self.calculate_key(self.s_start) or self.rhs[self.s_start] > self.g[self.s_start]: + u = self.U.top() + k_old = self.U.top_key() + k_new = self.calculate_key(u) + if k_old < k_new: + self.U.update(u, k_new) + elif self.g[u] > self.rhs[u]: # 过一致 + self.g[u] = self.rhs[u] + self.U.remove(u) + pred = self.get_neighbors(u) + for s in pred: + if s != self.s_goal: + self.rhs[s] = min(self.rhs[s], self.c(s, u) + self.g[u]) + self.update_vertex(s) + else: # 欠一致 + g_old = self.g[u] + self.g[u] = float('inf') + pred = self.get_neighbors(u) + for s in pred + [u]: + if self.rhs[s] == self.c(s, u) + g_old: + if s != self.s_goal: + succ = self.get_neighbors(s) + self.rhs[s] = min([self.c(s, s_) + self.g[s_] for s_ in succ]) + self.update_vertex(s) + + def _planning(self, s_start, s_goal, dyna_obs, step_num=None, debug=False): + ''' + 规划路径(实际实现) + Args: + dyna_obs: 动态障碍物位置列表 + step_num: 单次移动步数 + ''' + # 确保目标合法 + if not self.in_bounds_without_obstacle(s_goal): + return None + # 第一次规划需要初始化rhs并将goal加入队列,计算最短路径 + if self.s_goal is None: + self.s_start = tuple(s_start) + self.s_goal = tuple(s_goal) + self.s_last = tuple(s_start) + self.rhs[tuple(s_goal)] = 0 + self.U.insert(tuple(s_goal), Priority(k1=heuristic(tuple(s_start), tuple(s_goal)), k2=0)) + self.compute_shortest_path() # 计算最短路径 + self.path = self.get_path() + # 后续规划只更新起点,直接使用原路径(去掉已走过部分) + else: + self.s_start = tuple(s_start) + self.path = self.path[step_num:] + # 根据obs更新map, cost_map, edge_cost + changed_pos = self.update_map(dyna_obs=dyna_obs) + if changed_pos: + self.update_cost_map(changed_pos) + self.update_edge_costs(changed_pos) + # 若地图改变,重新计算最短路径 + self.compute_shortest_path() + self.path = self.get_path() + + # TODO: 误差抖动使robot没有到达路径上的点,导致新起点的rhs=∞,可能导致get_path失败 ( 当前版本没有该问题 ) + # assert (self.rhs[self.s_start] != float('inf')), "There is no known path!" + # self.path = self.get_path(step_num) + # # debug + # if debug: + # pass + return self.path + + def planning(self, s_start, s_goal, dyna_obs, step_num=None, debug=False): + ''' + 路径规划(供外部调用,处理实际坐标和地图坐标的转换) + ''' + # 实际坐标 -> 地图坐标 + s_start = self.real2map(s_start) + s_goal = self.real2map(s_goal) + for i in range(len(dyna_obs)): + dyna_obs[i] = self.real2map(dyna_obs[i]) + + self._planning(s_start, s_goal, dyna_obs, step_num, debug) + + # 地图坐标->实际坐标 + path = [self.map2real(node) for node in self.path] + return path + + def get_path(self): + ''' + 得到路径 + Args: + step_num: 路径步数 (None表示返回完整路径) + return: + path: [(x, y), ...] + ''' + if self.s_start is None or self.s_goal == self.s_start: + return [] + path = [] + cur = self.s_start + # if step_num is None: # 得到完整路径 + while cur != self.s_goal: + succ = self.get_neighbors(cur) + cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])] + path.append(cur) + # else: + # for i in range(step_num): + # if cur == self.s_goal: + # break + # succ = self.get_neighbors(cur) + # cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])] + # path.append(cur) + return path + + def in_bounds_without_obstacle(self, pos): + ''' + 判断位置在地图范围内 且 不是静态障碍物 + ''' + (x, y) = pos + return 0 <= x < self.X and 0 <= y < self.Y and self.map[x, y] != self.object_to_idx["obstacle"] + + def get_neighbors(self, pos, mode=8): + ''' + 获取邻居节点, 地图范围内 + ''' + (x_, y_) = pos + # results = [(x_+1,y_), (x_-1,y_), (x_, y_+1), (x_,y_-1)] + # if mode == 8: + neighbors = [(x_ + 1, y_), (x_ - 1, y_), (x_, y_ + 1), (x_, y_ - 1), (x_ + 1, y_ + 1), (x_ - 1, y_ + 1), + (x_ + 1, y_ - 1), (x_ - 1, y_ - 1)] + neighbors = filter(self.in_bounds_without_obstacle, neighbors) # 确保位置在地图范围内 且 不是静态障碍物 + return list(neighbors) + + def compute_cost_map(self): + # 计算当前地图的cost_map + self.cost_map = np.zeros(self.map.shape) + for idx, obj in self.idx_to_object.items(): + self.cost_map[self.map == idx] = self.object_to_cost[obj] + + def update_map(self, dyna_obs): + ''' + 更新地图 和 动态障碍物列表 + Args: + dyna_obs: 当前动态障碍物位置列表 [(x,y), ...] + return: + update_obj: 改变的位置列表 [(x, y, obj_idx, obj_idx_old), ...] + ''' + # dyna_obs没有变化 (集合set可以忽略元素在列表中的位置) + if set(dyna_obs) == set(self.dyna_obs_list): + return [] + + # 新旧dyna_obs占用位置列表 + old_obs_occupy = [] + new_obs_occupy = [] + for pos in self.dyna_obs_list: + old_obs_occupy.extend(self.get_occupy_pos(pos)) + for pos in dyna_obs: + new_obs_occupy.extend(self.get_occupy_pos(pos)) + old_obs_occupy = [pos for i, pos in enumerate(old_obs_occupy) if pos not in old_obs_occupy[:i]] # 去除重复位置 + new_obs_occupy = [pos for i, pos in enumerate(new_obs_occupy) if pos not in new_obs_occupy[:i]] # 去除重复位置 + + # 转变为free 和 转变为obs的位置列表 + changed_free = [pos for pos in old_obs_occupy if pos not in new_obs_occupy] + changed_obs = [pos for pos in new_obs_occupy if pos not in old_obs_occupy] + + # 更新地图,计算changed_pos + changed_pos = [] + for (x, y) in changed_free: + self.map[x, y] = self.object_to_idx['free'] + changed_pos.append((x, y, self.object_to_idx["free"], self.object_to_idx["dynamic obstacle"])) + for (x, y) in changed_obs: + self.map[x, y] = self.object_to_idx['dynamic obstacle'] + changed_pos.append((x, y, self.object_to_idx["dynamic obstacle"], self.object_to_idx["free"])) + + # 更新动态障碍物列表 + self.dyna_obs_list = dyna_obs + + return changed_pos + + def get_occupy_pos(self, obs_pos): + ''' + 根据dyna_obs中心位置,计算其占用的所有网格位置 + ''' + (x, y) = obs_pos + occupy_pos = [] + for i in range(x-self.dyna_obs_radius, x+self.dyna_obs_radius+1): + for j in range(y-self.dyna_obs_radius, y+self.dyna_obs_radius+1): + occupy_pos.append((i, j)) + occupy_pos = filter(self.in_bounds_without_obstacle, occupy_pos) # 确保位置在地图范围内 且 不是静态障碍物 + return list(occupy_pos) + + def update_cost_map(self, changed_pos): + ''' + 更新cost_map + Args: + changed_pos: 改变的位置列表 [x, y, idx] + ''' + for (x, y, idx, _) in changed_pos: + self.cost_map[x, y] = self.object_to_cost[self.idx_to_object[idx]] + + def update_edge_costs(self, changed_pos): + ''' + 重新计算edge_cost,更新受影响节点的rhs + Args: + changed_pos: 改变的位置列表 [(x, y, obj_idx_new, obj_idx_old)] + ''' + if not changed_pos: return + self.k_m += heuristic(self.s_last, self.s_start, name='euclidean') # 使用欧式距离 更新km + self.s_last = self.s_start + for pos in changed_pos: + v = (pos[0], pos[1]) # to pos + v_old = self.idx_to_object[pos[3]] # 位置v的旧类型 + pred_v = self.get_neighbors(v) + for u in pred_v: + c_old = self.c(u, v, v_old=v_old) + c_new = self.c(u, v) + if c_old > c_new and u != self.s_goal: + self.rhs[u] = min(self.rhs[u], self.c(u, v) + self.g[v]) + elif self.rhs[u] == c_old + self.g[v] and u != self.s_goal: + succ_u = self.get_neighbors(u) + self.rhs[u] = min([self.c(u, s_) + self.g[s_] for s_ in succ_u]) + self.update_vertex(u) + + def map2real(self, pos): + ''' + 地图坐标->实际坐标 + ''' + x = pos[0] * self.scale_ratio + self.x_min + y = pos[1] * self.scale_ratio + self.y_min + return tuple((x, y)) + + def real2map(self, pos, approximation='round'): + ''' + 实际坐标->地图坐标 + ''' + if approximation == 'round': # 四舍五入 + x = round((pos[0] - self.x_min) / self.scale_ratio) + y = round((pos[1] - self.y_min) / self.scale_ratio) + elif approximation == 'low': # 向下取整 + x = math.floor((pos[0] - self.x_min) / self.scale_ratio) + y = math.floor((pos[1] - self.y_min) / self.scale_ratio) + else: + raise Exception("Wrong approximation!") + return tuple((x, y)) + + def draw_graph(self, step_num): + # 清空当前figure内容,保留figure对象 + plt.clf() + # for stopping simulation with the esc key. 按esc结束 + plt.gcf().canvas.mpl_connect( + 'key_release_event', + lambda event: [exit(0) if event.key == 'escape' else None]) + + # 缩放坐标偏移量 + offset = (self.x_min / self.scale_ratio, self.x_max / self.scale_ratio, + self.y_min / self.scale_ratio, self.y_max / self.scale_ratio,) + + # 画地图: X行Y列,第一行在下面 + # 范围: 横向Y[-80,290] 纵向X[-70,120] + plt.imshow(self.map, cmap='binary', alpha=0.5, origin='lower', + extent=(offset[2], offset[3], + offset[0], offset[1])) + # 画起点和目标 + plt.plot(self.s_start[1] + offset[2], self.s_start[0] + offset[0], "xr") + plt.plot(self.s_goal[1] + offset[2], self.s_goal[0] + offset[0], "xr") + + # 画搜索路径 + plt.plot([y + offset[2] for (x, y) in self.path], + [x + offset[0] for (x, y) in self.path], "-g") + + # 画移动路径 + next_step = min(step_num, len(self.path)) + # plt.plot([self.s_start[1] + offset[2], self.path[next_step-1][1] + offset[2]], + # [self.s_start[0] + offset[0], self.path[next_step-1][0] + offset[0]], "-r") + plt.plot([y + offset[2] for (x, y) in self.path[:next_step]], + [x + offset[0] for (x, y) in self.path[:next_step]], "-r") + + plt.xlabel('y', loc='right') + plt.ylabel('x', loc='top') + plt.grid(color='black', linestyle='-', linewidth=0.5) + plt.pause(0.3) diff --git a/zoo/navigator_DstarLite/map_5.pkl b/zoo/navigator_DstarLite/map_5.pkl new file mode 100644 index 0000000000000000000000000000000000000000..7b49370226f1770a974c60a3578f1c44c62dea35 GIT binary patch literal 562562 zcmeI!&8{p*QowPW!GZ-_-hq|ZERc|3hlLls02+G+fy8)57kl9y3%*A3ZqB?8&%z_{ z20%_wx;}kxpUSGr%*v17U1Z zKmWyd|M=}c{Pf+spML-H!yongB=kXt}I3xl*-~kUl?1Ql4 z!_W1Le$g-b!W=LM%mH)2954sWf%1JRB9YcKJN!~8Tq%};Z{954sW0dv3{m}w3~HTPHE z^L{4p`OIfN`#on4m;>g3Indi2h^~CT_7<;r%u#dH95n~b0dv3{FbB+mndU%L^ZCkm zzR%=4pZUyZzvIjSbHE%h2h0I;z#K3K%mH)2954sW0dv3{FbB*5bHE%h2h0I;z#K3K z%mH)299VA-g!Sv?ZQhx8=AFJb2h0I;z#K3K%z+-}KvZFWw<>o*=d z;b?sJ2VWd;NCbGm10LuDbHE%h2g=WZu&Uge+o$DjJaY7u&Dg{tUt@&wC0J= z`1(A2zyXIufCoI_fj%$?%z+W+Kvb#x`sjc88=RO3JaYCvyubkm{b4?s1LlA^U=Ea? z1JPCeuch<8>%9I+zmD)OPVS95;DAFSzylueKp&U`_soH~b=`Fp??$QcD)o@(J@XEC z+!FyF@PG&UAa@Q#w^&OP$mKulzS2Bvoi{6gd4(}J;E)LLfCoI#2WRI%bdPlu0%z;I z%5%NeaTWgZ30f$6@2Wxr|S+`q9_pCOlyZO}4Z`Si4 z>o#hhk9l|>l|0L#7Z1+H@40LB#}B_mfCoIt@gOL*TSxV>Hlg0R)Xx9z^Gox;JD;=n zm*%kgd1vcT>$%nIA^$K72OJUs9`xZsL~-9biU+j`jl$tsoVEGnp8qVqyFbs3zn6V? z^P2U3^$h~E)>mHYn+WiL2WLD8YWJZ2!&*M5cinp9i*F*p10Hs0#r)%eRd48j42M1TivJ&4=JSzGzh zib$(`+|#zNa(TD%?CL$a`Pf&+v#a$g{k*!CsVj~+CIUR*!N)uZD&wxDy>3OKbllF1 z%8kaMm2X$?8I9}c@m{T0t>;JQD*iA72OJUs9`xZsL^02L)vHz{uEy#4KKiG09a{N! z^`6ps*FJAA-`e+IJ%2C$*56;v$MaD!9B@bkc))`oF!!44c`FLJ@z}R!f41_l^gVa^ zI!hk8{7UD0^}O7;_x5dZ?CtY?kir3nM1Ti#co12vvp(~-6@h2>`+SamDpki;KGweH zDv#RnJv;wu+_j&77Wdts=jOc^XYM)n1xXxmNCbE=wg(ZtwG{#VGIqbTu3sxJTivtq zYjxk%d$xFceZ87rt>-shXV2fB_ZkFoz#$Rf!JHmMR@PPo^wFGsRJ*RN{B+OT1Zvm$ z?D=m;J?^(>d3Y{rjsp&f01w9SAfmL^CZHe2=!ahGRNGJIA0yCMS!<; zyj`7ZEnl5`j=)_$o{M(er|#nH-f#v791;N@)b$`vx0WKn*Sfy0&bO4O&M8Zvm6v6s z9_O@Hd{@6`Gh%fP@(&YmxQD~mhhDD{;O{;DuEzbErJk-upq7WV;vMDOT3nqQ25cOj z`LKQO^))X%@JIw|c@T81M+wxbhjXJ6IK1X?)KB}{5UAy8o9O$$r50c3M(sBadnRoA zyuRj!2Ofz)X%B*`b&o)4?#>J1IK0ha&r9FmOQ4j$_h!-G?WMRnCu;kQ!_mq=JICj5 zbH)dsL?G9Lpk;j|kju}$AcsR84v$`X-xqba<8E{EL8=bmTZG5&br zl?Xg~5LB#>T)Yn_aJYwqjvRx)(f$}CtNWuo%GLg;SuTg%3g@0@-!cAp;gtwH^C0M~ zw?5-`9p$iB;iKo-f36O=;g$$&9s~{R#=~p4 zfWti;bRq%>yeD7|1&h)g-V=WPzwMsw)9wBCx_!RAZl8M%A~@iX2%PmG=vns&oaO7e zAdAD>9QOS4JpwBeFh8RDS2;X$=<(cb-?PuR?;l?u_j@0NaKIrE*z+JLS+5e<QxY|#iFH1n*&E0opZE+d`2%I7C zY|b7%>)AQIef})3+&G^-U+eSSxa^Ax;4lY=TK(kQIt29HSbbN=0;eK?K+gnvnbXf0 z9gXkVIjufF8n69P863vq(5j!@(}sZl>!bhLRNE~GATR=fyXNvUHo5UUd%l;?bMvq- zDuzSf9D3<1akL?zAM5sGn<}~m0R%=SP z{dufE`>2{I5I|rC0$1m6E`!{Aa?k5^Uv8fEMFnxVheNOZBd)#)=-0mMBRT{Sn1{g8 zd7GPg?zy?=^}a7RU;CnxIJD)^d;f@UOaim_@t9Q+H3A5XMBwNg&CTuTx!eBS^LyX7 z<>NIfwK|8**Sk4vad<@l0R#|00D=1n9Q|F88}ZR|ANS|pGs?cl{Jf9Kt;S*V^{%|P z`@JH700IagfIvS4a(@>bjX3w*XZw!cH_HBJ`T9I6I4g%oKXdbZd~SQ6`~2g+?Rl>V zAbV=kGn|djt?b z009ILXis4KyItz$?YvU2rQ+)KoKl>e6BX`-!=7)saqgYtdjt?b009ILKwu+~`}^YP zzU}NnRveD{nH$&9bL~d}0R#|00D<)gR> zx^F$Qk?~b-p0CUCte3fUIeWh65I_I{1Q0-A9Rj((D@yHKhwST2OYw6~a4p5*b(Qkp z<=&rvqMZmJfB*srAh0Td+}{9p!xWKF=e700IagfWW*2j{dHw zwSQj1AJ427N9P9bXB<9WjpLs_JOA9;d#&3Co?-ll*2+dY2APxJ%=2q1s}0tl>1 zVDEQDFW;|HpI&&+7Ds7+p7AMtkMj^f009ILKwy3X&wf|*`uY47i41$?KWbd1z1cXF zKG%5&Ab_NAb_oLTk*7!&DN!)7^n6(e(6+YvDLn5%c2VucH>*)2IrxLF-8NDvE#{arX-XYM2 zz^wfwukZ&491;N@obw=_nrEHGgLx`)&dXWq*n1wcc_yE>1bX*deCBT=zyltPE-CnbGSpYy7i$CZCTJ znAKl-MZIyrAraug*d9dm^RA=&Ia_5$ug7fh&*qnWUK5zjSNTMJalj!F;K5uTL^kuU zvv@m8rDmzeEP2f0l{{V(n8i#U>uJWHiUufr_y&*GCjUK5zb zPkBULalj!F;KAG;M0V?~qkBC|#YV5g=<(0ukvxLH=st>H9TNc_@L**RqWbHtqxbzR z6&t+{qsKpsN2}*Cx`*NqgK@wi5#Yhv9z^!*t+RMPdgW%R!)SSo?v40^z-ZoySKSf; z9`Jw%ANE05VI6%wj9$Uf`HvR==)SBT|7gC6H;lyrheUt}Jb254pkp1qPe!lc==?{E ze{^3~jej)H#2cpKfI}j{10KBPLC~>o1ZL@%EuT?dXW>3d9;0|6&esG+@k^XhHym(C z1bD!MBOU}T>qcOf{@L>B{dE@JqvX-M58``GU=**!8Fj+}heUt}JUHV)P_sTGFnV7- z%d3~qN9Q|g{Jqp)9B&gC)h}^JU2(u65#Rw2j(8BXtj`FH-cQf+y8H9dxsDqD-F0_= z9)VH)5_i-W2OJUs9`N9d2SLsHjKC~?^(?>EpU=W`v^-kZ*?q?djOLSgqrN!ckO=UA z2PHfRiq<^>v-I0up0&T9h2yAs)UL1d&k-2aCviu;alj!F-~kUxcn}n=xdcYWIc7w_8=n8hP`gf}?gkO=UA2Q@tiy4I@%X6aKLf`EEnHDT@NXYofKbIk+wRL?|! z2Yq@FQCUk7n8ow6JZ8y%mOReZ(R0@b%;Jwc!UG&|NCbGmgBBhHWosz{v-Isz9<$^< zOCCq-Xa6+v+ggjj=>43_W%N8}i$Aw+_O&H2n`iO~e{jGd z5#Rw2N_Y?yt+fc~(;$mO8xFNRcJ93d^iP}q!5Qa7fCoId(}SRItwlh82U#5IaH!?2 zbLSwS@9Oj&uDB)wJm5h|4}zw(HUWJbgmEa#p|-cqUz>n_E!(en;+Y8WfCq1R5Ol1q z2pHtH92$kge8$TN z0R#|00DLz54n->UWmIR{gWjKR&me=k+y#jnnH-`+2v;=QV1A0}hD*5Bl;T zqM3JH#kXE6xbf~Kp5EiwczKOVmEy2h_oMghoj3dUxx9MikQ={!Q9&GVNCbG$mj@Be z`0FZO)vnZ0jzx~C?R|H1aDP-72OJUs9`xx!M0MZF zi?&|eSLwcP=3N=@`l@O_(e&b1FGPCbA&xSAk0XwW01xVT5U1X^s%xFPKI3{{rTe(~ z85f`DdDur4`;DR(kM1Va3kPxJ^)*g7B?3IS*MqqI{*?!9J-EN(Z6n~Ut2T-?yr7-TFTAy|2>u-29lo_icUK<9&}6C5 zWq9nQyYh8!)CmV15&<5J?LkC;-^!o1{@hpTHaGLBtxsopx3Qu8Eobv>^<4Rk^G7@L*04BCGpWUbXS+z6!UwnMZ9rD#f{t73FRzm3OUk%4M2&Qms6k8~ky= zAraugnjS>f_pQ8ZQ95qtMICU!ArasK4?gBWP-(yNv5k-I zD_iD9ezftUHRm#RTtB6C-lgulp84A+l#0_i!5Ie}5&<6Y;D`r7tNqH;x}LVLXqg*% zQP+#R`IfQd_$hbiUF!bhndd!GDn91~Zyaz)1bD!MGadxBb}LWYc-pR_Wp3g{8!ztW zT*j9CQ|`{!{cCuw&gz^9@PG#;JP3;IR^Hb2wp}I5-o%T#Ui8AZtTi8=)=Qq^Si@&^ zR_8>32Rtb0LC|cs^0%(P?J8OJCSKI_q8Gkpt$BZ1FZqdM4Ug4Xof82b@SvmzL9_kJ z<2oLMD-8`&NC`Hxb}L8xP`^_N)5Vsc-vAmc5Y|b-d_> zYbjfL$wwS>`>MX`n+WiL2Q57a+U-_;*Y&$yCF|V8kGg)`&G)P|cgOAixxG|h^-TnL zz=Iwtbxj0#z=IJyh$_`z^+TP0s9(9ZH}Is6CwFswwCL`5+&`yh>Z-1Z01tRDvIkMU z`l~)E+b8uacFzqwDeFluJU5$qiANlBdZw=Gnh5ZK2P1nB)vLeild^qMzhd{?z>~6` z;E87Lvx;+P2VfCoI#2j;+7b0DIvxANhd z5A`b3=gEAz<_muKB?3I)0T1+nIWXoNh^XtWyg24Xy-JL6GGC7Qf)`$i01tS;1ASl) zj6DY;`uZzR@;s?ui7{^AOP(({;gksQfCoI#2j;*Wb0D&)zw+hdzSJi_<_&!LxG%Wi zk_hmC2RzUR=D?hDAhN2z@~1xInQq`qR1*gr5&<6YfCu`(99UxxM3(i|)jg_5cJ7l` z_eTEVHx4)?0zBXW5A?w*b0E6C-nt6M$2{v1UE}0Op58~@alj!F-~kVKpbyM}Ip;uR zRc~Fzn|cJ-Jb4vwBV&`rMo| z2h0I;;KMl(R+L|7>$mcRocI=j+5C}DR2v5z5&<6YfCu`(9QgPgh)>I}v+yoY#)&xu zX7NWJQEeP>NCbGm10LuDbKvbc5Z{(xN9A6ggcHvY7}XzfN40UlArasK4|t#t%zk%$-<*z$pHRGpdaP4v7E{c)$aFU=CcH1MzgZwfB0I%hX9V3H0uf_@eqa z;E)LLfCoI#2j;-FIS@}<5$KbE{*JoffI}j{10L`|AD9Dk&wv1k9hP3l2CW0zBXW z5A=aKF!vmY?5qg%Nx=Mxy5N9ABESP4@IW7!19Q)T$j*vDp9IXGs0$7_Bmz9(0T1+n zIWYGei0rHg^hv<{iMrr`Ln6Qf9`HaPm;-aqfymB^K%WH6pQsBCI3xl*-~kWxfjO|| z9Ehy*SDxJCNq&XK+R3MTe8L^~M1Ti8;DJ6c2l|);5ykUWo%*QLR)^=68ta=aAFqA+ zr5@^$2=IUhJkST`K-)PGx4mESZ_EEE@As7&`+rCCw!d#L)kl320Uq#x2l~JqFb7te z15xd9SN*DASL#>3^PTU0rCy4q`dfeNZ*#yLSa}Xa_4{A- zsXkq~Px;S({`bHE%h2h0I;V1+plRqk)qfBJ8Q{^L2%dG7b7IbaT$1LnXs2cjGLS+}^nnp5VK zIb{x*1LlA^U=ElAE6jnYazCrS(|0TM9nX2rbH6jq0dv3{Fb5vzKy<}@>tj6Ln^WeL zIb{x*1LlA^U=ElAE6stZ_I;~<({C&F8{hfPcfT{u0dv3{FbBq;1Hb(&I{A~AZ(qLp G{C@$1cKMJ1 literal 0 HcmV?d00001 diff --git a/zoo/navigator_DstarLite/navigate.py b/zoo/navigator_DstarLite/navigate.py new file mode 100644 index 0000000..bbb0a16 --- /dev/null +++ b/zoo/navigator_DstarLite/navigate.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- +import math +import sys +import time + +import matplotlib.pyplot as plt +import numpy as np +from mpl_toolkits.axes_grid1 import make_axes_locatable +from scene_utils import Scene + + +from dstar_lite import DStarLite + +class Navigator: + ''' + 导航类 + ''' + + def __init__(self, scene, area_range, map, scale_ratio=5): + self.scene = scene + self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax + self.map = map # 缩放并离散化的地图 array(X,Y) + self.scale_ratio = scale_ratio # 地图缩放率 + self.step_length = 50 # 步长(单次移动) + self.step_num = self.step_length // self.scale_ratio # 单次移动地图格数 + self.v = 200 # 速度 + self.step_time = self.step_length/self.v + 0.1 # 单步移动时长 + + + self.planner = DStarLite(area_range=area_range, map=map, scale_ratio=scale_ratio) + + + @staticmethod + def is_reached(pos: np.array((float, float)), goal: np.array((float, float)), dis_limit=25): + ''' + 判断是否到达目标 + ''' + dis = np.linalg.norm(pos - goal) + return dis < dis_limit + + def reset_goal(self, goal:(float, float)): + # TODO: 使目标可达 + # 目标在障碍物上:从目标开始方形向外扩展,直到找到可行点 + # 目标在地图外面:起点和目标连线最靠近目标的可行点 + pass + + def navigate(self, goal: (float, float), animation=True): + ''' + 单次导航,直到到达目标 + ''' + + pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) # 机器人当前: 位置 和 朝向 + yaw = self.scene.status.rotation.Yaw + print('------------------navigation_start----------------------') + while not self.is_reached(pos, goal): + dyna_obs = [(walker.pose.X, walker.pose.Y) for walker in self.scene.status.walkers] # 动态障碍物(顾客)位置列表 + path = self.planner.planning(pos, goal, dyna_obs, step_num=self.step_num) + if path: + next_step = min(self.step_num, len(path)) + (next_x, next_y) = path[next_step-1] + print('plan pos:', (next_x, next_y), end=' ') + self.scene.walk_to(next_x, next_y, yaw, velocity=self.v) + if animation: + self.planner.draw_graph(self.step_num) # 画出搜索路径 + time.sleep(self.step_time) + pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) + print('reach pos:', pos) + + self.planner.reset() # 完成一轮导航,重置变量 + + if self.is_reached(pos, goal): + print('The robot has achieved goal !!') + + + + + diff --git a/zoo/navigator_DstarLite/readme.md b/zoo/navigator_DstarLite/readme.md new file mode 100644 index 0000000..3a77674 --- /dev/null +++ b/zoo/navigator_DstarLite/readme.md @@ -0,0 +1,4 @@ +### dstar_lite.py ——Dstar lite算法文件 +### navigate.py ——导航类 +### test.py ——测试文件 +### map_5.pkl ——离散化地图文件 \ No newline at end of file diff --git a/zoo/navigator_DstarLite/test.py b/zoo/navigator_DstarLite/test.py new file mode 100644 index 0000000..66ef1e7 --- /dev/null +++ b/zoo/navigator_DstarLite/test.py @@ -0,0 +1,72 @@ +import os +import pickle +import time +import random + +import matplotlib.pyplot as plt +import numpy as np + +from scene_utils import Scene # TODO: 文件名改成Scene.py +from navigate import Navigator + + + + +if __name__ == '__main__': + + file_name = 'map_5.pkl' + if os.path.exists(file_name): + with open(file_name, 'rb') as file: + map = pickle.load(file) + + Scene.init_world(1, 3) + scene = Scene.Scene(sceneID=0) + + navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map) + + '''场景1: 无行人环境 robot到达指定目标''' + # goal = np.array((-100, 700)) + + + '''场景2: 静止行人环境 robot到达指定目标''' + # scene.clean_walker() + # scene.add_walker(50, 300, 0) + # scene.add_walker(-50, 500, 0) + # goal = np.array((-100, 700)) + + '''场景3: 移动行人环境 robot到达指定目标''' + scene.clean_walker() + scene.add_walker(50, 300, 0) + scene.add_walker(-50, 500, 0) + scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=False, speed=20, X=-50, Y=600, Yaw=0)]) + scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=False, speed=20, X=100, Y=150, Yaw=0)]) + goal = np.array((-100, 700)) + + '''场景4: 行人自由移动 robot到达指定目标''' + # # TODO: autowalk=True仿真器会闪退 ??? + # scene.clean_walker() + # scene.add_walker(50, 300, 0) + # scene.add_walker(-50, 500, 0) + # scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=True, speed=20, X=0, Y=0, Yaw=0)]) + # scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=True, speed=20, X=0, Y=0, Yaw=0)]) + # time.sleep(5) + # goal = np.array((-100, 700)) + + navigator.navigate(goal, animation=False) + + scene.clean_walker() + print(scene.status.collision) # TODO: 不显示碰撞信息 ??? + + + + + + + + + + + + + +