Merge remote-tracking branch 'origin/main'

This commit is contained in:
wuziji 2023-11-14 09:24:21 +08:00
commit f7a2b9b5d7
33 changed files with 251 additions and 1303 deletions

View File

@ -1,2 +1 @@
from robowaiter.algos.navigate.DstarLite.navigate import Navigator as DstarLite

View File

@ -7,14 +7,15 @@ import grpc
from explore import Explore
sys.path.append('/')
sys.path.append('../navigate/')
sys.path.append('./')
sys.path.append('../')
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
from robowaiter.proto import GrabSim_pb2_grpc, GrabSim_pb2
import GrabSim_pb2_grpc
import GrabSim_pb2
channel = grpc.insecure_channel('localhost:30001', options=[
('grpc.max_send_message_length', 1024 * 1024 * 1024),

View File

@ -1,5 +0,0 @@
from . import apf
from . import rrt
from . import rrt_star
from . import pathsmoothing
from . import navigate

View File

@ -1,91 +0,0 @@
import numpy as np
import copy
import matplotlib
matplotlib.use('TkAgg') # 防止画图卡死
## 初始化车的参数
# P0 = np.array([0, - d / 2, 1, 1]) #车辆起点位置分别代表x,y,vx,vy
#
# Pg = np.array([99, d / 2, 0, 0]) # 目标位置
#
# # 障碍物位置
# Pobs = np.array([
# [15, 7 / 4, 0, 0],
# [30, - 3 / 2, 0, 0],
# [45, 3 / 2, 0, 0],
# [60, - 3 / 4, 0, 0],
# [80, 3/2, 0, 0]])
def APF(Pi, Pg, Pobs=None, step_length=100):
'''
Args:
Pi: robot位置 (x,y)
Pg: 目标位置 (x,y)
Pobs: 障碍物位置 [(x,y), ...]
step_length: 单次移动步长 int
returns:
next_step: robot下一步位置
UnitVec_Fsum: 合力方向向量
'''
# 目标检测
if np.linalg.norm(Pi-Pg) < step_length: # 目标检测
return Pg, (Pg-Pi) / np.linalg.norm(Pg-Pi)
Eta_att = 5 # 引力的增益系数
Eta_rep_ob = 15 # 斥力的增益系数
d0 = 200 # 障碍影响的最大距离
n = 1 # {P_g}^n 为到目标点的距离n为可选的正常数 (解决目标不可达问题)
## 计算引力
delta_g = Pg - Pi # 目标方向向量(指向目标)
dist_g = np.linalg.norm(delta_g) # 目标距离
UniteVec_g = delta_g / dist_g # 目标单位向量
F_att = Eta_att * dist_g * UniteVec_g # F_att = Eta_att * dist(pi,pg) * unite_vec(pi,pg)
F_sum = F_att
## 计算斥力
if Pobs:
delta = np.zeros((len(Pobs), 2))
unite_vec = np.zeros((len(Pobs), 2))
dists = []
# 计算车辆当前位置与所有障碍物的单位方向向量
for j in range(len(Pobs)):
delta[j] = Pi - Pobs[j] # 斥力向量
dists.append(np.linalg.norm(delta[j])) # 障碍物距离
unite_vec[j] = delta[j] / dists[j] # 斥力单位向量
# 每一个障碍物对车辆的斥力,带方向
F_rep_ob = np.zeros((len(Pobs), 2))
# 在原斥力势场函数增加目标调节因子即车辆至目标距离以使车辆到达目标点后斥力也为0
for j in range(len(Pobs)):
if dists[j] >= d0:
F_rep_ob[j] = np.array([0, 0]) # 距离超过阈值则斥力为0
else:
# 障碍物的斥力1方向由障碍物指向车辆
F_rep_ob1_abs = Eta_rep_ob * (1 / dists[j] - 1 / d0) * dist_g ** n / dists[j] ** 2 # 斥力大小
F_rep_ob1 = F_rep_ob1_abs * unite_vec[j] # 斥力向量
# 障碍物的斥力2方向由车辆指向目标点
F_rep_ob2_abs = n / 2 * Eta_rep_ob * (1 / dists[j] - 1 / d0) ** 2 * dist_g ** (n - 1) # 斥力大小
F_rep_ob2 = F_rep_ob2_abs * UniteVec_g # 斥力向量
# 改进后的障碍物合斥力计算
F_rep_ob[j] = F_rep_ob1 + F_rep_ob2
F_rep = np.sum(F_rep_ob, axis=0) # 斥力合力
F_sum += F_rep # 总合力
## 合力方向
UnitVec_Fsum = F_sum / np.linalg.norm(F_sum) # 合力方向向量: F / |F|
# 计算车的下一步位置
next_step = copy.deepcopy(Pi + step_length * UnitVec_Fsum) # 沿合力方向前进单位距离
# print(next_step)
return next_step, UnitVec_Fsum

View File

@ -1,61 +0,0 @@
# !/usr/bin/env python3
# -*- encoding: utf-8 -*-
import time
import matplotlib.pyplot as plt
import numpy as np
import pickle
import os
from scene_utils import control
def draw_grid_map(grid_map):
# 生成新的地图图像
plt.imshow(grid_map, cmap='binary', alpha=0.5, origin='lower') # 黑白网格
# 绘制坐标轴
plt.xlabel('y', loc='right')
plt.ylabel('x', loc='top')
# 显示网格线
plt.grid(color='black', linestyle='-', linewidth=0.5)
# 显示图像
plt.show()
#plt.pause(0.01)
if __name__ == '__main__':
# control.init_world(scene_num=1, mapID=3)
# scene = control.Scene(sceneID=0)
#
# X = int(950/5) # 采点数量
# Y = int(1850/5)
# map = np.zeros((X, Y))
#
# for x in range(X):
# for y in range(Y):
# if not scene.reachable_check(x*5-350, y*5-400, Yaw=0):
# map[x, y] = 1
# print(x, y)
#
#
# file_name = 'map_5.pkl'
# if not os.path.exists(file_name):
# open(file_name, 'w').close()
# with open(file_name, 'wb') as file:
# pickle.dump(map, file)
# print('保存成功')
file_name = 'map_5.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
draw_grid_map(map)

View File

@ -1,164 +0,0 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import sys
import time
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
from scene_utils import control
from rrt import RRT
from rrt_star import RRTStar
from apf import APF
class Navigator:
'''
导航类
'''
def __init__(self, scene, area_range, map, scale_ratio=5):
self.scene = scene
self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax
self.map = map # 缩放并离散化的地图 array(X,Y)
self.scale_ratio = scale_ratio # 地图缩放率
self.step_length = 50 # 步长(单次移动)
self.v = 100 # 速度
self.step_time = self.step_length/self.v # 单步移动时长
self.planner = RRTStar(rand_area=area_range, map=map, scale_ratio=scale_ratio, max_iter=400, search_until_max_iter=True)
@staticmethod
def is_reached(pos: np.array((float, float)), goal: np.array((float, float)), dis_limit=25):
'''
判断是否到达目标
'''
dis = np.linalg.norm(pos - goal)
return dis < dis_limit
def reset_goal(self, goal:(float, float)):
# TODO: 使目标可达
# 目标在障碍物上:从目标开始方形向外扩展,直到找到可行点
# 目标在地图外面:起点和目标连线最靠近目标的可行点
pass
def navigate(self, goal: (float, float), path_smoothing=True, animation=True):
pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) # 机器人当前: 位置 和 朝向
yaw = self.scene.status.rotation.Yaw
print('------------------navigation_start----------------------')
path = self.planner.planning(pos, goal, path_smoothing, animation)
if path:
self.planner.draw_graph(final_path=path) # 画出探索过程
for (x, y) in path:
self.scene.walk_to(x, y, yaw, velocity=self.v)
time.sleep(self.step_time)
pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
self.planner.reset()
'''APF势场法暂不可用'''
# while not self.is_reached(pos, goal):
# # 1. 路径规划
# path = self.planner.planning(pos, goal, path_smoothing, animation)
# self.planner.draw_graph(final_path=path) # 画出探索过程
#
# # 2. 使用APF导航到路径中的每个waypoint
# traj = [(pos[0], pos[1])]
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# for i, waypoint in enumerate(path[1:]):
# print('waypoint [', i, ']:', waypoint)
# # if (not self.scene.reachable_check(waypoint[0], waypoint[1], yaw)) and self.map[self.planner.real2map(waypoint[0], waypoint[1])] == 0:
# # print('error')
# while not self.is_reached(pos, waypoint):
# # 2.1 计算next_step
# pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
# Pobs = [] # 障碍物(顾客)位置数组
# for walker in self.scene.status.walkers:
# Pobs.append((walker.pose.X, walker.pose.Y))
# next_step, _ = APF(Pi=pos, Pg=waypoint, Pobs=Pobs, step_length=self.step_length)
# traj.append((next_step[0], next_step[1]))
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# while not self.scene.reachable_check(next_step[0], next_step[1], yaw): # 取中点直到next_step可达
# traj.pop()
# next_step = (pos + next_step) / 2
# traj.append((next_step[0], next_step[1]))
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# # 2.2 移动robot
# self.scene.walk_to(next_step[0], next_step[1], yaw, velocity=self.v)
# # print(self.scene.status.info) # print navigation info
# # print(self.scene.status.collision)
# time.sleep(self.step_time)
# # print(self.scene.status.info) # print navigation info
# # print(self.scene.status.collision)
# self.planner.reset()
if self.is_reached(pos, goal):
print('The robot has achieved goal !!')
# class Walker:
# def __int__(self, scene):
# self.scene = scene
#
# def add_walkers(self, walker_loc, scene_id=0):
# """
# 批量添加行人
# Args:
# walker_loc: 行人的初始位置列表( 列表元素数量对应行人数量 )
# """
# print('------------------add walker----------------------')
# walker_list = []
# for i in range(len(walker_loc)):
# # 只有可达的位置才能成功初始化行人显示unreachable的地方不能初始化行人
# walker_list.append(
# GrabSim_pb2.WalkerList.Walker(id=i, pose=GrabSim_pb2.Pose(X=walker_loc[0], Y=walker_loc[1], Yaw=90)))
# scene = self.client.AddWalker(GrabSim_pb2.WalkerList(walkers=walker_list, scene=scene_id)) # 生成环境中行人
# # print(self.client.Observe(GrabSim_pb2.SceneID(value=scene_id)).walkers) # 打印行人信息
# return scene
#
# def control_walkers(self, walker_loc, scene_id=0):
# """
# 批量移动行人
# Args:
# walker_loc: 行人的终止位置列表
# """
# scene = self.client.Observe(GrabSim_pb2.SceneID(value=scene_id))
# controls = []
# for i in range(len(scene.walkers)):
# loc = walker_loc[i]
# is_autowalk = False # True: 随机移动; False: 移动到目标点
# pose = GrabSim_pb2.Pose(X=loc[0], Y=loc[1], Yaw=180)
# controls.append(GrabSim_pb2.WalkerControls.WControl(id=i, autowalk=is_autowalk, speed=200, pose=pose))
# scene = self.client.ControlWalkers(
# GrabSim_pb2.WalkerControls(controls=controls, scene=scene_id)) # 设置行人移动速度和目标点
# # print(self.client.Observe(GrabSim_pb2.SceneID(value=scene_id)).walkers) # 打印行人信息
# time.sleep(10)
# return scene
#
# def remove_walkers(self, ids, scene_id=0):
# '''
# 按编号移除行人
# Args:
# ids: 待移除的行人编号列表
# '''
# scene = self.client.RemoveWalkers(GrabSim_pb2.RemoveList(IDs=ids, scene=scene_id)) # 按编号移除行人
# # print(self.client.Observe(GrabSim_pb2.SceneID(value=scene_id)).walkers) # 打印行人信息
# time.sleep(2)
# return scene
#
# def clean_walkers(self, scene_id=0):
# '''
# 删除环境中所有行人
# '''
# scene = self.client.CleanWalkers(GrabSim_pb2.SceneID(value=scene_id))
# return scene

View File

@ -1,132 +0,0 @@
"""
Path planning Sample Code with RRT with path smoothing
@author: AtsushiSakai(@Atsushi_twi)
"""
import math
import random
import matplotlib.pyplot as plt
import sys
import pathlib
sys.path.append(str(pathlib.Path(__file__).parent))
def get_path_length(path):
'''
计算路径长度
'''
le = 0
for i in range(len(path) - 1):
dx = path[i + 1][0] - path[i][0]
dy = path[i + 1][1] - path[i][1]
d = math.hypot(dx, dy)
le += d
return le
def get_target_point(path, targetL):
le = 0
ti = 0
lastPairLen = 0
for i in range(len(path) - 1):
dx = path[i + 1][0] - path[i][0]
dy = path[i + 1][1] - path[i][1]
d = math.hypot(dx, dy)
le += d
if le >= targetL:
ti = i - 1
lastPairLen = d
break
partRatio = (le - targetL) / lastPairLen
x = path[ti][0] + (path[ti + 1][0] - path[ti][0]) * partRatio
y = path[ti][1] + (path[ti + 1][1] - path[ti][1]) * partRatio
return [x, y, ti]
# def line_collision_check(first, second, map, path_resolution=25.0, robot_radius=5, scale_ratio=5):
# '''
# 检查first-second的直线是否穿过障碍物
# Args:
# path_resolution: 采样点分辨率
# '''
# x1 = first[0]
# y1 = first[1]
# x2 = second[0]
# y2 = second[1]
# path_x = [x1]
# path_y = [y1]
# # 计算距离和夹角
# dx = x2 - x1
# dy = y2 - y1
# d = math.hypot(dx, dy) # 欧式距离
# theta = math.atan2(dy, dx) # 夹角
#
# n_expand = math.floor(d / path_resolution) # first 和 second 之间的采样点数
# for _ in range(n_expand):
# x1 += path_resolution * math.cos(theta)
# y1 += path_resolution * math.sin(theta)
# path_x.append(x1)
# path_y.append(y1)
# if d <= path_resolution:
# path_x.append(x2)
# path_y.append(y2)
#
# for (x, y) in zip(path_x, path_y):
# (scale_x, scale_y) = (math.floor(x / scale_ratio), math.floor(y / scale_ratio)) # 向下取整
# if robot_radius > scale_ratio: # TODO: 根据robot_radius和scale_ratio选择不同的判断方式
# if map[scale_x, scale_y] or map[scale_x + 1, scale_y] or \
# map[scale_x, scale_y + 1] or map[scale_x + 1, scale_y + 1]: # 如果node周围的四个缩放坐标有障碍物
# return False
# return True
#
#
# def path_smoothing(path, map, max_iter=1000):
# '''
# 输入原路径,输出平滑后的路径
# Args:
# path: [(x,y),...]
# '''
# le = get_path_length(path)
#
# for i in range(max_iter):
# # Sample two points
# pickPoints = [random.uniform(0, le), random.uniform(0, le)] # 总路径长度中采样2个点
# pickPoints.sort()
# first = get_target_point(path, pickPoints[0])
# second = get_target_point(path, pickPoints[1])
#
# if first[2] <= 0 or second[2] <= 0:
# continue
#
# if (second[2] + 1) > len(path):
# continue
#
# if second[2] == first[2]:
# continue
#
# # collision check
# if not line_collision_check(first, second, map):
# continue
#
# # Create New path
# newPath = []
# newPath.extend(path[:first[2] + 1])
# newPath.append([first[0], first[1]])
# newPath.append([second[0], second[1]])
# newPath.extend(path[second[2] + 1:])
# path = newPath
# le = get_path_length(path)
#
# return path

View File

@ -1,17 +0,0 @@
apf.py: 势场法实现
discretize_map.py: 地图离散化并压缩
map_5.pkl: 地图文件(5倍压缩)
*navigate.py: 导航类*
pathsmoothing.py: 路径平滑
rrt.py: RRT实现
rrt_star.py: RRTStar 实现
test.py: 测试文件
默认使用RRTStar+路径平滑

View File

@ -1,440 +0,0 @@
"""
Path planning Sample Code with Randomized Rapidly-Exploring Random Trees (RRT)
author: AtsushiSakai(@Atsushi_twi)
"""
import math
import random
import matplotlib.pyplot as plt
import numpy as np
from pathsmoothing import get_path_length, get_target_point
show_animation = True
class RRT:
"""
Class for RRT planning
"""
class Node:
"""
RRT Node
"""
def __init__(self, x=0.0, y=0.0):
self.x = x
self.y = y
self.path_x = [] # Node 到 parent 间的路径链
self.path_y = []
self.parent = None
class AreaBounds:
def __init__(self, area):
self.xmin = float(area[0])
self.xmax = float(area[1])
self.ymin = float(area[2])
self.ymax = float(area[3])
def __init__(self,
rand_area, # 横纵坐标的采样范围 [minx,maxx,miny,maxy]
map, # 离散化地图
scale_ratio=5, # 地图缩放率
expand_dis=200, # 最大扩展距离
path_resolution=10.0, # 路径链分辨率
goal_sample_rate=5, # 5%的几率采样到目标点
max_iter=500,
robot_radius=12.0, # robot身位半径
):
self.map = map
self.scale_ratio = scale_ratio
self.start = self.Node(0, 0)
self.goal = self.Node(0, 0)
(self.min_x, self.max_x, self.min_y, self.max_y) = rand_area
# self.play_area = self.AreaBounds(rand_area) # 采样区域(边界框启发式)
self.play_area = None
self.expand_dis = expand_dis
self.path_resolution = path_resolution
self.goal_sample_rate = goal_sample_rate
self.max_iter = max_iter
self.node_list = []
self.robot_radius = robot_radius
def reset(self):
'''
重置rrt的变量和数据结构
'''
self.node_list.clear()
self.start = self.Node(0, 0)
self.goal = self.Node(0, 0)
def update_play_area(self):
'''
更新采样区域
'''
if self.goal.x > self.start.x:
self.play_area.xmin = self.start.x
self.play_area.xmax = self.max_x
else:
self.play_area.xmin = self.min_x
self.play_area.xmax = self.start.x
if self.goal.y > self.start.y:
self.play_area.ymin = self.start.y
self.play_area.ymax = self.max_y
else:
self.play_area.ymin = self.min_y
self.play_area.ymax = self.start.y
def planning(self, start, goal, path_smoothing=True, animation=False):
"""
rrt path planning, return a path list
animation: flag for animation on or off
"""
(self.start.x, self.start.y) = start
(self.goal.x, self.goal.y) = goal
# self.update_play_area()
self.node_list = [self.start] # 已经采样的节点列表
for i in range(self.max_iter):
rnd_node = self.get_random_node() # 随机采样节点
nearest_ind = self.get_nearest_node_index(self.node_list, rnd_node)
nearest_node = self.node_list[nearest_ind] # node_list中距离新采样点最近的节点
new_node = self.steer(nearest_node, rnd_node, self.expand_dis)
# 确保新采样节点在范围内且不是障碍物
if self.check_if_outside_play_area(new_node, self.play_area) and \
self.check_collision(new_node):
self.node_list.append(new_node)
if animation and i % 5 == 0:
self.draw_graph(rnd_node)
if self.calc_dist_to_goal(self.node_list[-1].x,
self.node_list[-1].y) <= self.expand_dis:
final_node = self.steer(self.node_list[-1], self.goal,
self.expand_dis)
if self.check_collision(final_node):
path = self.generate_final_course(len(self.node_list) - 1)
if path_smoothing:
return self.path_smoothing(path)
else:
return path
if animation and i % 5:
self.draw_graph(rnd_node)
return None # cannot find path
def steer(self, from_node, to_node, extend_length=float("inf")):
'''
返回from_node to_node 距离限制内最远的节点并设置其父节点为from_node
并计算路径链
Args:
entend_length: 距离限制
return:
new_node: from_node to_node 距离限制内最远的节点
( 如果 to_node 在距离限制内则返回 to_node )
'''
new_node = self.Node(from_node.x, from_node.y)
d, theta = self.calc_distance_and_angle(new_node, to_node) # from 和 to 之间的距离和方向角
new_node.path_x = [new_node.x]
new_node.path_y = [new_node.y]
if extend_length > d:
extend_length = d
n_expand = math.floor(extend_length / self.path_resolution) # from 和 to 之间的采样点数
for _ in range(n_expand):
new_node.x += self.path_resolution * math.cos(theta)
new_node.y += self.path_resolution * math.sin(theta)
new_node.path_x.append(new_node.x)
new_node.path_y.append(new_node.y)
d, _ = self.calc_distance_and_angle(new_node, to_node)
if d <= self.path_resolution:
new_node.path_x.append(to_node.x)
new_node.path_y.append(to_node.y)
new_node.x = to_node.x
new_node.y = to_node.y
new_node.parent = from_node
return new_node
def generate_final_course(self, goal_ind) -> [(float, float), ]:
'''
得到路径(起点->终点)
'''
path = [(self.goal.x, self.goal.y)]
node = self.node_list[goal_ind]
while node.parent is not None:
path.append((node.x, node.y))
node = node.parent
path.append((node.x, node.y))
path.reverse()
return path
def calc_dist_to_goal(self, x, y):
'''
计算节点到终点距离
'''
dx = x - self.goal.x
dy = y - self.goal.y
return math.hypot(dx, dy)
def get_random_node(self):
'''
在规定范围内随机采样节点(一定概率采样到目标节点)
'''
if random.randint(0, 100) > self.goal_sample_rate:
rnd = self.Node(
random.uniform(self.min_x, self.max_x),
random.uniform(self.min_y, self.max_y))
else: # goal point sampling
rnd = self.Node(self.goal.x, self.goal.y) # 返回goal
return rnd
def map2real(self, x, y):
'''
地图坐标->实际坐标
'''
x = x * self.scale_ratio + self.min_x
y = y * self.scale_ratio + self.min_y
return x, y
def real2map(self, x, y):
'''
实际坐标->地图坐标 (向下取整)
'''
# x = round((x - self.min_x) / self.scale_ratio)
# y = round((y - self.min_y) / self.scale_ratio)
x = math.floor((x - self.min_x) / self.scale_ratio)
y = math.floor((y - self.min_y) / self.scale_ratio)
return x, y
def check_collision(self, node):
'''
判断 node 父节点之间 是否有障碍物
'''
if node is None:
return False
for (x, y) in zip(node.path_x, node.path_y):
map_x, map_y = self.real2map(x, y)
# TODO: 碰撞检测考虑robot_radius和scale_ratio
if self.map[map_x, map_y] or self.map[map_x + 1, map_y] or \
self.map[map_x, map_y + 1] or self.map[map_x + 1, map_y + 1]:
return False
return True # safe
def draw_graph(self, new=None, final_path=None):
# 清空当前figure内容保留figure对象
plt.clf()
# for stopping simulation with the esc key. 按esc结束
plt.gcf().canvas.mpl_connect(
'key_release_event',
lambda event: [exit(0) if event.key == 'escape' else None])
# 画地图: X行Y列第一行在下面
# 范围: 横向Y[-70,120] 纵向X[-80,290]
plt.imshow(self.map, cmap='binary', alpha=0.5, origin='lower',
extent=(self.min_y / self.scale_ratio, self.max_y / self.scale_ratio,
self.min_x / self.scale_ratio, self.max_x / self.scale_ratio))
# 画新采样点
if new is not None:
(scale_x, scale_y) = (new.x / self.scale_ratio, new.y / self.scale_ratio)
plt.plot(scale_y, scale_x, "^k")
if self.robot_radius > 0.0:
self.plot_circle(scale_y, scale_x, self.robot_radius / self.scale_ratio, '-r')
# 画新边
if new.parent:
plt.plot([y / self.scale_ratio for y in new.path_y],
[x / self.scale_ratio for x in new.path_x], "-g")
# 画整个搜索树
for node in self.node_list:
if node.parent:
plt.plot([y / self.scale_ratio for y in node.path_y],
[x / self.scale_ratio for x in node.path_x], "-g")
# 画起点和目标
plt.plot(self.start.y / self.scale_ratio, self.start.x / self.scale_ratio, "xr")
plt.plot(self.goal.y / self.scale_ratio, self.goal.x / self.scale_ratio, "xr")
plt.xlabel('y', loc='right')
plt.ylabel('x', loc='top')
plt.grid(color='black', linestyle='-', linewidth=0.5)
plt.pause(0.2)
# 画最终路径
if final_path:
plt.plot([y / self.scale_ratio for (x, y) in final_path],
[x / self.scale_ratio for (x, y) in final_path], '-r') # 画出最终路径
plt.pause(2)
@staticmethod
def plot_circle(x, y, size, color="-b"): # pragma: no cover
'''
(x,y)为圆心size为半径 画圆
'''
deg = list(range(0, 360, 5))
deg.append(0)
xl = [x + size * math.cos(np.deg2rad(d)) for d in deg]
yl = [y + size * math.sin(np.deg2rad(d)) for d in deg]
plt.plot(xl, yl, color)
@staticmethod
def get_nearest_node_index(node_list, rnd_node):
'''
得到距离rnd_node最近的节点编号
'''
dlist = [(node.x - rnd_node.x) ** 2 + (node.y - rnd_node.y) ** 2
for node in node_list]
minind = dlist.index(min(dlist))
return minind
@staticmethod
def check_if_outside_play_area(node, play_area):
if play_area is None:
return True # no play_area was defined, every pos should be ok
if node.x < play_area.xmin or node.x > play_area.xmax or \
node.y < play_area.ymin or node.y > play_area.ymax:
return False # outside - bad
else:
return True # inside - ok
@staticmethod
def calc_distance_and_angle(from_node, to_node):
'''
计算2点间的距离和角度( 以from_node为原点 )
'''
dx = to_node.x - from_node.x
dy = to_node.y - from_node.y
d = math.hypot(dx, dy) # 欧式距离
theta = math.atan2(dy, dx)
return d, theta
# **************************Path Smoothing**************************************************** #
# **************************Path Smoothing**************************************************** #
# **************************Path Smoothing**************************************************** #
def line_collision_check(self, first, second):
'''
检查first-second的直线是否穿过障碍物
Args:
path_resolution: 采样点分辨率
'''
x1 = first[0]
y1 = first[1]
x2 = second[0]
y2 = second[1]
path_x = [x1]
path_y = [y1]
# 计算距离和夹角
dx = x2 - x1
dy = y2 - y1
d = math.hypot(dx, dy) # 欧式距离
theta = math.atan2(dy, dx) # 夹角
n_expand = math.floor(d / self.path_resolution) # first 和 second 之间的采样点数
for _ in range(n_expand):
x1 += self.path_resolution * math.cos(theta)
y1 += self.path_resolution * math.sin(theta)
path_x.append(x1)
path_y.append(y1)
if d <= self.path_resolution:
path_x.append(x2)
path_y.append(y2)
for (x, y) in zip(path_x, path_y):
map_x, map_y = self.real2map(x, y) # 向下取整
# TODO: 碰撞检测考虑robot_radius和scale_ratio
if self.map[map_x, map_y] or self.map[map_x + 1, map_y] or \
self.map[map_x, map_y + 1] or self.map[map_x + 1, map_y + 1]:
return False
return True
def path_smoothing(self, path, max_iter=1000):
'''
输入原路径输出平滑后的路径
Args:
path: [(x,y),...]
'''
le = get_path_length(path)
for i in range(max_iter):
# Sample two points
pickPoints = [random.uniform(0, le), random.uniform(0, le)] # 总路径长度中采样2个点
pickPoints.sort()
first = get_target_point(path, pickPoints[0])
second = get_target_point(path, pickPoints[1])
if first[2] <= 0 or second[2] <= 0:
continue
if (second[2] + 1) > len(path):
continue
if second[2] == first[2]:
continue
# collision check
if not self.line_collision_check(first, second):
continue
# Create New path
newPath = []
newPath.extend(path[:first[2] + 1])
newPath.append([first[0], first[1]])
newPath.append([second[0], second[1]])
newPath.extend(path[second[2] + 1:])
path = newPath
le = get_path_length(path)
return path
def main(gx=6.0, gy=10.0):
print("start " + __file__)
# ====Search Path with RRT====
obstacleList = [(5, 5, 1), (3, 6, 2), (3, 8, 2), (3, 10, 2), (7, 5, 2),
(9, 5, 2), (8, 10, 1)] # [x, y, radius]
# Set Initial parameters
rrt = RRT(
start=[0, 0],
goal=[gx, gy],
rand_area=[-2, 15],
obstacle_list=obstacleList,
# play_area=[0, 10, 0, 14]
robot_radius=0.8
)
path = rrt.planning(animation=show_animation)
if path is None:
print("Cannot find path")
else:
print("found path!!")
# Draw final path
if show_animation:
rrt.draw_graph() # 画出探索过程
plt.plot([x for (x, y) in path], [y for (x, y) in path], '-r') # 画出最终路径
plt.grid(True)
plt.pause(0.01)
plt.show()
if __name__ == '__main__':
main()

View File

@ -1,288 +0,0 @@
"""
Path planning Sample Code with RRT*
author: Atsushi Sakai(@Atsushi_twi)
"""
import math
import sys
import matplotlib.pyplot as plt
import pathlib
from rrt import RRT
sys.path.append(str(pathlib.Path(__file__).parent.parent))
class RRTStar(RRT):
"""
Class for RRT Star planning
"""
class Node(RRT.Node):
def __init__(self, x, y):
super().__init__(x, y)
self.cost = 0.0 # 路径代价
def __init__(self,
rand_area,
map,
scale_ratio=5,
expand_dis=100.0,
path_resolution=10.0,
goal_sample_rate=10,
max_iter=300,
connect_circle_dist=50.0, # new
search_until_max_iter=False, # new
robot_radius=5.0):
super().__init__(rand_area, map, scale_ratio, expand_dis,
path_resolution, goal_sample_rate, max_iter,
robot_radius=robot_radius)
self.connect_circle_dist = connect_circle_dist
self.search_until_max_iter = search_until_max_iter
self.node_list = []
def planning(self, start, goal, path_smoothing=True, animation=False) -> [(float, float), ...]:
"""
rrt star path planning, return a path list
animation: flag for animation on or off .
"""
(self.start.x, self.start.y) = start
(self.goal.x, self.goal.y) = goal
# self.update_play_area()
self.node_list = [self.start]
while len(self.node_list) < self.max_iter:
#for i in range(self.max_iter):
# print("Iter:", i, ", number of nodes:", len(self.node_list))
print("number of nodes:", len(self.node_list))
rnd_node = self.get_random_node()
nearest_ind = self.get_nearest_node_index(self.node_list, rnd_node)
nearest_node = self.node_list[nearest_ind]
new_node = self.steer(nearest_node, rnd_node, self.expand_dis)
if animation:
self.draw_graph(new_node)
if self.check_collision(new_node):
new_node.cost = nearest_node.cost + \
math.hypot(new_node.x - nearest_node.x,
new_node.y - nearest_node.y)
near_inds = self.find_near_nodes(new_node)
# node_with_updated_parent: 已经找到父节点的new_node
node_with_updated_parent = self.choose_parent(new_node, near_inds)
if node_with_updated_parent:
self.rewire(node_with_updated_parent, near_inds)
self.node_list.append(node_with_updated_parent)
else:
self.node_list.append(new_node) # ??? 不可能发生
# 目标检测
if not self.search_until_max_iter:
last_index = self.search_best_goal_node() # 找到目标单步范围内的总距离最短的节点
if last_index is not None:
path = self.generate_final_course(last_index)
if path_smoothing:
return self.path_smoothing(path)
else:
return path
print("reached max iteration")
last_index = self.search_best_goal_node()
if last_index is not None:
path = self.generate_final_course(last_index)
if path_smoothing:
return self.path_smoothing(path)
else:
return path
return None
def choose_parent(self, new_node, near_inds):
"""
new_node 选择(从起点开始)总距离最小的父节点
Computes the cheapest point to new_node contained in the list
near_inds and set such a node as the parent of new_node.
Arguments:
--------
new_node, Node
randomly generated node with a path from its neared point
There are not coalitions between this node and th tree.
near_inds: list
Indices of indices of the nodes what are near to new_node
Returns.
------
Node, a copy of new_node
"""
if not near_inds:
return None
# search nearest cost in near_inds
costs = []
for i in near_inds:
near_node = self.node_list[i]
t_node = self.steer(near_node, new_node)
if t_node and self.check_collision(t_node):
costs.append(self.calc_new_cost(near_node, new_node))
else:
costs.append(float("inf")) # the cost of collision node
min_cost = min(costs)
if min_cost == float("inf"):
print("There is no good path.(min_cost is inf)")
return None
min_ind = near_inds[costs.index(min_cost)]
new_node = self.steer(self.node_list[min_ind], new_node) # 为new_node设置父节点
new_node.cost = min_cost
return new_node
def search_best_goal_node(self):
'''
从可直达目标的节点(单步范围内且中间无障碍物)选出从起点到目标距离最短的中间节点
'''
dist_to_goal_list = [
self.calc_dist_to_goal(n.x, n.y) for n in self.node_list
]
goal_inds = [ # 距离目标单步范围内的节点
dist_to_goal_list.index(i) for i in dist_to_goal_list
if i <= self.expand_dis
]
safe_goal_inds = [] # 目标单步范围内且中间没有障碍物的节点
for goal_ind in goal_inds:
t_node = self.steer(self.node_list[goal_ind], self.goal)
if self.check_collision(t_node):
safe_goal_inds.append(goal_ind)
if not safe_goal_inds:
return None
safe_goal_costs = [self.node_list[i].cost + # 从起点经过安全节点到目标的距离
self.calc_dist_to_goal(self.node_list[i].x, self.node_list[i].y)
for i in safe_goal_inds]
min_cost = min(safe_goal_costs)
for i, cost in zip(safe_goal_inds, safe_goal_costs):
if cost == min_cost:
return i
return None
def find_near_nodes(self, new_node):
"""
找到 new_node 周围一定范围内的树中的节点
1) defines a ball centered on new_node
2) Returns all nodes of the three that are inside this ball
Arguments:
---------
new_node: Node
new randomly generated node, without collisions between
its nearest node
Returns:
-------
list
List with the indices of the nodes inside the ball of
radius r
"""
nnode = len(self.node_list) + 1
r = self.connect_circle_dist * math.sqrt(math.log(nnode) / nnode)
# if expand_dist exists, search vertices in a range no more than expand_dist
if hasattr(self, 'expand_dis'):
r = min(r, self.expand_dis)
dist_list = [(node.x - new_node.x) ** 2 + (node.y - new_node.y) ** 2
for node in self.node_list]
near_inds = [dist_list.index(i) for i in dist_list if i <= r ** 2] #
return near_inds
def rewire(self, new_node, near_inds):
"""
新加入节点后为周围的其他节点重新计算最短路径并更新其父节点
For each node in near_inds, this will check if it is cheaper to
arrive to them from new_node.
In such a case, this will re-assign the parent of the nodes in
near_inds to new_node.
Parameters:
----------
new_node, Node
Node randomly added which can be joined to the tree
near_inds, list of uints
A list of indices of the self.new_node which contains
nodes within a circle of a given radius.
Remark: parent is designated in choose_parent.
"""
for i in near_inds:
near_node = self.node_list[i]
edge_node = self.steer(new_node, near_node)
if not edge_node:
continue
edge_node.cost = self.calc_new_cost(new_node, near_node)
no_collision = self.check_collision(edge_node)
improved_cost = near_node.cost > edge_node.cost
if no_collision and improved_cost:
for node in self.node_list:
if node.parent == near_node:
node.parent = edge_node
self.node_list[i] = edge_node
self.propagate_cost_to_leaves(self.node_list[i])
def calc_new_cost(self, from_node, to_node):
'''
从起始位置经过 from_node to_node cost
'''
d, _ = self.calc_distance_and_angle(from_node, to_node)
return from_node.cost + d
def propagate_cost_to_leaves(self, parent_node):
'''
(递归算法) 从父节点不断向子节点传播计算cost
'''
for node in self.node_list:
if node.parent == parent_node:
node.cost = self.calc_new_cost(parent_node, node)
self.propagate_cost_to_leaves(node)
def main():
print("Start " + __file__)
# ====Search Path with RRT====
obstacle_list = [
(5, 5, 1),
(3, 6, 2),
(3, 8, 2),
(3, 10, 2),
(7, 5, 2),
(9, 5, 2),
(8, 10, 1),
(6, 12, 1),
] # [x,y,size(radius)]
# Set Initial parameters
rrt_star = RRTStar(rand_area=[-2, 15], expand_dis=1, robot_radius=0.8)
path = rrt_star.planning(animation=show_animation)
if path is None:
print("Cannot find path")
else:
print("found path!!")
# Draw final path
if show_animation:
rrt_star.draw_graph()
plt.plot([x for (x, y) in path], [y for (x, y) in path], 'r--')
plt.grid(True)
plt.show()
if __name__ == '__main__':
main()

View File

@ -1,64 +0,0 @@
import os
import pickle
import time
import random
import matplotlib.pyplot as plt
import numpy as np
from scene_utils import control # TODO: 文件名改成Scene.py
from navigate import Navigator
if __name__ == '__main__':
file_name = 'map_5.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
control.init_world(1, 3)
scene = control.Scene(sceneID=0)
scene.reset()
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map)
goal = (200, 1400)
navigator.navigate(goal, animation=False)
# scene.add_walker(1085, 2630, 220)
# scene.control_walker([scene.walker_control_generator(0, False, 100, 755, 1900, 180)])
# print(scene.status)
#
# control.init_world(1, 3)
#
# scene = control.Scene(sceneID=0)
#
# # 实现单顾客领位
# scene.reset()
# scene.add_walker(0, -200, 220)
#
# for walker in scene.status.walkers:
# print(walker.pose.X)
# print('*************************')
# time.sleep(3)
# for walker in scene.status.walkers:
# print(walker.pose)
# print('*************************')
#
# scene.control_walker([scene.walker_control_generator(0, False, 500, 70, 880, 120)])
# time.sleep(0.5)
# for walker in scene.status.walkers:
# print(walker.pose)
# print('*************************')
# time.sleep(5)
# for walker in scene.status.walkers:
# print(walker.pose.X)
# print('*************************')
#
# scene.remove_walker(0)
# scene.clean_walker()

