parent
04b5c73355
commit
8f23c6ae47
|
@ -1,2 +1 @@
|
|||
|
||||
from robowaiter.algos.navigate.DstarLite.navigate import Navigator as DstarLite
|
|
@ -7,14 +7,15 @@ import grpc
|
|||
|
||||
from explore import Explore
|
||||
|
||||
sys.path.append('/')
|
||||
sys.path.append('../navigate/')
|
||||
sys.path.append('./')
|
||||
sys.path.append('../')
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from mpl_toolkits.axes_grid1 import make_axes_locatable
|
||||
|
||||
from robowaiter.proto import GrabSim_pb2_grpc, GrabSim_pb2
|
||||
import GrabSim_pb2_grpc
|
||||
import GrabSim_pb2
|
||||
|
||||
channel = grpc.insecure_channel('localhost:30001', options=[
|
||||
('grpc.max_send_message_length', 1024 * 1024 * 1024),
|
||||
|
|
|
@ -1,5 +0,0 @@
|
|||
from . import apf
|
||||
from . import rrt
|
||||
from . import rrt_star
|
||||
from . import pathsmoothing
|
||||
from . import navigate
|
|
@ -1,91 +0,0 @@
|
|||
import numpy as np
|
||||
import copy
|
||||
import matplotlib
|
||||
matplotlib.use('TkAgg') # 防止画图卡死
|
||||
|
||||
## 初始化车的参数
|
||||
|
||||
|
||||
# P0 = np.array([0, - d / 2, 1, 1]) #车辆起点位置,分别代表x,y,vx,vy
|
||||
#
|
||||
# Pg = np.array([99, d / 2, 0, 0]) # 目标位置
|
||||
#
|
||||
# # 障碍物位置
|
||||
# Pobs = np.array([
|
||||
# [15, 7 / 4, 0, 0],
|
||||
# [30, - 3 / 2, 0, 0],
|
||||
# [45, 3 / 2, 0, 0],
|
||||
# [60, - 3 / 4, 0, 0],
|
||||
# [80, 3/2, 0, 0]])
|
||||
|
||||
|
||||
|
||||
def APF(Pi, Pg, Pobs=None, step_length=100):
|
||||
'''
|
||||
Args:
|
||||
Pi: robot位置 (x,y)
|
||||
Pg: 目标位置 (x,y)
|
||||
Pobs: 障碍物位置 [(x,y), ...]
|
||||
step_length: 单次移动步长 int
|
||||
returns:
|
||||
next_step: robot下一步位置
|
||||
UnitVec_Fsum: 合力方向向量
|
||||
'''
|
||||
|
||||
# 目标检测
|
||||
if np.linalg.norm(Pi-Pg) < step_length: # 目标检测
|
||||
return Pg, (Pg-Pi) / np.linalg.norm(Pg-Pi)
|
||||
|
||||
Eta_att = 5 # 引力的增益系数
|
||||
Eta_rep_ob = 15 # 斥力的增益系数
|
||||
d0 = 200 # 障碍影响的最大距离
|
||||
n = 1 # {P_g}^n 为到目标点的距离,n为可选的正常数 (解决目标不可达问题)
|
||||
|
||||
## 计算引力
|
||||
delta_g = Pg - Pi # 目标方向向量(指向目标)
|
||||
dist_g = np.linalg.norm(delta_g) # 目标距离
|
||||
UniteVec_g = delta_g / dist_g # 目标单位向量
|
||||
|
||||
F_att = Eta_att * dist_g * UniteVec_g # F_att = Eta_att * dist(pi,pg) * unite_vec(pi,pg)
|
||||
F_sum = F_att
|
||||
|
||||
## 计算斥力
|
||||
if Pobs:
|
||||
delta = np.zeros((len(Pobs), 2))
|
||||
unite_vec = np.zeros((len(Pobs), 2))
|
||||
dists = []
|
||||
# 计算车辆当前位置与所有障碍物的单位方向向量
|
||||
for j in range(len(Pobs)):
|
||||
delta[j] = Pi - Pobs[j] # 斥力向量
|
||||
dists.append(np.linalg.norm(delta[j])) # 障碍物距离
|
||||
unite_vec[j] = delta[j] / dists[j] # 斥力单位向量
|
||||
# 每一个障碍物对车辆的斥力,带方向
|
||||
F_rep_ob = np.zeros((len(Pobs), 2))
|
||||
# 在原斥力势场函数增加目标调节因子(即车辆至目标距离),以使车辆到达目标点后斥力也为0
|
||||
for j in range(len(Pobs)):
|
||||
if dists[j] >= d0:
|
||||
F_rep_ob[j] = np.array([0, 0]) # 距离超过阈值则斥力为0
|
||||
else:
|
||||
# 障碍物的斥力1,方向由障碍物指向车辆
|
||||
F_rep_ob1_abs = Eta_rep_ob * (1 / dists[j] - 1 / d0) * dist_g ** n / dists[j] ** 2 # 斥力大小
|
||||
F_rep_ob1 = F_rep_ob1_abs * unite_vec[j] # 斥力向量
|
||||
# 障碍物的斥力2,方向由车辆指向目标点
|
||||
F_rep_ob2_abs = n / 2 * Eta_rep_ob * (1 / dists[j] - 1 / d0) ** 2 * dist_g ** (n - 1) # 斥力大小
|
||||
F_rep_ob2 = F_rep_ob2_abs * UniteVec_g # 斥力向量
|
||||
# 改进后的障碍物合斥力计算
|
||||
F_rep_ob[j] = F_rep_ob1 + F_rep_ob2
|
||||
|
||||
F_rep = np.sum(F_rep_ob, axis=0) # 斥力合力
|
||||
F_sum += F_rep # 总合力
|
||||
|
||||
## 合力方向
|
||||
UnitVec_Fsum = F_sum / np.linalg.norm(F_sum) # 合力方向向量: F / |F|
|
||||
|
||||
# 计算车的下一步位置
|
||||
next_step = copy.deepcopy(Pi + step_length * UnitVec_Fsum) # 沿合力方向前进单位距离
|
||||
# print(next_step)
|
||||
|
||||
return next_step, UnitVec_Fsum
|
||||
|
||||
|
||||
|
|
@ -1,61 +0,0 @@
|
|||
# !/usr/bin/env python3
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
import time
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import pickle
|
||||
import os
|
||||
|
||||
from scene_utils import control
|
||||
|
||||
|
||||
|
||||
|
||||
def draw_grid_map(grid_map):
|
||||
# 生成新的地图图像
|
||||
plt.imshow(grid_map, cmap='binary', alpha=0.5, origin='lower') # 黑白网格
|
||||
|
||||
# 绘制坐标轴
|
||||
plt.xlabel('y', loc='right')
|
||||
plt.ylabel('x', loc='top')
|
||||
|
||||
# 显示网格线
|
||||
plt.grid(color='black', linestyle='-', linewidth=0.5)
|
||||
|
||||
# 显示图像
|
||||
plt.show()
|
||||
#plt.pause(0.01)
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# control.init_world(scene_num=1, mapID=3)
|
||||
# scene = control.Scene(sceneID=0)
|
||||
#
|
||||
# X = int(950/5) # 采点数量
|
||||
# Y = int(1850/5)
|
||||
# map = np.zeros((X, Y))
|
||||
#
|
||||
# for x in range(X):
|
||||
# for y in range(Y):
|
||||
# if not scene.reachable_check(x*5-350, y*5-400, Yaw=0):
|
||||
# map[x, y] = 1
|
||||
# print(x, y)
|
||||
#
|
||||
#
|
||||
# file_name = 'map_5.pkl'
|
||||
# if not os.path.exists(file_name):
|
||||
# open(file_name, 'w').close()
|
||||
# with open(file_name, 'wb') as file:
|
||||
# pickle.dump(map, file)
|
||||
# print('保存成功')
|
||||
|
||||
|
||||
file_name = 'map_5.pkl'
|
||||
if os.path.exists(file_name):
|
||||
with open(file_name, 'rb') as file:
|
||||
map = pickle.load(file)
|
||||
draw_grid_map(map)
|
|
@ -1,164 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- encoding: utf-8 -*-
|
||||
import sys
|
||||
import time
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from mpl_toolkits.axes_grid1 import make_axes_locatable
|
||||
from scene_utils import control
|
||||
|
||||
from rrt import RRT
|
||||
from rrt_star import RRTStar
|
||||
from apf import APF
|
||||
|
||||
|
||||
class Navigator:
|
||||
'''
|
||||
导航类
|
||||
'''
|
||||
|
||||
def __init__(self, scene, area_range, map, scale_ratio=5):
|
||||
self.scene = scene
|
||||
self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax
|
||||
self.map = map # 缩放并离散化的地图 array(X,Y)
|
||||
self.scale_ratio = scale_ratio # 地图缩放率
|
||||
self.step_length = 50 # 步长(单次移动)
|
||||
self.v = 100 # 速度
|
||||
self.step_time = self.step_length/self.v # 单步移动时长
|
||||
|
||||
self.planner = RRTStar(rand_area=area_range, map=map, scale_ratio=scale_ratio, max_iter=400, search_until_max_iter=True)
|
||||
|
||||
@staticmethod
|
||||
def is_reached(pos: np.array((float, float)), goal: np.array((float, float)), dis_limit=25):
|
||||
'''
|
||||
判断是否到达目标
|
||||
'''
|
||||
dis = np.linalg.norm(pos - goal)
|
||||
return dis < dis_limit
|
||||
|
||||
def reset_goal(self, goal:(float, float)):
|
||||
# TODO: 使目标可达
|
||||
# 目标在障碍物上:从目标开始方形向外扩展,直到找到可行点
|
||||
# 目标在地图外面:起点和目标连线最靠近目标的可行点
|
||||
pass
|
||||
|
||||
def navigate(self, goal: (float, float), path_smoothing=True, animation=True):
|
||||
pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) # 机器人当前: 位置 和 朝向
|
||||
yaw = self.scene.status.rotation.Yaw
|
||||
print('------------------navigation_start----------------------')
|
||||
|
||||
path = self.planner.planning(pos, goal, path_smoothing, animation)
|
||||
if path:
|
||||
self.planner.draw_graph(final_path=path) # 画出探索过程
|
||||
for (x, y) in path:
|
||||
self.scene.walk_to(x, y, yaw, velocity=self.v)
|
||||
time.sleep(self.step_time)
|
||||
pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
|
||||
|
||||
self.planner.reset()
|
||||
|
||||
|
||||
|
||||
'''APF势场法暂不可用'''
|
||||
# while not self.is_reached(pos, goal):
|
||||
# # 1. 路径规划
|
||||
# path = self.planner.planning(pos, goal, path_smoothing, animation)
|
||||
# self.planner.draw_graph(final_path=path) # 画出探索过程
|
||||
#
|
||||
# # 2. 使用APF导航到路径中的每个waypoint
|
||||
# traj = [(pos[0], pos[1])]
|
||||
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
|
||||
# for i, waypoint in enumerate(path[1:]):
|
||||
# print('waypoint [', i, ']:', waypoint)
|
||||
# # if (not self.scene.reachable_check(waypoint[0], waypoint[1], yaw)) and self.map[self.planner.real2map(waypoint[0], waypoint[1])] == 0:
|
||||
# # print('error')
|
||||
# while not self.is_reached(pos, waypoint):
|
||||
# # 2.1 计算next_step
|
||||
# pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
|
||||
# Pobs = [] # 障碍物(顾客)位置数组
|
||||
# for walker in self.scene.status.walkers:
|
||||
# Pobs.append((walker.pose.X, walker.pose.Y))
|
||||
# next_step, _ = APF(Pi=pos, Pg=waypoint, Pobs=Pobs, step_length=self.step_length)
|
||||
# traj.append((next_step[0], next_step[1]))
|
||||
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
|
||||
# while not self.scene.reachable_check(next_step[0], next_step[1], yaw): # 取中点直到next_step可达
|
||||
# traj.pop()
|
||||
# next_step = (pos + next_step) / 2
|
||||
# traj.append((next_step[0], next_step[1]))
|
||||
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
|
||||
# # 2.2 移动robot
|
||||
# self.scene.walk_to(next_step[0], next_step[1], yaw, velocity=self.v)
|
||||
# # print(self.scene.status.info) # print navigation info
|
||||
# # print(self.scene.status.collision)
|
||||
# time.sleep(self.step_time)
|
||||
# # print(self.scene.status.info) # print navigation info
|
||||
# # print(self.scene.status.collision)
|
||||
# self.planner.reset()
|
||||
if self.is_reached(pos, goal):
|
||||
print('The robot has achieved goal !!')
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# class Walker:
|
||||
# def __int__(self, scene):
|
||||
# self.scene = scene
|
||||
#
|
||||
# def add_walkers(self, walker_loc, scene_id=0):
|
||||
# """
|
||||
# 批量添加行人
|
||||
# Args:
|
||||
# walker_loc: 行人的初始位置列表( 列表元素数量对应行人数量 )
|
||||
# """
|
||||
# print('------------------add walker----------------------')
|
||||
# walker_list = []
|
||||
# for i in range(len(walker_loc)):
|
||||
# # 只有可达的位置才能成功初始化行人,显示unreachable的地方不能初始化行人
|
||||
# walker_list.append(
|
||||
# GrabSim_pb2.WalkerList.Walker(id=i, pose=GrabSim_pb2.Pose(X=walker_loc[0], Y=walker_loc[1], Yaw=90)))
|
||||
# scene = self.client.AddWalker(GrabSim_pb2.WalkerList(walkers=walker_list, scene=scene_id)) # 生成环境中行人
|
||||
# # print(self.client.Observe(GrabSim_pb2.SceneID(value=scene_id)).walkers) # 打印行人信息
|
||||
# return scene
|
||||
#
|
||||
# def control_walkers(self, walker_loc, scene_id=0):
|
||||
# """
|
||||
# 批量移动行人
|
||||
# Args:
|
||||
# walker_loc: 行人的终止位置列表
|
||||
# """
|
||||
# scene = self.client.Observe(GrabSim_pb2.SceneID(value=scene_id))
|
||||
# controls = []
|
||||
# for i in range(len(scene.walkers)):
|
||||
# loc = walker_loc[i]
|
||||
# is_autowalk = False # True: 随机移动; False: 移动到目标点
|
||||
# pose = GrabSim_pb2.Pose(X=loc[0], Y=loc[1], Yaw=180)
|
||||
# controls.append(GrabSim_pb2.WalkerControls.WControl(id=i, autowalk=is_autowalk, speed=200, pose=pose))
|
||||
# scene = self.client.ControlWalkers(
|
||||
# GrabSim_pb2.WalkerControls(controls=controls, scene=scene_id)) # 设置行人移动速度和目标点
|
||||
# # print(self.client.Observe(GrabSim_pb2.SceneID(value=scene_id)).walkers) # 打印行人信息
|
||||
# time.sleep(10)
|
||||
# return scene
|
||||
#
|
||||
# def remove_walkers(self, ids, scene_id=0):
|
||||
# '''
|
||||
# 按编号移除行人
|
||||
# Args:
|
||||
# ids: 待移除的行人编号列表
|
||||
# '''
|
||||
# scene = self.client.RemoveWalkers(GrabSim_pb2.RemoveList(IDs=ids, scene=scene_id)) # 按编号移除行人
|
||||
# # print(self.client.Observe(GrabSim_pb2.SceneID(value=scene_id)).walkers) # 打印行人信息
|
||||
# time.sleep(2)
|
||||
# return scene
|
||||
#
|
||||
# def clean_walkers(self, scene_id=0):
|
||||
# '''
|
||||
# 删除环境中所有行人
|
||||
# '''
|
||||
# scene = self.client.CleanWalkers(GrabSim_pb2.SceneID(value=scene_id))
|
||||
# return scene
|
|
@ -1,132 +0,0 @@
|
|||
"""
|
||||
|
||||
Path planning Sample Code with RRT with path smoothing
|
||||
|
||||
@author: AtsushiSakai(@Atsushi_twi)
|
||||
|
||||
"""
|
||||
|
||||
import math
|
||||
import random
|
||||
import matplotlib.pyplot as plt
|
||||
import sys
|
||||
import pathlib
|
||||
sys.path.append(str(pathlib.Path(__file__).parent))
|
||||
|
||||
|
||||
|
||||
def get_path_length(path):
|
||||
'''
|
||||
计算路径长度
|
||||
'''
|
||||
le = 0
|
||||
for i in range(len(path) - 1):
|
||||
dx = path[i + 1][0] - path[i][0]
|
||||
dy = path[i + 1][1] - path[i][1]
|
||||
d = math.hypot(dx, dy)
|
||||
le += d
|
||||
|
||||
return le
|
||||
|
||||
|
||||
def get_target_point(path, targetL):
|
||||
le = 0
|
||||
ti = 0
|
||||
lastPairLen = 0
|
||||
for i in range(len(path) - 1):
|
||||
dx = path[i + 1][0] - path[i][0]
|
||||
dy = path[i + 1][1] - path[i][1]
|
||||
d = math.hypot(dx, dy)
|
||||
le += d
|
||||
if le >= targetL:
|
||||
ti = i - 1
|
||||
lastPairLen = d
|
||||
break
|
||||
|
||||
partRatio = (le - targetL) / lastPairLen
|
||||
|
||||
x = path[ti][0] + (path[ti + 1][0] - path[ti][0]) * partRatio
|
||||
y = path[ti][1] + (path[ti + 1][1] - path[ti][1]) * partRatio
|
||||
|
||||
return [x, y, ti]
|
||||
|
||||
|
||||
# def line_collision_check(first, second, map, path_resolution=25.0, robot_radius=5, scale_ratio=5):
|
||||
# '''
|
||||
# 检查first-second的直线是否穿过障碍物
|
||||
# Args:
|
||||
# path_resolution: 采样点分辨率
|
||||
# '''
|
||||
# x1 = first[0]
|
||||
# y1 = first[1]
|
||||
# x2 = second[0]
|
||||
# y2 = second[1]
|
||||
# path_x = [x1]
|
||||
# path_y = [y1]
|
||||
# # 计算距离和夹角
|
||||
# dx = x2 - x1
|
||||
# dy = y2 - y1
|
||||
# d = math.hypot(dx, dy) # 欧式距离
|
||||
# theta = math.atan2(dy, dx) # 夹角
|
||||
#
|
||||
# n_expand = math.floor(d / path_resolution) # first 和 second 之间的采样点数
|
||||
# for _ in range(n_expand):
|
||||
# x1 += path_resolution * math.cos(theta)
|
||||
# y1 += path_resolution * math.sin(theta)
|
||||
# path_x.append(x1)
|
||||
# path_y.append(y1)
|
||||
# if d <= path_resolution:
|
||||
# path_x.append(x2)
|
||||
# path_y.append(y2)
|
||||
#
|
||||
# for (x, y) in zip(path_x, path_y):
|
||||
# (scale_x, scale_y) = (math.floor(x / scale_ratio), math.floor(y / scale_ratio)) # 向下取整
|
||||
# if robot_radius > scale_ratio: # TODO: 根据robot_radius和scale_ratio选择不同的判断方式
|
||||
# if map[scale_x, scale_y] or map[scale_x + 1, scale_y] or \
|
||||
# map[scale_x, scale_y + 1] or map[scale_x + 1, scale_y + 1]: # 如果node周围的四个缩放坐标有障碍物
|
||||
# return False
|
||||
# return True
|
||||
#
|
||||
#
|
||||
# def path_smoothing(path, map, max_iter=1000):
|
||||
# '''
|
||||
# 输入原路径,输出平滑后的路径
|
||||
# Args:
|
||||
# path: [(x,y),...]
|
||||
# '''
|
||||
# le = get_path_length(path)
|
||||
#
|
||||
# for i in range(max_iter):
|
||||
# # Sample two points
|
||||
# pickPoints = [random.uniform(0, le), random.uniform(0, le)] # 总路径长度中采样2个点
|
||||
# pickPoints.sort()
|
||||
# first = get_target_point(path, pickPoints[0])
|
||||
# second = get_target_point(path, pickPoints[1])
|
||||
#
|
||||
# if first[2] <= 0 or second[2] <= 0:
|
||||
# continue
|
||||
#
|
||||
# if (second[2] + 1) > len(path):
|
||||
# continue
|
||||
#
|
||||
# if second[2] == first[2]:
|
||||
# continue
|
||||
#
|
||||
# # collision check
|
||||
# if not line_collision_check(first, second, map):
|
||||
# continue
|
||||
#
|
||||
# # Create New path
|
||||
# newPath = []
|
||||
# newPath.extend(path[:first[2] + 1])
|
||||
# newPath.append([first[0], first[1]])
|
||||
# newPath.append([second[0], second[1]])
|
||||
# newPath.extend(path[second[2] + 1:])
|
||||
# path = newPath
|
||||
# le = get_path_length(path)
|
||||
#
|
||||
# return path
|
||||
|
||||
|
||||
|
||||
|
|
@ -1,17 +0,0 @@
|
|||
apf.py: 势场法实现
|
||||
|
||||
discretize_map.py: 地图离散化并压缩
|
||||
|
||||
map_5.pkl: 地图文件(5倍压缩)
|
||||
|
||||
*navigate.py: 导航类*
|
||||
|
||||
pathsmoothing.py: 路径平滑
|
||||
|
||||
rrt.py: RRT实现
|
||||
|
||||
rrt_star.py: RRTStar 实现
|
||||
|
||||
test.py: 测试文件
|
||||
|
||||
默认使用RRTStar+路径平滑
|
|
@ -1,440 +0,0 @@
|
|||
"""
|
||||
|
||||
Path planning Sample Code with Randomized Rapidly-Exploring Random Trees (RRT)
|
||||
|
||||
author: AtsushiSakai(@Atsushi_twi)
|
||||
|
||||
"""
|
||||
|
||||
import math
|
||||
import random
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from pathsmoothing import get_path_length, get_target_point
|
||||
|
||||
show_animation = True
|
||||
|
||||
|
||||
class RRT:
|
||||
"""
|
||||
Class for RRT planning
|
||||
"""
|
||||
|
||||
class Node:
|
||||
"""
|
||||
RRT Node
|
||||
"""
|
||||
|
||||
def __init__(self, x=0.0, y=0.0):
|
||||
self.x = x
|
||||
self.y = y
|
||||
self.path_x = [] # Node 到 parent 间的路径链
|
||||
self.path_y = []
|
||||
self.parent = None
|
||||
|
||||
class AreaBounds:
|
||||
|
||||
def __init__(self, area):
|
||||
self.xmin = float(area[0])
|
||||
self.xmax = float(area[1])
|
||||
self.ymin = float(area[2])
|
||||
self.ymax = float(area[3])
|
||||
|
||||
def __init__(self,
|
||||
rand_area, # 横纵坐标的采样范围 [minx,maxx,miny,maxy]
|
||||
map, # 离散化地图
|
||||
scale_ratio=5, # 地图缩放率
|
||||
expand_dis=200, # 最大扩展距离
|
||||
path_resolution=10.0, # 路径链分辨率
|
||||
goal_sample_rate=5, # 5%的几率采样到目标点
|
||||
max_iter=500,
|
||||
robot_radius=12.0, # robot身位半径
|
||||
):
|
||||
|
||||
self.map = map
|
||||
self.scale_ratio = scale_ratio
|
||||
self.start = self.Node(0, 0)
|
||||
self.goal = self.Node(0, 0)
|
||||
(self.min_x, self.max_x, self.min_y, self.max_y) = rand_area
|
||||
# self.play_area = self.AreaBounds(rand_area) # 采样区域(边界框启发式)
|
||||
self.play_area = None
|
||||
self.expand_dis = expand_dis
|
||||
self.path_resolution = path_resolution
|
||||
self.goal_sample_rate = goal_sample_rate
|
||||
self.max_iter = max_iter
|
||||
self.node_list = []
|
||||
self.robot_radius = robot_radius
|
||||
|
||||
def reset(self):
|
||||
'''
|
||||
重置rrt的变量和数据结构
|
||||
'''
|
||||
self.node_list.clear()
|
||||
self.start = self.Node(0, 0)
|
||||
self.goal = self.Node(0, 0)
|
||||
|
||||
def update_play_area(self):
|
||||
'''
|
||||
更新采样区域
|
||||
'''
|
||||
if self.goal.x > self.start.x:
|
||||
self.play_area.xmin = self.start.x
|
||||
self.play_area.xmax = self.max_x
|
||||
else:
|
||||
self.play_area.xmin = self.min_x
|
||||
self.play_area.xmax = self.start.x
|
||||
if self.goal.y > self.start.y:
|
||||
self.play_area.ymin = self.start.y
|
||||
self.play_area.ymax = self.max_y
|
||||
else:
|
||||
self.play_area.ymin = self.min_y
|
||||
self.play_area.ymax = self.start.y
|
||||
|
||||
def planning(self, start, goal, path_smoothing=True, animation=False):
|
||||
"""
|
||||
rrt path planning, return a path list
|
||||
|
||||
animation: flag for animation on or off
|
||||
"""
|
||||
(self.start.x, self.start.y) = start
|
||||
(self.goal.x, self.goal.y) = goal
|
||||
# self.update_play_area()
|
||||
self.node_list = [self.start] # 已经采样的节点列表
|
||||
for i in range(self.max_iter):
|
||||
rnd_node = self.get_random_node() # 随机采样节点
|
||||
nearest_ind = self.get_nearest_node_index(self.node_list, rnd_node)
|
||||
nearest_node = self.node_list[nearest_ind] # node_list中距离新采样点最近的节点
|
||||
|
||||
new_node = self.steer(nearest_node, rnd_node, self.expand_dis)
|
||||
|
||||
# 确保新采样节点在范围内且不是障碍物
|
||||
if self.check_if_outside_play_area(new_node, self.play_area) and \
|
||||
self.check_collision(new_node):
|
||||
self.node_list.append(new_node)
|
||||
|
||||
if animation and i % 5 == 0:
|
||||
self.draw_graph(rnd_node)
|
||||
|
||||
if self.calc_dist_to_goal(self.node_list[-1].x,
|
||||
self.node_list[-1].y) <= self.expand_dis:
|
||||
final_node = self.steer(self.node_list[-1], self.goal,
|
||||
self.expand_dis)
|
||||
if self.check_collision(final_node):
|
||||
path = self.generate_final_course(len(self.node_list) - 1)
|
||||
if path_smoothing:
|
||||
return self.path_smoothing(path)
|
||||
else:
|
||||
return path
|
||||
|
||||
if animation and i % 5:
|
||||
self.draw_graph(rnd_node)
|
||||
|
||||
return None # cannot find path
|
||||
|
||||
def steer(self, from_node, to_node, extend_length=float("inf")):
|
||||
'''
|
||||
返回from_node 到 to_node 距离限制内最远的节点,并设置其父节点为from_node
|
||||
并计算路径链
|
||||
Args:
|
||||
entend_length: 距离限制
|
||||
return:
|
||||
new_node: from_node 到 to_node 距离限制内最远的节点
|
||||
( 如果 to_node 在距离限制内则返回 to_node )
|
||||
'''
|
||||
new_node = self.Node(from_node.x, from_node.y)
|
||||
d, theta = self.calc_distance_and_angle(new_node, to_node) # from 和 to 之间的距离和方向角
|
||||
|
||||
new_node.path_x = [new_node.x]
|
||||
new_node.path_y = [new_node.y]
|
||||
|
||||
if extend_length > d:
|
||||
extend_length = d
|
||||
|
||||
n_expand = math.floor(extend_length / self.path_resolution) # from 和 to 之间的采样点数
|
||||
|
||||
for _ in range(n_expand):
|
||||
new_node.x += self.path_resolution * math.cos(theta)
|
||||
new_node.y += self.path_resolution * math.sin(theta)
|
||||
new_node.path_x.append(new_node.x)
|
||||
new_node.path_y.append(new_node.y)
|
||||
|
||||
d, _ = self.calc_distance_and_angle(new_node, to_node)
|
||||
if d <= self.path_resolution:
|
||||
new_node.path_x.append(to_node.x)
|
||||
new_node.path_y.append(to_node.y)
|
||||
new_node.x = to_node.x
|
||||
new_node.y = to_node.y
|
||||
|
||||
new_node.parent = from_node
|
||||
|
||||
return new_node
|
||||
|
||||
def generate_final_course(self, goal_ind) -> [(float, float), ]:
|
||||
'''
|
||||
得到路径(起点->终点)
|
||||
'''
|
||||
path = [(self.goal.x, self.goal.y)]
|
||||
node = self.node_list[goal_ind]
|
||||
while node.parent is not None:
|
||||
path.append((node.x, node.y))
|
||||
node = node.parent
|
||||
path.append((node.x, node.y))
|
||||
path.reverse()
|
||||
return path
|
||||
|
||||
def calc_dist_to_goal(self, x, y):
|
||||
'''
|
||||
计算节点到终点距离
|
||||
'''
|
||||
dx = x - self.goal.x
|
||||
dy = y - self.goal.y
|
||||
return math.hypot(dx, dy)
|
||||
|
||||
def get_random_node(self):
|
||||
'''
|
||||
在规定范围内随机采样节点(一定概率采样到目标节点)
|
||||
'''
|
||||
if random.randint(0, 100) > self.goal_sample_rate:
|
||||
rnd = self.Node(
|
||||
random.uniform(self.min_x, self.max_x),
|
||||
random.uniform(self.min_y, self.max_y))
|
||||
else: # goal point sampling
|
||||
rnd = self.Node(self.goal.x, self.goal.y) # 返回goal
|
||||
return rnd
|
||||
|
||||
def map2real(self, x, y):
|
||||
'''
|
||||
地图坐标->实际坐标
|
||||
'''
|
||||
x = x * self.scale_ratio + self.min_x
|
||||
y = y * self.scale_ratio + self.min_y
|
||||
return x, y
|
||||
|
||||
def real2map(self, x, y):
|
||||
'''
|
||||
实际坐标->地图坐标 (向下取整)
|
||||
'''
|
||||
# x = round((x - self.min_x) / self.scale_ratio)
|
||||
# y = round((y - self.min_y) / self.scale_ratio)
|
||||
x = math.floor((x - self.min_x) / self.scale_ratio)
|
||||
y = math.floor((y - self.min_y) / self.scale_ratio)
|
||||
return x, y
|
||||
|
||||
def check_collision(self, node):
|
||||
'''
|
||||
判断 node 到 父节点之间 是否有障碍物
|
||||
'''
|
||||
|
||||
if node is None:
|
||||
return False
|
||||
for (x, y) in zip(node.path_x, node.path_y):
|
||||
map_x, map_y = self.real2map(x, y)
|
||||
# TODO: 碰撞检测考虑robot_radius和scale_ratio
|
||||
if self.map[map_x, map_y] or self.map[map_x + 1, map_y] or \
|
||||
self.map[map_x, map_y + 1] or self.map[map_x + 1, map_y + 1]:
|
||||
return False
|
||||
return True # safe
|
||||
|
||||
def draw_graph(self, new=None, final_path=None):
|
||||
# 清空当前figure内容,保留figure对象
|
||||
plt.clf()
|
||||
# for stopping simulation with the esc key. 按esc结束
|
||||
plt.gcf().canvas.mpl_connect(
|
||||
'key_release_event',
|
||||
lambda event: [exit(0) if event.key == 'escape' else None])
|
||||
|
||||
# 画地图: X行Y列,第一行在下面
|
||||
# 范围: 横向Y[-70,120] 纵向X[-80,290]
|
||||
plt.imshow(self.map, cmap='binary', alpha=0.5, origin='lower',
|
||||
extent=(self.min_y / self.scale_ratio, self.max_y / self.scale_ratio,
|
||||
self.min_x / self.scale_ratio, self.max_x / self.scale_ratio))
|
||||
# 画新采样点
|
||||
if new is not None:
|
||||
(scale_x, scale_y) = (new.x / self.scale_ratio, new.y / self.scale_ratio)
|
||||
plt.plot(scale_y, scale_x, "^k")
|
||||
if self.robot_radius > 0.0:
|
||||
self.plot_circle(scale_y, scale_x, self.robot_radius / self.scale_ratio, '-r')
|
||||
# 画新边
|
||||
if new.parent:
|
||||
plt.plot([y / self.scale_ratio for y in new.path_y],
|
||||
[x / self.scale_ratio for x in new.path_x], "-g")
|
||||
|
||||
# 画整个搜索树
|
||||
for node in self.node_list:
|
||||
if node.parent:
|
||||
plt.plot([y / self.scale_ratio for y in node.path_y],
|
||||
[x / self.scale_ratio for x in node.path_x], "-g")
|
||||
# 画起点和目标
|
||||
plt.plot(self.start.y / self.scale_ratio, self.start.x / self.scale_ratio, "xr")
|
||||
plt.plot(self.goal.y / self.scale_ratio, self.goal.x / self.scale_ratio, "xr")
|
||||
|
||||
plt.xlabel('y', loc='right')
|
||||
plt.ylabel('x', loc='top')
|
||||
plt.grid(color='black', linestyle='-', linewidth=0.5)
|
||||
plt.pause(0.2)
|
||||
|
||||
# 画最终路径
|
||||
if final_path:
|
||||
plt.plot([y / self.scale_ratio for (x, y) in final_path],
|
||||
[x / self.scale_ratio for (x, y) in final_path], '-r') # 画出最终路径
|
||||
plt.pause(2)
|
||||
|
||||
@staticmethod
|
||||
def plot_circle(x, y, size, color="-b"): # pragma: no cover
|
||||
'''
|
||||
以(x,y)为圆心,size为半径 画圆
|
||||
'''
|
||||
deg = list(range(0, 360, 5))
|
||||
deg.append(0)
|
||||
xl = [x + size * math.cos(np.deg2rad(d)) for d in deg]
|
||||
yl = [y + size * math.sin(np.deg2rad(d)) for d in deg]
|
||||
plt.plot(xl, yl, color)
|
||||
|
||||
@staticmethod
|
||||
def get_nearest_node_index(node_list, rnd_node):
|
||||
'''
|
||||
得到距离rnd_node最近的节点编号
|
||||
'''
|
||||
dlist = [(node.x - rnd_node.x) ** 2 + (node.y - rnd_node.y) ** 2
|
||||
for node in node_list]
|
||||
minind = dlist.index(min(dlist))
|
||||
|
||||
return minind
|
||||
|
||||
@staticmethod
|
||||
def check_if_outside_play_area(node, play_area):
|
||||
|
||||
if play_area is None:
|
||||
return True # no play_area was defined, every pos should be ok
|
||||
|
||||
if node.x < play_area.xmin or node.x > play_area.xmax or \
|
||||
node.y < play_area.ymin or node.y > play_area.ymax:
|
||||
return False # outside - bad
|
||||
else:
|
||||
return True # inside - ok
|
||||
|
||||
@staticmethod
|
||||
def calc_distance_and_angle(from_node, to_node):
|
||||
'''
|
||||
计算2点间的距离和角度( 以from_node为原点 )
|
||||
'''
|
||||
dx = to_node.x - from_node.x
|
||||
dy = to_node.y - from_node.y
|
||||
d = math.hypot(dx, dy) # 欧式距离
|
||||
theta = math.atan2(dy, dx)
|
||||
return d, theta
|
||||
|
||||
# **************************Path Smoothing**************************************************** #
|
||||
# **************************Path Smoothing**************************************************** #
|
||||
# **************************Path Smoothing**************************************************** #
|
||||
|
||||
def line_collision_check(self, first, second):
|
||||
'''
|
||||
检查first-second的直线是否穿过障碍物
|
||||
Args:
|
||||
path_resolution: 采样点分辨率
|
||||
'''
|
||||
x1 = first[0]
|
||||
y1 = first[1]
|
||||
x2 = second[0]
|
||||
y2 = second[1]
|
||||
path_x = [x1]
|
||||
path_y = [y1]
|
||||
# 计算距离和夹角
|
||||
dx = x2 - x1
|
||||
dy = y2 - y1
|
||||
d = math.hypot(dx, dy) # 欧式距离
|
||||
theta = math.atan2(dy, dx) # 夹角
|
||||
|
||||
n_expand = math.floor(d / self.path_resolution) # first 和 second 之间的采样点数
|
||||
for _ in range(n_expand):
|
||||
x1 += self.path_resolution * math.cos(theta)
|
||||
y1 += self.path_resolution * math.sin(theta)
|
||||
path_x.append(x1)
|
||||
path_y.append(y1)
|
||||
if d <= self.path_resolution:
|
||||
path_x.append(x2)
|
||||
path_y.append(y2)
|
||||
|
||||
for (x, y) in zip(path_x, path_y):
|
||||
map_x, map_y = self.real2map(x, y) # 向下取整
|
||||
# TODO: 碰撞检测考虑robot_radius和scale_ratio
|
||||
if self.map[map_x, map_y] or self.map[map_x + 1, map_y] or \
|
||||
self.map[map_x, map_y + 1] or self.map[map_x + 1, map_y + 1]:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def path_smoothing(self, path, max_iter=1000):
|
||||
'''
|
||||
输入原路径,输出平滑后的路径
|
||||
Args:
|
||||
path: [(x,y),...]
|
||||
'''
|
||||
le = get_path_length(path)
|
||||
|
||||
for i in range(max_iter):
|
||||
# Sample two points
|
||||
pickPoints = [random.uniform(0, le), random.uniform(0, le)] # 总路径长度中采样2个点
|
||||
pickPoints.sort()
|
||||
first = get_target_point(path, pickPoints[0])
|
||||
second = get_target_point(path, pickPoints[1])
|
||||
|
||||
if first[2] <= 0 or second[2] <= 0:
|
||||
continue
|
||||
|
||||
if (second[2] + 1) > len(path):
|
||||
continue
|
||||
|
||||
if second[2] == first[2]:
|
||||
continue
|
||||
|
||||
# collision check
|
||||
if not self.line_collision_check(first, second):
|
||||
continue
|
||||
|
||||
# Create New path
|
||||
newPath = []
|
||||
newPath.extend(path[:first[2] + 1])
|
||||
newPath.append([first[0], first[1]])
|
||||
newPath.append([second[0], second[1]])
|
||||
newPath.extend(path[second[2] + 1:])
|
||||
path = newPath
|
||||
le = get_path_length(path)
|
||||
|
||||
return path
|
||||
|
||||
|
||||
def main(gx=6.0, gy=10.0):
|
||||
print("start " + __file__)
|
||||
|
||||
# ====Search Path with RRT====
|
||||
obstacleList = [(5, 5, 1), (3, 6, 2), (3, 8, 2), (3, 10, 2), (7, 5, 2),
|
||||
(9, 5, 2), (8, 10, 1)] # [x, y, radius]
|
||||
# Set Initial parameters
|
||||
rrt = RRT(
|
||||
start=[0, 0],
|
||||
goal=[gx, gy],
|
||||
rand_area=[-2, 15],
|
||||
obstacle_list=obstacleList,
|
||||
# play_area=[0, 10, 0, 14]
|
||||
robot_radius=0.8
|
||||
)
|
||||
path = rrt.planning(animation=show_animation)
|
||||
|
||||
if path is None:
|
||||
print("Cannot find path")
|
||||
else:
|
||||
print("found path!!")
|
||||
|
||||
# Draw final path
|
||||
if show_animation:
|
||||
rrt.draw_graph() # 画出探索过程
|
||||
plt.plot([x for (x, y) in path], [y for (x, y) in path], '-r') # 画出最终路径
|
||||
plt.grid(True)
|
||||
plt.pause(0.01)
|
||||
plt.show()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -1,288 +0,0 @@
|
|||
"""
|
||||
|
||||
Path planning Sample Code with RRT*
|
||||
|
||||
author: Atsushi Sakai(@Atsushi_twi)
|
||||
|
||||
"""
|
||||
|
||||
import math
|
||||
import sys
|
||||
import matplotlib.pyplot as plt
|
||||
import pathlib
|
||||
from rrt import RRT
|
||||
|
||||
sys.path.append(str(pathlib.Path(__file__).parent.parent))
|
||||
|
||||
|
||||
|
||||
class RRTStar(RRT):
|
||||
"""
|
||||
Class for RRT Star planning
|
||||
"""
|
||||
|
||||
class Node(RRT.Node):
|
||||
def __init__(self, x, y):
|
||||
super().__init__(x, y)
|
||||
self.cost = 0.0 # 路径代价
|
||||
|
||||
def __init__(self,
|
||||
rand_area,
|
||||
map,
|
||||
scale_ratio=5,
|
||||
expand_dis=100.0,
|
||||
path_resolution=10.0,
|
||||
goal_sample_rate=10,
|
||||
max_iter=300,
|
||||
connect_circle_dist=50.0, # new
|
||||
search_until_max_iter=False, # new
|
||||
robot_radius=5.0):
|
||||
|
||||
super().__init__(rand_area, map, scale_ratio, expand_dis,
|
||||
path_resolution, goal_sample_rate, max_iter,
|
||||
robot_radius=robot_radius)
|
||||
self.connect_circle_dist = connect_circle_dist
|
||||
self.search_until_max_iter = search_until_max_iter
|
||||
self.node_list = []
|
||||
|
||||
def planning(self, start, goal, path_smoothing=True, animation=False) -> [(float, float), ...]:
|
||||
"""
|
||||
rrt star path planning, return a path list
|
||||
animation: flag for animation on or off .
|
||||
"""
|
||||
(self.start.x, self.start.y) = start
|
||||
(self.goal.x, self.goal.y) = goal
|
||||
# self.update_play_area()
|
||||
self.node_list = [self.start]
|
||||
while len(self.node_list) < self.max_iter:
|
||||
#for i in range(self.max_iter):
|
||||
# print("Iter:", i, ", number of nodes:", len(self.node_list))
|
||||
print("number of nodes:", len(self.node_list))
|
||||
rnd_node = self.get_random_node()
|
||||
nearest_ind = self.get_nearest_node_index(self.node_list, rnd_node)
|
||||
nearest_node = self.node_list[nearest_ind]
|
||||
new_node = self.steer(nearest_node, rnd_node, self.expand_dis)
|
||||
|
||||
if animation:
|
||||
self.draw_graph(new_node)
|
||||
|
||||
if self.check_collision(new_node):
|
||||
new_node.cost = nearest_node.cost + \
|
||||
math.hypot(new_node.x - nearest_node.x,
|
||||
new_node.y - nearest_node.y)
|
||||
near_inds = self.find_near_nodes(new_node)
|
||||
# node_with_updated_parent: 已经找到父节点的new_node
|
||||
node_with_updated_parent = self.choose_parent(new_node, near_inds)
|
||||
if node_with_updated_parent:
|
||||
self.rewire(node_with_updated_parent, near_inds)
|
||||
self.node_list.append(node_with_updated_parent)
|
||||
else:
|
||||
self.node_list.append(new_node) # ??? 不可能发生
|
||||
|
||||
# 目标检测
|
||||
if not self.search_until_max_iter:
|
||||
last_index = self.search_best_goal_node() # 找到目标单步范围内的总距离最短的节点
|
||||
if last_index is not None:
|
||||
path = self.generate_final_course(last_index)
|
||||
if path_smoothing:
|
||||
return self.path_smoothing(path)
|
||||
else:
|
||||
return path
|
||||
|
||||
print("reached max iteration")
|
||||
|
||||
last_index = self.search_best_goal_node()
|
||||
if last_index is not None:
|
||||
path = self.generate_final_course(last_index)
|
||||
if path_smoothing:
|
||||
return self.path_smoothing(path)
|
||||
else:
|
||||
return path
|
||||
|
||||
return None
|
||||
|
||||
def choose_parent(self, new_node, near_inds):
|
||||
"""
|
||||
为 new_node 选择(从起点开始)总距离最小的父节点
|
||||
Computes the cheapest point to new_node contained in the list
|
||||
near_inds and set such a node as the parent of new_node.
|
||||
Arguments:
|
||||
--------
|
||||
new_node, Node
|
||||
randomly generated node with a path from its neared point
|
||||
There are not coalitions between this node and th tree.
|
||||
near_inds: list
|
||||
Indices of indices of the nodes what are near to new_node
|
||||
|
||||
Returns.
|
||||
------
|
||||
Node, a copy of new_node
|
||||
"""
|
||||
if not near_inds:
|
||||
return None
|
||||
|
||||
# search nearest cost in near_inds
|
||||
costs = []
|
||||
for i in near_inds:
|
||||
near_node = self.node_list[i]
|
||||
t_node = self.steer(near_node, new_node)
|
||||
if t_node and self.check_collision(t_node):
|
||||
costs.append(self.calc_new_cost(near_node, new_node))
|
||||
else:
|
||||
costs.append(float("inf")) # the cost of collision node
|
||||
min_cost = min(costs)
|
||||
|
||||
if min_cost == float("inf"):
|
||||
print("There is no good path.(min_cost is inf)")
|
||||
return None
|
||||
|
||||
min_ind = near_inds[costs.index(min_cost)]
|
||||
new_node = self.steer(self.node_list[min_ind], new_node) # 为new_node设置父节点
|
||||
new_node.cost = min_cost
|
||||
|
||||
return new_node
|
||||
|
||||
def search_best_goal_node(self):
|
||||
'''
|
||||
从可直达目标的节点(单步范围内且中间无障碍物)中,选出从起点到目标距离最短的中间节点
|
||||
'''
|
||||
dist_to_goal_list = [
|
||||
self.calc_dist_to_goal(n.x, n.y) for n in self.node_list
|
||||
]
|
||||
goal_inds = [ # 距离目标单步范围内的节点
|
||||
dist_to_goal_list.index(i) for i in dist_to_goal_list
|
||||
if i <= self.expand_dis
|
||||
]
|
||||
|
||||
safe_goal_inds = [] # 目标单步范围内且中间没有障碍物的节点
|
||||
for goal_ind in goal_inds:
|
||||
t_node = self.steer(self.node_list[goal_ind], self.goal)
|
||||
if self.check_collision(t_node):
|
||||
safe_goal_inds.append(goal_ind)
|
||||
|
||||
if not safe_goal_inds:
|
||||
return None
|
||||
|
||||
safe_goal_costs = [self.node_list[i].cost + # 从起点经过安全节点到目标的距离
|
||||
self.calc_dist_to_goal(self.node_list[i].x, self.node_list[i].y)
|
||||
for i in safe_goal_inds]
|
||||
|
||||
min_cost = min(safe_goal_costs)
|
||||
for i, cost in zip(safe_goal_inds, safe_goal_costs):
|
||||
if cost == min_cost:
|
||||
return i
|
||||
|
||||
return None
|
||||
|
||||
def find_near_nodes(self, new_node):
|
||||
"""
|
||||
找到 new_node 周围一定范围内的树中的节点
|
||||
1) defines a ball centered on new_node
|
||||
2) Returns all nodes of the three that are inside this ball
|
||||
Arguments:
|
||||
---------
|
||||
new_node: Node
|
||||
new randomly generated node, without collisions between
|
||||
its nearest node
|
||||
Returns:
|
||||
-------
|
||||
list
|
||||
List with the indices of the nodes inside the ball of
|
||||
radius r
|
||||
"""
|
||||
nnode = len(self.node_list) + 1
|
||||
r = self.connect_circle_dist * math.sqrt(math.log(nnode) / nnode)
|
||||
# if expand_dist exists, search vertices in a range no more than expand_dist
|
||||
if hasattr(self, 'expand_dis'):
|
||||
r = min(r, self.expand_dis)
|
||||
dist_list = [(node.x - new_node.x) ** 2 + (node.y - new_node.y) ** 2
|
||||
for node in self.node_list]
|
||||
near_inds = [dist_list.index(i) for i in dist_list if i <= r ** 2] #
|
||||
return near_inds
|
||||
|
||||
def rewire(self, new_node, near_inds):
|
||||
"""
|
||||
新加入节点后,为周围的其他节点重新计算最短路径并更新其父节点
|
||||
For each node in near_inds, this will check if it is cheaper to
|
||||
arrive to them from new_node.
|
||||
In such a case, this will re-assign the parent of the nodes in
|
||||
near_inds to new_node.
|
||||
Parameters:
|
||||
----------
|
||||
new_node, Node
|
||||
Node randomly added which can be joined to the tree
|
||||
|
||||
near_inds, list of uints
|
||||
A list of indices of the self.new_node which contains
|
||||
nodes within a circle of a given radius.
|
||||
Remark: parent is designated in choose_parent.
|
||||
|
||||
"""
|
||||
for i in near_inds:
|
||||
near_node = self.node_list[i]
|
||||
edge_node = self.steer(new_node, near_node)
|
||||
if not edge_node:
|
||||
continue
|
||||
edge_node.cost = self.calc_new_cost(new_node, near_node)
|
||||
|
||||
no_collision = self.check_collision(edge_node)
|
||||
improved_cost = near_node.cost > edge_node.cost
|
||||
|
||||
if no_collision and improved_cost:
|
||||
for node in self.node_list:
|
||||
if node.parent == near_node:
|
||||
node.parent = edge_node
|
||||
self.node_list[i] = edge_node
|
||||
self.propagate_cost_to_leaves(self.node_list[i])
|
||||
|
||||
def calc_new_cost(self, from_node, to_node):
|
||||
'''
|
||||
从起始位置经过 from_node 到 to_node 的 cost
|
||||
'''
|
||||
d, _ = self.calc_distance_and_angle(from_node, to_node)
|
||||
return from_node.cost + d
|
||||
|
||||
def propagate_cost_to_leaves(self, parent_node):
|
||||
'''
|
||||
(递归算法) 从父节点不断向子节点传播,计算cost
|
||||
'''
|
||||
for node in self.node_list:
|
||||
if node.parent == parent_node:
|
||||
node.cost = self.calc_new_cost(parent_node, node)
|
||||
self.propagate_cost_to_leaves(node)
|
||||
|
||||
|
||||
def main():
|
||||
print("Start " + __file__)
|
||||
|
||||
# ====Search Path with RRT====
|
||||
obstacle_list = [
|
||||
(5, 5, 1),
|
||||
(3, 6, 2),
|
||||
(3, 8, 2),
|
||||
(3, 10, 2),
|
||||
(7, 5, 2),
|
||||
(9, 5, 2),
|
||||
(8, 10, 1),
|
||||
(6, 12, 1),
|
||||
] # [x,y,size(radius)]
|
||||
|
||||
# Set Initial parameters
|
||||
rrt_star = RRTStar(rand_area=[-2, 15], expand_dis=1, robot_radius=0.8)
|
||||
path = rrt_star.planning(animation=show_animation)
|
||||
|
||||
if path is None:
|
||||
print("Cannot find path")
|
||||
else:
|
||||
print("found path!!")
|
||||
|
||||
# Draw final path
|
||||
if show_animation:
|
||||
rrt_star.draw_graph()
|
||||
plt.plot([x for (x, y) in path], [y for (x, y) in path], 'r--')
|
||||
plt.grid(True)
|
||||
plt.show()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -1,64 +0,0 @@
|
|||
import os
|
||||
import pickle
|
||||
import time
|
||||
import random
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
|
||||
from scene_utils import control # TODO: 文件名改成Scene.py
|
||||
from navigate import Navigator
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
file_name = 'map_5.pkl'
|
||||
if os.path.exists(file_name):
|
||||
with open(file_name, 'rb') as file:
|
||||
map = pickle.load(file)
|
||||
|
||||
control.init_world(1, 3)
|
||||
scene = control.Scene(sceneID=0)
|
||||
scene.reset()
|
||||
|
||||
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map)
|
||||
goal = (200, 1400)
|
||||
navigator.navigate(goal, animation=False)
|
||||
|
||||
|
||||
# scene.add_walker(1085, 2630, 220)
|
||||
# scene.control_walker([scene.walker_control_generator(0, False, 100, 755, 1900, 180)])
|
||||
|
||||
# print(scene.status)
|
||||
#
|
||||
# control.init_world(1, 3)
|
||||
#
|
||||
# scene = control.Scene(sceneID=0)
|
||||
#
|
||||
# # 实现单顾客领位
|
||||
# scene.reset()
|
||||
# scene.add_walker(0, -200, 220)
|
||||
#
|
||||
# for walker in scene.status.walkers:
|
||||
# print(walker.pose.X)
|
||||
# print('*************************')
|
||||
# time.sleep(3)
|
||||
# for walker in scene.status.walkers:
|
||||
# print(walker.pose)
|
||||
# print('*************************')
|
||||
#
|
||||
# scene.control_walker([scene.walker_control_generator(0, False, 500, 70, 880, 120)])
|
||||
# time.sleep(0.5)
|
||||
# for walker in scene.status.walkers:
|
||||
# print(walker.pose)
|
||||
# print('*************************')
|
||||
# time.sleep(5)
|
||||
# for walker in scene.status.walkers:
|
||||
# print(walker.pose.X)
|
||||
# print('*************************')
|
||||
#
|
||||
# scene.remove_walker(0)
|
||||
# scene.clean_walker()
|
||||
|
||||
|
|
@ -1,3 +0,0 @@
|
|||
from . import navigate
|
||||
from . import dstar_lite
|
||||
|
Binary file not shown.
|
@ -0,0 +1,3 @@
|
|||
from . import navigate
|
||||
from . import dstar_lite
|
||||
|
|
@ -307,9 +307,7 @@ class DStarLite:
|
|||
s_start = self.real2map(s_start)
|
||||
s_goal = self.real2map(s_goal)
|
||||
dyna_obs = [self.real2map(obs) for obs in dyna_obs]
|
||||
|
||||
self._planning(s_start, s_goal, dyna_obs, debug)
|
||||
|
||||
# 地图坐标->实际坐标
|
||||
path = [self.map2real(node) for node in self.path]
|
||||
return path
|
||||
|
@ -327,7 +325,7 @@ class DStarLite:
|
|||
path = []
|
||||
cur = self.s_start
|
||||
while cur != self.s_goal:
|
||||
succ = self.get_neighbors(cur)
|
||||
succ = [s_ for s_ in self.get_neighbors(cur) if s_ not in path] # 避免抖动 (不可走重复的点)
|
||||
cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])]
|
||||
path.append(cur)
|
||||
# else:
|
|
@ -1,7 +1,7 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- encoding: utf-8 -*-
|
||||
import math
|
||||
from dstar_lite import DStarLite, euclidean_distance
|
||||
from robowaiter.algos.navigate.dstar_lite import DStarLite, euclidean_distance
|
||||
|
||||
|
||||
class Navigator:
|
||||
|
@ -71,6 +71,13 @@ class Navigator:
|
|||
print('plan pos:', next_pos, end=' ')
|
||||
yaw = self.get_yaw(pos, next_pos)
|
||||
self.scene.walk_to(next_pos[0], next_pos[1], yaw, velocity=self.v, dis_limit=10)
|
||||
|
||||
|
||||
### 获取视觉图像
|
||||
self.scene.get_camera_segment()
|
||||
|
||||
|
||||
|
||||
# pos = (self.scene.status.location.X, self.scene.status.location.Y)
|
||||
# if self.is_reached(pos, next_pos):
|
||||
self.planner.path = self.planner.path[next_step - 1:] # 去除已走过的路径
|
|
@ -1,7 +1,6 @@
|
|||
import py_trees as ptree
|
||||
from typing import Any
|
||||
from robowaiter.behavior_lib._base.Act import Act
|
||||
from robowaiter.algos.navigate.DstarLite.navigate import Navigator
|
||||
from robowaiter.algos.navigate.navigate import Navigator
|
||||
|
||||
class MoveTo(Act):
|
||||
can_be_expanded = True
|
||||
|
@ -27,13 +26,13 @@ class MoveTo(Act):
|
|||
def _update(self) -> ptree.common.Status:
|
||||
# self.scene.test_move()
|
||||
|
||||
# navigator = Navigator(scene=self.scene, area_range=[-350, 600, -400, 1450], map=self.scene.state["map"]["2d"])
|
||||
# goal = self.scene.state['map']['obj_pos'][self.args[0]]
|
||||
# navigator.navigate(goal, animation=False)
|
||||
#
|
||||
# self.scene.state['condition_set'].add('At(Robot,Table)')
|
||||
|
||||
navigator = Navigator(scene=self.scene, area_range=[-350, 600, -400, 1450], map=self.scene.state["map"]["2d"])
|
||||
goal = self.scene.state['map']['obj_pos'][self.args[0]]
|
||||
self.scene.walk_to(goal[0],goal[1])
|
||||
navigator.navigate(goal, animation=False)
|
||||
|
||||
self.scene.state['condition_set'].add('At(Robot,Table)')
|
||||
|
||||
# goal = self.scene.state['map']['obj_pos'][self.args[0]]
|
||||
# self.scene.walk_to(goal[0],goal[1])
|
||||
|
||||
return ptree.common.Status.RUNNING
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import time
|
||||
import grpc
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
from robowaiter.proto import GrabSim_pb2
|
||||
from robowaiter.proto import GrabSim_pb2_grpc
|
||||
|
@ -24,11 +25,18 @@ def init_world(scene_num=1, mapID=11):
|
|||
time.sleep(3) # wait for the map to load
|
||||
|
||||
|
||||
def image_extract(camera_data):
|
||||
image = camera_data.images[0]
|
||||
return np.frombuffer(image.data, dtype=image.dtype).reshape(
|
||||
(image.height, image.width, image.channels)
|
||||
)
|
||||
def show_image(camera_data):
|
||||
print('------------------show_image----------------------')
|
||||
#获取第0张照片
|
||||
im = camera_data.images[0]
|
||||
#使用numpy(np) 数值类型矩阵的frombuffer,将im.data以流的形式(向量的形式)读入,在变型reshape成三位矩阵的形式(长度,宽度,深度)即三阶张量
|
||||
d = np.frombuffer(im.data, dtype=im.dtype).reshape((im.height, im.width, im.channels))
|
||||
#matplotlib中的plt方法 对矩阵d 进行图形绘制,如果 深度相机拍摄的带深度的图片(图片名字中有depth信息),则转换成黑白图即灰度图
|
||||
plt.imshow(d, cmap="gray" if "depth" in im.name.lower() else None)
|
||||
#图像展示在屏幕上
|
||||
plt.show()
|
||||
|
||||
return d
|
||||
|
||||
|
||||
class Scene:
|
||||
|
@ -299,7 +307,7 @@ class Scene:
|
|||
)
|
||||
)
|
||||
if image_only:
|
||||
return image_extract(camera_data)
|
||||
return show_image(camera_data)
|
||||
else:
|
||||
return camera_data
|
||||
|
||||
|
@ -310,20 +318,20 @@ class Scene:
|
|||
)
|
||||
)
|
||||
if image_only:
|
||||
return image_extract(camera_data)
|
||||
return show_image(camera_data)
|
||||
else:
|
||||
return camera_data
|
||||
|
||||
def get_camera_segment(self, image_only=True):
|
||||
def get_camera_segment(self, show=True):
|
||||
camera_data = stub.Capture(
|
||||
GrabSim_pb2.CameraList(
|
||||
cameras=[GrabSim_pb2.CameraName.Head_Segment], scene=self.sceneID
|
||||
)
|
||||
)
|
||||
if image_only:
|
||||
return image_extract(camera_data)
|
||||
else:
|
||||
return camera_data
|
||||
if show:
|
||||
show_image(camera_data)
|
||||
|
||||
return camera_data
|
||||
|
||||
def chat_bubble(self, message):
|
||||
stub.ControlRobot(
|
||||
|
|
|
@ -15,7 +15,7 @@ import numpy as np
|
|||
from robowaiter.scene.scene import Scene,init_world # TODO: 文件名改成Scene.py
|
||||
|
||||
from robowaiter.scene.scene import Scene
|
||||
|
||||
from robowaiter.utils import get_root_path
|
||||
|
||||
class SceneVLN(Scene):
|
||||
def __init__(self, robot):
|
||||
|
@ -26,7 +26,8 @@ class SceneVLN(Scene):
|
|||
]
|
||||
|
||||
def _reset(self):
|
||||
file_name = './robowaiter/algos/navigate/DstarLite/map_5.pkl'
|
||||
root_path = get_root_path()
|
||||
file_name = os.path.join(root_path,'robowaiter/algos/navigate/map_5.pkl')
|
||||
with open(file_name, 'rb') as file:
|
||||
map = pickle.load(file)
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import os
|
||||
from robowaiter import Robot, task_map
|
||||
|
||||
TASK_NAME = 'VLM'
|
||||
TASK_NAME = 'VLN'
|
||||
|
||||
# create robot
|
||||
project_path = "./robowaiter"
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
selector{
|
||||
cond EnvExplored()
|
||||
cond At(Robot,Table)
|
||||
act MoveTo(Table)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue