Merge branch 'main' of github.com:HPCL-EI/RoboWaiter

This commit is contained in:
ChenXL97 2023-11-16 20:48:22 +08:00
commit 4feb75e80d
45 changed files with 967 additions and 479 deletions

View File

@ -1,15 +1,20 @@
antlr4-python3-runtime
py_trees
shortuuid
shortuuid~=1.0.11
protobuf==3.20.0
gym==0.21.0
grpcio==1.53.0
requests
urllib3
tabulate
requests~=2.31.0
urllib3~=2.0.7
tabulate~=0.9.0
autopep8
pytorch==1.11.0
torchvision==0.12.0
torchaudio==0.11.0
cudatoolkit=11.3
loguru
loguru~=0.5.3
matplotlib~=3.8.0
numpy~=1.26.0
setuptools~=68.0.0
pydot~=1.4.2
colorama~=0.4.6

View File

@ -1,4 +1,5 @@
import time
import math
import grpc
import numpy as np
@ -358,29 +359,29 @@ class Scene:
temp = stub.GetIKControlInfos(GrabSim_pb2.HandPostureInfos(scene=self.sceneID, handPostureObjects=HandPostureObject))
# 移动到进行操作任务的指定地点
def move_task_area(self,op_type):
if op_type==11 or op_type==12: # 开关窗帘不需要移动
return
scene = stub.Observe(GrabSim_pb2.SceneID(value=self.sceneID))
walk_value = [scene.location.X, scene.location.Y, scene.rotation.Yaw]
if op_type < 8:
v_list = self.op_v_list[op_type]
if op_type>=8 and op_type<=10: # 控灯
v_list = self.op_v_list[6]
if op_type in [13,14,15]: # 空调
v_list = [[240, -140.0]] # KongTiao [300.5, -140.0] # 250
print("------------------move_task_area----------------------")
print("Current Position:", walk_value,"开始任务:",self.op_dialog[op_type])
for walk_v in v_list:
walk_v = walk_v + [scene.rotation.Yaw, 180, 0]
walk_v[2] = 0 if (op_type in [13,14,15]) else scene.rotation.Yaw # 空调操作朝向墙面
action = GrabSim_pb2.Action(
scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v
)
scene = stub.Do(action)
print("After Walk Position:",[scene.location.X, scene.location.Y, scene.rotation.Yaw])
# def move_task_area(self,op_type):
# if op_type==11 or op_type==12: # 开关窗帘不需要移动
# return
# scene = stub.Observe(GrabSim_pb2.SceneID(value=self.sceneID))
# walk_value = [scene.location.X, scene.location.Y, scene.rotation.Yaw]
#
# if op_type < 8:
# v_list = self.op_v_list[op_type]
# if op_type>=8 and op_type<=10: # 控灯
# v_list = self.op_v_list[6]
# if op_type in [13,14,15]: # 空调
# v_list = [[240, -140.0]] # KongTiao [300.5, -140.0] # 250
# print("------------------error version----------------------")
# print("------------------move_task_area----------------------")
# print("Current Position:", walk_value,"开始任务:",self.op_dialog[op_type])
# for walk_v in v_list:
# walk_v = walk_v + [scene.rotation.Yaw, 180, 0]
# walk_v[2] = 0 if (op_type in [13,14,15]) else scene.rotation.Yaw # 空调操作朝向墙面
# action = GrabSim_pb2.Action(
# scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v
# )
# scene = stub.Do(action)
# print("After Walk Position:",[scene.location.X, scene.location.Y, scene.rotation.Yaw])
# 相应的行动,由主办方封装
def control_robot_action(self, type=0, action=0, message="你好"):
@ -512,3 +513,4 @@ class Scene:
print(scene.info)

View File

@ -1,59 +0,0 @@
# !/usr/bin/env python3
# -*- encoding: utf-8 -*-
import matplotlib.pyplot as plt
import numpy as np
import pickle
import os
def draw_grid_map(grid_map):
# 生成新的地图图像
plt.imshow(grid_map, cmap='binary', alpha=0.5, origin='lower') # 黑白网格
# 绘制坐标轴
plt.xlabel('y', loc='right')
plt.ylabel('x', loc='top')
# 显示网格线
plt.grid(color='black', linestyle='-', linewidth=0.5)
# 显示图像
plt.show()
#plt.pause(0.01)
if __name__ == '__main__':
# control.init_world(scene_num=1, mapID=3)
# scene = control.Scene(sceneID=0)
#
# X = int(950/5) # 采点数量
# Y = int(1850/5)
# map = np.zeros((X, Y))
#
# for x in range(X):
# for y in range(Y):
# if not scene.reachable_check(x*5-350, y*5-400, Yaw=0):
# map[x, y] = 1
# print(x, y)
#
#
# file_name = 'map_5.pkl'
# if not os.path.exists(file_name):
# open(file_name, 'w').close()
# with open(file_name, 'wb') as file:
# pickle.dump(map, file)
# print('保存成功')
file_name = 'map_5.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
draw_grid_map(map)

View File

@ -1,158 +0,0 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import math
import sys
import time
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
# from scene import control
# from rrt import RRT
# from rrt_star import RRTStar
# from apf import APF
from robowaiter.algos.navigate.dstar_lite import DStarLite, euclidean_distance
class Navigator:
'''
导航类
'''
def __init__(self, scene, area_range, map, scale_ratio=5, step_length=150, velocity=150, react_radius=300):
self.scene = scene
self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax
self.map = map # 缩放并离散化的地图 array(X,Y)
self.scale_ratio = scale_ratio # 地图缩放率
self.step_length = step_length # 步长(单次移动)
self.step_num = self.step_length // self.scale_ratio # 单次移动地图格数
self.v = velocity # 速度
self.react_radius = react_radius # robot反应半径
# self.planner = RRTStar(rand_area=area_range, map=map, scale_ratio=scale_ratio, max_iter=400, search_until_max_iter=True)
self.planner = DStarLite(area_range=area_range, map=map, scale_ratio=scale_ratio)
@staticmethod
def is_reached(pos: (float, float), goal: (float, float), dis_limit=30):
'''
判断是否到达目标
'''
dis = math.hypot(pos[0]-goal[0], pos[1]-goal[1])
# dis = np.linalg.norm(pos - goal)
return dis < dis_limit
@staticmethod
def get_yaw(pos: (float, float), goal: (float, float)):
'''
得到移动方向
'''
return math.degrees(math.atan2((goal[1] - pos[1]), (goal[0] - pos[0])))
def legalize_goal(self, goal: (float, float)):
'''
TODO: 处理非法目标
目标在障碍物上从目标开始方形向外扩展直到找到可行点
目标在地图外面起点和目标连线最靠近目标的可行点
'''
return goal
def navigate(self, goal: (float, float), animation=True):
'''
单次导航直到到达目标
'''
if not self.scene.reachable_check(goal[0], goal[1], 0):
goal = self.legalize_goal(goal)
pos = (self.scene.status.location.X, self.scene.status.location.Y) # 机器人当前: 位置 和 朝向
print('------------------navigation_start----------------------')
while not self.is_reached(pos, goal):
dyna_obs = [(walker.pose.X, walker.pose.Y) for walker in self.scene.status.walkers] # 动态障碍物(顾客)位置列表
dyna_obs = [obs for obs in dyna_obs if euclidean_distance(obs, pos) < self.react_radius] # 过滤观测范围外的dyna_obs
# 周围有dyna_obs则步长减半
if dyna_obs:
step_num = self.step_num // 2
else:
step_num = self.step_num
path = self.planner.planning(pos, goal, dyna_obs)
if path:
if animation:
self.planner.draw_graph(step_num) # 画出搜索路径
next_step = min(step_num, len(path))
next_pos = path[next_step - 1]
# print('plan pos:', next_pos, end=' ')
yaw = self.get_yaw(pos, next_pos)
# print("yaw:",yaw)
self.scene.walk_to(next_pos[0], next_pos[1], Yaw=yaw, velocity=self.v, dis_limit=10)
# pos = (self.scene.status.location.X, self.scene.status.location.Y)
# if self.is_reached(pos, next_pos):
self.planner.path = self.planner.path[next_step - 1:] # 去除已走过的路径
pos = (self.scene.status.location.X, self.scene.status.location.Y)
# print('reach pos:', pos)
self.planner.reset() # 完成一轮导航,重置变量
if self.is_reached(pos, goal):
print('The robot has achieved goal !!')
# def navigate(self, goal: (float, float), path_smoothing=True, animation=True):
# pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) # 机器人当前: 位置 和 朝向
# yaw = self.scene.status.rotation.Yaw
# print('------------------navigation_start----------------------')
#
# path = self.planner.planning(pos, goal, path_smoothing, animation)
# if path:
# self.planner.draw_graph(final_path=path) # 画出探索过程
# for (x, y) in path:
# self.scene.walk_to(x, y, yaw, velocity=self.v)
# time.sleep(self.step_time)
# pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
#
# self.planner.reset()
#
# if self.is_reached(pos, goal):
# print('The robot has achieved goal !!')
'''APF势场法暂不可用'''
# while not self.is_reached(pos, goal):
# # 1. 路径规划
# path = self.planner.planning(pos, goal, path_smoothing, animation)
# self.planner.draw_graph(final_path=path) # 画出探索过程
#
# # 2. 使用APF导航到路径中的每个waypoint
# traj = [(pos[0], pos[1])]
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# for i, waypoint in enumerate(path[1:]):
# print('waypoint [', i, ']:', waypoint)
# # if (not self.scene.reachable_check(waypoint[0], waypoint[1], yaw)) and self.map[self.planner.real2map(waypoint[0], waypoint[1])] == 0:
# # print('error')
# while not self.is_reached(pos, waypoint):
# # 2.1 计算next_step
# pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
# Pobs = [] # 障碍物(顾客)位置数组
# for walker in self.scene.status.walkers:
# Pobs.append((walker.pose.X, walker.pose.Y))
# next_step, _ = APF(Pi=pos, Pg=waypoint, Pobs=Pobs, step_length=self.step_length)
# traj.append((next_step[0], next_step[1]))
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# while not self.scene.reachable_check(next_step[0], next_step[1], yaw): # 取中点直到next_step可达
# traj.pop()
# next_step = (pos + next_step) / 2
# traj.append((next_step[0], next_step[1]))
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# # 2.2 移动robot
# self.scene.walk_to(next_step[0], next_step[1], yaw, velocity=self.v)
# # print(self.scene.status.info) # print navigation info
# # print(self.scene.status.collision)
# time.sleep(self.step_time)
# # print(self.scene.status.info) # print navigation info
# # print(self.scene.status.collision)
# self.planner.reset()

View File

