跑通子任务树生成

This commit is contained in:
ChenXL97 2023-11-09 08:47:57 +08:00
parent 0c0355af46
commit 5c893d72e1
37 changed files with 1722 additions and 91 deletions

4
bracket_ptml.ptml Normal file
View File

@ -0,0 +1,4 @@
selector{
cond At(Robot, Table)
}
}

View File

@ -9,7 +9,7 @@ import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
from dstar_lite import DStarLite
from robowaiter.algos.navigate.DstarLite.dstar_lite import DStarLite
class Navigator:
'''

View File

@ -19,13 +19,13 @@ if __name__ == '__main__':
with open(file_name, 'rb') as file:
map = pickle.load(file)
init_world(1, 3)
init_world(1, 11)
scene = Scene(sceneID=0)
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map)
'''场景1: 无行人环境 robot到达指定目标'''
# goal = np.array((-100, 700))
goal = np.array((-100, 700))
'''场景2: 静止行人环境 robot到达指定目标'''
@ -35,12 +35,12 @@ if __name__ == '__main__':
# goal = np.array((-100, 700))
'''场景3: 移动行人环境 robot到达指定目标'''
scene.clean_walker()
scene.add_walker(50, 300, 0)
scene.add_walker(-50, 500, 0)
scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=False, speed=20, X=-50, Y=600, Yaw=0)])
scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=False, speed=20, X=100, Y=150, Yaw=0)])
goal = np.array((-100, 700))
# scene.clean_walker()
# scene.add_walker(50, 300, 0)
# scene.add_walker(-50, 500, 0)
# scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=False, speed=20, X=-50, Y=600, Yaw=0)])
# scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=False, speed=20, X=100, Y=150, Yaw=0)])
# goal = np.array((-100, 700))
'''场景4: 行人自由移动 robot到达指定目标'''
# # TODO: autowalk=True仿真器会闪退 ???

View File

@ -0,0 +1,151 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# enconding = utf8
import sys
import time
import grpc
from explore import Explore
sys.path.append('./')
sys.path.append('../')
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
import GrabSim_pb2_grpc
import GrabSim_pb2
channel = grpc.insecure_channel('localhost:30001', options=[
('grpc.max_send_message_length', 1024 * 1024 * 1024),
('grpc.max_receive_message_length', 1024 * 1024 * 1024)
])
sim_client = GrabSim_pb2_grpc.GrabSimStub(channel)
'''
初始化卸载已经加载的关卡清除所有机器人
'''
def Init():
sim_client.Init(GrabSim_pb2.NUL())
'''
获取当前可加载的地图信息(地图名字地图尺寸)
'''
def AcquireAvailableMaps():
AvailableMaps = sim_client.AcquireAvailableMaps(GrabSim_pb2.NUL())
print(AvailableMaps)
'''
1根据mapID加载指定地图
2如果scene_num>1,则根据地图尺寸偏移后加载多个相同地图
3这样就可以在一个关卡中训练多个地图
'''
def SetWorld(map_id=0, scene_num=1):
print('------------------SetWorld----------------------')
world = sim_client.SetWorld(GrabSim_pb2.BatchMap(count=scene_num, mapID=map_id))
'''
返回场景的状态信息
1返回机器人的位置和旋转
2返回各个关节的名字和旋转
3返回场景中标记的物品信息(名字类型位置旋转)
4返回场景中行人的信息(名字位置旋转速度)
5返回机器人手指和双臂的碰撞信息
'''
def Observe(scene_id=0):
print('------------------show_env_info----------------------')
scene = sim_client.Observe(GrabSim_pb2.SceneID(value=scene_id))
print(
f"location:{[scene.location]}, rotation:{scene.rotation}\n",
f"joints number:{len(scene.joints)}, fingers number:{len(scene.fingers)}\n",
f"objects number: {len(scene.objects)}, walkers number: {len(scene.walkers)}\n"
f"timestep:{scene.timestep}, timestamp:{scene.timestamp}\n"
f"collision:{scene.collision}, info:{scene.info}")
'''
重置场景
1重置桌子的宽度和高度
2清除生成的行人和物品
3重置关节角度位置旋转
4清除碰撞信息
5重置场景中标记的物品
'''
def Reset(scene_id=0):
print('------------------Reset----------------------')
scene = sim_client.Reset(GrabSim_pb2.ResetParams(scene=scene_id))
print(scene)
# 如果场景支持调整桌子
# sim_client.Reset(GrabSim_pb2.ResetParams(scene = scene_id, adjust = True, height = 100.0, width = 100.0))"
"""
导航移动
yaw:机器人朝向;
velocity:速度,>0代表移动,<0代表瞬移,=0代表只查询;
dis:最终达到的位置距离目标点最远距离,如果超过此距离则目标位置不可达
"""
def navigation_move(v_list, scene_id=0, map_id=11):
print('------------------navigation_move----------------------')
scene = sim_client.Observe(GrabSim_pb2.SceneID(value=scene_id))
walk_value = [scene.location.X, scene.location.Y, scene.rotation.Yaw]
print("position:", walk_value)
# if map_id == 11: # coffee
# v_list = [[0, 880], [250, 1200], [-55, 750], [70, -200]]
# else:
# v_list = [[0.0, 0.0]]
for walk_v in v_list:
walk_v = walk_v + [scene.rotation.Yaw - 90, 60, 0]
print("walk_v", walk_v)
action = GrabSim_pb2.Action(scene=scene_id, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = sim_client.Do(action)
print(scene.info)
if __name__ == '__main__':
map_id = 11 # 地图编号
scene_num = 1 # 场景数量
node_list = [] # 探索点
explorer = Explore(_explore_range=100)
print('------------ 初始化加载场景 ------------')
Init()
AcquireAvailableMaps()
SetWorld(map_id, scene_num)
time.sleep(5.0)
for i in range(scene_num):
print('------------ 场景操作 ------------')
Observe(i)
Reset(i)
print('------------ 自主探索 ------------')
while True:
goal = explorer.explore(cur_pos) # cur_pos 指的是当前机器人的位置,场景中应该也有接口可以获取
if goal is None:
break
node_list.append(goal)
cur_pos = goal
navigation_move(i, map_id, node_list)
# TODOnode_list就是机器人拍照的点目前没有设置拍照角度需要机器人到达一个拍照点后前后左右各拍一张照片然后获取语义信息

View File

@ -0,0 +1,5 @@
from . import apf
from . import rrt
from . import rrt_star
from . import pathsmoothing
from . import navigate

View File

@ -0,0 +1,91 @@
import numpy as np
import copy
import matplotlib
matplotlib.use('TkAgg') # 防止画图卡死
## 初始化车的参数
# P0 = np.array([0, - d / 2, 1, 1]) #车辆起点位置分别代表x,y,vx,vy
#
# Pg = np.array([99, d / 2, 0, 0]) # 目标位置
#
# # 障碍物位置
# Pobs = np.array([
# [15, 7 / 4, 0, 0],
# [30, - 3 / 2, 0, 0],
# [45, 3 / 2, 0, 0],
# [60, - 3 / 4, 0, 0],
# [80, 3/2, 0, 0]])
def APF(Pi, Pg, Pobs=None, step_length=100):
'''
Args:
Pi: robot位置 (x,y)
Pg: 目标位置 (x,y)
Pobs: 障碍物位置 [(x,y), ...]
step_length: 单次移动步长 int
returns:
next_step: robot下一步位置
UnitVec_Fsum: 合力方向向量
'''
# 目标检测
if np.linalg.norm(Pi-Pg) < step_length: # 目标检测
return Pg, (Pg-Pi) / np.linalg.norm(Pg-Pi)
Eta_att = 5 # 引力的增益系数
Eta_rep_ob = 15 # 斥力的增益系数
d0 = 200 # 障碍影响的最大距离
n = 1 # {P_g}^n 为到目标点的距离n为可选的正常数 (解决目标不可达问题)
## 计算引力
delta_g = Pg - Pi # 目标方向向量(指向目标)
dist_g = np.linalg.norm(delta_g) # 目标距离
UniteVec_g = delta_g / dist_g # 目标单位向量
F_att = Eta_att * dist_g * UniteVec_g # F_att = Eta_att * dist(pi,pg) * unite_vec(pi,pg)
F_sum = F_att
## 计算斥力
if Pobs:
delta = np.zeros((len(Pobs), 2))
unite_vec = np.zeros((len(Pobs), 2))
dists = []
# 计算车辆当前位置与所有障碍物的单位方向向量
for j in range(len(Pobs)):
delta[j] = Pi - Pobs[j] # 斥力向量
dists.append(np.linalg.norm(delta[j])) # 障碍物距离
unite_vec[j] = delta[j] / dists[j] # 斥力单位向量
# 每一个障碍物对车辆的斥力,带方向
F_rep_ob = np.zeros((len(Pobs), 2))
# 在原斥力势场函数增加目标调节因子即车辆至目标距离以使车辆到达目标点后斥力也为0
for j in range(len(Pobs)):
if dists[j] >= d0:
F_rep_ob[j] = np.array([0, 0]) # 距离超过阈值则斥力为0
else:
# 障碍物的斥力1方向由障碍物指向车辆
F_rep_ob1_abs = Eta_rep_ob * (1 / dists[j] - 1 / d0) * dist_g ** n / dists[j] ** 2 # 斥力大小
F_rep_ob1 = F_rep_ob1_abs * unite_vec[j] # 斥力向量
# 障碍物的斥力2方向由车辆指向目标点
F_rep_ob2_abs = n / 2 * Eta_rep_ob * (1 / dists[j] - 1 / d0) ** 2 * dist_g ** (n - 1) # 斥力大小
F_rep_ob2 = F_rep_ob2_abs * UniteVec_g # 斥力向量
# 改进后的障碍物合斥力计算
F_rep_ob[j] = F_rep_ob1 + F_rep_ob2
F_rep = np.sum(F_rep_ob, axis=0) # 斥力合力
F_sum += F_rep # 总合力
## 合力方向
UnitVec_Fsum = F_sum / np.linalg.norm(F_sum) # 合力方向向量: F / |F|
# 计算车的下一步位置
next_step = copy.deepcopy(Pi + step_length * UnitVec_Fsum) # 沿合力方向前进单位距离
# print(next_step)
return next_step, UnitVec_Fsum

View File

@ -0,0 +1,61 @@
# !/usr/bin/env python3
# -*- encoding: utf-8 -*-
import time
import matplotlib.pyplot as plt
import numpy as np
import pickle
import os
from scene_utils import control
def draw_grid_map(grid_map):
# 生成新的地图图像
plt.imshow(grid_map, cmap='binary', alpha=0.5, origin='lower') # 黑白网格
# 绘制坐标轴
plt.xlabel('y', loc='right')
plt.ylabel('x', loc='top')
# 显示网格线
plt.grid(color='black', linestyle='-', linewidth=0.5)
# 显示图像
plt.show()
#plt.pause(0.01)
if __name__ == '__main__':
# control.init_world(scene_num=1, mapID=3)
# scene = control.Scene(sceneID=0)
#
# X = int(950/5) # 采点数量
# Y = int(1850/5)
# map = np.zeros((X, Y))
#
# for x in range(X):
# for y in range(Y):
# if not scene.reachable_check(x*5-350, y*5-400, Yaw=0):
# map[x, y] = 1
# print(x, y)
#
#
# file_name = 'map_5.pkl'
# if not os.path.exists(file_name):
# open(file_name, 'w').close()
# with open(file_name, 'wb') as file:
# pickle.dump(map, file)
# print('保存成功')
file_name = 'map_5.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
draw_grid_map(map)

View File

@ -0,0 +1,91 @@
import math
import sys
import numpy as np
# from rrt import RRT
# from rrt_star import RRTStar
# def real2map(min_x, min_y, scale_ratio, x, y):
# '''
# 实际坐标->地图坐标 (向下取整)
# '''
# # x = round((x - self.min_x) / self.scale_ratio)
# # y = round((y - self.min_y) / self.scale_ratio)
# x = math.floor((x - min_x) / scale_ratio)
# y = math.floor((y - min_y) / scale_ratio)
# return x, y
def getDistance(pos1, pos2):
return math.sqrt((pos1[0] - pos2[0]) ** 2 + (pos1[1] - pos2[1]) ** 2)
def getNearestFrontier(cur_pos, frontiers):
dis_min = sys.maxsize
frontier_best = None
for frontier in frontiers:
dis = getDistance(frontier, cur_pos)
if dis <= dis_min:
dis_min = dis
frontier_best = frontier
return frontier_best
class Explore:
def __init__(self, _explore_range):
self.explore_range = _explore_range # 机器人感知范围以机器人为中心边长为2 * _explore_range的正方形
self.visited = set()
self.all_frontier_list = set()
def explore(self, cur_pos):
for i in range(cur_pos[0] - self.explore_range, cur_pos[0] + self.explore_range + 1):
for j in range(cur_pos[1] - self.explore_range, cur_pos[1] + self.explore_range + 1):
# if self.isOutMap((i, j)):
# continue
# x, y = real2map(self.area_range[0], self.area_range[2], self.scale_ratio, i, j)
# if self.map[x, y] == 0:
if self.reachable((i, j)):
self.visited.add((i, j))
for i in range(cur_pos[0] - self.explore_range, cur_pos[0] + self.explore_range + 1):
for j in range(cur_pos[1] - self.explore_range, cur_pos[1] + self.explore_range + 1):
# if self.isOutMap((i, j)):
# continue
# x, y = real2map(self.area_range[0], self.area_range[2], self.scale_ratio, i, j)
if self.reachable((i, j)):
if self.isNewFrontier((i, j)):
self.all_frontier_list.add((i, j))
if len(self.all_frontier_list) == 0:
free_list = list(self.visited)
free_array = np.array(free_list)
print(f"探索完成!以下是场景中可以到达的点:{free_array};其余点均是障碍物不可达")
return None
return getNearestFrontier(cur_pos, self.all_frontier_list)
def isNewFrontier(self, pos):
around_nodes = [(pos[0], pos[1] + 1), (pos[0], pos[1] - 1), (pos[0] - 1, pos[1]), (pos[0] + 1, pos[1])]
for node in around_nodes:
if node not in self.visited and self.reachable(node):
return True
if (pos[0], pos[1]) in self.all_frontier_list:
self.all_frontier_list.remove((pos[0], pos[1]))
return False
def reachable(self, pos):
# x, y = real2map(self.area_range[0], self.area_range[2], self.scale_ratio, pos[0], pos[1])
# if self.map[x, y] == 1:
# return True
# else:
# return False
# TODO: 调用reachable_check函数
pass
# def isOutMap(self, pos):
# if pos[0] <= self.area_range[0] or pos[0] >= self.area_range[1] or pos[1] <= self.area_range[2] or pos[1] >= \
# self.area_range[3]:
# return True
# return False

