From 4250bfbefb77ef579cc1af82be99f929c7f8e999 Mon Sep 17 00:00:00 2001 From: Caiyishuai <39987654+Caiyishuai@users.noreply.github.com> Date: Wed, 15 Nov 2023 15:40:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BA=86=20VLN=20=E7=9A=84?= =?UTF-8?q?=E8=BD=AC=E5=90=91=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- robowaiter/algos/navigate/dstar_lite.py | 145 +++++++++++++++--------- robowaiter/algos/navigate/navigate.py | 92 ++++++++++++--- robowaiter/algos/navigate/test.py | 2 +- robowaiter/scene/tasks/VLM.py | 4 +- 4 files changed, 174 insertions(+), 69 deletions(-) diff --git a/robowaiter/algos/navigate/dstar_lite.py b/robowaiter/algos/navigate/dstar_lite.py index 65b3986..9cb67c4 100644 --- a/robowaiter/algos/navigate/dstar_lite.py +++ b/robowaiter/algos/navigate/dstar_lite.py @@ -123,13 +123,15 @@ class DStarLite: # self.area_bounds = area self.map = map - self.background = map + self.background = map.copy() self.X = map.shape[0] self.Y = map.shape[1] (self.x_min, self.x_max, self.y_min, self.y_max) = area_range self.scale_ratio = scale_ratio - self.dyna_obs_list = [] # 动态障碍物位置列表( 当前地图 ) [(x, y)] - self.dyna_obs_radius = math.ceil(dyna_obs_radius / scale_ratio) # dyna_obs缩放后身位半径 + + self.dyna_obs_list = [] # dyna_obs位置列表 [(x, y)] + self.dyna_obs_radius = math.floor(dyna_obs_radius / scale_ratio) # dyna_obs缩放后身位半径 + self.dyna_obs_occupy = [] # dyna_obs占用位置列表 # free:0, obs:1, dyna_obs:2 self.idx_to_object = { @@ -143,7 +145,8 @@ class DStarLite: "obstacle": float('inf'), "dynamic obstacle": 100 } - + self.cost_map = np.zeros_like(self.map) + self.cost_background = self.cost_map.copy() self.compute_cost_map() self.s_start = None # (int,int) 必须是元组(元组可以直接当作矩阵索引) @@ -155,14 +158,14 @@ class DStarLite: self.g = self.rhs.copy() self.path = [] - def set_map(self, map_): - ''' - 设置map 和 cost_map - ''' - self.map = map_ - self.X = map_.shape[0] - self.Y = map_.shape[1] - self.compute_cost_map() + # def set_map(self, map_): + # ''' + # 设置map 和 cost_map + # ''' + # self.map = map_ + # self.X = map_.shape[0] + # self.Y = map_.shape[1] + # self.compute_cost_map() def reset(self): ''' @@ -172,8 +175,9 @@ class DStarLite: ''' # env reset self.map = self.background.copy() - self.compute_cost_map() + self.cost_map = self.cost_background.copy() self.dyna_obs_list.clear() + self.dyna_obs_occupy.clear() self.path.clear() # dstar_lite reset self.s_start = None @@ -277,27 +281,29 @@ class DStarLite: self.s_last = tuple(s_start) self.rhs[tuple(s_goal)] = 0 self.U.insert(tuple(s_goal), Priority(k1=heuristic(tuple(s_start), tuple(s_goal)), k2=0)) + changed_pos = self.update_map(dyna_obs=dyna_obs) + if changed_pos: + self.update_cost_map(changed_pos) self.compute_shortest_path() # 计算最短路径 self.path = self.get_path() + return self.path # 后续规划只更新起点,直接使用原路径(去掉已走过部分) else: self.s_start = tuple(s_start) - # 根据obs更新map, cost_map, edge_cost - changed_pos = self.update_map(dyna_obs=dyna_obs) - if changed_pos: - self.update_cost_map(changed_pos) - self.update_edge_costs(changed_pos) - # 若地图改变,重新计算最短路径 - self.compute_shortest_path() - self.path = self.get_path() - + # 根据obs更新map, cost_map, edge_cost + changed_pos = self.update_map(dyna_obs=dyna_obs) + if changed_pos: + self.update_cost_map(changed_pos) + self.update_edge_costs(changed_pos) + # 若地图改变,重新计算最短路径 + self.compute_shortest_path() + self.path = self.get_path() + return self.path # TODO: 误差抖动使robot没有到达路径上的点,导致新起点的rhs=∞,可能导致get_path失败 ( 当前版本没有该问题 ) # assert (self.rhs[self.s_start] != float('inf')), "There is no known path!" - # self.path = self.get_path(step_num) # # debug # if debug: # pass - return self.path def planning(self, s_start, s_goal, dyna_obs, debug=False): ''' @@ -306,8 +312,10 @@ class DStarLite: # 实际坐标 -> 地图坐标 s_start = self.real2map(s_start) s_goal = self.real2map(s_goal) - dyna_obs = [self.real2map(obs) for obs in dyna_obs] + dyna_obs = [self.real2map(obs, reachable_assurance=False) for obs in dyna_obs] + self._planning(s_start, s_goal, dyna_obs, debug) + # 地图坐标->实际坐标 path = [self.map2real(node) for node in self.path] return path @@ -358,10 +366,20 @@ class DStarLite: def compute_cost_map(self): # 计算当前地图的cost_map - self.cost_map = np.zeros(self.map.shape) for idx, obj in self.idx_to_object.items(): self.cost_map[self.map == idx] = self.object_to_cost[obj] + # # TODO + # for x in range(self.X): + # for y in range(self.Y): + # if self.cost_map[x, y] > 0: + # neighbors = self.get_neighbors((x, y)) + # for (x_, y_) in neighbors: + # self.cost_map[x_, y_] = max(self.cost_map[x_, y_], self.cost_map[x, y] - 10) + + + self.cost_background = self.cost_map.copy() + def update_map(self, dyna_obs): ''' 更新地图 和 动态障碍物列表 @@ -374,19 +392,28 @@ class DStarLite: if set(dyna_obs) == set(self.dyna_obs_list): return [] - # 新旧dyna_obs占用位置列表 - old_obs_occupy = [] - new_obs_occupy = [] - for pos in self.dyna_obs_list: - old_obs_occupy.extend(self.get_occupy_pos(pos)) + # 当前dyna_obs占用位置列表 + dyna_obs_occupy = [] for pos in dyna_obs: - new_obs_occupy.extend(self.get_occupy_pos(pos)) - old_obs_occupy = [pos for i, pos in enumerate(old_obs_occupy) if pos not in old_obs_occupy[:i]] # 去除重复位置 - new_obs_occupy = [pos for i, pos in enumerate(new_obs_occupy) if pos not in new_obs_occupy[:i]] # 去除重复位置 - + dyna_obs_occupy.extend(self.get_occupy_pos(pos)) + dyna_obs_occupy = [pos for i, pos in enumerate(dyna_obs_occupy) if pos not in dyna_obs_occupy[:i]] # 去除重复位置 # 转变为free 和 转变为obs的位置列表 - changed_free = [pos for pos in old_obs_occupy if pos not in new_obs_occupy] - changed_obs = [pos for pos in new_obs_occupy if pos not in old_obs_occupy] + changed_free = [pos for pos in self.dyna_obs_occupy if pos not in dyna_obs_occupy] + changed_obs = [pos for pos in dyna_obs_occupy if pos not in self.dyna_obs_occupy] + + # # 新旧dyna_obs占用位置列表 + # old_obs_occupy = [] + # new_obs_occupy = [] + # for pos in self.dyna_obs_list: + # old_obs_occupy.extend(self.get_occupy_pos(pos)) + # for pos in dyna_obs: + # new_obs_occupy.extend(self.get_occupy_pos(pos)) + # old_obs_occupy = [pos for i, pos in enumerate(old_obs_occupy) if pos not in old_obs_occupy[:i]] # 去除重复位置 + # new_obs_occupy = [pos for i, pos in enumerate(new_obs_occupy) if pos not in new_obs_occupy[:i]] # 去除重复位置 + # + # # 转变为free 和 转变为obs的位置列表 + # changed_free = [pos for pos in old_obs_occupy if pos not in new_obs_occupy] + # changed_obs = [pos for pos in new_obs_occupy if pos not in old_obs_occupy] # 更新地图,计算changed_pos changed_pos = [] @@ -397,22 +424,26 @@ class DStarLite: self.map[x, y] = self.object_to_idx['dynamic obstacle'] changed_pos.append((x, y, self.object_to_idx["dynamic obstacle"], self.object_to_idx["free"])) - # 更新动态障碍物列表 + # 更新dyna_obs 位置列表 和 占用位置列表 self.dyna_obs_list = dyna_obs + self.dyna_obs_occupy = dyna_obs_occupy return changed_pos + + def get_occupy_pos(self, obs_pos): ''' 根据dyna_obs中心位置,计算其占用的所有网格位置 ''' (x, y) = obs_pos - # for i in range(x - self.dyna_obs_radius, x + self.dyna_obs_radius + 1): + occupy_radius = min(self.dyna_obs_radius, int(euclidean_distance(obs_pos, self.s_start) - 1)) # 避免robot被dyna_obs的占用区域包裹住 + # for i in range(x - self.dyna_obs_radius, x + self.dyna_obs_radius + 1): # 方形区域 # for j in range(y - self.dyna_obs_radius, y + self.dyna_obs_radius + 1): # occupy_pos.append((i, j)) - occupy_pos = [(i, j) for i in range(x - self.dyna_obs_radius, x + self.dyna_obs_radius + 1) - for j in range(y - self.dyna_obs_radius, y + self.dyna_obs_radius + 1) - if euclidean_distance((i, j), obs_pos) < self.dyna_obs_radius] + occupy_pos = [(i, j) for i in range(x - occupy_radius, x + occupy_radius + 1) # 圆形区域 + for j in range(y - occupy_radius, y + occupy_radius + 1) + if euclidean_distance((i, j), obs_pos) < occupy_radius] occupy_pos = filter(self.in_bounds_without_obstacle, occupy_pos) # 确保位置在地图范围内 且 不是静态障碍物 return list(occupy_pos) @@ -424,7 +455,10 @@ class DStarLite: changed_pos: 改变的位置列表 [x, y, idx] ''' for (x, y, idx, _) in changed_pos: - self.cost_map[x, y] = self.object_to_cost[self.idx_to_object[idx]] + if self.idx_to_object[idx] == 'free': # 空位的cost取决于离障碍物的距离 + self.cost_map[x, y] = self.cost_background[x, y] + else: + self.cost_map[x, y] = self.object_to_cost[self.idx_to_object[idx]] def update_edge_costs(self, changed_pos): ''' @@ -457,19 +491,26 @@ class DStarLite: y = pos[1] * self.scale_ratio + self.y_min return tuple((x, y)) - def real2map(self, pos, approximation='round'): + def real2map(self, pos, reachable_assurance=True): ''' 实际坐标->地图坐标 ''' - if approximation == 'round': # 四舍五入 - x = round((pos[0] - self.x_min) / self.scale_ratio) - y = round((pos[1] - self.y_min) / self.scale_ratio) - elif approximation == 'low': # 向下取整 - x = math.floor((pos[0] - self.x_min) / self.scale_ratio) - y = math.floor((pos[1] - self.y_min) / self.scale_ratio) + x = round((pos[0] - self.x_min) / self.scale_ratio) + y = round((pos[1] - self.y_min) / self.scale_ratio) + # 需要确保点可达 + if reachable_assurance and self.idx_to_object[self.map[x, y]] != 'free': + print('1') + x_ = math.floor((pos[0] - self.x_min) / self.scale_ratio) + y_ = math.floor((pos[1] - self.y_min) / self.scale_ratio) + candidates = [(x_, y_), (x_ + 1, y_), (x_, y_ + 1), (x_ + 1, y_ + 1)] + for (x, y) in candidates: + print(self.idx_to_object[self.map[x, y]]) + if self.idx_to_object[self.map[x, y]] == 'free': + print((x,y)) + return tuple((x, y)) + raise Exception('error') else: - raise Exception("Wrong approximation!") - return tuple((x, y)) + return tuple((x, y)) def draw_graph(self, step_num): # 清空当前figure内容,保留figure对象 diff --git a/robowaiter/algos/navigate/navigate.py b/robowaiter/algos/navigate/navigate.py index d2bae6f..9059c61 100644 --- a/robowaiter/algos/navigate/navigate.py +++ b/robowaiter/algos/navigate/navigate.py @@ -1,15 +1,25 @@ #!/usr/bin/env python3 # -*- encoding: utf-8 -*- import math -from robowaiter.algos.navigate.dstar_lite import DStarLite, euclidean_distance +import sys +import time +import matplotlib.pyplot as plt +import numpy as np +from mpl_toolkits.axes_grid1 import make_axes_locatable +# from scene import control + +# from rrt import RRT +# from rrt_star import RRTStar +# from apf import APF +from robowaiter.algos.navigate.dstar_lite import DStarLite, euclidean_distance class Navigator: ''' 导航类 ''' - def __init__(self, scene, area_range, map, scale_ratio=5, step_length=150, velocity=150, react_radius=250): + def __init__(self, scene, area_range, map, scale_ratio=5, step_length=150, velocity=150, react_radius=300): self.scene = scene self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax self.map = map # 缩放并离散化的地图 array(X,Y) @@ -19,10 +29,11 @@ class Navigator: self.v = velocity # 速度 self.react_radius = react_radius # robot反应半径 + # self.planner = RRTStar(rand_area=area_range, map=map, scale_ratio=scale_ratio, max_iter=400, search_until_max_iter=True) self.planner = DStarLite(area_range=area_range, map=map, scale_ratio=scale_ratio) @staticmethod - def is_reached(pos: (float, float), goal: (float, float), dis_limit=50): + def is_reached(pos: (float, float), goal: (float, float), dis_limit=30): ''' 判断是否到达目标 ''' @@ -35,7 +46,7 @@ class Navigator: ''' 得到移动方向 ''' - return math.degrees(math.atan2(goal[0] - pos[0], -(goal[1] - pos[1]))) + return math.degrees(math.atan2((goal[1] - pos[1]), (goal[0] - pos[0]))) def legalize_goal(self, goal: (float, float)): ''' @@ -68,21 +79,15 @@ class Navigator: self.planner.draw_graph(step_num) # 画出搜索路径 next_step = min(step_num, len(path)) next_pos = path[next_step - 1] - print('plan pos:', next_pos, end=' ') + # print('plan pos:', next_pos, end=' ') yaw = self.get_yaw(pos, next_pos) - self.scene.walk_to(next_pos[0], next_pos[1], yaw, velocity=self.v, dis_limit=10) - - - ### 获取视觉图像 - self.scene.get_camera_segment() - - - + # print("yaw:",yaw) + self.scene.walk_to(next_pos[0], next_pos[1], Yaw=yaw, velocity=self.v, dis_limit=10) # pos = (self.scene.status.location.X, self.scene.status.location.Y) # if self.is_reached(pos, next_pos): self.planner.path = self.planner.path[next_step - 1:] # 去除已走过的路径 pos = (self.scene.status.location.X, self.scene.status.location.Y) - print('reach pos:', pos) + # print('reach pos:', pos) self.planner.reset() # 完成一轮导航,重置变量 @@ -92,3 +97,62 @@ class Navigator: + + + + + + + # def navigate(self, goal: (float, float), path_smoothing=True, animation=True): + # pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) # 机器人当前: 位置 和 朝向 + # yaw = self.scene.status.rotation.Yaw + # print('------------------navigation_start----------------------') + # + # path = self.planner.planning(pos, goal, path_smoothing, animation) + # if path: + # self.planner.draw_graph(final_path=path) # 画出探索过程 + # for (x, y) in path: + # self.scene.walk_to(x, y, yaw, velocity=self.v) + # time.sleep(self.step_time) + # pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) + # + # self.planner.reset() + # + # if self.is_reached(pos, goal): + # print('The robot has achieved goal !!') + + '''APF势场法暂不可用''' + # while not self.is_reached(pos, goal): + # # 1. 路径规划 + # path = self.planner.planning(pos, goal, path_smoothing, animation) + # self.planner.draw_graph(final_path=path) # 画出探索过程 + # + # # 2. 使用APF导航到路径中的每个waypoint + # traj = [(pos[0], pos[1])] + # #self.planner.draw_graph(final_path=traj) # 画出探索过程 + # for i, waypoint in enumerate(path[1:]): + # print('waypoint [', i, ']:', waypoint) + # # if (not self.scene.reachable_check(waypoint[0], waypoint[1], yaw)) and self.map[self.planner.real2map(waypoint[0], waypoint[1])] == 0: + # # print('error') + # while not self.is_reached(pos, waypoint): + # # 2.1 计算next_step + # pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) + # Pobs = [] # 障碍物(顾客)位置数组 + # for walker in self.scene.status.walkers: + # Pobs.append((walker.pose.X, walker.pose.Y)) + # next_step, _ = APF(Pi=pos, Pg=waypoint, Pobs=Pobs, step_length=self.step_length) + # traj.append((next_step[0], next_step[1])) + # #self.planner.draw_graph(final_path=traj) # 画出探索过程 + # while not self.scene.reachable_check(next_step[0], next_step[1], yaw): # 取中点直到next_step可达 + # traj.pop() + # next_step = (pos + next_step) / 2 + # traj.append((next_step[0], next_step[1])) + # #self.planner.draw_graph(final_path=traj) # 画出探索过程 + # # 2.2 移动robot + # self.scene.walk_to(next_step[0], next_step[1], yaw, velocity=self.v) + # # print(self.scene.status.info) # print navigation info + # # print(self.scene.status.collision) + # time.sleep(self.step_time) + # # print(self.scene.status.info) # print navigation info + # # print(self.scene.status.collision) + # self.planner.reset() diff --git a/robowaiter/algos/navigate/test.py b/robowaiter/algos/navigate/test.py index ab2b8a0..c27eb1b 100644 --- a/robowaiter/algos/navigate/test.py +++ b/robowaiter/algos/navigate/test.py @@ -64,7 +64,7 @@ if __name__ == '__main__': # time.sleep(5) # goal = np.array((-100, 700)) - navigator.navigate(goal, animation=False) + navigator.navigate(goal, animation=True) scene.clean_walker() print(scene.status.collision) # TODO: 不显示碰撞信息 ??? diff --git a/robowaiter/scene/tasks/VLM.py b/robowaiter/scene/tasks/VLM.py index 5cf3789..aba572c 100644 --- a/robowaiter/scene/tasks/VLM.py +++ b/robowaiter/scene/tasks/VLM.py @@ -13,7 +13,7 @@ class SceneVLM(Scene): self.event_list = [ # (5, self.create_chat_event("测试VLM:做一杯咖啡")), # (5, self.create_chat_event("测试VLM:倒一杯水")), - (5, self.create_chat_event("测试VLM:开空调")), + # (5, self.create_chat_event("测试VLM:开空调")), # (5, self.create_chat_event("测试VLM:关空调")), # (5, self.create_chat_event("测试VLM:开大厅灯")), # (5, self.create_chat_event("测试VLM:拖地")), @@ -22,7 +22,7 @@ class SceneVLM(Scene): # (5, self.create_chat_event("测试VLM:把冰红茶放到Table2")), # (5, self.create_chat_event("测试VLM:关大厅灯")) # (5, self.create_chat_event("测试VLM:做一杯咖啡放到吧台上")), - # (5, self.create_chat_event("测试VLM:做一杯咖啡放到水杯桌上并倒水")), + (5, self.create_chat_event("测试VLM:做一杯咖啡放到水杯桌上并倒水")), ] def _reset(self):