View File

@ -1,3 +0,0 @@
from . import navigate
from . import dstar_lite

View File

@ -0,0 +1,3 @@
from . import navigate
from . import dstar_lite

View File

@ -307,9 +307,7 @@ class DStarLite:
s_start = self.real2map(s_start)
s_goal = self.real2map(s_goal)
dyna_obs = [self.real2map(obs) for obs in dyna_obs]
self._planning(s_start, s_goal, dyna_obs, debug)
# 地图坐标->实际坐标
path = [self.map2real(node) for node in self.path]
return path
@ -327,7 +325,7 @@ class DStarLite:
path = []
cur = self.s_start
while cur != self.s_goal:
succ = self.get_neighbors(cur)
succ = [s_ for s_ in self.get_neighbors(cur) if s_ not in path] # 避免抖动 (不可走重复的点)
cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])]
path.append(cur)
# else:

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import math
from dstar_lite import DStarLite, euclidean_distance
from robowaiter.algos.navigate.dstar_lite import DStarLite, euclidean_distance
class Navigator:
@ -71,6 +71,13 @@ class Navigator:
print('plan pos:', next_pos, end=' ')
yaw = self.get_yaw(pos, next_pos)
self.scene.walk_to(next_pos[0], next_pos[1], yaw, velocity=self.v, dis_limit=10)
### 获取视觉图像
self.scene.get_camera_segment()
# pos = (self.scene.status.location.X, self.scene.status.location.Y)
# if self.is_reached(pos, next_pos):
self.planner.path = self.planner.path[next_step - 1:] # 去除已走过的路径