Binary file not shown.

View File

@ -0,0 +1,164 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import sys
import time
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
from scene_utils import control
from rrt import RRT
from rrt_star import RRTStar
from apf import APF
class Navigator:
'''
导航类
'''
def __init__(self, scene, area_range, map, scale_ratio=5):
self.scene = scene
self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax
self.map = map # 缩放并离散化的地图 array(X,Y)
self.scale_ratio = scale_ratio # 地图缩放率
self.step_length = 50 # 步长(单次移动)
self.v = 100 # 速度
self.step_time = self.step_length/self.v # 单步移动时长
self.planner = RRTStar(rand_area=area_range, map=map, scale_ratio=scale_ratio, max_iter=400, search_until_max_iter=True)
@staticmethod
def is_reached(pos: np.array((float, float)), goal: np.array((float, float)), dis_limit=25):
'''
判断是否到达目标
'''
dis = np.linalg.norm(pos - goal)
return dis < dis_limit
def reset_goal(self, goal:(float, float)):
# TODO: 使目标可达
# 目标在障碍物上:从目标开始方形向外扩展,直到找到可行点
# 目标在地图外面:起点和目标连线最靠近目标的可行点
pass
def navigate(self, goal: (float, float), path_smoothing=True, animation=True):
pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) # 机器人当前: 位置 和 朝向
yaw = self.scene.status.rotation.Yaw
print('------------------navigation_start----------------------')
path = self.planner.planning(pos, goal, path_smoothing, animation)
if path:
self.planner.draw_graph(final_path=path) # 画出探索过程
for (x, y) in path:
self.scene.walk_to(x, y, yaw, velocity=self.v)
time.sleep(self.step_time)
pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
self.planner.reset()
'''APF势场法暂不可用'''
# while not self.is_reached(pos, goal):
# # 1. 路径规划
# path = self.planner.planning(pos, goal, path_smoothing, animation)
# self.planner.draw_graph(final_path=path) # 画出探索过程
#
# # 2. 使用APF导航到路径中的每个waypoint
# traj = [(pos[0], pos[1])]
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# for i, waypoint in enumerate(path[1:]):
# print('waypoint [', i, ']:', waypoint)
# # if (not self.scene.reachable_check(waypoint[0], waypoint[1], yaw)) and self.map[self.planner.real2map(waypoint[0], waypoint[1])] == 0:
# # print('error')
# while not self.is_reached(pos, waypoint):
# # 2.1 计算next_step
# pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
# Pobs = [] # 障碍物(顾客)位置数组
# for walker in self.scene.status.walkers:
# Pobs.append((walker.pose.X, walker.pose.Y))
# next_step, _ = APF(Pi=pos, Pg=waypoint, Pobs=Pobs, step_length=self.step_length)
# traj.append((next_step[0], next_step[1]))
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# while not self.scene.reachable_check(next_step[0], next_step[1], yaw): # 取中点直到next_step可达
# traj.pop()
# next_step = (pos + next_step) / 2
# traj.append((next_step[0], next_step[1]))
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# # 2.2 移动robot
# self.scene.walk_to(next_step[0], next_step[1], yaw, velocity=self.v)
# # print(self.scene.status.info) # print navigation info
# # print(self.scene.status.collision)
# time.sleep(self.step_time)
# # print(self.scene.status.info) # print navigation info
# # print(self.scene.status.collision)
# self.planner.reset()
if self.is_reached(pos, goal):
print('The robot has achieved goal !!')
# class Walker:
# def __int__(self, scene):
# self.scene = scene
#
# def add_walkers(self, walker_loc, scene_id=0):
# """
# 批量添加行人
# Args:
# walker_loc: 行人的初始位置列表( 列表元素数量对应行人数量 )
# """
# print('------------------add walker----------------------')
# walker_list = []
# for i in range(len(walker_loc)):
# # 只有可达的位置才能成功初始化行人显示unreachable的地方不能初始化行人
# walker_list.append(
# GrabSim_pb2.WalkerList.Walker(id=i, pose=GrabSim_pb2.Pose(X=walker_loc[0], Y=walker_loc[1], Yaw=90)))
# scene = self.client.AddWalker(GrabSim_pb2.WalkerList(walkers=walker_list, scene=scene_id)) # 生成环境中行人
# # print(self.client.Observe(GrabSim_pb2.SceneID(value=scene_id)).walkers) # 打印行人信息
# return scene
#
# def control_walkers(self, walker_loc, scene_id=0):
# """
# 批量移动行人
# Args:
# walker_loc: 行人的终止位置列表
# """
# scene = self.client.Observe(GrabSim_pb2.SceneID(value=scene_id))
# controls = []
# for i in range(len(scene.walkers)):
# loc = walker_loc[i]
# is_autowalk = False # True: 随机移动; False: 移动到目标点
# pose = GrabSim_pb2.Pose(X=loc[0], Y=loc[1], Yaw=180)
# controls.append(GrabSim_pb2.WalkerControls.WControl(id=i, autowalk=is_autowalk, speed=200, pose=pose))
# scene = self.client.ControlWalkers(
# GrabSim_pb2.WalkerControls(controls=controls, scene=scene_id)) # 设置行人移动速度和目标点
# # print(self.client.Observe(GrabSim_pb2.SceneID(value=scene_id)).walkers) # 打印行人信息
# time.sleep(10)
# return scene
#
# def remove_walkers(self, ids, scene_id=0):
# '''
# 按编号移除行人
# Args:
# ids: 待移除的行人编号列表
# '''
# scene = self.client.RemoveWalkers(GrabSim_pb2.RemoveList(IDs=ids, scene=scene_id)) # 按编号移除行人
# # print(self.client.Observe(GrabSim_pb2.SceneID(value=scene_id)).walkers) # 打印行人信息
# time.sleep(2)
# return scene
#
# def clean_walkers(self, scene_id=0):
# '''
# 删除环境中所有行人
# '''
# scene = self.client.CleanWalkers(GrabSim_pb2.SceneID(value=scene_id))
# return scene