@ -1,52 +0,0 @@
## 默认使用RRTStar+路径平滑
### apf.py: 势场法实现
### discretize_map.py: 地图离散化并压缩
### map_5.pkl: 地图文件(5倍压缩)
### navigate.py: 导航类
### pathsmoothing.py: 路径平滑
### rrt.py: RRT实现
### rrt_star.py: RRTStar 实现
### test.py: 测试文件
## TODO
### 目标不合法
#### 初始目标不合法
目标在障碍物上:从目标开始方形向外扩展,直到找到可行点
目标在地图外面:起点和目标连线最靠近目标的可行点
对不合法的目标做单独处理,生成新的目标
#### 在移动过程中目标被占据
给出目标但不会行移动,程序会继续运行,重新计算规划路径,给出新目标
### 规划中断情况
### 计算转向角
`完成`
### 有些本来可达的位置却无法走到系统bug?
例如从初始位置(247, 500) 移动到(115, -10),无法到达
`(已解决)将dis_limit设为小值5而非0`
#### 构型空间膨胀??
`不需要`
### 只考虑一定范围内的行人
观测范围 / 反应半径
`完成`
#### 观测范围内有行人步长要减小
`完成`

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,90 @@
# !/usr/bin/env python3
# -*- encoding: utf-8 -*-
import matplotlib.pyplot as plt
import numpy as np
import pickle
import os
from scipy.ndimage import binary_dilation
from scene import scene
def draw_grid_map(grid_map):
# 生成新的地图图像
plt.imshow(grid_map, cmap='binary', alpha=0.5, origin='lower') # 黑白网格
# 绘制坐标轴
plt.xlabel('y', loc='right')
plt.ylabel('x', loc='top')
# 显示网格线
plt.grid(color='black', linestyle='-', linewidth=0.5)
# 显示图像
plt.show()
#plt.pause(0.01)
def discretize_map(scene, scale_ratio):
X = int(950 / scale_ratio) # 采点数量
Y = int(1850 / scale_ratio)
map = np.zeros((X, Y))
for x in range(X):
for y in range(Y):
if not scene.reachable_check(x * scale_ratio - 350, y * scale_ratio - 400, Yaw=0):
map[x, y] = 1
print(x, y)
file_name = 'map_'+str(scale_ratio)+'.pkl'
if not os.path.exists(file_name):
open(file_name, 'w').close()
with open(file_name, 'wb') as file:
pickle.dump(map, file)
print('保存成功')
def expand_obstacles(scale_ratio, expand_range=1):
'''
障碍物边沿扩展
TODO: 扩展后的地图不可用
'''
file_name = 'map_'+str(scale_ratio)+'.pkl'
dilated_file_name = 'map_'+str(scale_ratio)+'_e'+str(expand_range)+'.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
dilated_map = binary_dilation(map, iterations=expand_range)
if not os.path.exists(dilated_file_name):
open(dilated_file_name, 'w').close()
with open(dilated_file_name, 'wb') as file:
pickle.dump(dilated_map, file)
print('保存成功')
def show_map(file_name):
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
draw_grid_map(map)
if __name__ == '__main__':
# scene.init_world(scene_num=1, mapID=11)
# scene = scene.Scene(sceneID=0)
#
# # 离散化地图
# discretize_map(scene, scale_ratio=4)
# # 扩张构型空间
# expand_obstacles(scale_ratio=3, expand_range=1)
# 展示离散化地图
file_name = 'costMap_4.pkl'
show_map(file_name)

View File

@ -5,8 +5,8 @@
'''
import math
import queue
from functools import partial
import os
import pickle
import numpy as np
import heapq
@ -115,13 +115,12 @@ class PriorityQueue:
class DStarLite:
def __init__(self,
map: np.array([int, int]), # [X, Y]
area_range, # [x_min, x_max, y_min, y_max] 实际坐标范围
scale_ratio=5, # 地图缩放率
dyna_obs_radius=30, # dyna_obs实际身位半径
map: np.array([int, int]),
area_range, # [x_min, x_max, y_min, y_max] 实际坐标范围
scale_ratio=5, # 地图缩放率
dyna_obs_radius=36, # dyna_obs实际身位半径
):
# self.area_bounds = area
self.map = map
self.background = map.copy()
self.X = map.shape[0]
@ -145,9 +144,13 @@ class DStarLite:
"obstacle": float('inf'),
"dynamic obstacle": 100
}
self.cost_map = np.zeros_like(self.map)
file_name = 'costMap_'+str(self.scale_ratio)+'.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
cost_map = pickle.load(file)
self.cost_map = cost_map
self.cost_background = self.cost_map.copy()
self.compute_cost_map()
self.s_start = None # (int,int) 必须是元组(元组可以直接当作矩阵索引)
self.s_goal = None # (int,int)
@ -163,9 +166,11 @@ class DStarLite:
# 设置map 和 cost_map
# '''
# self.map = map_
# self.background = map_.copy()
# self.X = map_.shape[0]
# self.Y = map_.shape[1]
# self.compute_cost_map()
# self.cost_background = self.cost_map.copy()
def reset(self):
'''
@ -299,11 +304,6 @@ class DStarLite:
self.compute_shortest_path()
self.path = self.get_path()
return self.path
# TODO: 误差抖动使robot没有到达路径上的点导致新起点的rhs=∞可能导致get_path失败 ( 当前版本没有该问题 )
# assert (self.rhs[self.s_start] != float('inf')), "There is no known path!"
# # debug
# if debug:
# pass
def planning(self, s_start, s_goal, dyna_obs, debug=False):
'''
@ -311,7 +311,10 @@ class DStarLite:
'''
# 实际坐标 -> 地图坐标
s_start = self.real2map(s_start)
s_goal = self.real2map(s_goal)
if self.s_goal is None:
s_goal = self.real2map(s_goal)
else:
s_goal = self.s_goal
dyna_obs = [self.real2map(obs, reachable_assurance=False) for obs in dyna_obs]
self._planning(s_start, s_goal, dyna_obs, debug)
@ -336,13 +339,6 @@ class DStarLite:
succ = [s_ for s_ in self.get_neighbors(cur) if s_ not in path] # 避免抖动 (不可走重复的点)
cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])]
path.append(cur)
# else:
# for i in range(step_num):
# if cur == self.s_goal:
# break
# succ = self.get_neighbors(cur)
# cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])]
# path.append(cur)
return path
def in_bounds_without_obstacle(self, pos):
@ -357,8 +353,6 @@ class DStarLite:
获取邻居节点, 地图范围内
'''
(x_, y_) = pos
# results = [(x_+1,y_), (x_-1,y_), (x_, y_+1), (x_,y_-1)]
# if mode == 8:
neighbors = [(x_ + 1, y_), (x_ - 1, y_), (x_, y_ + 1), (x_, y_ - 1), (x_ + 1, y_ + 1), (x_ - 1, y_ + 1),
(x_ + 1, y_ - 1), (x_ - 1, y_ - 1)]
neighbors = filter(self.in_bounds_without_obstacle, neighbors) # 确保位置在地图范围内 且 不是静态障碍物
@ -366,17 +360,26 @@ class DStarLite:
def compute_cost_map(self):
# 计算当前地图的cost_map
self.cost_map = np.zeros_like(self.map)
for idx, obj in self.idx_to_object.items():
self.cost_map[self.map == idx] = self.object_to_cost[obj]
# # TODO
# for x in range(self.X):
# for y in range(self.Y):
# if self.cost_map[x, y] > 0:
# neighbors = self.get_neighbors((x, y))
# for (x_, y_) in neighbors:
# self.cost_map[x_, y_] = max(self.cost_map[x_, y_], self.cost_map[x, y] - 10)
# 扩张静态障碍物影响范围
obs_pos = np.where(self.map == self.object_to_idx['obstacle']) # 静态障碍物位置列表
for (x, y) in zip(obs_pos[0], obs_pos[1]):
start_x, end_x = max(x - 1, 0), min(x + 1, self.X - 1)
start_y, end_y = max(y - 1, 0), min(y + 1, self.Y - 1)
for cost in range(9, 0, -3):
for x_ in range(start_x, end_x + 1):
self.cost_map[x_, start_y] = max(self.cost_map[x_, start_y], cost)
for y_ in range(start_y + 1, end_y + 1):
self.cost_map[end_x, y_] = max(self.cost_map[end_x, y_], cost)
for x_ in range(end_x - 1, start_x - 1, -1):
self.cost_map[x_, end_y] = max(self.cost_map[x_, end_y], cost)
for y_ in range(end_y - 1, start_y, -1):
self.cost_map[start_x, y_] = max(self.cost_map[start_x, y_], cost)
start_x, end_x = max(start_x - 1, 0), min(end_x + 1, self.X - 1)
start_y, end_y = max(start_y - 1, 0), min(end_y + 1, self.Y - 1)
self.cost_background = self.cost_map.copy()
@ -388,8 +391,8 @@ class DStarLite:
return:
update_obj: 改变的位置列表 [(x, y, obj_idx, obj_idx_old), ...]
'''
# dyna_obs没有变化 (集合set可以忽略元素在列表中的位置)
if set(dyna_obs) == set(self.dyna_obs_list):
# dyna_obs没有变化 (集合set可以忽略元素在列表中的位置) 且 robot未在dyna_obs占用位置中
if set(dyna_obs) == set(self.dyna_obs_list) and self.s_start not in self.dyna_obs_occupy:
return []
# 当前dyna_obs占用位置列表
@ -397,24 +400,10 @@ class DStarLite:
for pos in dyna_obs:
dyna_obs_occupy.extend(self.get_occupy_pos(pos))
dyna_obs_occupy = [pos for i, pos in enumerate(dyna_obs_occupy) if pos not in dyna_obs_occupy[:i]] # 去除重复位置
# 转变为free 和 转变为obs的位置列表
# 转变为free 和 转变为dyna_obs的位置列表
changed_free = [pos for pos in self.dyna_obs_occupy if pos not in dyna_obs_occupy]
changed_obs = [pos for pos in dyna_obs_occupy if pos not in self.dyna_obs_occupy]
# # 新旧dyna_obs占用位置列表
# old_obs_occupy = []
# new_obs_occupy = []
# for pos in self.dyna_obs_list:
# old_obs_occupy.extend(self.get_occupy_pos(pos))
# for pos in dyna_obs:
# new_obs_occupy.extend(self.get_occupy_pos(pos))
# old_obs_occupy = [pos for i, pos in enumerate(old_obs_occupy) if pos not in old_obs_occupy[:i]] # 去除重复位置
# new_obs_occupy = [pos for i, pos in enumerate(new_obs_occupy) if pos not in new_obs_occupy[:i]] # 去除重复位置
#
# # 转变为free 和 转变为obs的位置列表
# changed_free = [pos for pos in old_obs_occupy if pos not in new_obs_occupy]
# changed_obs = [pos for pos in new_obs_occupy if pos not in old_obs_occupy]
# 更新地图计算changed_pos
changed_pos = []
for (x, y) in changed_free:
@ -430,21 +419,15 @@ class DStarLite:
return changed_pos
def get_occupy_pos(self, obs_pos):
'''
根据dyna_obs中心位置计算其占用的所有网格位置
'''
(x, y) = obs_pos
occupy_radius = min(self.dyna_obs_radius, int(euclidean_distance(obs_pos, self.s_start) - 1)) # 避免robot被dyna_obs的占用区域包裹住
# for i in range(x - self.dyna_obs_radius, x + self.dyna_obs_radius + 1): # 方形区域
# for j in range(y - self.dyna_obs_radius, y + self.dyna_obs_radius + 1):
# occupy_pos.append((i, j))
occupy_pos = [(i, j) for i in range(x - occupy_radius, x + occupy_radius + 1) # 圆形区域
for j in range(y - occupy_radius, y + occupy_radius + 1)
if euclidean_distance((i, j), obs_pos) < occupy_radius]
occupy_pos = filter(self.in_bounds_without_obstacle, occupy_pos) # 确保位置在地图范围内 且 不是静态障碍物
return list(occupy_pos)
@ -497,21 +480,40 @@ class DStarLite:
'''
x = round((pos[0] - self.x_min) / self.scale_ratio)
y = round((pos[1] - self.y_min) / self.scale_ratio)
# 需要确保点可达
if reachable_assurance and self.idx_to_object[self.map[x, y]] != 'free':
print('1')
x_ = math.floor((pos[0] - self.x_min) / self.scale_ratio)
y_ = math.floor((pos[1] - self.y_min) / self.scale_ratio)
candidates = [(x_, y_), (x_ + 1, y_), (x_, y_ + 1), (x_ + 1, y_ + 1)]
for (x, y) in candidates:
print(self.idx_to_object[self.map[x, y]])
if self.idx_to_object[self.map[x, y]] == 'free':
print((x,y))
return tuple((x, y))
raise Exception('error')
# 确保点不在静态障碍物上,否则就不断向外圈扩展直到找到非静态障碍物位置
if reachable_assurance:
return self.validate_pos((x, y))
else:
return tuple((x, y))
def validate_pos(self, pos):
'''
对于不合法的pos找到周围距离最近的合法坐标
'''
(x, y) = pos
x = max(0, min(x, self.X - 1))
y = max(0, min(y, self.Y - 1))
if self.idx_to_object[self.map[x, y]] == 'obstacle':
start_x, end_x = max(x - 1, 0), min(x + 1, self.X - 1)
start_y, end_y = max(y - 1, 0), min(y + 1, self.Y - 1)
while True:
for x_ in range(start_x, end_x + 1):
if self.idx_to_object[self.map[x_, start_y]] != 'obstacle':
return tuple((x_, start_y))
for y_ in range(start_y + 1, end_y + 1):
if self.idx_to_object[self.map[end_x, y_]] != 'obstacle':
return tuple((end_x, y_))
for x_ in range(end_x - 1, start_x - 1, -1):
if self.idx_to_object[self.map[x_, end_y]] != 'obstacle':
return tuple((x_, end_y))
for y_ in range(end_y - 1, start_y, -1):
if self.idx_to_object[self.map[start_x, y_]] != 'obstacle':
return tuple((start_x, y_))
start_x, end_x = max(start_x - 1, 0), min(end_x + 1, self.X - 1)
start_y, end_y = max(start_y - 1, 0), min(end_y + 1, self.Y - 1)
# raise Exception('invalid pos!')
return tuple((x, y))
def draw_graph(self, step_num):
# 清空当前figure内容保留figure对象
plt.clf()