View File

View File

View File

@ -3,10 +3,18 @@ from robowaiter.behavior_lib._base.Behavior import Bahavior
class Act(Bahavior):
print_name_prefix = "act "
type = 'Act'
all_place = {'Bar', 'WaterTable', 'CoffeeTable', 'Bar2', 'Table1', 'Table2', 'Table3'}
all_place = {'Bar', 'Bar2', 'WaterTable', 'CoffeeTable', 'Table1', 'Table2', 'Table3'}
all_object = {'Coffee', 'Water', 'Dessert', 'Softdrink', 'BottledDrink', 'Yogurt', 'ADMilk', 'MilkDrink', 'Milk',
'VacuumCup'}
place_xyz_dic={
'Bar': (247.0, 520.0, 100.0),
'Bar2': (240.0, 40.0, 70.0),
'WaterTable':(-70.0, 500.0, 107),
'CoffeeTable':(247.0, 520.0, 100.0), # 位置需要更改!!!
'Table1': (247.0, 520.0, 100.0),# 位置需要更改!!!
'Table2': (-55.0, 0.0, 107),
'Table3':(-55.0, 150.0, 107)
}
def __init__(self,*args):
super().__init__(*args)
self.info = self.get_info(*args)

View File

@ -0,0 +1,47 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Act import Act
from robowaiter.behavior_lib._base.Behavior import Status
class Clean(Act):
can_be_expanded = True
num_args = 1
valid_args = (
'Table1','Floor','Chairs'
)
def __init__(self, *args):
super().__init__(*args)
self.target_obj = self.args[0]
self.op_type = 5
if self.target_obj=="Table1":
self.op_type = 5
elif self.target_obj=="Floor":
self.op_type = 4
elif self.target_obj=="Chairs":
self.op_type = 7
@classmethod
def get_info(cls,arg):
info = {}
info["pre"]= {f'Holding(Nothing)'}
if arg == "Table1":
info["add"]= {f'Is(Table1,Clean)'}
info["del"] = {f'Is(Table1,Dirty)'}
elif arg == "Floor":
info["add"] = {f'Is(Floor,Clean)'}
info["del"] = {f'Is(Floor,Dirty)'}
elif arg == "Chairs":
info["add"] = {f'Is(Chairs,Clean)'}
info["del"] = {f'Is(Chairs,Dirty)'}
return info
def _update(self) -> ptree.common.Status:
self.scene.move_task_area(self.op_type)
self.scene.op_task_execute(self.op_type)
self.scene.state["condition_set"].union(self.info["add"])
self.scene.state["condition_set"] -= self.info["del"]
return Status.RUNNING