View File

@ -0,0 +1,132 @@
"""
Path planning Sample Code with RRT with path smoothing
@author: AtsushiSakai(@Atsushi_twi)
"""
import math
import random
import matplotlib.pyplot as plt
import sys
import pathlib
sys.path.append(str(pathlib.Path(__file__).parent))
def get_path_length(path):
'''
计算路径长度
'''
le = 0
for i in range(len(path) - 1):
dx = path[i + 1][0] - path[i][0]
dy = path[i + 1][1] - path[i][1]
d = math.hypot(dx, dy)
le += d
return le
def get_target_point(path, targetL):
le = 0
ti = 0
lastPairLen = 0
for i in range(len(path) - 1):
dx = path[i + 1][0] - path[i][0]
dy = path[i + 1][1] - path[i][1]
d = math.hypot(dx, dy)
le += d
if le >= targetL:
ti = i - 1
lastPairLen = d
break
partRatio = (le - targetL) / lastPairLen
x = path[ti][0] + (path[ti + 1][0] - path[ti][0]) * partRatio
y = path[ti][1] + (path[ti + 1][1] - path[ti][1]) * partRatio
return [x, y, ti]
# def line_collision_check(first, second, map, path_resolution=25.0, robot_radius=5, scale_ratio=5):
# '''
# 检查first-second的直线是否穿过障碍物
# Args:
# path_resolution: 采样点分辨率
# '''
# x1 = first[0]
# y1 = first[1]
# x2 = second[0]
# y2 = second[1]
# path_x = [x1]
# path_y = [y1]
# # 计算距离和夹角
# dx = x2 - x1
# dy = y2 - y1
# d = math.hypot(dx, dy) # 欧式距离
# theta = math.atan2(dy, dx) # 夹角
#
# n_expand = math.floor(d / path_resolution) # first 和 second 之间的采样点数
# for _ in range(n_expand):
# x1 += path_resolution * math.cos(theta)
# y1 += path_resolution * math.sin(theta)
# path_x.append(x1)
# path_y.append(y1)
# if d <= path_resolution:
# path_x.append(x2)
# path_y.append(y2)
#
# for (x, y) in zip(path_x, path_y):
# (scale_x, scale_y) = (math.floor(x / scale_ratio), math.floor(y / scale_ratio)) # 向下取整
# if robot_radius > scale_ratio: # TODO: 根据robot_radius和scale_ratio选择不同的判断方式
# if map[scale_x, scale_y] or map[scale_x + 1, scale_y] or \
# map[scale_x, scale_y + 1] or map[scale_x + 1, scale_y + 1]: # 如果node周围的四个缩放坐标有障碍物
# return False
# return True
#
#
# def path_smoothing(path, map, max_iter=1000):
# '''
# 输入原路径,输出平滑后的路径
# Args:
# path: [(x,y),...]
# '''
# le = get_path_length(path)
#
# for i in range(max_iter):
# # Sample two points
# pickPoints = [random.uniform(0, le), random.uniform(0, le)] # 总路径长度中采样2个点
# pickPoints.sort()
# first = get_target_point(path, pickPoints[0])
# second = get_target_point(path, pickPoints[1])
#
# if first[2] <= 0 or second[2] <= 0:
# continue
#
# if (second[2] + 1) > len(path):
# continue
#
# if second[2] == first[2]:
# continue
#
# # collision check
# if not line_collision_check(first, second, map):
# continue
#
# # Create New path
# newPath = []
# newPath.extend(path[:first[2] + 1])
# newPath.append([first[0], first[1]])
# newPath.append([second[0], second[1]])
# newPath.extend(path[second[2] + 1:])
# path = newPath
# le = get_path_length(path)
#
# return path