Binary file not shown.

After

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,115 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import math
import os
import pickle
import matplotlib.pyplot as plt
import numpy as np
from robowaiter.scene import scene
from robowaiter.algos.navigator.dstar_lite import DStarLite, euclidean_distance
class Navigator:
'''
导航类
'''
def __init__(self, scene, area_range, map, scale_ratio=5, step_length=150, velocity=150, react_radius=300):
self.scene = scene
self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax
self.map = map # 缩放并离散化的地图 array(X,Y)
self.scale_ratio = scale_ratio # 地图缩放率
self.step_length = step_length # 步长(单次移动)
self.step_num = self.step_length // self.scale_ratio # 单次移动地图格数
self.v = velocity # 速度
self.react_radius = react_radius # robot反应半径
self.planner = DStarLite(area_range=area_range, map=map, scale_ratio=scale_ratio)
def validate_goal(self, goal):
'''
目标合法化
'''
return self.planner.map2real(self.planner.real2map(goal))
@staticmethod
def is_reached(pos: (float, float), goal: (float, float), dis_limit=50):
'''
判断是否到达目标
'''
dis = math.hypot(pos[0]-goal[0], pos[1]-goal[1])
# dis = np.linalg.norm(pos - goal)
return dis < dis_limit
@staticmethod
def get_yaw(pos: (float, float), goal: (float, float)):
'''
得到移动方向
'''
# return math.degrees(math.atan2(goal[0] - pos[0], -(goal[1] - pos[1])))
return math.degrees(math.atan2((goal[1] - pos[1]), (goal[0] - pos[0])))
def navigate(self, goal: (float, float), animation=True):
'''
单次导航直到到达目标
'''
goal = self.validate_goal(goal) # 目标合法化
pos = (self.scene.status.location.X, self.scene.status.location.Y) # 机器人当前: 位置 和 朝向
print('------------------navigation_start----------------------')
while not self.is_reached(pos, goal):
dyna_obs = [(walker.pose.X, walker.pose.Y) for walker in self.scene.status.walkers] # 动态障碍物(顾客)位置列表
dyna_obs = [obs for obs in dyna_obs if euclidean_distance(obs, pos) < self.react_radius] # 过滤观测范围外的dyna_obs
# 周围有dyna_obs则步长减半
if dyna_obs:
step_num = self.step_num // 2
else:
step_num = self.step_num
path = self.planner.planning(pos, goal, dyna_obs)
if path:
if animation:
self.planner.draw_graph(step_num) # 画出搜索路径
next_step = min(step_num, len(path))
next_pos = path[next_step - 1]
print('plan pos:', next_pos, end=' ')
yaw = self.get_yaw(pos, next_pos)
self.scene.walk_to(next_pos[0], next_pos[1], yaw, velocity=self.v, dis_limit=10)
self.planner.path = self.planner.path[next_step - 1:] # 去除已走过的路径
pos = (self.scene.status.location.X, self.scene.status.location.Y)
print('reach pos:', pos)
self.planner.reset() # 完成一轮导航,重置变量
if self.is_reached(pos, goal):
print('The robot has achieved goal !!')
if __name__ == '__main__':
# 根据map计算并保存cost_map
file_name = 'map_4.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
scene.init_world(1, 11)
scene = scene.Scene(sceneID=0)
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map, scale_ratio=4)
navigator.planner.compute_cost_map()
file_name = 'costMap_4.pkl'
if not os.path.exists(file_name):
open(file_name, 'w').close()
with open(file_name, 'wb') as file:
pickle.dump(navigator.planner.cost_map, file)
print('保存成功')

View File

@ -0,0 +1,128 @@
# D_star Lite 机器人任务规划
## 目录结构
### 坐标离散化
`discretize_map.py`
### 地图文件(选择缩放倍率)
`map_3.pkl`
`map_4.pkl`
`map_5.pkl`
### 导航类
`navigate.py`
### D_star Lite 算法实现
`dstar_lite.py`
### 测试文件
`test.py`
---
## 世界地图
### 实际坐标范围
`X: -350 ~ 600`
`Y: -400 ~ 1450`
### 5倍缩放后坐标范围
`X: -70 ~ 120`
`Y: -80 ~ 290`
### 网格地图
| Idx | Obj |
|-----|------------------|
| 0 | free |
| 1 | obstacle |
| 2 | dynamic obstacle |
### 代价地图
| Cost | Obj |
|---------|-----------------|
| 0 | free |
| 15-10-5 | obs周围3格 |
| inf | obstacle |
| 100 | dynamic obstacle |
---
## 参数
`机器人步长` 150
`机器人速度` 150
`机器人观测范围` 300
`行人半径` 36
`目标判达距离` 50
`机器人移动dis_limit` 10
---
## 使用方法
```python
# 选择缩放合适的地图3、4、5
file_name = 'map_4.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
# 初始化场景
scene.init_world(1, 11)
scene = scene.Scene(sceneID=0)
# 舒适化导航类
# (需要传入:场景、实际地图范围、离散化地图、缩放比例)
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map, scale_ratio=4)
# 设置目标
goal = (0, 0)
# 导航
# (animation: 选择是否画出导航过程)
navigator.navigate(goal, animation=False)
```
---
## 可靠性保证
`目标合法性保证`
- 目标在地图外:重置目标为最近的地图内位置
- 目标在静态障碍物:从当前位置不断向外圈扩展直到找到合法位置,重置目标为该位置
`规划sbgoal合法性保证`
- 规划subgoal被动态障碍物占据重新规划路径
`起点合法性保证`
- 起点在静态障碍物:从当前位置不断向外圈扩展直到找到合法位置,重置起点为该位置
- 起点在动态障碍物范围内:缩小动态障碍物半径,保证起点位置为空闲
`机器人朝向保证`
- 机器人始终朝向每一步的移动方向
`规划抖动解决`
- 规划路径不允许有重复点
`避免机器人沿障碍物行走`
- 障碍物扩张:在代价地图`cost_map`中静态障碍物周围的空位也会受到影响并产生cost

View File

@ -1,22 +1,19 @@
import os
import pickle
import time
import random
import math
import matplotlib.pyplot as plt
import numpy as np
from robowaiter.scene import scene
# from navigate import Navigator
from robowaiter.algos.navigate.navigate import Navigator
from robowaiter.algos.navigator.navigate import Navigator
if __name__ == '__main__':
# def navigate_test(scene):
file_name = 'map_5.pkl'
# 选择缩放合适的地图3、4、5
file_name = 'map_4.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
@ -24,21 +21,25 @@ if __name__ == '__main__':
scene.init_world(1, 11)
scene = scene.Scene(sceneID=0)
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map)
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map, scale_ratio=4)
'''场景1: 无行人环境 robot到达指定目标'''
# goal = np.array((-100, 700))
# goal = (-100, 700)
# goal = (590, 1370)
# goal = (290, -240)
# goal = (-200, -200)
# goal = (-200, -50)
'''场景2: 静止行人环境 robot到达指定目标'''
# scene.clean_walker()
# scene.add_walker(50, 300, 0)
# scene.add_walker(-50, 500, 0)
# goal = np.array((-100, 700))
# goal = (-100, 700)
'''场景3: 移动行人环境 robot到达指定目标'''
scene.walk_to(100, 0, -90, dis_limit=10)
scene.walk_to(100, 0, -90, dis_limit=5)
scene.clean_walker()
scene.add_walkers([[50, 300],[-50, 500],[0, 700]])
scene.add_walkers([[50, 300], [-50, 500], [0, 700]])
# scene.add_walker(50, 300, 0)
# scene.add_walker(-50, 500, 0)
# scene.add_walker(0, 700, 0)
@ -47,12 +48,13 @@ if __name__ == '__main__':
scene.control_walker([scene.walker_control_generator(walkerID=2, autowalk=False, speed=50, X=0, Y=0, Yaw=0)])
# goal = (-100, 700)
# goal = (-300)
# goal = (340.0, 900.0)
# goal = (-200, 700) # 目标在障碍物测试
# goal = (-400, 700) # 目标在地图外测试
goal = (10000, 10000) # 目标在地图外测试
# goal = (-220, 300)
# goal = (-280, 400)
# goal = (-230, 600)
# goal = (240.0, 1000.0)
# goal = (340.0, 900.0)
goal = (240.0, 1160.0)
'''场景4: 行人自由移动 robot到达指定目标'''
# # TODO: autowalk=True仿真器会闪退 ???
@ -62,7 +64,7 @@ if __name__ == '__main__':
# scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=True, speed=20, X=0, Y=0, Yaw=0)])
# scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=True, speed=20, X=0, Y=0, Yaw=0)])
# time.sleep(5)
# goal = np.array((-100, 700))
# goal = (-100, 700)
navigator.navigate(goal, animation=True)