View File

@ -13,6 +13,13 @@ class Make(Act):
def __init__(self, *args):
super().__init__(*args)
self.target_obj = self.args[0]
self.op_type = 1
if self.target_obj=="Coffee":
self.op_type = 1
elif self.target_obj=="Water":
self.op_type = 2
elif self.target_obj=="Dessert":
self.op_type = 3
@classmethod
@ -29,9 +36,10 @@ class Make(Act):
return info
def _update(self) -> ptree.common.Status:
op_type = 1
self.scene.move_task_area(op_type)
self.scene.op_task_execute(op_type)
self.scene.move_task_area(self.op_type)
self.scene.op_task_execute(self.op_type)
self.scene.state["condition_set"].union(self.info["add"])
self.scene.state["condition_set"] -= self.info["del"]
return Status.RUNNING

View File

@ -1,7 +1,6 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Act import Act
from robowaiter.algos.navigate.DstarLite.navigate import Navigator
from robowaiter.algos.navigate.navigate import Navigator
class MoveTo(Act):
can_be_expanded = True
@ -25,13 +24,13 @@ class MoveTo(Act):
def _update(self) -> ptree.common.Status:
# self.scene.test_move()
# navigator = Navigator(scene=self.scene, area_range=[-350, 600, -400, 1450], map=self.scene.state["map"]["2d"])
# goal = self.scene.state['map']['obj_pos'][self.args[0]]
# navigator.navigate(goal, animation=False)
#
# self.scene.state['condition_set'].add('At(Robot,Table)')
navigator = Navigator(scene=self.scene, area_range=[-350, 600, -400, 1450], map=self.scene.state["map"]["2d"])
goal = self.scene.state['map']['obj_pos'][self.args[0]]
self.scene.walk_to(goal[0],goal[1])
navigator.navigate(goal, animation=False)
self.scene.state['condition_set'].add('At(Robot,Table)')
# goal = self.scene.state['map']['obj_pos'][self.args[0]]
# self.scene.walk_to(goal[0],goal[1])
return ptree.common.Status.RUNNING