View File

@ -0,0 +1,17 @@
apf.py: 势场法实现
discretize_map.py: 地图离散化并压缩
map_5.pkl: 地图文件(5倍压缩)
*navigate.py: 导航类*
pathsmoothing.py: 路径平滑
rrt.py: RRT实现
rrt_star.py: RRTStar 实现
test.py: 测试文件
默认使用RRTStar+路径平滑

View File

@ -0,0 +1,440 @@
"""
Path planning Sample Code with Randomized Rapidly-Exploring Random Trees (RRT)
author: AtsushiSakai(@Atsushi_twi)
"""
import math
import random
import matplotlib.pyplot as plt
import numpy as np
from pathsmoothing import get_path_length, get_target_point
show_animation = True
class RRT:
"""
Class for RRT planning
"""
class Node:
"""
RRT Node
"""
def __init__(self, x=0.0, y=0.0):
self.x = x
self.y = y
self.path_x = [] # Node 到 parent 间的路径链
self.path_y = []
self.parent = None
class AreaBounds:
def __init__(self, area):
self.xmin = float(area[0])
self.xmax = float(area[1])
self.ymin = float(area[2])
self.ymax = float(area[3])
def __init__(self,
rand_area, # 横纵坐标的采样范围 [minx,maxx,miny,maxy]
map, # 离散化地图
scale_ratio=5, # 地图缩放率
expand_dis=200, # 最大扩展距离
path_resolution=10.0, # 路径链分辨率
goal_sample_rate=5, # 5%的几率采样到目标点
max_iter=500,
robot_radius=12.0, # robot身位半径
):
self.map = map
self.scale_ratio = scale_ratio
self.start = self.Node(0, 0)
self.goal = self.Node(0, 0)
(self.min_x, self.max_x, self.min_y, self.max_y) = rand_area
# self.play_area = self.AreaBounds(rand_area) # 采样区域(边界框启发式)
self.play_area = None
self.expand_dis = expand_dis
self.path_resolution = path_resolution
self.goal_sample_rate = goal_sample_rate
self.max_iter = max_iter
self.node_list = []
self.robot_radius = robot_radius
def reset(self):
'''
重置rrt的变量和数据结构
'''
self.node_list.clear()
self.start = self.Node(0, 0)
self.goal = self.Node(0, 0)
def update_play_area(self):
'''
更新采样区域
'''
if self.goal.x > self.start.x:
self.play_area.xmin = self.start.x
self.play_area.xmax = self.max_x
else:
self.play_area.xmin = self.min_x
self.play_area.xmax = self.start.x
if self.goal.y > self.start.y:
self.play_area.ymin = self.start.y
self.play_area.ymax = self.max_y
else:
self.play_area.ymin = self.min_y
self.play_area.ymax = self.start.y
def planning(self, start, goal, path_smoothing=True, animation=False):
"""
rrt path planning, return a path list
animation: flag for animation on or off
"""
(self.start.x, self.start.y) = start
(self.goal.x, self.goal.y) = goal
# self.update_play_area()
self.node_list = [self.start] # 已经采样的节点列表
for i in range(self.max_iter):
rnd_node = self.get_random_node() # 随机采样节点
nearest_ind = self.get_nearest_node_index(self.node_list, rnd_node)
nearest_node = self.node_list[nearest_ind] # node_list中距离新采样点最近的节点
new_node = self.steer(nearest_node, rnd_node, self.expand_dis)
# 确保新采样节点在范围内且不是障碍物
if self.check_if_outside_play_area(new_node, self.play_area) and \
self.check_collision(new_node):
self.node_list.append(new_node)
if animation and i % 5 == 0:
self.draw_graph(rnd_node)
if self.calc_dist_to_goal(self.node_list[-1].x,
self.node_list[-1].y) <= self.expand_dis:
final_node = self.steer(self.node_list[-1], self.goal,
self.expand_dis)
if self.check_collision(final_node):
path = self.generate_final_course(len(self.node_list) - 1)
if path_smoothing:
return self.path_smoothing(path)
else:
return path
if animation and i % 5:
self.draw_graph(rnd_node)
return None # cannot find path
def steer(self, from_node, to_node, extend_length=float("inf")):
'''
返回from_node to_node 距离限制内最远的节点并设置其父节点为from_node
并计算路径链
Args:
entend_length: 距离限制
return:
new_node: from_node to_node 距离限制内最远的节点
( 如果 to_node 在距离限制内则返回 to_node )
'''
new_node = self.Node(from_node.x, from_node.y)
d, theta = self.calc_distance_and_angle(new_node, to_node) # from 和 to 之间的距离和方向角
new_node.path_x = [new_node.x]
new_node.path_y = [new_node.y]
if extend_length > d:
extend_length = d
n_expand = math.floor(extend_length / self.path_resolution) # from 和 to 之间的采样点数
for _ in range(n_expand):
new_node.x += self.path_resolution * math.cos(theta)
new_node.y += self.path_resolution * math.sin(theta)
new_node.path_x.append(new_node.x)
new_node.path_y.append(new_node.y)
d, _ = self.calc_distance_and_angle(new_node, to_node)
if d <= self.path_resolution:
new_node.path_x.append(to_node.x)
new_node.path_y.append(to_node.y)
new_node.x = to_node.x
new_node.y = to_node.y
new_node.parent = from_node
return new_node
def generate_final_course(self, goal_ind) -> [(float, float), ]:
'''
得到路径(起点->终点)
'''
path = [(self.goal.x, self.goal.y)]
node = self.node_list[goal_ind]
while node.parent is not None:
path.append((node.x, node.y))
node = node.parent
path.append((node.x, node.y))
path.reverse()
return path
def calc_dist_to_goal(self, x, y):
'''
计算节点到终点距离
'''
dx = x - self.goal.x
dy = y - self.goal.y
return math.hypot(dx, dy)
def get_random_node(self):
'''
在规定范围内随机采样节点(一定概率采样到目标节点)
'''
if random.randint(0, 100) > self.goal_sample_rate:
rnd = self.Node(
random.uniform(self.min_x, self.max_x),
random.uniform(self.min_y, self.max_y))
else: # goal point sampling
rnd = self.Node(self.goal.x, self.goal.y) # 返回goal
return rnd
def map2real(self, x, y):
'''
地图坐标->实际坐标
'''
x = x * self.scale_ratio + self.min_x
y = y * self.scale_ratio + self.min_y
return x, y
def real2map(self, x, y):
'''
实际坐标->地图坐标 (向下取整)
'''
# x = round((x - self.min_x) / self.scale_ratio)
# y = round((y - self.min_y) / self.scale_ratio)
x = math.floor((x - self.min_x) / self.scale_ratio)
y = math.floor((y - self.min_y) / self.scale_ratio)
return x, y
def check_collision(self, node):
'''
判断 node 父节点之间 是否有障碍物
'''
if node is None:
return False
for (x, y) in zip(node.path_x, node.path_y):
map_x, map_y = self.real2map(x, y)
# TODO: 碰撞检测考虑robot_radius和scale_ratio
if self.map[map_x, map_y] or self.map[map_x + 1, map_y] or \
self.map[map_x, map_y + 1] or self.map[map_x + 1, map_y + 1]:
return False
return True # safe
def draw_graph(self, new=None, final_path=None):
# 清空当前figure内容保留figure对象
plt.clf()
# for stopping simulation with the esc key. 按esc结束
plt.gcf().canvas.mpl_connect(
'key_release_event',
lambda event: [exit(0) if event.key == 'escape' else None])
# 画地图: X行Y列第一行在下面
# 范围: 横向Y[-70,120] 纵向X[-80,290]
plt.imshow(self.map, cmap='binary', alpha=0.5, origin='lower',
extent=(self.min_y / self.scale_ratio, self.max_y / self.scale_ratio,
self.min_x / self.scale_ratio, self.max_x / self.scale_ratio))
# 画新采样点
if new is not None:
(scale_x, scale_y) = (new.x / self.scale_ratio, new.y / self.scale_ratio)
plt.plot(scale_y, scale_x, "^k")
if self.robot_radius > 0.0:
self.plot_circle(scale_y, scale_x, self.robot_radius / self.scale_ratio, '-r')
# 画新边
if new.parent:
plt.plot([y / self.scale_ratio for y in new.path_y],
[x / self.scale_ratio for x in new.path_x], "-g")
# 画整个搜索树
for node in self.node_list:
if node.parent:
plt.plot([y / self.scale_ratio for y in node.path_y],
[x / self.scale_ratio for x in node.path_x], "-g")
# 画起点和目标
plt.plot(self.start.y / self.scale_ratio, self.start.x / self.scale_ratio, "xr")
plt.plot(self.goal.y / self.scale_ratio, self.goal.x / self.scale_ratio, "xr")
plt.xlabel('y', loc='right')
plt.ylabel('x', loc='top')
plt.grid(color='black', linestyle='-', linewidth=0.5)
plt.pause(0.2)
# 画最终路径
if final_path:
plt.plot([y / self.scale_ratio for (x, y) in final_path],
[x / self.scale_ratio for (x, y) in final_path], '-r') # 画出最终路径
plt.pause(2)
@staticmethod
def plot_circle(x, y, size, color="-b"): # pragma: no cover
'''
(x,y)为圆心size为半径 画圆
'''
deg = list(range(0, 360, 5))
deg.append(0)
xl = [x + size * math.cos(np.deg2rad(d)) for d in deg]
yl = [y + size * math.sin(np.deg2rad(d)) for d in deg]
plt.plot(xl, yl, color)
@staticmethod
def get_nearest_node_index(node_list, rnd_node):
'''
得到距离rnd_node最近的节点编号
'''
dlist = [(node.x - rnd_node.x) ** 2 + (node.y - rnd_node.y) ** 2
for node in node_list]
minind = dlist.index(min(dlist))
return minind
@staticmethod
def check_if_outside_play_area(node, play_area):
if play_area is None:
return True # no play_area was defined, every pos should be ok
if node.x < play_area.xmin or node.x > play_area.xmax or \
node.y < play_area.ymin or node.y > play_area.ymax:
return False # outside - bad
else:
return True # inside - ok
@staticmethod
def calc_distance_and_angle(from_node, to_node):
'''
计算2点间的距离和角度( 以from_node为原点 )
'''
dx = to_node.x - from_node.x
dy = to_node.y - from_node.y
d = math.hypot(dx, dy) # 欧式距离
theta = math.atan2(dy, dx)
return d, theta
# **************************Path Smoothing**************************************************** #
# **************************Path Smoothing**************************************************** #
# **************************Path Smoothing**************************************************** #
def line_collision_check(self, first, second):
'''
检查first-second的直线是否穿过障碍物
Args:
path_resolution: 采样点分辨率
'''
x1 = first[0]
y1 = first[1]
x2 = second[0]
y2 = second[1]
path_x = [x1]
path_y = [y1]
# 计算距离和夹角
dx = x2 - x1
dy = y2 - y1
d = math.hypot(dx, dy) # 欧式距离
theta = math.atan2(dy, dx) # 夹角
n_expand = math.floor(d / self.path_resolution) # first 和 second 之间的采样点数
for _ in range(n_expand):
x1 += self.path_resolution * math.cos(theta)
y1 += self.path_resolution * math.sin(theta)
path_x.append(x1)
path_y.append(y1)
if d <= self.path_resolution:
path_x.append(x2)
path_y.append(y2)
for (x, y) in zip(path_x, path_y):
map_x, map_y = self.real2map(x, y) # 向下取整
# TODO: 碰撞检测考虑robot_radius和scale_ratio
if self.map[map_x, map_y] or self.map[map_x + 1, map_y] or \
self.map[map_x, map_y + 1] or self.map[map_x + 1, map_y + 1]:
return False
return True
def path_smoothing(self, path, max_iter=1000):
'''
输入原路径输出平滑后的路径
Args:
path: [(x,y),...]
'''
le = get_path_length(path)
for i in range(max_iter):
# Sample two points
pickPoints = [random.uniform(0, le), random.uniform(0, le)] # 总路径长度中采样2个点
pickPoints.sort()
first = get_target_point(path, pickPoints[0])
second = get_target_point(path, pickPoints[1])
if first[2] <= 0 or second[2] <= 0:
continue
if (second[2] + 1) > len(path):
continue
if second[2] == first[2]:
continue
# collision check
if not self.line_collision_check(first, second):
continue
# Create New path
newPath = []
newPath.extend(path[:first[2] + 1])
newPath.append([first[0], first[1]])
newPath.append([second[0], second[1]])
newPath.extend(path[second[2] + 1:])
path = newPath
le = get_path_length(path)
return path
def main(gx=6.0, gy=10.0):
print("start " + __file__)
# ====Search Path with RRT====
obstacleList = [(5, 5, 1), (3, 6, 2), (3, 8, 2), (3, 10, 2), (7, 5, 2),
(9, 5, 2), (8, 10, 1)] # [x, y, radius]
# Set Initial parameters
rrt = RRT(
start=[0, 0],
goal=[gx, gy],
rand_area=[-2, 15],
obstacle_list=obstacleList,
# play_area=[0, 10, 0, 14]
robot_radius=0.8
)
path = rrt.planning(animation=show_animation)
if path is None:
print("Cannot find path")
else:
print("found path!!")
# Draw final path
if show_animation:
rrt.draw_graph() # 画出探索过程
plt.plot([x for (x, y) in path], [y for (x, y) in path], '-r') # 画出最终路径
plt.grid(True)
plt.pause(0.01)
plt.show()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,288 @@
"""
Path planning Sample Code with RRT*
author: Atsushi Sakai(@Atsushi_twi)
"""
import math
import sys
import matplotlib.pyplot as plt
import pathlib
from rrt import RRT
sys.path.append(str(pathlib.Path(__file__).parent.parent))
class RRTStar(RRT):
"""
Class for RRT Star planning
"""
class Node(RRT.Node):
def __init__(self, x, y):
super().__init__(x, y)
self.cost = 0.0 # 路径代价
def __init__(self,
rand_area,
map,
scale_ratio=5,
expand_dis=100.0,
path_resolution=10.0,
goal_sample_rate=10,
max_iter=300,
connect_circle_dist=50.0, # new
search_until_max_iter=False, # new
robot_radius=5.0):
super().__init__(rand_area, map, scale_ratio, expand_dis,
path_resolution, goal_sample_rate, max_iter,
robot_radius=robot_radius)
self.connect_circle_dist = connect_circle_dist
self.search_until_max_iter = search_until_max_iter
self.node_list = []
def planning(self, start, goal, path_smoothing=True, animation=False) -> [(float, float), ...]:
"""
rrt star path planning, return a path list
animation: flag for animation on or off .
"""
(self.start.x, self.start.y) = start
(self.goal.x, self.goal.y) = goal
# self.update_play_area()
self.node_list = [self.start]
while len(self.node_list) < self.max_iter:
#for i in range(self.max_iter):
# print("Iter:", i, ", number of nodes:", len(self.node_list))
print("number of nodes:", len(self.node_list))
rnd_node = self.get_random_node()
nearest_ind = self.get_nearest_node_index(self.node_list, rnd_node)
nearest_node = self.node_list[nearest_ind]
new_node = self.steer(nearest_node, rnd_node, self.expand_dis)
if animation:
self.draw_graph(new_node)
if self.check_collision(new_node):
new_node.cost = nearest_node.cost + \
math.hypot(new_node.x - nearest_node.x,
new_node.y - nearest_node.y)
near_inds = self.find_near_nodes(new_node)
# node_with_updated_parent: 已经找到父节点的new_node
node_with_updated_parent = self.choose_parent(new_node, near_inds)
if node_with_updated_parent:
self.rewire(node_with_updated_parent, near_inds)
self.node_list.append(node_with_updated_parent)
else:
self.node_list.append(new_node) # ??? 不可能发生
# 目标检测
if not self.search_until_max_iter:
last_index = self.search_best_goal_node() # 找到目标单步范围内的总距离最短的节点
if last_index is not None:
path = self.generate_final_course(last_index)
if path_smoothing:
return self.path_smoothing(path)
else:
return path
print("reached max iteration")
last_index = self.search_best_goal_node()
if last_index is not None:
path = self.generate_final_course(last_index)
if path_smoothing:
return self.path_smoothing(path)
else:
return path
return None
def choose_parent(self, new_node, near_inds):
"""
new_node 选择(从起点开始)总距离最小的父节点
Computes the cheapest point to new_node contained in the list
near_inds and set such a node as the parent of new_node.
Arguments:
--------
new_node, Node
randomly generated node with a path from its neared point
There are not coalitions between this node and th tree.
near_inds: list
Indices of indices of the nodes what are near to new_node
Returns.
------
Node, a copy of new_node
"""
if not near_inds:
return None
# search nearest cost in near_inds
costs = []
for i in near_inds:
near_node = self.node_list[i]
t_node = self.steer(near_node, new_node)
if t_node and self.check_collision(t_node):
costs.append(self.calc_new_cost(near_node, new_node))
else:
costs.append(float("inf")) # the cost of collision node
min_cost = min(costs)
if min_cost == float("inf"):
print("There is no good path.(min_cost is inf)")
return None
min_ind = near_inds[costs.index(min_cost)]
new_node = self.steer(self.node_list[min_ind], new_node) # 为new_node设置父节点
new_node.cost = min_cost
return new_node
def search_best_goal_node(self):
'''
从可直达目标的节点(单步范围内且中间无障碍物)选出从起点到目标距离最短的中间节点
'''
dist_to_goal_list = [
self.calc_dist_to_goal(n.x, n.y) for n in self.node_list
]
goal_inds = [ # 距离目标单步范围内的节点
dist_to_goal_list.index(i) for i in dist_to_goal_list
if i <= self.expand_dis
]
safe_goal_inds = [] # 目标单步范围内且中间没有障碍物的节点
for goal_ind in goal_inds:
t_node = self.steer(self.node_list[goal_ind], self.goal)
if self.check_collision(t_node):
safe_goal_inds.append(goal_ind)
if not safe_goal_inds:
return None
safe_goal_costs = [self.node_list[i].cost + # 从起点经过安全节点到目标的距离
self.calc_dist_to_goal(self.node_list[i].x, self.node_list[i].y)
for i in safe_goal_inds]
min_cost = min(safe_goal_costs)
for i, cost in zip(safe_goal_inds, safe_goal_costs):
if cost == min_cost:
return i
return None
def find_near_nodes(self, new_node):
"""
找到 new_node 周围一定范围内的树中的节点
1) defines a ball centered on new_node
2) Returns all nodes of the three that are inside this ball
Arguments:
---------
new_node: Node
new randomly generated node, without collisions between
its nearest node
Returns:
-------
list
List with the indices of the nodes inside the ball of
radius r
"""
nnode = len(self.node_list) + 1
r = self.connect_circle_dist * math.sqrt(math.log(nnode) / nnode)
# if expand_dist exists, search vertices in a range no more than expand_dist
if hasattr(self, 'expand_dis'):
r = min(r, self.expand_dis)
dist_list = [(node.x - new_node.x) ** 2 + (node.y - new_node.y) ** 2
for node in self.node_list]
near_inds = [dist_list.index(i) for i in dist_list if i <= r ** 2] #
return near_inds
def rewire(self, new_node, near_inds):
"""
新加入节点后为周围的其他节点重新计算最短路径并更新其父节点
For each node in near_inds, this will check if it is cheaper to
arrive to them from new_node.
In such a case, this will re-assign the parent of the nodes in
near_inds to new_node.
Parameters:
----------
new_node, Node
Node randomly added which can be joined to the tree
near_inds, list of uints
A list of indices of the self.new_node which contains
nodes within a circle of a given radius.
Remark: parent is designated in choose_parent.
"""
for i in near_inds:
near_node = self.node_list[i]
edge_node = self.steer(new_node, near_node)
if not edge_node:
continue
edge_node.cost = self.calc_new_cost(new_node, near_node)
no_collision = self.check_collision(edge_node)
improved_cost = near_node.cost > edge_node.cost
if no_collision and improved_cost:
for node in self.node_list:
if node.parent == near_node:
node.parent = edge_node
self.node_list[i] = edge_node
self.propagate_cost_to_leaves(self.node_list[i])
def calc_new_cost(self, from_node, to_node):
'''
从起始位置经过 from_node to_node cost
'''
d, _ = self.calc_distance_and_angle(from_node, to_node)
return from_node.cost + d
def propagate_cost_to_leaves(self, parent_node):
'''
(递归算法) 从父节点不断向子节点传播计算cost
'''
for node in self.node_list:
if node.parent == parent_node:
node.cost = self.calc_new_cost(parent_node, node)
self.propagate_cost_to_leaves(node)
def main():
print("Start " + __file__)
# ====Search Path with RRT====
obstacle_list = [
(5, 5, 1),
(3, 6, 2),
(3, 8, 2),
(3, 10, 2),
(7, 5, 2),
(9, 5, 2),
(8, 10, 1),
(6, 12, 1),
] # [x,y,size(radius)]
# Set Initial parameters
rrt_star = RRTStar(rand_area=[-2, 15], expand_dis=1, robot_radius=0.8)
path = rrt_star.planning(animation=show_animation)
if path is None:
print("Cannot find path")
else:
print("found path!!")
# Draw final path
if show_animation:
rrt_star.draw_graph()
plt.plot([x for (x, y) in path], [y for (x, y) in path], 'r--')
plt.grid(True)
plt.show()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,64 @@
import os
import pickle
import time
import random
import matplotlib.pyplot as plt
import numpy as np
from scene_utils import control # TODO: 文件名改成Scene.py
from navigate import Navigator
if __name__ == '__main__':
file_name = 'map_5.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
control.init_world(1, 3)
scene = control.Scene(sceneID=0)
scene.reset()
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map)
goal = (200, 1400)
navigator.navigate(goal, animation=False)
# scene.add_walker(1085, 2630, 220)
# scene.control_walker([scene.walker_control_generator(0, False, 100, 755, 1900, 180)])
# print(scene.status)
#
# control.init_world(1, 3)
#
# scene = control.Scene(sceneID=0)
#
# # 实现单顾客领位
# scene.reset()
# scene.add_walker(0, -200, 220)
#
# for walker in scene.status.walkers:
# print(walker.pose.X)
# print('*************************')
# time.sleep(3)
# for walker in scene.status.walkers:
# print(walker.pose)
# print('*************************')
#
# scene.control_walker([scene.walker_control_generator(0, False, 500, 70, 880, 120)])
# time.sleep(0.5)
# for walker in scene.status.walkers:
# print(walker.pose)
# print('*************************')
# time.sleep(5)
# for walker in scene.status.walkers:
# print(walker.pose.X)
# print('*************************')
#
# scene.remove_walker(0)
# scene.clean_walker()