View File

@ -13,12 +13,12 @@ class Bahavior(ptree.behaviour.Behaviour):
'''
scene = None
print_name_prefix = ""
all_place = {'Bar', 'Bar2', 'WaterTable', 'CoffeeTable', 'Table1', 'Table2', 'Table3'}
# all_place = {'Bar', 'Bar2', 'WaterTable', 'CoffeeTable', 'Table1', 'Table2', 'Table3'}
# all_object = {'Coffee', 'Water', 'Dessert', 'Softdrink', 'BottledDrink', 'Yogurt', 'ADMilk', 'MilkDrink', 'Milk',
# 'VacuumCup'}
# all_place = {'Bar', 'WaterTable', 'CoffeeTable'}
all_place = {'Bar', 'WaterTable', 'CoffeeTable'}
# all_object = {'Coffee', 'Water', 'Dessert', 'Softdrink', 'Yogurt'}
all_object = {'Coffee'}
all_object = {'Coffee', 'Water'}
place_xyz_dic={
'Bar': (247.0, 520.0, 100.0),
'Bar2': (240.0, 40.0, 70.0),
@ -28,6 +28,13 @@ class Bahavior(ptree.behaviour.Behaviour):
'Table2': (-55.0, 0.0, 107),
'Table3':(-55.0, 150.0, 107)
}
container_dic={
'Coffee':'CoffeeCup',
'Water': 'Glass',
'Dessert':'Plate'
}
@classmethod
def get_ins_name(cls,*args):

View File

@ -14,11 +14,11 @@ class Make(Act):
super().__init__(*args)
self.target_obj = self.args[0]
self.op_type = 1
if self.target_obj=="Coffee":
if self.target_obj==self.valid_args[0]:
self.op_type = 1
elif self.target_obj=="Water":
elif self.target_obj==self.valid_args[1]:
self.op_type = 2
elif self.target_obj=="Dessert":
elif self.target_obj==self.valid_args[2]:
self.op_type = 3
@ -28,12 +28,12 @@ class Make(Act):
info["pre"]= {f'Holding(Nothing)'}
info['del_set'] = set()
info['add'] = {f'Exist({arg})'}
if arg == "Coffee":
info["add"] |= {f'On(Coffee,CoffeeTable)'}
elif arg == "Water":
info["add"] |= {f'On(Water,WaterTable)'}
elif arg == "Dessert":
info["add"] |= {f'On(Dessert,Bar)'}
if arg == cls.valid_args[0]:
info["add"] |= {f'On({arg},CoffeeTable)'}
elif arg == cls.valid_args[1]:
info["add"] |= {f'On({arg},WaterTable)'}
elif arg == cls.valid_args[2]:
info["add"] |= {f'On({arg},Bar)'}
return info
def _update(self) -> ptree.common.Status:
@ -43,16 +43,20 @@ class Make(Act):
# self.scene.gen_obj(type=40)
obj_dict = self.scene.status.objects
if len(obj_dict) != 0:
# 获取obj_id
for id, obj in enumerate(obj_dict):
if obj.name == "Coffee":
obj_info = obj_dict[id]
obj_x, obj_y, obj_z = obj_info.location.X, obj_info.location.Y, obj_info.location.Z
print(id,obj.name,obj_x,obj_y,obj_z)
# obj_dict = self.scene.status.objects
# if len(obj_dict) != 0:
# # 获取obj_id
# for id, obj in enumerate(obj_dict):
# print("id:",id,"obj",obj.name)
# if obj.name == "Coffee":
# obj_info = obj_dict[id]
# obj_x, obj_y, obj_z = obj_info.location.X, obj_info.location.Y, obj_info.location.Z
# print(id,obj.name,obj_x,obj_y,obj_z)
self.scene.state["condition_set"] |= (self.info["add"])
self.scene.state["condition_set"] -= self.info["del_set"]
# print("condition_set:",self.scene.state["condition_set"])
return Status.RUNNING

View File

@ -1,6 +1,6 @@
import py_trees as ptree
from robowaiter.behavior_lib._base.Act import Act
from robowaiter.algos.navigate.navigate import Navigator
from robowaiter.algos.navigator.navigate import Navigator
class MoveTo(Act):
can_be_expanded = True
@ -21,6 +21,7 @@ class MoveTo(Act):
info['pre'] |= {f'Exist({arg})'}
info["add"] = {f'At(Robot,{arg})'}
info["del_set"] = {f'At(Robot,{place})' for place in cls.valid_args if place != arg}
info['cost']=10
return info
@ -29,30 +30,39 @@ class MoveTo(Act):
# navigator = Navigator(scene=self.scene, area_range=[-350, 600, -400, 1450], map=self.scene.state["map"]["2d"])
# goal = self.scene.state['map']['obj_pos'][self.args[0]]
# navigator.navigate(goal, animation=False)
# navigator.navigate_old(goal, animation=False)
# 走到固定的地点
if self.target_place in Act.place_xyz_dic:
goal = Act.place_xyz_dic[self.target_place]
self.scene.walk_to(goal[0],goal[1])
else: # 走到物品边上
self.scene.walk_to(goal[0]+1,goal[1])
# 走到物品边上
else:
# 是否用容器装好
if self.target_place in Act.container_dic:
target_name = Act.container_dic[self.target_place]
else:
target_name = self.target_place
# 根据物体名字找到最近的这类物体对应的位置
obj_id = -1
min_dis = float('inf')
obj_dict = self.scene.status.objects
if len(obj_dict)!=0:
# 获取obj_id
for id,obj in enumerate(obj_dict):
if obj.name == self.target_place:
obj_id = id
# obj_info = obj_dict[id]
# obj_x, obj_y, obj_z = obj_info.location.X, obj_info.location.Y, obj_info.location.Z
# ginger_x,ginger_y,ginger_z = [int(self.scene.location.X), int(self.scene.location.Y), int(self.scene.rotation.Yaw)]
break
if self.target_place == "CoffeeCup":
obj_id = 273
if obj.name == target_name:
obj_info = obj_dict[id]
dis = self.scene.cal_distance_to_robot(obj_info.location.X, obj_info.location.Y, obj_info.location.Z)
if dis<min_dis:
min_dis = dis
obj_id = id
# if self.target_place == "CoffeeCup":
# # obj_id = 273
# obj_id = 275
if obj_id == -1:
return ptree.common.Status.FAILURE
# print("self.target_place:",self.target_place,"id:",obj_id,"dis:",min_dis)
self.scene.move_to_obj(obj_id=obj_id)
# 为了演示,写死咖啡位置

View File

@ -25,14 +25,36 @@ class PickUp(Act):
def _update(self) -> ptree.common.Status:
# self.scene.test_move()
op_type=16
obj_id = 0
# op_type=16
# 遍历场景里的所有物品,根据名字匹配位置最近的 obj-id
# 是否用容器装好
if self.target_obj in Act.container_dic:
target_name = Act.container_dic[self.target_obj]
else:
target_name = self.target_obj
# 根据物体名字找到最近的这类物体对应的位置
obj_id = -1
min_dis = float('inf')
obj_dict = self.scene.status.objects
if len(obj_dict) != 0:
# 获取obj_id
for id, obj in enumerate(obj_dict):
if obj.name == target_name:
obj_info = obj_dict[id]
dis = self.scene.cal_distance_to_robot(obj_info.location.X, obj_info.location.Y,
obj_info.location.Z)
if dis < min_dis:
min_dis = dis
obj_id = id
# if self.target_place == "CoffeeCup":
# # obj_id = 273
# obj_id = 275
if obj_id == -1:
return ptree.common.Status.FAILURE
if self.args=="Coffee":
obj_id = 273
self.scene.op_task_execute(op_type, obj_id=obj_id)
self.scene.move_task_area(op_type=16, obj_id=obj_id)
self.scene.op_task_execute(op_type=16, obj_id=obj_id)
self.scene.state["condition_set"] |= (self.info["add"])
self.scene.state["condition_set"] -= self.info["del_set"]

View File

@ -22,6 +22,8 @@ class PutDown(Act):
info["pre"] = {f'Holding({arg[0]})',f'At(Robot,{arg[1]})'}
info["add"] = {f'Holding(Nothing)',f'On({arg[0]},{arg[1]})'}
info["del_set"] = {f'Holding({arg[0]})'}
info['cost'] = 100
return info
@ -31,8 +33,11 @@ class PutDown(Act):
release_pos = list(Act.place_xyz_dic[self.target_place])
# # 原始吧台处:[247.0, 520.0, 100.0], 空调开关旁吧台:[240.0, 40.0, 70.0], 水杯桌:[-70.0, 500.0, 107]
# # 桌子2:[-55.0, 0.0, 107],桌子3:[-55.0, 150.0, 107]
self.scene.move_task_area(op_type, release_pos=release_pos)
self.scene.op_task_execute(op_type, release_pos=release_pos)
self.scene.state["condition_set"] |= (self.info["add"])
self.scene.state["condition_set"] -= self.info["del_set"]
print("After PutDown condition_set:",self.scene.state["condition_set"])
return Status.RUNNING

View File

@ -16,6 +16,9 @@ class On(Cond):
def _update(self) -> ptree.common.Status:
# if self.scene.status?
# print("self.name:",self.name)
# print("On: condition_set:",self.scene.state["condition_set"])
if self.name in self.scene.state["condition_set"]:
return ptree.common.Status.SUCCESS
else:

View File

@ -2,6 +2,8 @@ import copy
import random
from robowaiter.behavior_tree.obtea.BehaviorTree import Leaf,ControlBT
class CondActPair:
def __init__(self, cond_leaf,act_leaf):
self.cond_leaf = cond_leaf
@ -54,7 +56,10 @@ class OptBTExpAlgorithm:
self.conditions_index = []
#运行规划算法从初始状态、目标状态和可用行动计算行为树self.bt
def run_algorithm(self,goal,actions):
def run_algorithm(self,goal,actions,scene):
self.scene = scene
if self.verbose:
print("\n算法开始!")
@ -99,8 +104,13 @@ class OptBTExpAlgorithm:
sequence_structure.add_child(
[copy.deepcopy(pair_node.cond_leaf), copy.deepcopy(pair_node.act_leaf)])
subtree.add_child([copy.deepcopy(sequence_structure)]) # subtree 是回不断变化的它的父亲是self.bt
# 增加实时条件判断,满足条件就不再扩展
# if c <= self.scene.state["condition_set"]:
# return True
else:
subtree.add_child([copy.deepcopy(pair_node.act_leaf)])
if self.verbose:
print("完成扩展 a_node= %s,对应的新条件 c_attr= %s,mincost=%d" \
% (cond_anc_pair.act_leaf.content.name, cond_anc_pair.cond_leaf.content,
@ -128,14 +138,14 @@ class OptBTExpAlgorithm:
break
if valid:
# 把符合条件的动作节点都放到列表里
if self.verbose:
print("———— -- %s 符合条件放入列表" % actions[i].name)
c_attr_node = Leaf(type='cond', content=c_attr, mincost=current_mincost + actions[i].cost)
a_attr_node = Leaf(type='act', content=actions[i], mincost=current_mincost + actions[i].cost)
cond_anc_pair = CondActPair(cond_leaf=c_attr_node, act_leaf=a_attr_node)
self.nodes.append(copy.deepcopy(cond_anc_pair)) # condition node list
self.traversed.append(c_attr) # 重点 the set of expanded conditions
# 把符合条件的动作节点都放到列表里
if self.verbose:
print("———— -- %s 符合条件放入列表,对应的c为 %s" % (actions[i].name,c_attr))
if self.verbose:
print("算法结束!\n")
@ -178,13 +188,23 @@ class OptBTExpAlgorithm:
# 树的dfs
def dfs_ptml(self,parnode):
def dfs_ptml(self,parnode,is_root=False):
for child in parnode.children:
if isinstance(child, Leaf):
if child.type == 'cond':
self.ptml_string += "cond "
c_set_str = '\n cond '.join(map(str, child.content)) + "\n"
self.ptml_string += c_set_str
if is_root and len(child.content) > 1:
# 把多个 cond 串起来
self.ptml_string += "sequence{\n"
self.ptml_string += "cond "
c_set_str = '\n cond '.join(map(str, child.content)) + "\n"
self.ptml_string += c_set_str
self.ptml_string += '}\n'
else:
self.ptml_string += "cond "
c_set_str = '\n cond '.join(map(str, child.content)) + "\n"
self.ptml_string += c_set_str
elif child.type == 'act':
if '(' not in child.content.name:
self.ptml_string += 'act ' + child.content.name + "()\n"
@ -202,7 +222,7 @@ class OptBTExpAlgorithm:
def get_ptml(self):
self.ptml_string = "selector{\n"
self.dfs_ptml(self.bt.children[0])
self.dfs_ptml(self.bt.children[0],is_root=True)
self.ptml_string += '}\n'
return self.ptml_string

View File

@ -6,7 +6,7 @@ from robowaiter.behavior_tree.obtea.examples import *
# 封装好的主接口
class BTOptExpInterface:
def __init__(self, action_list):
def __init__(self, action_list,scene):
"""
Initialize the BTOptExpansion with a list of actions.
:param action_list: A list of actions to be used in the behavior tree.
@ -22,6 +22,8 @@ class BTOptExpInterface:
self.actions = action_list
self.has_processed = False
self.scene = scene
def process(self, goal):
"""
Process the input sets and return a string result.
@ -31,7 +33,7 @@ class BTOptExpInterface:
self.goal = goal
self.algo = OptBTExpAlgorithm(verbose=False)
self.algo.clear()
self.algo.run_algorithm(self.goal, self.actions) # 调用算法得到行为树保存至 algo.bt
self.algo.run_algorithm(self.goal, self.actions,self.scene) # 调用算法得到行为树保存至 algo.bt
self.ptml_string = self.algo.get_ptml()
self.has_processed = True
# algo.print_solution() # print behavior tree

View File

@ -1 +1 @@
{"测试VLM做一杯咖啡": {"Answer": "测试VLM做一杯咖啡", "Goal": "{\"On(Coffee,CoffeeTable)\"}"}, "测试VLM做一杯咖啡放到吧台上": {"Answer": "测试VLM做一杯咖啡放到吧台上", "Goal": "{\"On(Coffee,Bar)\"}"}, "测试VLM做一杯咖啡放到水杯桌上并倒水": {"Answer": "测试VLM做一杯咖啡放到水杯桌上并倒水", "Goal": "{\"On(Coffee,WaterTable)\"}"}, "测试VLN前往2号桌": {"Answer": "测试VLN前往2号桌", "Goal": "{\"At(Robot,Table2)\"}"}, "测试AEM": {"Answer": "测试AEM", "Goal": "{\"EnvExplored()\"}"}, "测试VLM倒一杯水": {"Answer": "测试VLM倒一杯水", "Goal": "{\"On(Water,WaterTable)\"}"}, "测试VLM开空调": {"Answer": "测试VLM开空调", "Goal": "{\"Is(AC,On)\"}"}, "测试VLM关空调": {"Answer": "测试VLM关空调", "Goal": "{\"Is(AC,Off)\"}"}, "测试VLM关大厅灯": {"Answer": "测试VLM关大厅灯", "Goal": "{\"Is(HallLight,Off)\"}"}, "测试VLM开大厅灯": {"Answer": "测试VLM开大厅灯", "Goal": "{\"Is(HallLight,On)\"}"}, "测试VLM关筒灯": {"Answer": "测试VLM关筒灯", "Goal": "{\"Is(TubeLight,Off)\"}"}, "测试VLM开筒灯": {"Answer": "测试VLM开筒灯", "Goal": "{\"Is(TubeLight,On)\"}"}, "测试VLM关窗帘": {"Answer": "测试VLM关窗帘", "Goal": "{\"Is(Curtain,Off)\"}"}, "测试VLM开窗帘": {"Answer": "测试VLM开窗帘", "Goal": "{\"Is(Curtain,On)\"}"}, "测试VLM拖地": {"Answer": "测试VLM拖地", "Goal": "{\"Is(Floor,Clean)\"}"}, "测试VLM擦桌子": {"Answer": "测试VLM擦桌子", "Goal": "{\"Is(Table1,Clean)\"}"}, "测试VLM整理椅子": {"Answer": "测试VLM整理椅子", "Goal": "{\"Is(Chairs,Clean)\"}"}, "测试VLM把冰红茶放到Table2": {"Answer": "测试VLM把冰红茶放到Table2", "Goal": "{\"On(BottledDrink,Table2)\"}"}, "我有点热,能开个空调吗?": {"Answer": "当然可以,我现在就开!", "Goal": "{\"Is(AC,On)\"}"}, "可以带我去吗": {"Answer": "当然可以,前往一号桌", "Goal": "{\"At(Robot,Table1)\"}"}}
{"测试VLM做一杯咖啡": {"Answer": "测试VLM做一杯咖啡", "Goal": "{\"On(Coffee,CoffeeTable)\"}"}, "测试VLM做一杯咖啡放到吧台上": {"Answer": "测试VLM做一杯咖啡放到吧台上", "Goal": "{\"On(Coffee,Bar)\"}"}, "测试VLM做一杯咖啡放到水杯桌上,再倒一杯水": {"Answer": "测试VLM做一杯咖啡放到水杯桌上再倒一杯水", "Goal": "{\"On(Coffee,WaterTable)\",\"On(Water,WaterTable)\"}"}, "测试VLN前往2号桌": {"Answer": "测试VLN前往2号桌", "Goal": "{\"At(Robot,Table2)\"}"}, "测试AEM": {"Answer": "测试AEM", "Goal": "{\"EnvExplored()\"}"}, "测试VLM倒一杯水": {"Answer": "测试VLM倒一杯水", "Goal": "{\"On(Water,WaterTable)\"}"}, "测试VLM开空调": {"Answer": "测试VLM开空调", "Goal": "{\"Is(AC,On)\"}"}, "测试VLM关空调": {"Answer": "测试VLM关空调", "Goal": "{\"Is(AC,Off)\"}"}, "测试VLM关大厅灯": {"Answer": "测试VLM关大厅灯", "Goal": "{\"Is(HallLight,Off)\"}"}, "测试VLM开大厅灯": {"Answer": "测试VLM开大厅灯", "Goal": "{\"Is(HallLight,On)\"}"}, "测试VLM关筒灯": {"Answer": "测试VLM关筒灯", "Goal": "{\"Is(TubeLight,Off)\"}"}, "测试VLM开筒灯": {"Answer": "测试VLM开筒灯", "Goal": "{\"Is(TubeLight,On)\"}"}, "测试VLM关窗帘": {"Answer": "测试VLM关窗帘", "Goal": "{\"Is(Curtain,Off)\"}"}, "测试VLM开窗帘": {"Answer": "测试VLM开窗帘", "Goal": "{\"Is(Curtain,On)\"}"}, "测试VLM拖地": {"Answer": "测试VLM拖地", "Goal": "{\"Is(Floor,Clean)\"}"}, "测试VLM擦桌子": {"Answer": "测试VLM擦桌子", "Goal": "{\"Is(Table1,Clean)\"}"}, "测试VLM整理椅子": {"Answer": "测试VLM整理椅子", "Goal": "{\"Is(Chairs,Clean)\"}"}, "测试VLM把冰红茶放到Table2": {"Answer": "测试VLM把冰红茶放到Table2", "Goal": "{\"On(BottledDrink,Table2)\"}"}, "我有点热,能开个空调吗?": {"Answer": "当然可以,我现在就开!", "Goal": "{\"Is(AC,On)\"}"}, "可以带我去吗": {"Answer": "当然可以,前往一号桌", "Goal": "{\"At(Robot,Table1)\"}"}}

View File

@ -1,7 +1,7 @@
Question,Answer,Goal
测试VLM做一杯咖啡,测试VLM做一杯咖啡,"{""On(Coffee,CoffeeTable)""}"
测试VLM做一杯咖啡放到吧台上,测试VLM做一杯咖啡放到吧台上,"{""On(Coffee,Bar)""}"
测试VLM做一杯咖啡放到水杯桌上并倒水,测试VLM做一杯咖啡放到水杯桌上并倒水,"{""On(Coffee,WaterTable)""}"
测试VLM做一杯咖啡放到水杯桌上,再倒一杯水,测试VLM做一杯咖啡放到水杯桌上再倒一杯水,"{""On(Coffee,WaterTable)"",""On(Water,WaterTable)""}"
测试VLN前往2号桌,测试VLN前往2号桌,"{""At(Robot,Table2)""}"
测试AEM,测试AEM,"{""EnvExplored()""}"
测试VLM倒一杯水,测试VLM倒一杯水,"{""On(Water,WaterTable)""}"

1 Question Answer Goal
2 测试VLM:做一杯咖啡 测试VLM:做一杯咖啡 {"On(Coffee,CoffeeTable)"}
3 测试VLM:做一杯咖啡放到吧台上 测试VLM:做一杯咖啡放到吧台上 {"On(Coffee,Bar)"}
4 测试VLM:做一杯咖啡放到水杯桌上并倒水 测试VLM:做一杯咖啡放到水杯桌上,再倒一杯水 测试VLM:做一杯咖啡放到水杯桌上并倒水 测试VLM:做一杯咖啡放到水杯桌上,再倒一杯水 {"On(Coffee,WaterTable)"} {"On(Coffee,WaterTable)","On(Water,WaterTable)"}
5 测试VLN:前往2号桌 测试VLN:前往2号桌 {"At(Robot,Table2)"}
6 测试AEM 测试AEM {"EnvExplored()"}
7 测试VLM:倒一杯水 测试VLM:倒一杯水 {"On(Water,WaterTable)"}

View File

@ -24,4 +24,3 @@ with open(csv_file_path, mode='r', encoding='gbk') as csv_file, \
json_str = json.dumps(output_dict, ensure_ascii=False)
# 将JSON字符串写入JSONL文件并添加换行符
jsonl_file.write(json_str + '\n')
s

Binary file not shown.

View File

@ -0,0 +1,40 @@
你能到另一个吧台这边来吗? create_sub_task {At(Robot,Bar2)} 当然可以,我现在过去,请您稍等
哈喽!服务员,在吧台那里借用下充电宝行不行? create_sub_task {At(Robot,Bar)} 没问题!我马上就过去.
麻烦拿个空杯子过来。 create_sub_task {At(Robot,WaterTable)} 没问题,马上给您拿一个空杯子过来。
你好,我想点一杯咖啡。 create_sub_task {At(Robot,Table3)} 当然,请稍等片刻,我马上为您准备。
你好,请问有没有空位? create_sub_task {At(Robot,Table1)} 是的,我们还有空座位。请随意选择一个位置坐下即可。
Hi! there! Could you bring me a milkdrink? Thankyou! create_sub_task {On(MilkDrink,CoffeeTable)} Certainly! I will bring it to you right away.
请将甜点和牛奶饮料放到2号桌上。 create_sub_task {On(Dessert,Table2),On(MilkDrink,Table2)} 好的甜点和牛奶饮料将马上送到2号桌上。
请把AD钙奶放到咖啡台上。 create_sub_task {On(ADMilk,CoffeeTable)} 好的马上为您放置AD钙奶。
请给我一瓶瓶装冰红茶,我在旁边的吧台这里。 create_sub_task {On(BottledDrink,Bar2)} 好的,我会马上为您准备一瓶瓶装冰红茶。
请给我来杯酸奶,在吧台处。 create_sub_task {On(Yogurt,Bar)} 当然可以,请稍等片刻,我们将为您准备杯酸奶并放置于吧台处。
可以打开大厅灯并且关掉筒灯吗? create_sub_task {Is(HallLight,On),Is(TubeLight,Off)} 当然,我会打开大厅的灯并关闭筒灯.
请打开空调并调低温度。 create_sub_task {Is(AC,On),Is(ACTemperature,Down)} 好的,空调已经打开,并且温度已经调低了。
你好,能不能给我整理一下这边的椅子? create_sub_task {Is(Chairs,Clean)} 当然,请稍等片刻。
可以帮我打开空调吗?这里太热了。 create_sub_task {Is(AC,On)} 当然可以,我会为您打开空调并将温度降低到适宜的程度。
这里太暗了,你能帮我打开筒灯吗? create_sub_task {Is(TubeLight,On)} 当然可以,请稍等片刻。马上为您打开筒灯。
请给我一个酸奶。 create_sub_task {Holding(Yogurt)} 好的,请稍等片刻,马上就会有酸奶送到您手中。
我想要酸奶和牛奶饮料/真果粒 create_sub_task {Holding(Yogurt),Holding(MilkDrink)} 当然可以。这是您的酸奶和牛奶饮料/真果粒。享用吧!
抱歉打扰了,请问有什么推荐的甜点吗? create_sub_task {Holding(Nothing)} 当然!我们有很多美味的甜点供您选择。让我向您介绍一下...
可以帮忙递给我那瓶瓶装冰红茶吗? create_sub_task {Holding(BottledDrink)} 没问题,这是您要的瓶装冰红茶。
你能给我一个保温杯吗? create_sub_task {Holding(VacuumCup)} 当然,这是您的保温杯。
只需要打扫地板就行了,在这里喝我的咖啡。 create_sub_task {On(Coffee,Bar2),Is(Floor,Dirty)} 当然没问题!让我们先打扫地板再享受你的美味咖啡吧!
在这个桌子上放置我的咖啡之前能帮忙把它清理下吗? create_sub_task {On(Coffee,Bar2),Is(Table1,Dirty)} 当然没问题!让我们先将桌子清洁干净再放置您的咖啡。
你好!可以给我一个甜点吗? create_sub_task {On(Dessert,CoffeeTable)} 当然!我会为您准备一个甜点.
你好,请给我来杯咖啡。可以从吧台那里拿过来吗? create_sub_task {On(Coffee,Bar)} 当然,请稍等片刻。让我为您准备一杯咖啡并放在吧台上供您享用。
嘿,在吧台那里有甜点。麻烦拿一个过来。 create_sub_task {On(Dessert,Bar)} 当然,请稍等片刻,我会为您取一个甜点过来。
请问这里有没有牙膏? get_env_info 牙膏None 不好意思,我们是咖啡厅,暂时没有提供牙膏。如果您需要的话,可以去附近的超市购买。
你们这里卫生纸用完了吗?我需要拿几张。 get_env_info 卫生纸,柜台 非常抱歉,目前我们暂时缺货。但是您可以在柜台处领取湿巾来代替使用。给您带来不便深感抱歉。
我想要一杯Bernachon牛奶热巧克力,这里能做得到吧? get_env_info Bernachon牛奶热巧克力咖啡机 非常抱歉我们的咖啡机暂时不支持制作Bernachon牛奶热巧克力。但是您可以尝试其他口味的饮品或者选择其他喜欢的热巧克力。
请问我可以用洗手间吗? get_env_info 洗手间(卫生间) 卫生间门旁边的走廊上方指示牌上标注了“卫生间”字样。当然可以,请沿着卫生间门旁边的走廊前行即可看到指示牌。
请问这里有面包卖吗? get_env_info 面包,橱柜 是的,我们这里有面包出售。您可以去橱柜那边看看,选择自己喜欢口感和口味的面包哦!
我要买蛋糕,能告诉我蛋糕柜在哪里吗? get_env_info 蛋糕柜,吧台 当然可以!如果您想要购买蛋糕,我们的蛋糕柜就位于咖啡厅的吧台旁边.您只需要沿着吧台走一小段路,就能找到我们精心准备的美味甜点了.希望您享受这个愉快而甜蜜的时刻!
请问冰箱在哪里?我需要存放我的食物。 get_env_info 冰箱,橱柜 非常抱歉,我们咖啡厅并没有提供冰箱来为客人存放食物。不过您可以将食物交给服务员,并告知他们需要保存的时间和要求,他们会为您做好安排。
我想找一个舒适的沙发坐一下,能告诉我在哪里吗? get_env_info 沙发,休闲区 当然可以!我们咖啡厅内设有专门供客人使用的舒适沙发,您可以前往咖啡厅中央附近的休闲区寻找。
你们这里提供免费Wi-Fi服务吗?我需要上网工作。 get_env_info 无线网络,服务台 当然我们提供免费Wi-Fi服务给所有顾客使用。最接近无线网络信号强度最高处是在服务台周围,如果您需要连接互联网进行工作或其他事情,请尽管前去。
请问这里有没有香柜? get_env_info 香柜 非常抱歉,我们这里暂时没有提供香柜服务。不过您可以在附近的商店中寻找哦。
能告诉我今天特别推荐哪种咖啡吗? — — 今天特别推荐我们自家调配的手冲单品咖啡。每日精选不同产区豆子,在专业师傅细心烘焙后现场为您沏制出最佳风味。
请问你们在什么时间段营业? — — 我们从早上8点开始营业到晚上10点结束。欢迎随时来店享用美味的咖啡和舒适环境。
今天下午来得人很多,我需要等很久吗? — — 很抱歉给您带来不便。由于人流较多,可能会有一些等待时间。我们会尽快为您提供服务,请耐心稍候。
请问你们还接受支付宝或微信支付吗? — — 是的,我们支持支付宝和微信支付两种方式。方便快捷地完成付款流程,并确保了安全性。
请问附近是否有停车场?如果没有,那么最近哪个停车场离得比较近? — — 很抱歉,在咖啡馆周围暂时没有专门停车场。但距离不远处就有一个大型商业中心停车场可供选择。

View File

@ -0,0 +1,19 @@
import os
import sys
from tqdm import tqdm
sys.path.append(os.path.join('../../llm_client'))
from tool_api import run_conversation
from tool_register import get_tools
functions = get_tools()
with open(os.path.join('./validate_data.txt'), 'r', encoding='utf-8') as file:
lines = file.readlines()
lines = [line[:-1].split('\t') for line in lines]
with open(os.path.join('./validate_llm_1.txt'), 'w', encoding='utf-8') as file:
for line in tqdm(lines):
query = line[0]
file.write(str(run_conversation(query=query, stream=False)) + '\n')

View File

@ -0,0 +1,40 @@
{'Answer': '\n 好的,请问您需要我帮忙做什么呢?', 'Goal': None}
{'Answer': '好的', 'Goal': 'At(ChargingPower,Bar)'}
{'Answer': '\n 好的,请告诉我您需要把杯子放在哪里?', 'Goal': None}
{'Answer': '\n 好的,我可以帮你点一杯咖啡。请告诉我你想要什么口味的咖啡?', 'Goal': None}
{'Answer': '\n 好的,请问您想坐在哪一张桌子呢?', 'Goal': None}
{'Answer': "\n Sure! I'll get you a milk drink. What size would you like?<|assistant|> \n How about a large one?", 'Goal': None}
{'Answer': '好的', 'Goal': 'At(Dessert, Table2)'}
{'Answer': '好的', 'Goal': 'At(Coffee, Stage)'}
{'Answer': '\n 根据您的要求,我已经在旁边找到了一瓶瓶装冰红茶,放在了对应的桌子上。请您享用!', 'Goal': None}
{'Answer': '好的', 'Goal': 'At(Bar,Yogurt)'}
{'Answer': '\n 当前场景中没有找到名称为“HallLight”的物体。', 'Goal': None}
{'Answer': '好的', 'Goal': 'Is(AC,On)'}
{'Answer': '\n 当然可以,请问你需要整理多少把椅子呢?', 'Goal': None}
{'Answer': '\n 好的,我可以帮你打开空调。请问你在哪个房间?', 'Goal': None}
{'Answer': '\n 好的,我会为您打开筒灯。请告诉我筒灯在哪里。', 'Goal': None}
{'Answer': '好的', 'Goal': 'Have(yogurt,1)'}
{'Answer': '好的', 'Goal': 'Have Yogurt and Milk Beverage with True Fruit Grape'}
{'Answer': '\n 你好,我可以帮你推荐一些甜点。请问你喜欢的口味是什么?', 'Goal': None}
{'Answer': '\n 很抱歉,我无法找到您要求的瓶装冰红茶。', 'Goal': None}
{'Answer': '\n 好的,请告诉我您的位置,我将为您送过去。', 'Goal': None}
{'Answer': '好的', 'Goal': 'Is(Floor,Clean)'}
{'Answer': '好的', 'Goal': 'At(Coordinates,OneTable)'}
{'Answer': '\n 当然可以,请问您想要什么类型的甜点呢?', 'Goal': None}
{'Answer': '\n 当然可以,您想要什么口味的咖啡呢?', 'Goal': None}
{'Answer': '\n 好的,请告诉我您想要哪个口味的甜点呢?', 'Goal': None}
{'Answer': '\n 这里的牙膏放在了桌子的抽屉里。', 'Goal': None}
{'Answer': '\n 好的,请问你需要多少张卫生纸呢?', 'Goal': None}
{'Answer': '好的', 'Goal': 'At(Bar,Bernachon牛奶热巧克力)'}
{'Answer': '\n 您不能使用洗手间。', 'Goal': None}
{'Answer': '\n 很抱歉,根据您的查询,我们无法找到任何面包。', 'Goal': None}
{'Answer': '\n 很抱歉,我无法找到蛋糕柜的位置。', 'Goal': None}
{'Answer': '\n 您可以到厨房里找一下冰箱,一般来说它都会放在 cabinets 和 countertop 之间。', 'Goal': None}
{'Answer': '\n 很抱歉,我无法找到舒适的沙发。', 'Goal': None}
{'Answer': '\n 你好是的我们这里提供免费的Wi-Fi服务。请问你需要连接吗', 'Goal': None}
{'Answer': '\n 根据您的询问,我查询了当前场景中的物品信息,但是没有找到香柜的存在。', 'Goal': None}
{'Answer': '\n 你好,我可以帮你推荐一些咖啡。请问你想要什么口味的咖啡呢?', 'Goal': None}
{'Answer': '\n 根据我所了解到的信息这家商店的营业时间没有具体的规定。不过通常来说商店的营业时间一般是在早上8点至晚上8点之间。如果您需要了解更多详细的信息建议您直接联系商店的管理人员。', 'Goal': None}
{'Answer': '\n 根据当前情况,您可能需要等待一段时间。', 'Goal': None}
{'Answer': '\n 我目前没有收微信或支付宝的功能。不过我们这里支持的是现金支付和银行卡支付。', 'Goal': None}
{'Answer': '\n 很抱歉,我无法找到附近的停车场和最近的一个停车场。建议您使用地图应用或询问周围的居民来获取相关信息。', 'Goal': None}

View File

@ -1,6 +1,6 @@
import json
import re
import openai
from colorama import init, Fore
from loguru import logger
import json
@ -9,6 +9,7 @@ import requests
import json
import urllib3
init(autoreset=True)
########################################
@ -18,7 +19,8 @@ init(autoreset=True)
# 忽略https的安全性警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
base_url = "https://45.125.46.134:25344" # 本地部署的地址,或者使用你访问模型的API地址
base_url = "https://45.125.46.134:25344" # 本地部署的地址,或者使用你访问模型的API地址
def get_response(**kwargs):
data = kwargs
@ -27,12 +29,14 @@ def get_response(**kwargs):
decoded_line = response.json()
return decoded_line
functions = get_tools()
def run_conversation(query: str, stream=False, max_retry=5):
def run_conversation(query: str, stream=False, max_retry=5):
params = dict(model="chatglm3", messages=[{"role": "user", "content": query}], stream=stream)
params["functions"] = functions
print(params)
# print(params)
response = get_response(**params)
for _ in range(max_retry):
@ -69,7 +73,42 @@ def run_conversation(query: str, stream=False, max_retry=5):
response = get_response(**params)
def run_conversation_for_test_only(query: str, stream=False, max_retry=5):
params = dict(model="chatglm3", messages=[{"role": "user", "content": query}], stream=stream)
params["functions"] = functions
response = get_response(**params)
response_string = ''
for _ in range(max_retry):
if response["choices"][0]["message"].get("function_call"):
function_call = response["choices"][0]["message"]["function_call"]
response_string += f"Function Call: {function_call} \t"
function_args = json.loads(function_call["arguments"])
if function_call["name"]:
tool_response = dispatch_tool(function_call["name"], function_args)
response_string += f"Tool Call: %s \t" % (re.sub(r'\n', '', tool_response))
else:
response_string += f"LLM Cannot find the function call."
params["messages"].append(response["choices"][0]["message"])
params["messages"].append(
{
"role": "function",
"name": function_call["name"],
"content": tool_response, # 调用函数返回结果
}
)
response = get_response(**params)['choices'][0]
return response_string + "\tResponse: " + str(response)
else:
reply = response["choices"][0]["message"]["content"]
response_string += f"Final Reply: %s" % (re.sub(r'\n', '', reply))
response = get_response(**params)['choices'][0]
return response_string + "\tResponse: " + str(response)
if __name__ == "__main__":
query = "可以带我去吗"
print(run_conversation(query, stream=False))
print(run_conversation_for_test_only(query, stream=False))

View File

@ -240,7 +240,7 @@ def get_semantic_map(camera, cur_objs, objs_name):
scene = Observe(0)
objs = scene.objects
img_data = get_camera([camera])
show_image(img_data, scene)
# show_image(img_data, scene)
objs_name = save_obj_info(img_data, objs_name)
for obj_name in list(objs_name):
for obj in objs:

BIN
robowaiter/proto/map_1.pkl Normal file

Binary file not shown.

View File

@ -62,7 +62,7 @@ class Robot(object):
print("--------------------\n")
algo = BTOptExpInterface(self.action_list)
algo = BTOptExpInterface(self.action_list,self.scene)
ptml_string = algo.process(goal)

View File

@ -1,3 +1,4 @@
import sys
import time
import grpc
import numpy as np
@ -108,6 +109,7 @@ class Scene:
self.visited = set()
self.all_frontier_list = set()
self.semantic_map = semantic_map
self.auto_map = np.ones((800, 1550))
def reset(self):
@ -410,8 +412,12 @@ class Scene:
walk_v = [obj_x + 40, obj_y - 35, 130, 180, 0]
obj_x += 3
obj_y += 2.5
walk_v[0]+=1
print("walk:",walk_v)
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = stub.Do(action)
print("After Walk Position:", [scene.location.X, scene.location.Y, scene.rotation.Yaw])
# 移动到进行操作任务的指定地点
def move_task_area(self,op_type,obj_id=0, release_pos=[247.0, 520.0, 100.0]):
@ -441,7 +447,7 @@ class Scene:
walk_v = release_pos[:-1] + [180, 180, 0]
if release_pos == [340.0, 900.0, 99.0]:
walk_v[2] = 130
print("walk_v:",walk_v)
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = stub.Do(action)
print("After Walk Position:", [scene.location.X, scene.location.Y, scene.rotation.Yaw])
@ -540,7 +546,7 @@ class Scene:
return True
# 执行过程:输出"开始(任务名)" -> 按步骤数执行任务 -> Robot输出成功或失败的对话
def op_task_execute(self,op_type,obj_id=0,release_pos=[240,-140]):
def op_task_execute(self,op_type,obj_id=0,release_pos=[247.0, 520.0, 100.0]):
self.control_robot_action(0, 1, "开始"+self.op_dialog[op_type]) # 开始制作咖啡
if op_type<8: result = self.control_robot_action(op_type, 1)
if op_type>=8 and op_type<=12: result = self.control_robot_action(self.op_typeToAct[op_type][0], self.op_typeToAct[op_type][1])
@ -587,7 +593,7 @@ class Scene:
# v_list = [[0.0, 0.0]]
for walk_v in v_list:
walk_v = walk_v + [scene.rotation.Yaw - 90, 250, 60]
walk_v = walk_v + [scene.rotation.Yaw - 90, 250, 10]
print("walk_v", walk_v)
action = GrabSim_pb2.Action(scene=scene_id, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = stub.Do(action)
@ -596,7 +602,7 @@ class Scene:
print(scene.info)
return cur_objs, objs_name_set
def isOutMap(self, pos, min_x=-350, max_x=600, min_y=-400, max_y=1450):
def isOutMap(self, pos, min_x=-200, max_x=600, min_y=-250, max_y=1300):
if pos[0] <= min_x or pos[0] >= max_x or pos[1] <= min_y or pos[1] >= max_y:
return True
return False
@ -607,11 +613,13 @@ class Scene:
'''
# x = round((x - self.min_x) / self.scale_ratio)
# y = round((y - self.min_y) / self.scale_ratio)
x = math.floor((x + 350) / 5)
y = math.floor((y + 400) / 5)
x = math.floor((x + 200))
y = math.floor((y + 250))
return x, y
def explore(self, map, cur_pos, explore_range):
def explore(self, map, explore_range):
scene = stub.Observe(GrabSim_pb2.SceneID(value=0))
cur_pos = [int(scene.location.X), int(scene.location.Y)]
for i in range(cur_pos[0] - explore_range, cur_pos[0] + explore_range + 1):
for j in range(cur_pos[1] - explore_range, cur_pos[1] + explore_range + 1):
if self.isOutMap((i, j)):
@ -619,6 +627,7 @@ class Scene:
x, y = self.real2map(i, j)
if map[x, y] == 0:
self.visited.add((i, j))
self.auto_map[x][y] = 0
for i in range(cur_pos[0] - explore_range, cur_pos[0] + explore_range + 1):
for j in range(cur_pos[1] - explore_range, cur_pos[1] + explore_range + 1):
if self.isOutMap((i, j)):
@ -627,11 +636,26 @@ class Scene:
if map[x, y] == 0:
if self.isNewFrontier((i, j), map):
self.all_frontier_list.add((i, j))
if len(self.all_frontier_list) <= 400:
if len(self.all_frontier_list) == 0:
free_list = list(self.visited)
free_array = np.array(free_list)
print(f"探索完成!以下是场景中可以到达的点:{free_array};其余点均是障碍物不可达")
print(f"主动探索完成!以下是场景中可以到达的点:{free_array};其余点均是障碍物不可达")
# 画地图: X行Y列第一行在下面
plt.clf()
plt.imshow(self.auto_map, cmap='binary', alpha=0.5, origin='lower',
extent=(-250, 1300,
-200, 600))
plt.show()
print("已绘制完成地图!!!")
return None
# 画地图: X行Y列第一行在下面
plt.imshow(self.auto_map, cmap='binary', alpha=0.5, origin='lower',
extent=(-250, 1300,
-200, 600))
plt.show()
print("已绘制部分地图!")
return self.getNearestFrontier(cur_pos, self.all_frontier_list)
def isNewFrontier(self, pos, map):
@ -639,10 +663,37 @@ class Scene:
for node in around_nodes:
x, y = self.real2map(node[0], node[1])
if node not in self.visited and map[x, y] == 0:
if not self.isOutMap((node[0], node[1])) and node not in self.visited and map[x, y] == 0:
return True
if (pos[0], pos[1]) in self.all_frontier_list:
self.all_frontier_list.remove((pos[0], pos[1]))
return False
def getDistance(self, pos1, pos2):
return math.sqrt((pos1[0] - pos2[0]) ** 2 + (pos1[1] - pos2[1]) ** 2)
def getNearestFrontier(self, cur_pos, frontiers):
dis_min = sys.maxsize
frontier_best = None
for frontier in frontiers:
dis = self.getDistance(frontier, cur_pos)
if dis <= dis_min:
dis_min = dis
frontier_best = frontier
return frontier_best
def cal_distance_to_robot(self,objx,objy,objz):
scene = self.status
ginger_x, ginger_y, ginger_z = [int(scene.location.X), int(scene.location.Y),100]
return math.sqrt((ginger_x - objx) ** 2 + (ginger_y - objy) ** 2 + (ginger_z - objz) ** 2)
# def test_yaw(self):
# walk_v = [247.0, 480.0, 180.0, 180, 0]
# action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
# scene = stub.Do(action)
# time.sleep(4)
# walk_v = [247.0, 500.0, 0.0, 180, 0]
# action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
# scene = stub.Do(action)