View File

@ -0,0 +1,34 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Act import Act
from robowaiter.behavior_lib._base.Behavior import Status
class PickUp(Act):
can_be_expanded = True
num_args = 1
def __init__(self, *args):
super().__init__(*args)
self.target_obj = self.args[0]
@classmethod
def get_info(self,arg):
info = {}
info["pre"] = {f'At(Robot,{arg})','Holding(Nothing)'}
info["add"] = {f'Holding({arg})'}
info["del"] = {f'Holding(Nothing)'}
return info
def _update(self) -> ptree.common.Status:
# self.scene.test_move()
op_type=16
obj_id = 0
# 遍历场景里的所有物品,根据名字匹配位置最近的 obj-id
self.scene.op_task_execute(op_type, obj_id=obj_id)
self.scene.state["condition_set"].union(self.info["add"])
self.scene.state["condition_set"] -= self.info["del"]
return Status.RUNNING

View File

@ -0,0 +1,35 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Act import Act
from robowaiter.behavior_lib._base.Behavior import Status
class PutDown(Act):
can_be_expanded = True
num_args = 1
def __init__(self, *args):
super().__init__(*args)
self.target_obj = self.args[0]
self.target_place = self.args[1]
@classmethod
def get_info(self,arg):
info = {}
info["pre"] = {f'Holding({arg[0]})',f'At(Robot,{arg[1]})'}
info["add"] = {f'Holding(Nothing)',f'At({arg[0]},{arg[1]})'}
info["del"] = {f'Holding(Nothing)'}
return info
def _update(self) -> ptree.common.Status:
# self.scene.test_move()
op_type=17
release_pos = list(Act.place_xyz_dic[self.target_place])
# # 原始吧台处:[247.0, 520.0, 100.0], 空调开关旁吧台:[240.0, 40.0, 70.0], 水杯桌:[-70.0, 500.0, 107]
# # 桌子2:[-55.0, 0.0, 107],桌子3:[-55.0, 150.0, 107]
self.scene.op_task_execute(op_type, release_pos=release_pos)
self.scene.state["condition_set"].union(self.info["add"])
self.scene.state["condition_set"] -= self.info["del"]
return Status.RUNNING