View File

@ -0,0 +1,2 @@

View File

@ -58,3 +58,7 @@ class Bahavior(ptree.behaviour.Behaviour):
def terminate(self, new_status: Status) -> None:
return super().terminate(new_status)
@property
def arg_str(self):
return ",".join(self.args)

View File

@ -3,7 +3,22 @@ from typing import Any
from robowaiter.behavior_lib._base.Act import Act
from robowaiter.llm_client.ask_llm import ask_llm
from robowaiter.behavior_tree.utils import load_bt_from_ptml,print_tree_from_root
fixed_answers = {
"测试VLM做一杯咖啡":
'''
测试VLM做一杯咖啡
---
{"At(Coffee,Bar)"}
'''
,
"测试VLN前往桌子":
'''
测试VLN前往桌子
---
{"At(Robot,Table)"}
'''
,
}
class DealChat(Act):
def __init__(self):
super().__init__()
@ -13,16 +28,22 @@ class DealChat(Act):
chat = self.scene.state['chat_list'].pop()
# 判断是否是测试
if chat =="测试VLN":
self.scene.chat_bubble(f"开始测试VLN")
self.scene.state['sub_task_list'].append(("At(Robot, Table)",))
if chat in fixed_answers.keys():
sentence,goal = fixed_answers[chat].split("---")
sentence = sentence.strip()
goal = goal.strip()
print(f'机器人回答:{sentence}')
goal = eval(goal)
print(f'goal{goal}')
answer = ask_llm(chat)
print(f"机器人回答:{answer}")
self.scene.chat_bubble(f"机器人回答:{answer}")
self.create_sub_task(goal)
else:
answer = ask_llm(chat)
print(f"机器人回答:{answer}")
self.scene.chat_bubble(f"机器人回答:{answer}")
return ptree.common.Status.RUNNING
def create_sub_task(goal):
pass
def create_sub_task(self,goal):
self.scene.robot.expand_sub_task_tree(goal)

