Merge branch 'main' of github.com:HPCL-EI/RoboWaiter

This commit is contained in:
ChenXL97 2023-11-16 15:04:46 +08:00
commit f2aef8fdc9
4 changed files with 174 additions and 69 deletions

View File

@ -123,13 +123,15 @@ class DStarLite:
# self.area_bounds = area
self.map = map
self.background = map
self.background = map.copy()
self.X = map.shape[0]
self.Y = map.shape[1]
(self.x_min, self.x_max, self.y_min, self.y_max) = area_range
self.scale_ratio = scale_ratio
self.dyna_obs_list = [] # 动态障碍物位置列表( 当前地图 ) [(x, y)]
self.dyna_obs_radius = math.ceil(dyna_obs_radius / scale_ratio) # dyna_obs缩放后身位半径
self.dyna_obs_list = [] # dyna_obs位置列表 [(x, y)]
self.dyna_obs_radius = math.floor(dyna_obs_radius / scale_ratio) # dyna_obs缩放后身位半径
self.dyna_obs_occupy = [] # dyna_obs占用位置列表
# free:0, obs:1, dyna_obs:2
self.idx_to_object = {
@ -143,7 +145,8 @@ class DStarLite:
"obstacle": float('inf'),
"dynamic obstacle": 100
}
self.cost_map = np.zeros_like(self.map)
self.cost_background = self.cost_map.copy()
self.compute_cost_map()
self.s_start = None # (int,int) 必须是元组(元组可以直接当作矩阵索引)
@ -155,14 +158,14 @@ class DStarLite:
self.g = self.rhs.copy()
self.path = []
def set_map(self, map_):
'''
设置map cost_map
'''
self.map = map_
self.X = map_.shape[0]
self.Y = map_.shape[1]
self.compute_cost_map()
# def set_map(self, map_):
# '''
# 设置map 和 cost_map
# '''
# self.map = map_
# self.X = map_.shape[0]
# self.Y = map_.shape[1]
# self.compute_cost_map()
def reset(self):
'''
@ -172,8 +175,9 @@ class DStarLite:
'''
# env reset
self.map = self.background.copy()
self.compute_cost_map()
self.cost_map = self.cost_background.copy()
self.dyna_obs_list.clear()
self.dyna_obs_occupy.clear()
self.path.clear()
# dstar_lite reset
self.s_start = None
@ -277,27 +281,29 @@ class DStarLite:
self.s_last = tuple(s_start)
self.rhs[tuple(s_goal)] = 0
self.U.insert(tuple(s_goal), Priority(k1=heuristic(tuple(s_start), tuple(s_goal)), k2=0))
changed_pos = self.update_map(dyna_obs=dyna_obs)
if changed_pos:
self.update_cost_map(changed_pos)
self.compute_shortest_path() # 计算最短路径
self.path = self.get_path()
return self.path
# 后续规划只更新起点,直接使用原路径(去掉已走过部分)
else:
self.s_start = tuple(s_start)
# 根据obs更新map, cost_map, edge_cost
changed_pos = self.update_map(dyna_obs=dyna_obs)
if changed_pos:
self.update_cost_map(changed_pos)
self.update_edge_costs(changed_pos)
# 若地图改变,重新计算最短路径
self.compute_shortest_path()
self.path = self.get_path()
# 根据obs更新map, cost_map, edge_cost
changed_pos = self.update_map(dyna_obs=dyna_obs)
if changed_pos:
self.update_cost_map(changed_pos)
self.update_edge_costs(changed_pos)
# 若地图改变,重新计算最短路径
self.compute_shortest_path()
self.path = self.get_path()
return self.path
# TODO: 误差抖动使robot没有到达路径上的点导致新起点的rhs=∞可能导致get_path失败 ( 当前版本没有该问题 )
# assert (self.rhs[self.s_start] != float('inf')), "There is no known path!"
# self.path = self.get_path(step_num)
# # debug
# if debug:
# pass
return self.path
def planning(self, s_start, s_goal, dyna_obs, debug=False):
'''
@ -306,8 +312,10 @@ class DStarLite:
# 实际坐标 -> 地图坐标
s_start = self.real2map(s_start)
s_goal = self.real2map(s_goal)
dyna_obs = [self.real2map(obs) for obs in dyna_obs]
dyna_obs = [self.real2map(obs, reachable_assurance=False) for obs in dyna_obs]
self._planning(s_start, s_goal, dyna_obs, debug)
# 地图坐标->实际坐标
path = [self.map2real(node) for node in self.path]
return path
@ -358,10 +366,20 @@ class DStarLite:
def compute_cost_map(self):
# 计算当前地图的cost_map
self.cost_map = np.zeros(self.map.shape)
for idx, obj in self.idx_to_object.items():
self.cost_map[self.map == idx] = self.object_to_cost[obj]
# # TODO
# for x in range(self.X):
# for y in range(self.Y):
# if self.cost_map[x, y] > 0:
# neighbors = self.get_neighbors((x, y))
# for (x_, y_) in neighbors:
# self.cost_map[x_, y_] = max(self.cost_map[x_, y_], self.cost_map[x, y] - 10)
self.cost_background = self.cost_map.copy()
def update_map(self, dyna_obs):
'''
更新地图 动态障碍物列表
@ -374,19 +392,28 @@ class DStarLite:
if set(dyna_obs) == set(self.dyna_obs_list):
return []
# 新旧dyna_obs占用位置列表
old_obs_occupy = []
new_obs_occupy = []
for pos in self.dyna_obs_list:
old_obs_occupy.extend(self.get_occupy_pos(pos))
# 当前dyna_obs占用位置列表
dyna_obs_occupy = []
for pos in dyna_obs:
new_obs_occupy.extend(self.get_occupy_pos(pos))
old_obs_occupy = [pos for i, pos in enumerate(old_obs_occupy) if pos not in old_obs_occupy[:i]] # 去除重复位置
new_obs_occupy = [pos for i, pos in enumerate(new_obs_occupy) if pos not in new_obs_occupy[:i]] # 去除重复位置
dyna_obs_occupy.extend(self.get_occupy_pos(pos))
dyna_obs_occupy = [pos for i, pos in enumerate(dyna_obs_occupy) if pos not in dyna_obs_occupy[:i]] # 去除重复位置
# 转变为free 和 转变为obs的位置列表
changed_free = [pos for pos in old_obs_occupy if pos not in new_obs_occupy]
changed_obs = [pos for pos in new_obs_occupy if pos not in old_obs_occupy]
changed_free = [pos for pos in self.dyna_obs_occupy if pos not in dyna_obs_occupy]
changed_obs = [pos for pos in dyna_obs_occupy if pos not in self.dyna_obs_occupy]
# # 新旧dyna_obs占用位置列表
# old_obs_occupy = []
# new_obs_occupy = []
# for pos in self.dyna_obs_list:
# old_obs_occupy.extend(self.get_occupy_pos(pos))
# for pos in dyna_obs:
# new_obs_occupy.extend(self.get_occupy_pos(pos))
# old_obs_occupy = [pos for i, pos in enumerate(old_obs_occupy) if pos not in old_obs_occupy[:i]] # 去除重复位置
# new_obs_occupy = [pos for i, pos in enumerate(new_obs_occupy) if pos not in new_obs_occupy[:i]] # 去除重复位置
#
# # 转变为free 和 转变为obs的位置列表
# changed_free = [pos for pos in old_obs_occupy if pos not in new_obs_occupy]
# changed_obs = [pos for pos in new_obs_occupy if pos not in old_obs_occupy]
# 更新地图计算changed_pos
changed_pos = []
@ -397,22 +424,26 @@ class DStarLite:
self.map[x, y] = self.object_to_idx['dynamic obstacle']
changed_pos.append((x, y, self.object_to_idx["dynamic obstacle"], self.object_to_idx["free"]))
# 更新动态障碍物列表
# 更新dyna_obs 位置列表 和 占用位置列表
self.dyna_obs_list = dyna_obs
self.dyna_obs_occupy = dyna_obs_occupy
return changed_pos
def get_occupy_pos(self, obs_pos):
'''
根据dyna_obs中心位置计算其占用的所有网格位置
'''
(x, y) = obs_pos
# for i in range(x - self.dyna_obs_radius, x + self.dyna_obs_radius + 1):
occupy_radius = min(self.dyna_obs_radius, int(euclidean_distance(obs_pos, self.s_start) - 1)) # 避免robot被dyna_obs的占用区域包裹住
# for i in range(x - self.dyna_obs_radius, x + self.dyna_obs_radius + 1): # 方形区域
# for j in range(y - self.dyna_obs_radius, y + self.dyna_obs_radius + 1):
# occupy_pos.append((i, j))
occupy_pos = [(i, j) for i in range(x - self.dyna_obs_radius, x + self.dyna_obs_radius + 1)
for j in range(y - self.dyna_obs_radius, y + self.dyna_obs_radius + 1)
if euclidean_distance((i, j), obs_pos) < self.dyna_obs_radius]
occupy_pos = [(i, j) for i in range(x - occupy_radius, x + occupy_radius + 1) # 圆形区域
for j in range(y - occupy_radius, y + occupy_radius + 1)
if euclidean_distance((i, j), obs_pos) < occupy_radius]
occupy_pos = filter(self.in_bounds_without_obstacle, occupy_pos) # 确保位置在地图范围内 且 不是静态障碍物
return list(occupy_pos)
@ -424,7 +455,10 @@ class DStarLite:
changed_pos: 改变的位置列表 [x, y, idx]
'''
for (x, y, idx, _) in changed_pos:
self.cost_map[x, y] = self.object_to_cost[self.idx_to_object[idx]]
if self.idx_to_object[idx] == 'free': # 空位的cost取决于离障碍物的距离
self.cost_map[x, y] = self.cost_background[x, y]
else:
self.cost_map[x, y] = self.object_to_cost[self.idx_to_object[idx]]
def update_edge_costs(self, changed_pos):
'''
@ -457,19 +491,26 @@ class DStarLite:
y = pos[1] * self.scale_ratio + self.y_min
return tuple((x, y))
def real2map(self, pos, approximation='round'):
def real2map(self, pos, reachable_assurance=True):
'''
实际坐标->地图坐标
'''
if approximation == 'round': # 四舍五入
x = round((pos[0] - self.x_min) / self.scale_ratio)
y = round((pos[1] - self.y_min) / self.scale_ratio)
elif approximation == 'low': # 向下取整
x = math.floor((pos[0] - self.x_min) / self.scale_ratio)
y = math.floor((pos[1] - self.y_min) / self.scale_ratio)
x = round((pos[0] - self.x_min) / self.scale_ratio)
y = round((pos[1] - self.y_min) / self.scale_ratio)
# 需要确保点可达
if reachable_assurance and self.idx_to_object[self.map[x, y]] != 'free':
print('1')
x_ = math.floor((pos[0] - self.x_min) / self.scale_ratio)
y_ = math.floor((pos[1] - self.y_min) / self.scale_ratio)
candidates = [(x_, y_), (x_ + 1, y_), (x_, y_ + 1), (x_ + 1, y_ + 1)]
for (x, y) in candidates:
print(self.idx_to_object[self.map[x, y]])
if self.idx_to_object[self.map[x, y]] == 'free':
print((x,y))
return tuple((x, y))
raise Exception('error')
else:
raise Exception("Wrong approximation!")
return tuple((x, y))
return tuple((x, y))
def draw_graph(self, step_num):
# 清空当前figure内容保留figure对象