View File

@ -2,6 +2,7 @@
环境主动探索和记忆
要求输出探索结果语义地图对环境重点信息记忆生成环境的语义拓扑地图和不少于10个环境物品的识别和位置记忆可以是图片或者文字或者格式化数据
"""
import pickle
from robowaiter.scene.scene import Scene
class SceneAEM(Scene):
@ -12,9 +13,25 @@ class SceneAEM(Scene):
pass
def _run(self):
cur_objs = []
objs_name_set = set()
file_name = '../../proto/map_1.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
print('------------ 自主探索 ------------')
cur_objs = self.semantic_map.navigation_move(cur_objs, 0, 11)
print("物品列表如下:")
# cur_objs = self.semantic_map.navigation_move(cur_objs, 0, 11)
# print("物品列表如下:")
# print(cur_objs)
# cur_pos = [int(scene.location.X), int(scene.location.Y)]
# print(reachable([237,490]))
# navigation_move([[237,490]], i, map_id)
# navigation_test(i,map_id)
while True:
goal = self.explore(map, 120) # cur_pos 指的是当前机器人的位置,场景中应该也有接口可以获取
if goal is None:
break
cur_objs, objs_name_set = self.navigation_move(cur_objs, objs_name_set, [[goal[0], goal[1]]], 0, 11)
print("------------物品信息--------------")
print(cur_objs)
pass

View File

@ -22,7 +22,10 @@ class SceneVLM(Scene):
# (5, self.create_chat_event("测试VLM把冰红茶放到Table2")),
# (5, self.create_chat_event("测试VLM关大厅灯"))
# (5, self.create_chat_event("测试VLM做一杯咖啡放到吧台上")),
(5, self.create_chat_event("测试VLM做一杯咖啡放到水杯桌上并倒水")),
(5, self.create_chat_event("测试VLM做一杯咖啡放到水杯桌上再倒一杯水")),
(10, self.create_chat_event("测试VLM关窗帘")),
# (5, self.create_chat_event("测试VLN前往2号桌")),
]
def _reset(self):
@ -35,6 +38,7 @@ class SceneVLM(Scene):
def _run(self, op_type=10):
# 共17个操作
# "制作咖啡","倒水","夹点心","拖地","擦桌子","开筒灯","搬椅子", # 1-7
# "关筒灯","开大厅灯","关大厅灯","关闭窗帘","打开窗帘", # 8-12
@ -58,27 +62,28 @@ class SceneVLM(Scene):
# 流程测试
# 抓握放置:抓吧台前生成的酸奶,放到抹布桌上
self.gen_obj()
self.move_task_area(16, obj_id=0)
self.op_task_execute(16, obj_id=0)
pos = [340.0, 900.0, 99.0]
self.move_task_area(17, release_pos=pos)
self.op_task_execute(17, release_pos=pos)
# self.move_task_area(16, obj_id=0)
# self.op_task_execute(16, obj_id=0)
# pos = [340.0, 900.0, 99.0]
# self.move_task_area(17, release_pos=pos)
# self.op_task_execute(17, release_pos=pos)
#
# # 做咖啡:做完的咖啡放到水杯桌上
# self.move_task_area(1)
# self.op_task_execute(1)
#
# self.find_obj("CoffeeCup")
#
# self.move_task_area(16, obj_id=275)
# self.op_task_execute(16, obj_id=275)
# pos = [-70.0, 500.0, 107]
# self.move_task_area(17, release_pos=pos)
# self.op_task_execute(17, release_pos=pos)
#
# # 倒水:倒完的水放到旁边桌子上
# self.move_task_area(2)
# self.op_task_execute(2)
# 做咖啡:做完的咖啡放到水杯桌上
self.move_task_area(1)
self.op_task_execute(1)
self.find_obj("CoffeeCup")
self.move_task_area(16, obj_id=275)
self.op_task_execute(16, obj_id=275)
pos = [-70.0, 500.0, 107]
self.move_task_area(17, release_pos=pos)
self.op_task_execute(17, release_pos=pos)
# 倒水:倒完的水放到旁边桌子上
self.move_task_area(2)
self.op_task_execute(2)
#
# self.move_task_area(16, obj_id=190)
# self.op_task_execute(16, obj_id=190)
@ -86,6 +91,8 @@ class SceneVLM(Scene):
# self.move_task_area(17, release_pos=pos)
# self.op_task_execute(17, release_pos=pos)
# self.test_yaw()
pass
def _step(self):

View File

@ -0,0 +1,123 @@
"""
视觉语言操作
机器人根据指令人的指令调节空调自主探索环境导航到目标点通过手臂的运动规划能力操作空调比如开关按钮调温按钮显示面板
"""
import time
from robowaiter.scene.scene import Scene
class SceneVLM(Scene):
def __init__(self, robot):
super().__init__(robot)
# 在这里加入场景中发生的事件, (事件发生的时间,事件函数)
self.event_list = [
# (5, self.create_chat_event("测试VLM做一杯咖啡")),
# (5, self.create_chat_event("测试VLM倒一杯水")),
# (5, self.create_chat_event("测试VLM开空调")),
# (5, self.create_chat_event("测试VLM关空调")),
# (5, self.create_chat_event("测试VLM开大厅灯")),
# (5, self.create_chat_event("测试VLM拖地")),
# (7, self.create_chat_event("测试VLM擦桌子")),
# (5, self.create_chat_event("测试VLM整理椅子")),
# (5, self.create_chat_event("测试VLM把冰红茶放到Table2")),
# (5, self.create_chat_event("测试VLM关大厅灯"))
# (5, self.create_chat_event("测试VLM做一杯咖啡放到吧台上")),
# (5, self.create_chat_event("测试VLM做一杯咖啡放到水杯桌上并倒水")),
# (8, self.create_chat_event("测试VLN前往1号桌")),
]
def _reset(self):
# self.gen_obj(type=5)
# self.gen_obj(type=9)
# self.op_task_execute(op_type=16, obj_id=0)
# self.move_task_area(op_type=4)
pass
def _run(self, op_type=10):
# 一个行人从门口走到 吧台
# 打招呼需要什么
# 行人说 哪里有位置,想晒个太阳
# 带领行人去有太阳的地方
# 行人说 有点热
# 好的,这就去开空调
scene = self.add_walkers([[0, 0]])
self.control_walker(
[self.walker_control_generator(walkerID=1, autowalk=False, speed=50, X=100, Y=150, Yaw=0)])
cont = scene.walkers[0].name+":我有点热,能开个空调吗?"
self.control_robot_action(0,3,cont)
# 共17个操作
# "制作咖啡","倒水","夹点心","拖地","擦桌子","开筒灯","搬椅子", # 1-7
# "关筒灯","开大厅灯","关大厅灯","关闭窗帘","打开窗帘", # 8-12
# "调整空调开关","调高空调温度","调低空调温度", # 13-15
# "抓握物体","放置物体" # 16-17
# self.gen_obj()
# if op_type <=15:
# self.move_task_area(op_type)
# self.op_task_execute(op_type)
# if op_type == 16: # 16: 抓操作需要传入物品id
# self.move_task_area(op_type, obj_id=0)
# self.op_task_execute(op_type, obj_id=0)
# # 原始吧台处:[247.0, 520.0, 100.0], 空调开关旁吧台:[240.0, 40.0, 100.0], 水杯桌:[-70.0, 500.0, 107]
# # 桌子1:[-55.0, 0.0, 107],抹布桌:[340.0, 900.0, 99.0] # 桌子2:[-55.0, 150.0, 107],
# if op_type == 17: # 17: 放操作需要传入放置位置周围的可达区域
# pos = [240.0, 40.0, 100.0]
# self.move_task_area(op_type, release_pos=pos)
# self.op_task_execute(op_type, release_pos=pos) # [325.0, 860.0, 100]
# 流程测试
# 抓握放置:抓吧台前生成的酸奶,放到抹布桌上
self.gen_obj()
# self.move_task_area(16, obj_id=0)
# self.op_task_execute(16, obj_id=0)
# pos = [340.0, 900.0, 99.0]
# self.move_task_area(17, release_pos=pos)
# self.op_task_execute(17, release_pos=pos)
#
# # 做咖啡:做完的咖啡放到水杯桌上
# self.move_task_area(1)
# self.op_task_execute(1)
#
# self.find_obj("CoffeeCup")
#
# self.move_task_area(16, obj_id=275)
# self.op_task_execute(16, obj_id=275)
# pos = [-70.0, 500.0, 107]
# self.move_task_area(17, release_pos=pos)
# self.op_task_execute(17, release_pos=pos)
#
# # 倒水:倒完的水放到旁边桌子上
# self.move_task_area(2)
# self.op_task_execute(2)
#
# self.move_task_area(16, obj_id=190)
# self.op_task_execute(16, obj_id=190)
# pos = [-55.0, 0.0, 107]
# self.move_task_area(17, release_pos=pos)
# self.op_task_execute(17, release_pos=pos)
# self.test_yaw()
pass
def _step(self):
pass
if __name__ == '__main__':
import os
from robowaiter.robot.robot import Robot
robot = Robot()
# create task
task = SceneVLM(robot)
task.reset()
task.run()

View File

View File

@ -16,8 +16,8 @@ from robowaiter.scene.scene import Scene,init_world # TODO: 文件名改成Scen
from robowaiter.scene.scene import Scene
from robowaiter.utils import get_root_path
from robowaiter.algos.navigate.navigate import Navigator
from robowaiter.algos.navigate import test
from robowaiter.algos.navigator.navigate import Navigator
from robowaiter.algos.navigator import test
class SceneVLN(Scene):
def __init__(self, robot):
@ -29,7 +29,7 @@ class SceneVLN(Scene):
def _reset(self):
root_path = get_root_path()
file_name = os.path.join(root_path,'robowaiter/algos/navigate/map_5.pkl')
file_name = os.path.join(root_path,'robowaiter/algos/navigator/map_5.pkl')
with open(file_name, 'rb') as file:
map = pickle.load(file)
@ -37,7 +37,7 @@ class SceneVLN(Scene):
self.state['map']['obj_pos']['Table'] = np.array((-100, 700))
def _run(self):
file_name = '../../algos/navigate/map_5.pkl'
file_name = '../../algos/navigator/map_5.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)

View File

@ -1,5 +1,10 @@
selector{
cond On(Coffee,CoffeeTable)
sequence{
cond On(Coffee,CoffeeTable)
cond On(Coffee,CoffeeTable)
}
sequence{
cond Holding(Nothing)
act Make(Coffee)