View File

@ -0,0 +1,16 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Act import Act
from robowaiter.behavior_lib._base.Behavior import Status
class DelSubTree(Act):
def __init__(self, *args):
super().__init__(*args)
def _update(self) -> ptree.common.Status:
sub_task_tree = self.parent
print(self.scene.sub_task_seq.children)
print(sub_task_tree)
self.scene.sub_task_seq.children.remove(sub_task_tree)
return Status.RUNNING

View File

@ -0,0 +1,23 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Act import Act
from robowaiter.behavior_lib._base.Behavior import Status
class MakeCoffee(Act):
def __init__(self, *args):
super().__init__(*args)
@property
def cond_sets(self):
pre = {"At(Robot,Bar)"}
add = {"At(Coffee,Bar)"}
de = {}
return pre,add,de
def _update(self) -> ptree.common.Status:
op_type = 1
self.scene.move_task_area(op_type)
self.scene.op_task_execute(op_type)
self.scene.state["condition_set"].add("At(Coffee,Bar)")
return Status.RUNNING

View File

@ -1,22 +1,15 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Act import Act
from robowaiter.algos.navigate.DstarLite.navigate import Navigator
class MoveTo(Act):
def __init__(self, *args):
super().__init__(*args)
def setup(self, **kwargs: Any) -> None:
return super().setup(**kwargs)
def initialise(self) -> None:
return super().initialise()
def _update(self) -> ptree.common.Status:
print('Start checking IsChatting...')
return ptree.common.Status.SUCCESS
def terminate(self, new_status: ptree.common.Status) -> None:
return super().terminate(new_status)
navigator = Navigator(scene=self.scene, area_range=[-350, 600, -400, 1450], map=self.scene.state["map"]["2d"])
goal = self.scene.state['map']['obj_pos'][self.args[0]]
navigator.navigate(goal, animation=False)
self.scene.state['condition_set'].add('At(Robot,Table)')
return ptree.common.Status.RUNNING