View File

@ -1,15 +1,25 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import math
from robowaiter.algos.navigate.dstar_lite import DStarLite, euclidean_distance
import sys
import time
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
# from scene import control
# from rrt import RRT
# from rrt_star import RRTStar
# from apf import APF
from robowaiter.algos.navigate.dstar_lite import DStarLite, euclidean_distance
class Navigator:
'''
导航类
'''
def __init__(self, scene, area_range, map, scale_ratio=5, step_length=150, velocity=150, react_radius=250):
def __init__(self, scene, area_range, map, scale_ratio=5, step_length=150, velocity=150, react_radius=300):
self.scene = scene
self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax
self.map = map # 缩放并离散化的地图 array(X,Y)
@ -19,10 +29,11 @@ class Navigator:
self.v = velocity # 速度
self.react_radius = react_radius # robot反应半径
# self.planner = RRTStar(rand_area=area_range, map=map, scale_ratio=scale_ratio, max_iter=400, search_until_max_iter=True)
self.planner = DStarLite(area_range=area_range, map=map, scale_ratio=scale_ratio)
@staticmethod
def is_reached(pos: (float, float), goal: (float, float), dis_limit=50):
def is_reached(pos: (float, float), goal: (float, float), dis_limit=30):
'''
判断是否到达目标
'''
@ -35,7 +46,7 @@ class Navigator:
'''
得到移动方向
'''
return math.degrees(math.atan2(goal[0] - pos[0], -(goal[1] - pos[1])))
return math.degrees(math.atan2((goal[1] - pos[1]), (goal[0] - pos[0])))
def legalize_goal(self, goal: (float, float)):
'''
@ -68,21 +79,15 @@ class Navigator:
self.planner.draw_graph(step_num) # 画出搜索路径
next_step = min(step_num, len(path))
next_pos = path[next_step - 1]
print('plan pos:', next_pos, end=' ')
# print('plan pos:', next_pos, end=' ')
yaw = self.get_yaw(pos, next_pos)
self.scene.walk_to(next_pos[0], next_pos[1], yaw, velocity=self.v, dis_limit=10)
### 获取视觉图像
self.scene.get_camera_segment()
# print("yaw:",yaw)
self.scene.walk_to(next_pos[0], next_pos[1], Yaw=yaw, velocity=self.v, dis_limit=10)
# pos = (self.scene.status.location.X, self.scene.status.location.Y)
# if self.is_reached(pos, next_pos):
self.planner.path = self.planner.path[next_step - 1:] # 去除已走过的路径
pos = (self.scene.status.location.X, self.scene.status.location.Y)
print('reach pos:', pos)
# print('reach pos:', pos)
self.planner.reset() # 完成一轮导航,重置变量
@ -92,3 +97,62 @@ class Navigator:
# def navigate(self, goal: (float, float), path_smoothing=True, animation=True):
# pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) # 机器人当前: 位置 和 朝向
# yaw = self.scene.status.rotation.Yaw
# print('------------------navigation_start----------------------')
#
# path = self.planner.planning(pos, goal, path_smoothing, animation)
# if path:
# self.planner.draw_graph(final_path=path) # 画出探索过程
# for (x, y) in path:
# self.scene.walk_to(x, y, yaw, velocity=self.v)
# time.sleep(self.step_time)
# pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
#
# self.planner.reset()
#
# if self.is_reached(pos, goal):
# print('The robot has achieved goal !!')
'''APF势场法暂不可用'''
# while not self.is_reached(pos, goal):
# # 1. 路径规划
# path = self.planner.planning(pos, goal, path_smoothing, animation)
# self.planner.draw_graph(final_path=path) # 画出探索过程
#
# # 2. 使用APF导航到路径中的每个waypoint
# traj = [(pos[0], pos[1])]
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# for i, waypoint in enumerate(path[1:]):
# print('waypoint [', i, ']:', waypoint)
# # if (not self.scene.reachable_check(waypoint[0], waypoint[1], yaw)) and self.map[self.planner.real2map(waypoint[0], waypoint[1])] == 0:
# # print('error')
# while not self.is_reached(pos, waypoint):
# # 2.1 计算next_step
# pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
# Pobs = [] # 障碍物(顾客)位置数组
# for walker in self.scene.status.walkers:
# Pobs.append((walker.pose.X, walker.pose.Y))
# next_step, _ = APF(Pi=pos, Pg=waypoint, Pobs=Pobs, step_length=self.step_length)
# traj.append((next_step[0], next_step[1]))
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# while not self.scene.reachable_check(next_step[0], next_step[1], yaw): # 取中点直到next_step可达
# traj.pop()
# next_step = (pos + next_step) / 2
# traj.append((next_step[0], next_step[1]))
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# # 2.2 移动robot
# self.scene.walk_to(next_step[0], next_step[1], yaw, velocity=self.v)
# # print(self.scene.status.info) # print navigation info
# # print(self.scene.status.collision)
# time.sleep(self.step_time)
# # print(self.scene.status.info) # print navigation info
# # print(self.scene.status.collision)
# self.planner.reset()

View File

@ -64,7 +64,7 @@ if __name__ == '__main__':
# time.sleep(5)
# goal = np.array((-100, 700))
navigator.navigate(goal, animation=False)
navigator.navigate(goal, animation=True)
scene.clean_walker()
print(scene.status.collision) # TODO: 不显示碰撞信息 ???

View File

@ -13,7 +13,7 @@ class SceneVLM(Scene):
self.event_list = [
# (5, self.create_chat_event("测试VLM做一杯咖啡")),
# (5, self.create_chat_event("测试VLM倒一杯水")),
(5, self.create_chat_event("测试VLM开空调")),
# (5, self.create_chat_event("测试VLM开空调")),
# (5, self.create_chat_event("测试VLM关空调")),
# (5, self.create_chat_event("测试VLM开大厅灯")),
# (5, self.create_chat_event("测试VLM拖地")),
@ -22,7 +22,7 @@ class SceneVLM(Scene):
# (5, self.create_chat_event("测试VLM把冰红茶放到Table2")),
# (5, self.create_chat_event("测试VLM关大厅灯"))
# (5, self.create_chat_event("测试VLM做一杯咖啡放到吧台上")),
# (5, self.create_chat_event("测试VLM做一杯咖啡放到水杯桌上并倒水")),
(5, self.create_chat_event("测试VLM做一杯咖啡放到水杯桌上并倒水")),
]
def _reset(self):