View File

@ -0,0 +1,64 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Act import Act
from robowaiter.behavior_lib._base.Behavior import Status
class Clean(Act):
can_be_expanded = True
num_args = 1
valid_args = [('AC','ACTemperature','TubeLight','HallLight','Curtain'),
('Off','On','Up','Down','Clean','Dirty')]
def __init__(self, *args):
super().__init__(*args)
self.target_obj = self.args[0]
self.op = self.args[1]
self.op_type = 13
if self.target_obj=="AC":
self.op_type = 13
elif self.target_obj=="ACTemperature":
if self.op == 'Up':
self.op_type = 14
elif self.op == 'Down':
self.op_type = 15
elif self.target_obj=="TubeLight":
if self.op == 'On':
self.op_type = 6
elif self.op == 'Off':
self.op_type = 8
elif self.target_obj=="HallLight":
if self.op == 'On':
self.op_type = 9
elif self.op == 'Off':
self.op_type = 10
elif self.target_obj=="Curtain":
if self.op == 'On':
self.op_type = 11
elif self.op == 'Off':
self.op_type = 12
@classmethod
def get_info(cls,arg):
info = {}
# 明天写
# info["pre"]= {f'Holding(Nothing)'}
# if arg == "Table1":
# info["add"]= {f'Is(Table1,Clean)'}
# info["del"] = {f'Is(Table1,Dirty)'}
# elif arg == "Floor":
# info["add"] = {f'Is(Floor,Clean)'}
# info["del"] = {f'Is(Floor,Dirty)'}
# elif arg == "Chairs":
# info["add"] = {f'Is(Chairs,Clean)'}
# info["del"] = {f'Is(Chairs,Dirty)'}
return info
def _update(self) -> ptree.common.Status:
self.scene.move_task_area(self.op_type)
self.scene.op_task_execute(self.op_type)
self.scene.state["condition_set"].union(self.info["add"])
self.scene.state["condition_set"] -= self.info["del"]
return Status.RUNNING