View File

@ -6,17 +6,24 @@ class At(Cond):
can_be_expanded = True
num_params = 2
valid_params = '''
Coffee, Table
Coffee, Bar
Robot, Bar
'''
def __init__(self,*args):
super().__init__(*args)
def _update(self) -> ptree.common.Status:
# if self.scene.status?
if self.scene.state['chat_list'] == []:
return ptree.common.Status.FAILURE
else:
arg_str = self.arg_str
if f'At({arg_str})' in self.scene.state["condition_set"]:
return ptree.common.Status.SUCCESS
else:
return ptree.common.Status.FAILURE
# if self.scene.state['chat_list'] == []:
# return ptree.common.Status.FAILURE
# else:
# return ptree.common.Status.SUCCESS

View File

@ -8,7 +8,7 @@ class HasSubTask(Cond):
def _update(self) -> ptree.common.Status:
# if self.scene.status?
if self.scene.state['chat_list'] == []:
if self.scene.sub_task_seq.children == []:
return ptree.common.Status.FAILURE
else:
return ptree.common.Status.SUCCESS

View File

@ -1,6 +1,6 @@
import copy
import random
from opt_bt_expansion.BehaviorTree import Leaf,ControlBT
from robowaiter.behavior_tree.obtea.BehaviorTree import Leaf,ControlBT
class CondActPair:
def __init__(self, cond_leaf,act_leaf):

View File

@ -1,3 +0,0 @@
from robowaiter.robot.robot import Robot
from robowaiter.scene import task_map

View File

@ -1,5 +1,5 @@
from zoo.opt_bt_expansion.OptimalBTExpansionAlgorithm import Action
from robowaiter.behavior_tree.obtea.OptimalBTExpansionAlgorithm import Action

View File

@ -1,7 +1,6 @@
from robowaiter.behavior_tree.obtea.BehaviorTree import Leaf,ControlBT # 行为结点类:叶子结点和非叶子节点
from zoo.opt_bt_expansion.OptimalBTExpansionAlgorithm import Action,OptBTExpAlgorithm,state_transition # 调用最优行为树扩展算法
from robowaiter.behavior_tree.obtea.tools import print_action_data_table,BTTest
from robowaiter.behavior_tree.obtea.examples import MoveBtoB_num,MoveBtoB,Cond2BelongsToCond3 # 导入三个例子
from robowaiter.behavior_tree.obtea.OptimalBTExpansionAlgorithm import Action,OptBTExpAlgorithm,state_transition # 调用最优行为树扩展算法
from robowaiter.behavior_tree.obtea.examples import *
@ -100,7 +99,7 @@ if __name__ == '__main__' :
ptml_string = algo.process(goal)
print(ptml_string)
file_name = "MakeCoffee"
file_name = "sub_task"
with open(f'./{file_name}.ptml', 'w') as file:
file.write(ptml_string)

View File

@ -3,7 +3,7 @@
from tabulate import tabulate
import numpy as np
import random
from zoo.opt_bt_expansion.OptimalBTExpansionAlgorithm import Action,OptBTExpAlgorithm
from robowaiter.behavior_tree.obtea.OptimalBTExpansionAlgorithm import Action,OptBTExpAlgorithm
import time

View File

@ -69,7 +69,13 @@ def format_trans_to_bracket(file_path: str) -> str:
if not os.path.exists(file_path):
raise FileNotFoundError("Given a fault ptml path: {}".format(file_path))
with open(file_path, 'r') as file:
f = file.read()
if "{" in f:
return file_path
def counter_(input:str) -> int:
length = 0
for i in range(len(input)):
@ -80,6 +86,7 @@ def format_trans_to_bracket(file_path: str) -> str:
raise TabError('Tab length in ptml file should be 4.')
return length
with open(file_path, 'r') as file:
ptml_new = ''
ptml_tab = file.readlines()

View File

@ -14,6 +14,19 @@ def load_bt_from_ptml(scene, ptml_path, behavior_lib_path):
# print(ptree.display.unicode_tree(root=bt.root, show_status=True))
return bt
def load_bt_from_ptml_str(scene, ptml_path, behavior_lib_path):
ptml_bt = ptmlCompiler.load(scene, ptml_path, behavior_lib_path)
bt = ptree.trees.BehaviourTree(ptml_bt)
with open(ptml_path, 'r') as f:
ptml = f.read()
print(f'BT loaded:')
print_tree_from_root(bt.root)
# print(ptree.display.unicode_tree(root=bt.root, show_status=True))
return bt
def print_tree_from_root(node, indent=0):
"""
Recursively prints the tree, each child with increased indentation.

View File

@ -1,10 +1,14 @@
import os
import py_trees as ptree
from robowaiter.behavior_tree.utils import load_bt_from_ptml,find_node_by_name,print_tree_from_root
import io
import contextlib
from robowaiter.behavior_tree.utils import load_bt_from_ptml,find_node_by_name,print_tree_from_root
from robowaiter.behavior_tree.obtea.OptimalBTExpansionAlgorithm import Action,OptBTExpAlgorithm,state_transition # 调用最优行为树扩展算法
from robowaiter.behavior_tree.obtea.opt_bt_exp_main import BTOptExpInterface
from robowaiter.behavior_lib.act.DelSubTree import DelSubTree
from robowaiter.behavior_lib._base.Sequence import Sequence
class Robot(object):
scene = None
response_frequency = 1
@ -16,6 +20,7 @@ class Robot(object):
self.next_response_time = self.response_frequency
self.step_num = 0
self.last_tick_output = ""
self.action_list = None
def set_scene(self,scene):
self.scene = scene
@ -27,7 +32,43 @@ class Robot(object):
sub_task_seq = sub_task_place_holder.parent
sub_task_seq.children.pop()
self.scene.sub_task_seq = sub_task_seq
print(self.scene.sub_task_seq)
def expand_sub_task_tree(self,goal):
if self.action_list is None:
self.action_list = self.collect_action_nodes()
print(f"首次运行行为树扩展算法,收集到{len(self.action_list)}个有效动作")
algo = BTOptExpInterface(self.action_list)
ptml_string = algo.process(goal)
file_name = "sub_task"
file_path = f'./{file_name}.ptml'
with open(file_path, 'w') as file:
file.write(ptml_string)
sub_task_bt = load_bt_from_ptml(self.scene, file_path,self.behavior_lib_path)
# 加入删除子树的节点
seq = Sequence(name="Sequence", memory=False)
seq.children.append(sub_task_bt.root)
del_sub_tree = DelSubTree()
del_sub_tree.set_scene(self.scene)
seq.children.append(del_sub_tree)
self.scene.sub_task_seq.children.append(seq)
print("当前行为树为:")
print_tree_from_root(self.bt.root)
def collect_action_nodes(self):
action_list = [
Action(name='MakeCoffee', pre={'At(Robot,CoffeeMachine)'},
add={'At(Coffee,Bar)'}, del_set=set(), cost=1),
Action(name='MoveTo(Table)', pre={'At(Robot,Bar)'},
add={'At(Robot,Table)'}, del_set=set(), cost=1),
]
return action_list
def step(self):
if self.scene.time > self.next_response_time:

View File

@ -43,6 +43,7 @@ class Scene:
"chat_list": [], # 未处理的顾客的对话, (顾客的位置,顾客对话的内容)
"sub_goal_list": [], # 子目标列表
"status": None, # 仿真器中的观测信息,见下方详细解释
"condition_set": set()
}
"""
status:
@ -62,13 +63,13 @@ class Scene:
self.use_offset = True
self.start_time = time.time()
self.time = 0
self.sub_task_seq = None
# init robot
if robot:
robot.set_scene(self)
robot.load_BT()
self.robot = robot
self.sub_task_seq = None
# myx op
self.op_dialog = ["","制作咖啡","倒水","夹点心","拖地","擦桌子","关闭窗帘","关筒灯","开大厅灯","搬椅子","打开窗帘","关大厅灯","开筒灯"]

View File

@ -18,7 +18,7 @@ class SceneOT(Scene):
super().__init__(robot)
# 在这里加入场景中发生的事件
self.event_list = [
(5,self.give_coffee) # (事件发生的时间,事件函数)
(5,self.create_chat_event("给我做一杯咖啡")) # (事件发生的时间,事件函数)
]
def _reset(self):
@ -31,7 +31,3 @@ class SceneOT(Scene):
pass
def give_coffee(self):
self.chat_bubble('顾客说:给我一杯咖啡')
self.state['chat_list'].append('给我一杯咖啡')

View File

@ -9,14 +9,18 @@ from robowaiter.scene.scene import Scene
class SceneVLM(Scene):
def __init__(self, robot):
super().__init__(robot)
# 在这里加入场景中发生的事件, (事件发生的时间,事件函数)
self.event_list = [
(5, self.create_chat_event("测试VLM做一杯咖啡")),
]
def _reset(self):
pass
def _run(self, op_type=1):
self.move_task_area(op_type)
self.op_task_execute(op_type)
# def _run(self, op_type=1):
# self.move_task_area(op_type)
# self.op_task_execute(op_type)
def _step(self):
pass
pass

View File

@ -4,6 +4,15 @@
开始条件监测到顾客靠近
结束条件完成领位语音请问您想喝点什么并等待下一步指令
"""
import os
import pickle
import time
import random
import matplotlib.pyplot as plt
import numpy as np
from robowaiter.scene.scene import Scene,init_world # TODO: 文件名改成Scene.py
from robowaiter.scene.scene import Scene
@ -11,32 +20,15 @@ from robowaiter.scene.scene import Scene
class SceneVLN(Scene):
def __init__(self, robot):
super().__init__(robot)
# 在这里加入场景中发生的事件, (事件发生的时间,事件函数)
self.event_list = [
(5, self.create_chat_event("测试VLN前往桌子")),
]
def _reset(self):
self.reset_sim()
file_name = './robowaiter/algos/navigate/DstarLite/map_5.pkl'
with open(file_name, 'rb') as file:
map = pickle.load(file)
self.add_walker(1085, 2630, 220)
self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)])
def _run(self):
# 实现单顾客领位
self.add_walker(1085, 2630, 220)
self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)])
# todo: 监测到顾客靠近,打招呼,对话,识别获取空闲餐桌位置
# 可以使用scene.chat_bubble(message)函数实现对话
"""
scene.walk_to(your_free_table_location)
time.sleep(5)
scene.control_walker([scene.walker_control_generator(your_free_table_location)])
"""
reach = True
if reach:
self.chat_bubble("请问您想喝点什么?")
print(self.status.walkers)
def _step(self):
pass
self.state['map']['2d'] = map
self.state['map']['obj_pos']['Table'] = np.array((-100, 700))

View File

@ -1,7 +1,7 @@
import os
from robowaiter import Robot, task_map
TASK_NAME = 'GQA'
TASK_NAME = 'VLN'
# create robot
project_path = "./robowaiter"

7
sub_task.ptml Normal file
View File

@ -0,0 +1,7 @@
selector{
cond At(Robot,Table)
selector{
cond At(Robot,Bar)
act MoveTo(Table)
}
}