View File

@ -1,6 +1,7 @@
import time
import grpc
import numpy as np
import matplotlib.pyplot as plt
from robowaiter.proto import GrabSim_pb2
from robowaiter.proto import GrabSim_pb2_grpc
@ -24,11 +25,18 @@ def init_world(scene_num=1, mapID=11):
time.sleep(3) # wait for the map to load
def image_extract(camera_data):
image = camera_data.images[0]
return np.frombuffer(image.data, dtype=image.dtype).reshape(
(image.height, image.width, image.channels)
)
def show_image(camera_data):
print('------------------show_image----------------------')
#获取第0张照片
im = camera_data.images[0]
#使用numpy(np) 数值类型矩阵的frombuffer将im.data以流的形式向量的形式读入在变型reshape成三位矩阵的形式(长度,宽度,深度)即三阶张量
d = np.frombuffer(im.data, dtype=im.dtype).reshape((im.height, im.width, im.channels))
#matplotlib中的plt方法 对矩阵d 进行图形绘制,如果 深度相机拍摄的带深度的图片图片名字中有depth信息则转换成黑白图即灰度图
plt.imshow(d, cmap="gray" if "depth" in im.name.lower() else None)
#图像展示在屏幕上
plt.show()
return d
class Scene:
@ -299,7 +307,7 @@ class Scene:
)
)
if image_only:
return image_extract(camera_data)
return show_image(camera_data)
else:
return camera_data
@ -310,20 +318,20 @@ class Scene:
)
)
if image_only:
return image_extract(camera_data)
return show_image(camera_data)
else:
return camera_data
def get_camera_segment(self, image_only=True):
def get_camera_segment(self, show=True):
camera_data = stub.Capture(
GrabSim_pb2.CameraList(
cameras=[GrabSim_pb2.CameraName.Head_Segment], scene=self.sceneID
)
)
if image_only:
return image_extract(camera_data)
else:
return camera_data
if show:
show_image(camera_data)
return camera_data
def chat_bubble(self, message):
stub.ControlRobot(

View File

@ -15,7 +15,7 @@ import numpy as np
from robowaiter.scene.scene import Scene,init_world # TODO: 文件名改成Scene.py
from robowaiter.scene.scene import Scene
from robowaiter.utils import get_root_path
class SceneVLN(Scene):
def __init__(self, robot):
@ -26,7 +26,8 @@ class SceneVLN(Scene):
]
def _reset(self):
file_name = './robowaiter/algos/navigate/DstarLite/map_5.pkl'
root_path = get_root_path()
file_name = os.path.join(root_path,'robowaiter/algos/navigate/map_5.pkl')
with open(file_name, 'rb') as file:
map = pickle.load(file)

View File

@ -1,7 +1,7 @@
import os
from robowaiter import Robot, task_map
TASK_NAME = 'VLM'
TASK_NAME = 'VLN'
# create robot
project_path = "./robowaiter"

View File

@ -1,3 +1,4 @@
selector{
cond EnvExplored()
cond At(Robot,Table)
act MoveTo(Table)
}