Merge remote-tracking branch 'origin/main'

This commit is contained in:
liwang zhang 2023-11-16 16:37:59 +08:00
commit bfb5dd3ef5
36 changed files with 764 additions and 469 deletions

View File

@ -1,4 +1,5 @@
import time import time
import math
import grpc import grpc
import numpy as np import numpy as np
@ -358,29 +359,29 @@ class Scene:
temp = stub.GetIKControlInfos(GrabSim_pb2.HandPostureInfos(scene=self.sceneID, handPostureObjects=HandPostureObject)) temp = stub.GetIKControlInfos(GrabSim_pb2.HandPostureInfos(scene=self.sceneID, handPostureObjects=HandPostureObject))
# 移动到进行操作任务的指定地点 # 移动到进行操作任务的指定地点
def move_task_area(self,op_type): # def move_task_area(self,op_type):
if op_type==11 or op_type==12: # 开关窗帘不需要移动 # if op_type==11 or op_type==12: # 开关窗帘不需要移动
return # return
scene = stub.Observe(GrabSim_pb2.SceneID(value=self.sceneID)) # scene = stub.Observe(GrabSim_pb2.SceneID(value=self.sceneID))
walk_value = [scene.location.X, scene.location.Y, scene.rotation.Yaw] # walk_value = [scene.location.X, scene.location.Y, scene.rotation.Yaw]
#
if op_type < 8: # if op_type < 8:
v_list = self.op_v_list[op_type] # v_list = self.op_v_list[op_type]
if op_type>=8 and op_type<=10: # 控灯 # if op_type>=8 and op_type<=10: # 控灯
v_list = self.op_v_list[6] # v_list = self.op_v_list[6]
if op_type in [13,14,15]: # 空调 # if op_type in [13,14,15]: # 空调
v_list = [[240, -140.0]] # KongTiao [300.5, -140.0] # 250 # v_list = [[240, -140.0]] # KongTiao [300.5, -140.0] # 250
# print("------------------error version----------------------")
print("------------------move_task_area----------------------") # print("------------------move_task_area----------------------")
print("Current Position:", walk_value,"开始任务:",self.op_dialog[op_type]) # print("Current Position:", walk_value,"开始任务:",self.op_dialog[op_type])
for walk_v in v_list: # for walk_v in v_list:
walk_v = walk_v + [scene.rotation.Yaw, 180, 0] # walk_v = walk_v + [scene.rotation.Yaw, 180, 0]
walk_v[2] = 0 if (op_type in [13,14,15]) else scene.rotation.Yaw # 空调操作朝向墙面 # walk_v[2] = 0 if (op_type in [13,14,15]) else scene.rotation.Yaw # 空调操作朝向墙面
action = GrabSim_pb2.Action( # action = GrabSim_pb2.Action(
scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v # scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v
) # )
scene = stub.Do(action) # scene = stub.Do(action)
print("After Walk Position:",[scene.location.X, scene.location.Y, scene.rotation.Yaw]) # print("After Walk Position:",[scene.location.X, scene.location.Y, scene.rotation.Yaw])
# 相应的行动,由主办方封装 # 相应的行动,由主办方封装
def control_robot_action(self, type=0, action=0, message="你好"): def control_robot_action(self, type=0, action=0, message="你好"):
@ -512,3 +513,4 @@ class Scene:
print(scene.info) print(scene.info)

View File

@ -1,59 +0,0 @@
# !/usr/bin/env python3
# -*- encoding: utf-8 -*-
import matplotlib.pyplot as plt
import numpy as np
import pickle
import os
def draw_grid_map(grid_map):
# 生成新的地图图像
plt.imshow(grid_map, cmap='binary', alpha=0.5, origin='lower') # 黑白网格
# 绘制坐标轴
plt.xlabel('y', loc='right')
plt.ylabel('x', loc='top')
# 显示网格线
plt.grid(color='black', linestyle='-', linewidth=0.5)
# 显示图像
plt.show()
#plt.pause(0.01)
if __name__ == '__main__':
# control.init_world(scene_num=1, mapID=3)
# scene = control.Scene(sceneID=0)
#
# X = int(950/5) # 采点数量
# Y = int(1850/5)
# map = np.zeros((X, Y))
#
# for x in range(X):
# for y in range(Y):
# if not scene.reachable_check(x*5-350, y*5-400, Yaw=0):
# map[x, y] = 1
# print(x, y)
#
#
# file_name = 'map_5.pkl'
# if not os.path.exists(file_name):
# open(file_name, 'w').close()
# with open(file_name, 'wb') as file:
# pickle.dump(map, file)
# print('保存成功')
file_name = 'map_5.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
draw_grid_map(map)

View File

@ -1,158 +0,0 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import math
import sys
import time
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
# from scene import control
# from rrt import RRT
# from rrt_star import RRTStar
# from apf import APF
from robowaiter.algos.navigate.dstar_lite import DStarLite, euclidean_distance
class Navigator:
'''
导航类
'''
def __init__(self, scene, area_range, map, scale_ratio=5, step_length=150, velocity=150, react_radius=300):
self.scene = scene
self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax
self.map = map # 缩放并离散化的地图 array(X,Y)
self.scale_ratio = scale_ratio # 地图缩放率
self.step_length = step_length # 步长(单次移动)
self.step_num = self.step_length // self.scale_ratio # 单次移动地图格数
self.v = velocity # 速度
self.react_radius = react_radius # robot反应半径
# self.planner = RRTStar(rand_area=area_range, map=map, scale_ratio=scale_ratio, max_iter=400, search_until_max_iter=True)
self.planner = DStarLite(area_range=area_range, map=map, scale_ratio=scale_ratio)
@staticmethod
def is_reached(pos: (float, float), goal: (float, float), dis_limit=30):
'''
判断是否到达目标
'''
dis = math.hypot(pos[0]-goal[0], pos[1]-goal[1])
# dis = np.linalg.norm(pos - goal)
return dis < dis_limit
@staticmethod
def get_yaw(pos: (float, float), goal: (float, float)):
'''
得到移动方向
'''
return math.degrees(math.atan2((goal[1] - pos[1]), (goal[0] - pos[0])))
def legalize_goal(self, goal: (float, float)):
'''
TODO: 处理非法目标
目标在障碍物上从目标开始方形向外扩展直到找到可行点
目标在地图外面起点和目标连线最靠近目标的可行点
'''
return goal
def navigate(self, goal: (float, float), animation=True):
'''
单次导航直到到达目标
'''
if not self.scene.reachable_check(goal[0], goal[1], 0):
goal = self.legalize_goal(goal)
pos = (self.scene.status.location.X, self.scene.status.location.Y) # 机器人当前: 位置 和 朝向
print('------------------navigation_start----------------------')
while not self.is_reached(pos, goal):
dyna_obs = [(walker.pose.X, walker.pose.Y) for walker in self.scene.status.walkers] # 动态障碍物(顾客)位置列表
dyna_obs = [obs for obs in dyna_obs if euclidean_distance(obs, pos) < self.react_radius] # 过滤观测范围外的dyna_obs
# 周围有dyna_obs则步长减半
if dyna_obs:
step_num = self.step_num // 2
else:
step_num = self.step_num
path = self.planner.planning(pos, goal, dyna_obs)
if path:
if animation:
self.planner.draw_graph(step_num) # 画出搜索路径
next_step = min(step_num, len(path))
next_pos = path[next_step - 1]
# print('plan pos:', next_pos, end=' ')
yaw = self.get_yaw(pos, next_pos)
# print("yaw:",yaw)
self.scene.walk_to(next_pos[0], next_pos[1], Yaw=yaw, velocity=self.v, dis_limit=10)
# pos = (self.scene.status.location.X, self.scene.status.location.Y)
# if self.is_reached(pos, next_pos):
self.planner.path = self.planner.path[next_step - 1:] # 去除已走过的路径
pos = (self.scene.status.location.X, self.scene.status.location.Y)
# print('reach pos:', pos)
self.planner.reset() # 完成一轮导航,重置变量
if self.is_reached(pos, goal):
print('The robot has achieved goal !!')
# def navigate(self, goal: (float, float), path_smoothing=True, animation=True):
# pos = np.array((self.scene.status.location.X, self.scene.status.location.Y)) # 机器人当前: 位置 和 朝向
# yaw = self.scene.status.rotation.Yaw
# print('------------------navigation_start----------------------')
#
# path = self.planner.planning(pos, goal, path_smoothing, animation)
# if path:
# self.planner.draw_graph(final_path=path) # 画出探索过程
# for (x, y) in path:
# self.scene.walk_to(x, y, yaw, velocity=self.v)
# time.sleep(self.step_time)
# pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
#
# self.planner.reset()
#
# if self.is_reached(pos, goal):
# print('The robot has achieved goal !!')
'''APF势场法暂不可用'''
# while not self.is_reached(pos, goal):
# # 1. 路径规划
# path = self.planner.planning(pos, goal, path_smoothing, animation)
# self.planner.draw_graph(final_path=path) # 画出探索过程
#
# # 2. 使用APF导航到路径中的每个waypoint
# traj = [(pos[0], pos[1])]
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# for i, waypoint in enumerate(path[1:]):
# print('waypoint [', i, ']:', waypoint)
# # if (not self.scene.reachable_check(waypoint[0], waypoint[1], yaw)) and self.map[self.planner.real2map(waypoint[0], waypoint[1])] == 0:
# # print('error')
# while not self.is_reached(pos, waypoint):
# # 2.1 计算next_step
# pos = np.array((self.scene.status.location.X, self.scene.status.location.Y))
# Pobs = [] # 障碍物(顾客)位置数组
# for walker in self.scene.status.walkers:
# Pobs.append((walker.pose.X, walker.pose.Y))
# next_step, _ = APF(Pi=pos, Pg=waypoint, Pobs=Pobs, step_length=self.step_length)
# traj.append((next_step[0], next_step[1]))
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# while not self.scene.reachable_check(next_step[0], next_step[1], yaw): # 取中点直到next_step可达
# traj.pop()
# next_step = (pos + next_step) / 2
# traj.append((next_step[0], next_step[1]))
# #self.planner.draw_graph(final_path=traj) # 画出探索过程
# # 2.2 移动robot
# self.scene.walk_to(next_step[0], next_step[1], yaw, velocity=self.v)
# # print(self.scene.status.info) # print navigation info
# # print(self.scene.status.collision)
# time.sleep(self.step_time)
# # print(self.scene.status.info) # print navigation info
# # print(self.scene.status.collision)
# self.planner.reset()

View File

@ -1,52 +0,0 @@
## 默认使用RRTStar+路径平滑
### apf.py: 势场法实现
### discretize_map.py: 地图离散化并压缩
### map_5.pkl: 地图文件(5倍压缩)
### navigate.py: 导航类
### pathsmoothing.py: 路径平滑
### rrt.py: RRT实现
### rrt_star.py: RRTStar 实现
### test.py: 测试文件
## TODO
### 目标不合法
#### 初始目标不合法
目标在障碍物上:从目标开始方形向外扩展,直到找到可行点
目标在地图外面:起点和目标连线最靠近目标的可行点
对不合法的目标做单独处理,生成新的目标
#### 在移动过程中目标被占据
给出目标但不会行移动,程序会继续运行,重新计算规划路径,给出新目标
### 规划中断情况
### 计算转向角
`完成`
### 有些本来可达的位置却无法走到系统bug?
例如从初始位置(247, 500) 移动到(115, -10),无法到达
`(已解决)将dis_limit设为小值5而非0`
#### 构型空间膨胀??
`不需要`
### 只考虑一定范围内的行人
观测范围 / 反应半径
`完成`
#### 观测范围内有行人步长要减小
`完成`

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,90 @@
# !/usr/bin/env python3
# -*- encoding: utf-8 -*-
import matplotlib.pyplot as plt
import numpy as np
import pickle
import os
from scipy.ndimage import binary_dilation
from scene import scene
def draw_grid_map(grid_map):
# 生成新的地图图像
plt.imshow(grid_map, cmap='binary', alpha=0.5, origin='lower') # 黑白网格
# 绘制坐标轴
plt.xlabel('y', loc='right')
plt.ylabel('x', loc='top')
# 显示网格线
plt.grid(color='black', linestyle='-', linewidth=0.5)
# 显示图像
plt.show()
#plt.pause(0.01)
def discretize_map(scene, scale_ratio):
X = int(950 / scale_ratio) # 采点数量
Y = int(1850 / scale_ratio)
map = np.zeros((X, Y))
for x in range(X):
for y in range(Y):
if not scene.reachable_check(x * scale_ratio - 350, y * scale_ratio - 400, Yaw=0):
map[x, y] = 1
print(x, y)
file_name = 'map_'+str(scale_ratio)+'.pkl'
if not os.path.exists(file_name):
open(file_name, 'w').close()
with open(file_name, 'wb') as file:
pickle.dump(map, file)
print('保存成功')
def expand_obstacles(scale_ratio, expand_range=1):
'''
障碍物边沿扩展
TODO: 扩展后的地图不可用
'''
file_name = 'map_'+str(scale_ratio)+'.pkl'
dilated_file_name = 'map_'+str(scale_ratio)+'_e'+str(expand_range)+'.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
dilated_map = binary_dilation(map, iterations=expand_range)
if not os.path.exists(dilated_file_name):
open(dilated_file_name, 'w').close()
with open(dilated_file_name, 'wb') as file:
pickle.dump(dilated_map, file)
print('保存成功')
def show_map(file_name):
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
draw_grid_map(map)
if __name__ == '__main__':
# scene.init_world(scene_num=1, mapID=11)
# scene = scene.Scene(sceneID=0)
#
# # 离散化地图
# discretize_map(scene, scale_ratio=4)
# # 扩张构型空间
# expand_obstacles(scale_ratio=3, expand_range=1)
# 展示离散化地图
file_name = 'costMap_4.pkl'
show_map(file_name)

View File

@ -5,8 +5,8 @@
''' '''
import math import math
import queue import os
from functools import partial import pickle
import numpy as np import numpy as np
import heapq import heapq
@ -115,13 +115,12 @@ class PriorityQueue:
class DStarLite: class DStarLite:
def __init__(self, def __init__(self,
map: np.array([int, int]), # [X, Y] map: np.array([int, int]),
area_range, # [x_min, x_max, y_min, y_max] 实际坐标范围 area_range, # [x_min, x_max, y_min, y_max] 实际坐标范围
scale_ratio=5, # 地图缩放率 scale_ratio=5, # 地图缩放率
dyna_obs_radius=30, # dyna_obs实际身位半径 dyna_obs_radius=36, # dyna_obs实际身位半径
): ):
# self.area_bounds = area
self.map = map self.map = map
self.background = map.copy() self.background = map.copy()
self.X = map.shape[0] self.X = map.shape[0]
@ -145,9 +144,13 @@ class DStarLite:
"obstacle": float('inf'), "obstacle": float('inf'),
"dynamic obstacle": 100 "dynamic obstacle": 100
} }
self.cost_map = np.zeros_like(self.map)
file_name = 'costMap_'+str(self.scale_ratio)+'.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
cost_map = pickle.load(file)
self.cost_map = cost_map
self.cost_background = self.cost_map.copy() self.cost_background = self.cost_map.copy()
self.compute_cost_map()
self.s_start = None # (int,int) 必须是元组(元组可以直接当作矩阵索引) self.s_start = None # (int,int) 必须是元组(元组可以直接当作矩阵索引)
self.s_goal = None # (int,int) self.s_goal = None # (int,int)
@ -163,9 +166,11 @@ class DStarLite:
# 设置map 和 cost_map # 设置map 和 cost_map
# ''' # '''
# self.map = map_ # self.map = map_
# self.background = map_.copy()
# self.X = map_.shape[0] # self.X = map_.shape[0]
# self.Y = map_.shape[1] # self.Y = map_.shape[1]
# self.compute_cost_map() # self.compute_cost_map()
# self.cost_background = self.cost_map.copy()
def reset(self): def reset(self):
''' '''
@ -299,11 +304,6 @@ class DStarLite:
self.compute_shortest_path() self.compute_shortest_path()
self.path = self.get_path() self.path = self.get_path()
return self.path return self.path
# TODO: 误差抖动使robot没有到达路径上的点导致新起点的rhs=∞可能导致get_path失败 ( 当前版本没有该问题 )
# assert (self.rhs[self.s_start] != float('inf')), "There is no known path!"
# # debug
# if debug:
# pass
def planning(self, s_start, s_goal, dyna_obs, debug=False): def planning(self, s_start, s_goal, dyna_obs, debug=False):
''' '''
@ -311,7 +311,10 @@ class DStarLite:
''' '''
# 实际坐标 -> 地图坐标 # 实际坐标 -> 地图坐标
s_start = self.real2map(s_start) s_start = self.real2map(s_start)
if self.s_goal is None:
s_goal = self.real2map(s_goal) s_goal = self.real2map(s_goal)
else:
s_goal = self.s_goal
dyna_obs = [self.real2map(obs, reachable_assurance=False) for obs in dyna_obs] dyna_obs = [self.real2map(obs, reachable_assurance=False) for obs in dyna_obs]
self._planning(s_start, s_goal, dyna_obs, debug) self._planning(s_start, s_goal, dyna_obs, debug)
@ -336,13 +339,6 @@ class DStarLite:
succ = [s_ for s_ in self.get_neighbors(cur) if s_ not in path] # 避免抖动 (不可走重复的点) succ = [s_ for s_ in self.get_neighbors(cur) if s_ not in path] # 避免抖动 (不可走重复的点)
cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])] cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])]
path.append(cur) path.append(cur)
# else:
# for i in range(step_num):
# if cur == self.s_goal:
# break
# succ = self.get_neighbors(cur)
# cur = succ[np.argmin([self.c(cur, s_) + self.g[s_] for s_ in succ])]
# path.append(cur)
return path return path
def in_bounds_without_obstacle(self, pos): def in_bounds_without_obstacle(self, pos):
@ -357,8 +353,6 @@ class DStarLite:
获取邻居节点, 地图范围内 获取邻居节点, 地图范围内
''' '''
(x_, y_) = pos (x_, y_) = pos
# results = [(x_+1,y_), (x_-1,y_), (x_, y_+1), (x_,y_-1)]
# if mode == 8:
neighbors = [(x_ + 1, y_), (x_ - 1, y_), (x_, y_ + 1), (x_, y_ - 1), (x_ + 1, y_ + 1), (x_ - 1, y_ + 1), neighbors = [(x_ + 1, y_), (x_ - 1, y_), (x_, y_ + 1), (x_, y_ - 1), (x_ + 1, y_ + 1), (x_ - 1, y_ + 1),
(x_ + 1, y_ - 1), (x_ - 1, y_ - 1)] (x_ + 1, y_ - 1), (x_ - 1, y_ - 1)]
neighbors = filter(self.in_bounds_without_obstacle, neighbors) # 确保位置在地图范围内 且 不是静态障碍物 neighbors = filter(self.in_bounds_without_obstacle, neighbors) # 确保位置在地图范围内 且 不是静态障碍物
@ -366,17 +360,26 @@ class DStarLite:
def compute_cost_map(self): def compute_cost_map(self):
# 计算当前地图的cost_map # 计算当前地图的cost_map
self.cost_map = np.zeros_like(self.map)
for idx, obj in self.idx_to_object.items(): for idx, obj in self.idx_to_object.items():
self.cost_map[self.map == idx] = self.object_to_cost[obj] self.cost_map[self.map == idx] = self.object_to_cost[obj]
# # TODO # 扩张静态障碍物影响范围
# for x in range(self.X): obs_pos = np.where(self.map == self.object_to_idx['obstacle']) # 静态障碍物位置列表
# for y in range(self.Y): for (x, y) in zip(obs_pos[0], obs_pos[1]):
# if self.cost_map[x, y] > 0: start_x, end_x = max(x - 1, 0), min(x + 1, self.X - 1)
# neighbors = self.get_neighbors((x, y)) start_y, end_y = max(y - 1, 0), min(y + 1, self.Y - 1)
# for (x_, y_) in neighbors: for cost in range(9, 0, -3):
# self.cost_map[x_, y_] = max(self.cost_map[x_, y_], self.cost_map[x, y] - 10) for x_ in range(start_x, end_x + 1):
self.cost_map[x_, start_y] = max(self.cost_map[x_, start_y], cost)
for y_ in range(start_y + 1, end_y + 1):
self.cost_map[end_x, y_] = max(self.cost_map[end_x, y_], cost)
for x_ in range(end_x - 1, start_x - 1, -1):
self.cost_map[x_, end_y] = max(self.cost_map[x_, end_y], cost)
for y_ in range(end_y - 1, start_y, -1):
self.cost_map[start_x, y_] = max(self.cost_map[start_x, y_], cost)
start_x, end_x = max(start_x - 1, 0), min(end_x + 1, self.X - 1)
start_y, end_y = max(start_y - 1, 0), min(end_y + 1, self.Y - 1)
self.cost_background = self.cost_map.copy() self.cost_background = self.cost_map.copy()
@ -388,8 +391,8 @@ class DStarLite:
return: return:
update_obj: 改变的位置列表 [(x, y, obj_idx, obj_idx_old), ...] update_obj: 改变的位置列表 [(x, y, obj_idx, obj_idx_old), ...]
''' '''
# dyna_obs没有变化 (集合set可以忽略元素在列表中的位置) # dyna_obs没有变化 (集合set可以忽略元素在列表中的位置) 且 robot未在dyna_obs占用位置中
if set(dyna_obs) == set(self.dyna_obs_list): if set(dyna_obs) == set(self.dyna_obs_list) and self.s_start not in self.dyna_obs_occupy:
return [] return []
# 当前dyna_obs占用位置列表 # 当前dyna_obs占用位置列表
@ -397,24 +400,10 @@ class DStarLite:
for pos in dyna_obs: for pos in dyna_obs:
dyna_obs_occupy.extend(self.get_occupy_pos(pos)) dyna_obs_occupy.extend(self.get_occupy_pos(pos))
dyna_obs_occupy = [pos for i, pos in enumerate(dyna_obs_occupy) if pos not in dyna_obs_occupy[:i]] # 去除重复位置 dyna_obs_occupy = [pos for i, pos in enumerate(dyna_obs_occupy) if pos not in dyna_obs_occupy[:i]] # 去除重复位置
# 转变为free 和 转变为obs的位置列表 # 转变为free 和 转变为dyna_obs的位置列表
changed_free = [pos for pos in self.dyna_obs_occupy if pos not in dyna_obs_occupy] changed_free = [pos for pos in self.dyna_obs_occupy if pos not in dyna_obs_occupy]
changed_obs = [pos for pos in dyna_obs_occupy if pos not in self.dyna_obs_occupy] changed_obs = [pos for pos in dyna_obs_occupy if pos not in self.dyna_obs_occupy]
# # 新旧dyna_obs占用位置列表
# old_obs_occupy = []
# new_obs_occupy = []
# for pos in self.dyna_obs_list:
# old_obs_occupy.extend(self.get_occupy_pos(pos))
# for pos in dyna_obs:
# new_obs_occupy.extend(self.get_occupy_pos(pos))
# old_obs_occupy = [pos for i, pos in enumerate(old_obs_occupy) if pos not in old_obs_occupy[:i]] # 去除重复位置
# new_obs_occupy = [pos for i, pos in enumerate(new_obs_occupy) if pos not in new_obs_occupy[:i]] # 去除重复位置
#
# # 转变为free 和 转变为obs的位置列表
# changed_free = [pos for pos in old_obs_occupy if pos not in new_obs_occupy]
# changed_obs = [pos for pos in new_obs_occupy if pos not in old_obs_occupy]
# 更新地图计算changed_pos # 更新地图计算changed_pos
changed_pos = [] changed_pos = []
for (x, y) in changed_free: for (x, y) in changed_free:
@ -430,21 +419,15 @@ class DStarLite:
return changed_pos return changed_pos
def get_occupy_pos(self, obs_pos): def get_occupy_pos(self, obs_pos):
''' '''
根据dyna_obs中心位置计算其占用的所有网格位置 根据dyna_obs中心位置计算其占用的所有网格位置
''' '''
(x, y) = obs_pos (x, y) = obs_pos
occupy_radius = min(self.dyna_obs_radius, int(euclidean_distance(obs_pos, self.s_start) - 1)) # 避免robot被dyna_obs的占用区域包裹住 occupy_radius = min(self.dyna_obs_radius, int(euclidean_distance(obs_pos, self.s_start) - 1)) # 避免robot被dyna_obs的占用区域包裹住
# for i in range(x - self.dyna_obs_radius, x + self.dyna_obs_radius + 1): # 方形区域
# for j in range(y - self.dyna_obs_radius, y + self.dyna_obs_radius + 1):
# occupy_pos.append((i, j))
occupy_pos = [(i, j) for i in range(x - occupy_radius, x + occupy_radius + 1) # 圆形区域 occupy_pos = [(i, j) for i in range(x - occupy_radius, x + occupy_radius + 1) # 圆形区域
for j in range(y - occupy_radius, y + occupy_radius + 1) for j in range(y - occupy_radius, y + occupy_radius + 1)
if euclidean_distance((i, j), obs_pos) < occupy_radius] if euclidean_distance((i, j), obs_pos) < occupy_radius]
occupy_pos = filter(self.in_bounds_without_obstacle, occupy_pos) # 确保位置在地图范围内 且 不是静态障碍物 occupy_pos = filter(self.in_bounds_without_obstacle, occupy_pos) # 确保位置在地图范围内 且 不是静态障碍物
return list(occupy_pos) return list(occupy_pos)
@ -497,21 +480,40 @@ class DStarLite:
''' '''
x = round((pos[0] - self.x_min) / self.scale_ratio) x = round((pos[0] - self.x_min) / self.scale_ratio)
y = round((pos[1] - self.y_min) / self.scale_ratio) y = round((pos[1] - self.y_min) / self.scale_ratio)
# 需要确保点可达 # 确保点不在静态障碍物上,否则就不断向外圈扩展直到找到非静态障碍物位置
if reachable_assurance and self.idx_to_object[self.map[x, y]] != 'free': if reachable_assurance:
print('1') return self.validate_pos((x, y))
x_ = math.floor((pos[0] - self.x_min) / self.scale_ratio)
y_ = math.floor((pos[1] - self.y_min) / self.scale_ratio)
candidates = [(x_, y_), (x_ + 1, y_), (x_, y_ + 1), (x_ + 1, y_ + 1)]
for (x, y) in candidates:
print(self.idx_to_object[self.map[x, y]])
if self.idx_to_object[self.map[x, y]] == 'free':
print((x,y))
return tuple((x, y))
raise Exception('error')
else: else:
return tuple((x, y)) return tuple((x, y))
def validate_pos(self, pos):
'''
对于不合法的pos找到周围距离最近的合法坐标
'''
(x, y) = pos
x = max(0, min(x, self.X - 1))
y = max(0, min(y, self.Y - 1))
if self.idx_to_object[self.map[x, y]] == 'obstacle':
start_x, end_x = max(x - 1, 0), min(x + 1, self.X - 1)
start_y, end_y = max(y - 1, 0), min(y + 1, self.Y - 1)
while True:
for x_ in range(start_x, end_x + 1):
if self.idx_to_object[self.map[x_, start_y]] != 'obstacle':
return tuple((x_, start_y))
for y_ in range(start_y + 1, end_y + 1):
if self.idx_to_object[self.map[end_x, y_]] != 'obstacle':
return tuple((end_x, y_))
for x_ in range(end_x - 1, start_x - 1, -1):
if self.idx_to_object[self.map[x_, end_y]] != 'obstacle':
return tuple((x_, end_y))
for y_ in range(end_y - 1, start_y, -1):
if self.idx_to_object[self.map[start_x, y_]] != 'obstacle':
return tuple((start_x, y_))
start_x, end_x = max(start_x - 1, 0), min(end_x + 1, self.X - 1)
start_y, end_y = max(start_y - 1, 0), min(end_y + 1, self.Y - 1)
# raise Exception('invalid pos!')
return tuple((x, y))
def draw_graph(self, step_num): def draw_graph(self, step_num):
# 清空当前figure内容保留figure对象 # 清空当前figure内容保留figure对象
plt.clf() plt.clf()

Binary file not shown.

After

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,115 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import math
import os
import pickle
import matplotlib.pyplot as plt
import numpy as np
from robowaiter.scene import scene
from robowaiter.algos.navigator.dstar_lite import DStarLite, euclidean_distance
class Navigator:
'''
导航类
'''
def __init__(self, scene, area_range, map, scale_ratio=5, step_length=150, velocity=150, react_radius=300):
self.scene = scene
self.area_range = area_range # 地图实际坐标范围 xmin, xmax, ymin, ymax
self.map = map # 缩放并离散化的地图 array(X,Y)
self.scale_ratio = scale_ratio # 地图缩放率
self.step_length = step_length # 步长(单次移动)
self.step_num = self.step_length // self.scale_ratio # 单次移动地图格数
self.v = velocity # 速度
self.react_radius = react_radius # robot反应半径
self.planner = DStarLite(area_range=area_range, map=map, scale_ratio=scale_ratio)
def validate_goal(self, goal):
'''
目标合法化
'''
return self.planner.map2real(self.planner.real2map(goal))
@staticmethod
def is_reached(pos: (float, float), goal: (float, float), dis_limit=50):
'''
判断是否到达目标
'''
dis = math.hypot(pos[0]-goal[0], pos[1]-goal[1])
# dis = np.linalg.norm(pos - goal)
return dis < dis_limit
@staticmethod
def get_yaw(pos: (float, float), goal: (float, float)):
'''
得到移动方向
'''
# return math.degrees(math.atan2(goal[0] - pos[0], -(goal[1] - pos[1])))
return math.degrees(math.atan2((goal[1] - pos[1]), (goal[0] - pos[0])))
def navigate(self, goal: (float, float), animation=True):
'''
单次导航直到到达目标
'''
goal = self.validate_goal(goal) # 目标合法化
pos = (self.scene.status.location.X, self.scene.status.location.Y) # 机器人当前: 位置 和 朝向
print('------------------navigation_start----------------------')
while not self.is_reached(pos, goal):
dyna_obs = [(walker.pose.X, walker.pose.Y) for walker in self.scene.status.walkers] # 动态障碍物(顾客)位置列表
dyna_obs = [obs for obs in dyna_obs if euclidean_distance(obs, pos) < self.react_radius] # 过滤观测范围外的dyna_obs
# 周围有dyna_obs则步长减半
if dyna_obs:
step_num = self.step_num // 2
else:
step_num = self.step_num
path = self.planner.planning(pos, goal, dyna_obs)
if path:
if animation:
self.planner.draw_graph(step_num) # 画出搜索路径
next_step = min(step_num, len(path))
next_pos = path[next_step - 1]
print('plan pos:', next_pos, end=' ')
yaw = self.get_yaw(pos, next_pos)
self.scene.walk_to(next_pos[0], next_pos[1], yaw, velocity=self.v, dis_limit=10)
self.planner.path = self.planner.path[next_step - 1:] # 去除已走过的路径
pos = (self.scene.status.location.X, self.scene.status.location.Y)
print('reach pos:', pos)
self.planner.reset() # 完成一轮导航,重置变量
if self.is_reached(pos, goal):
print('The robot has achieved goal !!')
if __name__ == '__main__':
# 根据map计算并保存cost_map
file_name = 'map_4.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
scene.init_world(1, 11)
scene = scene.Scene(sceneID=0)
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map, scale_ratio=4)
navigator.planner.compute_cost_map()
file_name = 'costMap_4.pkl'
if not os.path.exists(file_name):
open(file_name, 'w').close()
with open(file_name, 'wb') as file:
pickle.dump(navigator.planner.cost_map, file)
print('保存成功')

View File

@ -0,0 +1,128 @@
# D_star Lite 机器人任务规划
## 目录结构
### 坐标离散化
`discretize_map.py`
### 地图文件(选择缩放倍率)
`map_3.pkl`
`map_4.pkl`
`map_5.pkl`
### 导航类
`navigate.py`
### D_star Lite 算法实现
`dstar_lite.py`
### 测试文件
`test.py`
---
## 世界地图
### 实际坐标范围
`X: -350 ~ 600`
`Y: -400 ~ 1450`
### 5倍缩放后坐标范围
`X: -70 ~ 120`
`Y: -80 ~ 290`
### 网格地图
| Idx | Obj |
|-----|------------------|
| 0 | free |
| 1 | obstacle |
| 2 | dynamic obstacle |
### 代价地图
| Cost | Obj |
|---------|-----------------|
| 0 | free |
| 15-10-5 | obs周围3格 |
| inf | obstacle |
| 100 | dynamic obstacle |
---
## 参数
`机器人步长` 150
`机器人速度` 150
`机器人观测范围` 300
`行人半径` 36
`目标判达距离` 50
`机器人移动dis_limit` 10
---
## 使用方法
```python
# 选择缩放合适的地图3、4、5
file_name = 'map_4.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
# 初始化场景
scene.init_world(1, 11)
scene = scene.Scene(sceneID=0)
# 舒适化导航类
# (需要传入:场景、实际地图范围、离散化地图、缩放比例)
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map, scale_ratio=4)
# 设置目标
goal = (0, 0)
# 导航
# (animation: 选择是否画出导航过程)
navigator.navigate(goal, animation=False)
```
---
## 可靠性保证
`目标合法性保证`
- 目标在地图外:重置目标为最近的地图内位置
- 目标在静态障碍物:从当前位置不断向外圈扩展直到找到合法位置,重置目标为该位置
`规划sbgoal合法性保证`
- 规划subgoal被动态障碍物占据重新规划路径
`起点合法性保证`
- 起点在静态障碍物:从当前位置不断向外圈扩展直到找到合法位置,重置起点为该位置
- 起点在动态障碍物范围内:缩小动态障碍物半径,保证起点位置为空闲
`机器人朝向保证`
- 机器人始终朝向每一步的移动方向
`规划抖动解决`
- 规划路径不允许有重复点
`避免机器人沿障碍物行走`
- 障碍物扩张:在代价地图`cost_map`中静态障碍物周围的空位也会受到影响并产生cost

View File

@ -1,22 +1,19 @@
import os import os
import pickle import pickle
import time
import random
import math
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from robowaiter.scene import scene from robowaiter.scene import scene
# from navigate import Navigator from robowaiter.algos.navigator.navigate import Navigator
from robowaiter.algos.navigate.navigate import Navigator
if __name__ == '__main__': if __name__ == '__main__':
# def navigate_test(scene):
file_name = 'map_5.pkl' # 选择缩放合适的地图3、4、5
file_name = 'map_4.pkl'
if os.path.exists(file_name): if os.path.exists(file_name):
with open(file_name, 'rb') as file: with open(file_name, 'rb') as file:
map = pickle.load(file) map = pickle.load(file)
@ -24,21 +21,25 @@ if __name__ == '__main__':
scene.init_world(1, 11) scene.init_world(1, 11)
scene = scene.Scene(sceneID=0) scene = scene.Scene(sceneID=0)
navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map) navigator = Navigator(scene=scene, area_range=[-350, 600, -400, 1450], map=map, scale_ratio=4)
'''场景1: 无行人环境 robot到达指定目标''' '''场景1: 无行人环境 robot到达指定目标'''
# goal = np.array((-100, 700)) # goal = (-100, 700)
# goal = (590, 1370)
# goal = (290, -240)
# goal = (-200, -200)
# goal = (-200, -50)
'''场景2: 静止行人环境 robot到达指定目标''' '''场景2: 静止行人环境 robot到达指定目标'''
# scene.clean_walker() # scene.clean_walker()
# scene.add_walker(50, 300, 0) # scene.add_walker(50, 300, 0)
# scene.add_walker(-50, 500, 0) # scene.add_walker(-50, 500, 0)
# goal = np.array((-100, 700)) # goal = (-100, 700)
'''场景3: 移动行人环境 robot到达指定目标''' '''场景3: 移动行人环境 robot到达指定目标'''
scene.walk_to(100, 0, -90, dis_limit=10) scene.walk_to(100, 0, -90, dis_limit=5)
scene.clean_walker() scene.clean_walker()
scene.add_walkers([[50, 300],[-50, 500],[0, 700]]) scene.add_walkers([[50, 300], [-50, 500], [0, 700]])
# scene.add_walker(50, 300, 0) # scene.add_walker(50, 300, 0)
# scene.add_walker(-50, 500, 0) # scene.add_walker(-50, 500, 0)
# scene.add_walker(0, 700, 0) # scene.add_walker(0, 700, 0)
@ -47,12 +48,13 @@ if __name__ == '__main__':
scene.control_walker([scene.walker_control_generator(walkerID=2, autowalk=False, speed=50, X=0, Y=0, Yaw=0)]) scene.control_walker([scene.walker_control_generator(walkerID=2, autowalk=False, speed=50, X=0, Y=0, Yaw=0)])
# goal = (-100, 700) # goal = (-100, 700)
# goal = (-300) # goal = (-200, 700) # 目标在障碍物测试
# goal = (340.0, 900.0) # goal = (-400, 700) # 目标在地图外测试
goal = (10000, 10000) # 目标在地图外测试
# goal = (-220, 300)
# goal = (-280, 400)
# goal = (-230, 600)
# goal = (240.0, 1000.0)
# goal = (340.0, 900.0)
goal = (240.0, 1160.0)
'''场景4: 行人自由移动 robot到达指定目标''' '''场景4: 行人自由移动 robot到达指定目标'''
# # TODO: autowalk=True仿真器会闪退 ??? # # TODO: autowalk=True仿真器会闪退 ???
@ -62,7 +64,7 @@ if __name__ == '__main__':
# scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=True, speed=20, X=0, Y=0, Yaw=0)]) # scene.control_walker([scene.walker_control_generator(walkerID=0, autowalk=True, speed=20, X=0, Y=0, Yaw=0)])
# scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=True, speed=20, X=0, Y=0, Yaw=0)]) # scene.control_walker([scene.walker_control_generator(walkerID=1, autowalk=True, speed=20, X=0, Y=0, Yaw=0)])
# time.sleep(5) # time.sleep(5)
# goal = np.array((-100, 700)) # goal = (-100, 700)
navigator.navigate(goal, animation=True) navigator.navigate(goal, animation=True)

View File

@ -13,12 +13,12 @@ class Bahavior(ptree.behaviour.Behaviour):
''' '''
scene = None scene = None
print_name_prefix = "" print_name_prefix = ""
all_place = {'Bar', 'Bar2', 'WaterTable', 'CoffeeTable', 'Table1', 'Table2', 'Table3'} # all_place = {'Bar', 'Bar2', 'WaterTable', 'CoffeeTable', 'Table1', 'Table2', 'Table3'}
# all_object = {'Coffee', 'Water', 'Dessert', 'Softdrink', 'BottledDrink', 'Yogurt', 'ADMilk', 'MilkDrink', 'Milk', # all_object = {'Coffee', 'Water', 'Dessert', 'Softdrink', 'BottledDrink', 'Yogurt', 'ADMilk', 'MilkDrink', 'Milk',
# 'VacuumCup'} # 'VacuumCup'}
# all_place = {'Bar', 'WaterTable', 'CoffeeTable'} all_place = {'Bar', 'WaterTable', 'CoffeeTable'}
# all_object = {'Coffee', 'Water', 'Dessert', 'Softdrink', 'Yogurt'} # all_object = {'Coffee', 'Water', 'Dessert', 'Softdrink', 'Yogurt'}
all_object = {'Coffee'} all_object = {'Coffee', 'Water'}
place_xyz_dic={ place_xyz_dic={
'Bar': (247.0, 520.0, 100.0), 'Bar': (247.0, 520.0, 100.0),
'Bar2': (240.0, 40.0, 70.0), 'Bar2': (240.0, 40.0, 70.0),
@ -28,6 +28,11 @@ class Bahavior(ptree.behaviour.Behaviour):
'Table2': (-55.0, 0.0, 107), 'Table2': (-55.0, 0.0, 107),
'Table3':(-55.0, 150.0, 107) 'Table3':(-55.0, 150.0, 107)
} }
container_dic={
'Coffee':'CoffeeCup',
'Water': 'Glass',
'Dessert':'Plate'
}
@classmethod @classmethod
def get_ins_name(cls,*args): def get_ins_name(cls,*args):

View File

@ -14,11 +14,11 @@ class Make(Act):
super().__init__(*args) super().__init__(*args)
self.target_obj = self.args[0] self.target_obj = self.args[0]
self.op_type = 1 self.op_type = 1
if self.target_obj=="Coffee": if self.target_obj==self.valid_args[0]:
self.op_type = 1 self.op_type = 1
elif self.target_obj=="Water": elif self.target_obj==self.valid_args[1]:
self.op_type = 2 self.op_type = 2
elif self.target_obj=="Dessert": elif self.target_obj==self.valid_args[2]:
self.op_type = 3 self.op_type = 3
@ -28,12 +28,12 @@ class Make(Act):
info["pre"]= {f'Holding(Nothing)'} info["pre"]= {f'Holding(Nothing)'}
info['del_set'] = set() info['del_set'] = set()
info['add'] = {f'Exist({arg})'} info['add'] = {f'Exist({arg})'}
if arg == "Coffee": if arg == cls.valid_args[0]:
info["add"] |= {f'On(Coffee,CoffeeTable)'} info["add"] |= {f'On({arg},CoffeeTable)'}
elif arg == "Water": elif arg == cls.valid_args[1]:
info["add"] |= {f'On(Water,WaterTable)'} info["add"] |= {f'On({arg},WaterTable)'}
elif arg == "Dessert": elif arg == cls.valid_args[2]:
info["add"] |= {f'On(Dessert,Bar)'} info["add"] |= {f'On({arg},Bar)'}
return info return info
def _update(self) -> ptree.common.Status: def _update(self) -> ptree.common.Status:
@ -43,16 +43,20 @@ class Make(Act):
# self.scene.gen_obj(type=40) # self.scene.gen_obj(type=40)
obj_dict = self.scene.status.objects # obj_dict = self.scene.status.objects
if len(obj_dict) != 0: # if len(obj_dict) != 0:
# 获取obj_id # # 获取obj_id
for id, obj in enumerate(obj_dict): # for id, obj in enumerate(obj_dict):
if obj.name == "Coffee": # print("id:",id,"obj",obj.name)
obj_info = obj_dict[id]
obj_x, obj_y, obj_z = obj_info.location.X, obj_info.location.Y, obj_info.location.Z # if obj.name == "Coffee":
print(id,obj.name,obj_x,obj_y,obj_z) # obj_info = obj_dict[id]
# obj_x, obj_y, obj_z = obj_info.location.X, obj_info.location.Y, obj_info.location.Z
# print(id,obj.name,obj_x,obj_y,obj_z)
self.scene.state["condition_set"] |= (self.info["add"]) self.scene.state["condition_set"] |= (self.info["add"])
self.scene.state["condition_set"] -= self.info["del_set"] self.scene.state["condition_set"] -= self.info["del_set"]
print("condition_set:",self.scene.state["condition_set"])
return Status.RUNNING return Status.RUNNING

View File

@ -1,6 +1,6 @@
import py_trees as ptree import py_trees as ptree
from robowaiter.behavior_lib._base.Act import Act from robowaiter.behavior_lib._base.Act import Act
from robowaiter.algos.navigate.navigate import Navigator from robowaiter.algos.navigator.navigate import Navigator
class MoveTo(Act): class MoveTo(Act):
can_be_expanded = True can_be_expanded = True
@ -21,6 +21,7 @@ class MoveTo(Act):
info['pre'] |= {f'Exist({arg})'} info['pre'] |= {f'Exist({arg})'}
info["add"] = {f'At(Robot,{arg})'} info["add"] = {f'At(Robot,{arg})'}
info["del_set"] = {f'At(Robot,{place})' for place in cls.valid_args if place != arg} info["del_set"] = {f'At(Robot,{place})' for place in cls.valid_args if place != arg}
info['cost']=5
return info return info
@ -29,30 +30,41 @@ class MoveTo(Act):
# navigator = Navigator(scene=self.scene, area_range=[-350, 600, -400, 1450], map=self.scene.state["map"]["2d"]) # navigator = Navigator(scene=self.scene, area_range=[-350, 600, -400, 1450], map=self.scene.state["map"]["2d"])
# goal = self.scene.state['map']['obj_pos'][self.args[0]] # goal = self.scene.state['map']['obj_pos'][self.args[0]]
# navigator.navigate(goal, animation=False) # navigator.navigate_old(goal, animation=False)
# 走到固定的地点 # 走到固定的地点
if self.target_place in Act.place_xyz_dic: if self.target_place in Act.place_xyz_dic:
goal = Act.place_xyz_dic[self.target_place] goal = Act.place_xyz_dic[self.target_place]
self.scene.walk_to(goal[0],goal[1]) self.scene.walk_to(goal[0]+1,goal[1])
else: # 走到物品边上 # 走到物品边上
else:
# 是否用容器装好
if self.target_place in Act.container_dic:
target_name = Act.container_dic[self.target_place]
else:
target_name = self.target_place
# 根据物体名字找到最近的这类物体对应的位置
obj_id = -1 obj_id = -1
min_dis = float('inf') min_dis = float('inf')
obj_dict = self.scene.status.objects obj_dict = self.scene.status.objects
if len(obj_dict)!=0: if len(obj_dict)!=0:
# 获取obj_id # 获取obj_id
for id,obj in enumerate(obj_dict): for id,obj in enumerate(obj_dict):
if obj.name == self.target_place: if obj.name == target_name:
obj_info = obj_dict[id]
dis = self.scene.cal_distance_to_robot(obj_info.location.X, obj_info.location.Y, obj_info.location.Z)
if id==275:
print("275'dis:",dis)
if dis<min_dis:
min_dis = dis
obj_id = id obj_id = id
# obj_info = obj_dict[id] # if self.target_place == "CoffeeCup":
# obj_x, obj_y, obj_z = obj_info.location.X, obj_info.location.Y, obj_info.location.Z # # obj_id = 273
# ginger_x,ginger_y,ginger_z = [int(self.scene.location.X), int(self.scene.location.Y), int(self.scene.rotation.Yaw)] # obj_id = 275
break
if self.target_place == "CoffeeCup":
obj_id = 273
if obj_id == -1: if obj_id == -1:
return ptree.common.Status.FAILURE return ptree.common.Status.FAILURE
print("self.target_place:",self.target_place,"id:",obj_id,"dis:",min_dis)
self.scene.move_to_obj(obj_id=obj_id) self.scene.move_to_obj(obj_id=obj_id)
# 为了演示,写死咖啡位置 # 为了演示,写死咖啡位置

View File

@ -25,14 +25,36 @@ class PickUp(Act):
def _update(self) -> ptree.common.Status: def _update(self) -> ptree.common.Status:
# self.scene.test_move() # self.scene.test_move()
op_type=16 # op_type=16
obj_id = 0
# 遍历场景里的所有物品,根据名字匹配位置最近的 obj-id # 遍历场景里的所有物品,根据名字匹配位置最近的 obj-id
# 是否用容器装好
if self.target_obj in Act.container_dic:
target_name = Act.container_dic[self.target_obj]
else:
target_name = self.target_obj
# 根据物体名字找到最近的这类物体对应的位置
obj_id = -1
min_dis = float('inf')
obj_dict = self.scene.status.objects
if len(obj_dict) != 0:
# 获取obj_id
for id, obj in enumerate(obj_dict):
if obj.name == target_name:
obj_info = obj_dict[id]
dis = self.scene.cal_distance_to_robot(obj_info.location.X, obj_info.location.Y,
obj_info.location.Z)
if dis < min_dis:
min_dis = dis
obj_id = id
# if self.target_place == "CoffeeCup":
# # obj_id = 273
# obj_id = 275
if obj_id == -1:
return ptree.common.Status.FAILURE
if self.args=="Coffee": self.scene.move_task_area(op_type=16, obj_id=obj_id)
obj_id = 273 self.scene.op_task_execute(op_type=16, obj_id=obj_id)
self.scene.op_task_execute(op_type, obj_id=obj_id)
self.scene.state["condition_set"] |= (self.info["add"]) self.scene.state["condition_set"] |= (self.info["add"])
self.scene.state["condition_set"] -= self.info["del_set"] self.scene.state["condition_set"] -= self.info["del_set"]

View File

@ -31,6 +31,7 @@ class PutDown(Act):
release_pos = list(Act.place_xyz_dic[self.target_place]) release_pos = list(Act.place_xyz_dic[self.target_place])
# # 原始吧台处:[247.0, 520.0, 100.0], 空调开关旁吧台:[240.0, 40.0, 70.0], 水杯桌:[-70.0, 500.0, 107] # # 原始吧台处:[247.0, 520.0, 100.0], 空调开关旁吧台:[240.0, 40.0, 70.0], 水杯桌:[-70.0, 500.0, 107]
# # 桌子2:[-55.0, 0.0, 107],桌子3:[-55.0, 150.0, 107] # # 桌子2:[-55.0, 0.0, 107],桌子3:[-55.0, 150.0, 107]
self.scene.move_task_area(op_type, release_pos=release_pos)
self.scene.op_task_execute(op_type, release_pos=release_pos) self.scene.op_task_execute(op_type, release_pos=release_pos)
self.scene.state["condition_set"] |= (self.info["add"]) self.scene.state["condition_set"] |= (self.info["add"])

View File

@ -2,6 +2,8 @@ import copy
import random import random
from robowaiter.behavior_tree.obtea.BehaviorTree import Leaf,ControlBT from robowaiter.behavior_tree.obtea.BehaviorTree import Leaf,ControlBT
class CondActPair: class CondActPair:
def __init__(self, cond_leaf,act_leaf): def __init__(self, cond_leaf,act_leaf):
self.cond_leaf = cond_leaf self.cond_leaf = cond_leaf
@ -54,7 +56,10 @@ class OptBTExpAlgorithm:
self.conditions_index = [] self.conditions_index = []
#运行规划算法从初始状态、目标状态和可用行动计算行为树self.bt #运行规划算法从初始状态、目标状态和可用行动计算行为树self.bt
def run_algorithm(self,goal,actions): def run_algorithm(self,goal,actions,scene):
self.scene = scene
if self.verbose: if self.verbose:
print("\n算法开始!") print("\n算法开始!")
@ -99,8 +104,13 @@ class OptBTExpAlgorithm:
sequence_structure.add_child( sequence_structure.add_child(
[copy.deepcopy(pair_node.cond_leaf), copy.deepcopy(pair_node.act_leaf)]) [copy.deepcopy(pair_node.cond_leaf), copy.deepcopy(pair_node.act_leaf)])
subtree.add_child([copy.deepcopy(sequence_structure)]) # subtree 是回不断变化的它的父亲是self.bt subtree.add_child([copy.deepcopy(sequence_structure)]) # subtree 是回不断变化的它的父亲是self.bt
# 增加实时条件判断,满足条件就不再扩展
if c <= self.scene.state["condition_set"]:
return True
else: else:
subtree.add_child([copy.deepcopy(pair_node.act_leaf)]) subtree.add_child([copy.deepcopy(pair_node.act_leaf)])
if self.verbose: if self.verbose:
print("完成扩展 a_node= %s,对应的新条件 c_attr= %s,mincost=%d" \ print("完成扩展 a_node= %s,对应的新条件 c_attr= %s,mincost=%d" \
% (cond_anc_pair.act_leaf.content.name, cond_anc_pair.cond_leaf.content, % (cond_anc_pair.act_leaf.content.name, cond_anc_pair.cond_leaf.content,

View File

@ -6,7 +6,7 @@ from robowaiter.behavior_tree.obtea.examples import *
# 封装好的主接口 # 封装好的主接口
class BTOptExpInterface: class BTOptExpInterface:
def __init__(self, action_list): def __init__(self, action_list,scene):
""" """
Initialize the BTOptExpansion with a list of actions. Initialize the BTOptExpansion with a list of actions.
:param action_list: A list of actions to be used in the behavior tree. :param action_list: A list of actions to be used in the behavior tree.
@ -22,6 +22,8 @@ class BTOptExpInterface:
self.actions = action_list self.actions = action_list
self.has_processed = False self.has_processed = False
self.scene = scene
def process(self, goal): def process(self, goal):
""" """
Process the input sets and return a string result. Process the input sets and return a string result.
@ -29,9 +31,9 @@ class BTOptExpInterface:
:return: A PTML string representing the outcome of the behavior tree. :return: A PTML string representing the outcome of the behavior tree.
""" """
self.goal = goal self.goal = goal
self.algo = OptBTExpAlgorithm(verbose=False) self.algo = OptBTExpAlgorithm(verbose=True)
self.algo.clear() self.algo.clear()
self.algo.run_algorithm(self.goal, self.actions) # 调用算法得到行为树保存至 algo.bt self.algo.run_algorithm(self.goal, self.actions,self.scene) # 调用算法得到行为树保存至 algo.bt
self.ptml_string = self.algo.get_ptml() self.ptml_string = self.algo.get_ptml()
self.has_processed = True self.has_processed = True
# algo.print_solution() # print behavior tree # algo.print_solution() # print behavior tree

View File

@ -32,6 +32,7 @@ functions = get_tools()
def run_conversation(query: str, stream=False, max_retry=5): def run_conversation(query: str, stream=False, max_retry=5):
params = dict(model="chatglm3", messages=[{"role": "user", "content": query}], stream=stream) params = dict(model="chatglm3", messages=[{"role": "user", "content": query}], stream=stream)
params["functions"] = functions params["functions"] = functions
print(params)
response = get_response(**params) response = get_response(**params)
for _ in range(max_retry): for _ in range(max_retry):

View File

@ -0,0 +1,78 @@
import json
import openai
from colorama import init, Fore
from loguru import logger
import json
from robowaiter.llm_client.tool_register import get_tools, dispatch_tool
import requests
import json
import urllib3
init(autoreset=True)
########################################
# 该文件实现了与大模型的通信以及工具调用
########################################
# 忽略https的安全性警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
base_url = "https://45.125.46.134:25344" # 本地部署的地址,或者使用你访问模型的API地址
def get_response(**kwargs):
data = kwargs
response = requests.post(f"{base_url}/v1/chat/completions", json=data, stream=data["stream"], verify=False)
decoded_line = response.json()
return decoded_line
functions = get_tools()
if __name__ == "__main__":
question = input("\n顾客:")
data_memory = [{
"role": "system",
"content": "你是RoboWaiter,一个由HPCL团队开发的机器人服务员你在咖啡厅工作。接受顾客的指令并调用工具函数来完成各种服务任务。",
},]
n = 1
max_retry = 5
params = dict(model="RoboWaiter",messages=data_memory, stream=False)
params["functions"] = functions
while question != 'end':
user_dict = {"role": "user", "content": question}
params["messages"].append(user_dict)
# print(data_memory)
response = get_response(**params)
for _ in range(max_retry):
if response["choices"][0]["message"].get("function_call"):
function_call = response["choices"][0]["message"]["function_call"]
logger.info(f"Function Call Response: {function_call}")
function_args = json.loads(function_call["arguments"])
tool_response = dispatch_tool(function_call["name"], function_args)
logger.info(f"Tool Call Response: {tool_response}")
return_message = response["choices"][0]["message"]
params["messages"].append(return_message)
t = {
"role": "function",
"name": function_call["name"],
"content": tool_response, # 调用函数返回结果
}
params["messages"].append(t)
response = get_response(**params)
else:
return_message = response["choices"][0]["message"]
reply = return_message["content"]
params["messages"].append(return_message)
logger.info(f"Final Reply: \n{reply}")
break
question = input("\n顾客:")

View File

@ -0,0 +1,74 @@
import json
import openai
from colorama import init, Fore
from loguru import logger
import json
from robowaiter.llm_client.tool_register import get_tools, dispatch_tool
import requests
import json
import urllib3
init(autoreset=True)
########################################
# 该文件实现了与大模型的通信以及工具调用
########################################
# 忽略https的安全性警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
base_url = "https://45.125.46.134:25344" # 本地部署的地址,或者使用你访问模型的API地址
def get_response(**kwargs):
data = kwargs
response = requests.post(f"{base_url}/v1/chat/completions", json=data, stream=data["stream"], verify=False)
decoded_line = response.json()
return decoded_line
functions = get_tools()
def run_conversation(query: str, stream=False, max_retry=5):
params = dict(model="chatglm3", messages=[{"role": "user", "content": query}], stream=stream)
params["functions"] = functions
response = get_response(**params)
for _ in range(max_retry):
if response["choices"][0]["message"].get("function_call"):
function_call = response["choices"][0]["message"]["function_call"]
logger.info(f"Function Call Response: {function_call}")
if "sub_task" in function_call["name"]:
return {
"Answer": "好的",
"Goal": json.loads(function_call["arguments"])["goal"]
}
function_args = json.loads(function_call["arguments"])
tool_response = dispatch_tool(function_call["name"], function_args)
logger.info(f"Tool Call Response: {tool_response}")
params["messages"].append(response["choices"][0]["message"])
params["messages"].append(
{
"role": "function",
"name": function_call["name"],
"content": tool_response, # 调用函数返回结果
}
)
else:
reply = response["choices"][0]["message"]["content"]
return {
"Answer": reply,
"Goal": None
}
logger.info(f"Final Reply: \n{reply}")
return
response = get_response(**params)
if __name__ == "__main__":
query = "可以带我去吗"
print(run_conversation(query, stream=False))

View File

@ -132,8 +132,13 @@ def create_sub_task(
当需要完成具身任务如做咖啡拿放物体扫地前往某位置调用该函数根据用户的提示进行意图理解生成子任务的目标状态集合 `goal`以一阶逻辑的形式表示用户意图 当需要完成具身任务如做咖啡拿放物体扫地前往某位置调用该函数根据用户的提示进行意图理解生成子任务的目标状态集合 `goal`以一阶逻辑的形式表示用户意图
做一杯咖啡,则该函数的参数为 "On(Coffee,Bar)", 做一杯咖啡,则该函数的参数为 "On(Coffee,Bar)",
前往一号桌,则该函数的参数为 "At(Robot,Table1)", 前往一号桌,则该函数的参数为 "At(Robot,Table1)",
打开空调,则该函数的参数为 "Is(AC,On)", 前往二号桌,则该函数的参数为 "At(Robot,Table2)",
关空调,则该函数的参数为 "Is(AC,Off)", 打开空调,则该函数的参数为 "Is(AC,On)",
关空调,则该函数的参数为 "Is(AC,Off)",
打开窗帘,则该函数的参数为 "Is(Curtain,On)",
关闭窗帘,则该函数的参数为 "Is(Curtain,Off)",
拖地,则该函数的参数为 "Is(Floor,Clean)",
打开大厅灯,则该函数的参数为 "Is(HallLight,On)",
""" """
return goal return goal
@ -143,9 +148,9 @@ def get_object_info(
obj: Annotated[str, '需要获取信息的物体名称', True] obj: Annotated[str, '需要获取信息的物体名称', True]
) -> str: ) -> str:
""" """
获取场景中指定物体 `object` 在哪里 获取场景中指定物体 `object` 在哪里不涉及到具体的执行任务
如果`object` 是一个地点例如洗手间地方则输出 如果`object` 是一个地点例如洗手间则输出大门
如果`object`一个咖啡则输出 如果`object`咖啡则输出桌子咖啡在桌子上
如果`object` 是空桌子则输出一号桌 如果`object` 是空桌子则输出一号桌
""" """
near_object = None near_object = None

View File

@ -62,7 +62,7 @@ class Robot(object):
print("--------------------\n") print("--------------------\n")
algo = BTOptExpInterface(self.action_list) algo = BTOptExpInterface(self.action_list,self.scene)
ptml_string = algo.process(goal) ptml_string = algo.process(goal)

View File

@ -399,8 +399,12 @@ class Scene:
walk_v = [obj_x + 40, obj_y - 35, 130, 180, 0] walk_v = [obj_x + 40, obj_y - 35, 130, 180, 0]
obj_x += 3 obj_x += 3
obj_y += 2.5 obj_y += 2.5
walk_v[0]+=1
print("walk:",walk_v)
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v) action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = stub.Do(action) scene = stub.Do(action)
print("After Walk Position:", [scene.location.X, scene.location.Y, scene.rotation.Yaw])
# 移动到进行操作任务的指定地点 # 移动到进行操作任务的指定地点
def move_task_area(self,op_type,obj_id=0, release_pos=[247.0, 520.0, 100.0]): def move_task_area(self,op_type,obj_id=0, release_pos=[247.0, 520.0, 100.0]):
@ -430,7 +434,7 @@ class Scene:
walk_v = release_pos[:-1] + [180, 180, 0] walk_v = release_pos[:-1] + [180, 180, 0]
if release_pos == [340.0, 900.0, 99.0]: if release_pos == [340.0, 900.0, 99.0]:
walk_v[2] = 130 walk_v[2] = 130
print("walk_v:",walk_v)
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v) action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = stub.Do(action) scene = stub.Do(action)
print("After Walk Position:", [scene.location.X, scene.location.Y, scene.rotation.Yaw]) print("After Walk Position:", [scene.location.X, scene.location.Y, scene.rotation.Yaw])
@ -529,7 +533,7 @@ class Scene:
return True return True
# 执行过程:输出"开始(任务名)" -> 按步骤数执行任务 -> Robot输出成功或失败的对话 # 执行过程:输出"开始(任务名)" -> 按步骤数执行任务 -> Robot输出成功或失败的对话
def op_task_execute(self,op_type,obj_id=0,release_pos=[240,-140]): def op_task_execute(self,op_type,obj_id=0,release_pos=[247.0, 520.0, 100.0]):
self.control_robot_action(0, 1, "开始"+self.op_dialog[op_type]) # 开始制作咖啡 self.control_robot_action(0, 1, "开始"+self.op_dialog[op_type]) # 开始制作咖啡
if op_type<8: result = self.control_robot_action(op_type, 1) if op_type<8: result = self.control_robot_action(op_type, 1)
if op_type>=8 and op_type<=12: result = self.control_robot_action(self.op_typeToAct[op_type][0], self.op_typeToAct[op_type][1]) if op_type>=8 and op_type<=12: result = self.control_robot_action(self.op_typeToAct[op_type][0], self.op_typeToAct[op_type][1])
@ -667,3 +671,16 @@ class Scene:
def cal_distance_to_robot(self,objx,objy,objz):
scene = self.status
ginger_x, ginger_y, ginger_z = [int(scene.location.X), int(scene.location.Y),100]
return math.sqrt((ginger_x - objx) ** 2 + (ginger_y - objy) ** 2 + (ginger_z - objz) ** 2)
# def test_yaw(self):
# walk_v = [247.0, 480.0, 180.0, 180, 0]
# action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
# scene = stub.Do(action)
# time.sleep(4)
# walk_v = [247.0, 500.0, 0.0, 180, 0]
# action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
# scene = stub.Do(action)

View File

@ -16,8 +16,9 @@ class SceneAT(Scene):
super().__init__(robot) super().__init__(robot)
def _reset(self): def _reset(self):
self.add_walker(1085, 2630, 220) # self.add_walker(1085, 2630, 220)
self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)]) # self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)])
pass
def _run(self): def _run(self):

View File

@ -22,8 +22,8 @@ class SceneGQA(Scene):
def _reset(self): def _reset(self):
# self.clean_walker() # self.clean_walker()
self.add_walkers() # self.add_walkers([[50, 500,90]])
pass
# self.walker_bubble("洗手间在哪里") # self.walker_bubble("洗手间在哪里")
# self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)]) # self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)])

View File

@ -19,8 +19,7 @@ class SceneOT(Scene):
super().__init__(robot) super().__init__(robot)
# 在这里加入场景中发生的事件 # 在这里加入场景中发生的事件
self.event_list = [ self.event_list = [
# (5,self.create_chat_event("给我一杯咖啡")) # (事件发生的时间,事件函数) (5, self.create_chat_event("我有点热,能开个空调吗?")),
(5, self.create_chat_event("我有点热,能开个空调吗?")) # (事件发生的时间,事件函数)
] ]
def _reset(self): def _reset(self):
@ -30,7 +29,6 @@ class SceneOT(Scene):
print("scene.walkers:",scene.walkers) print("scene.walkers:",scene.walkers)
cont = scene.walkers[0].name+":我有点热,能开个空调吗?" cont = scene.walkers[0].name+":我有点热,能开个空调吗?"
self.control_robot_action(0,3,cont) self.control_robot_action(0,3,cont)
# self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)])
pass pass
def _run(self): def _run(self):

View File

@ -18,20 +18,11 @@ class SceneOT(Scene):
super().__init__(robot) super().__init__(robot)
# 在这里加入场景中发生的事件 # 在这里加入场景中发生的事件
self.event_list = [ self.event_list = [
# (5,self.create_chat_event("做一杯咖啡")), (5, self.create_chat_event("给我一杯咖啡"))
(5,self.create_chat_event("感觉有点冷,可以关一下空调吗")),
] ]
def _reset(self): def _reset(self):
scene = self.add_walkers([[50, 300, 0]]) pass
# time.sleep(2.0)
# print("我有点热,能开个空调吗?")
print("scene.walkers:",scene.walkers)
cont = scene.walkers[0].name+":我有点热,能开个空调吗?"
self.control_robot_action(0,3,cont)
# self.add_walker(1085, 2630, 220)
# self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)])
def _run(self): def _run(self):

View File

@ -35,6 +35,7 @@ class SceneVLM(Scene):
def _run(self, op_type=10): def _run(self, op_type=10):
# 共17个操作 # 共17个操作
# "制作咖啡","倒水","夹点心","拖地","擦桌子","开筒灯","搬椅子", # 1-7 # "制作咖啡","倒水","夹点心","拖地","擦桌子","开筒灯","搬椅子", # 1-7
# "关筒灯","开大厅灯","关大厅灯","关闭窗帘","打开窗帘", # 8-12 # "关筒灯","开大厅灯","关大厅灯","关闭窗帘","打开窗帘", # 8-12
@ -58,27 +59,28 @@ class SceneVLM(Scene):
# 流程测试 # 流程测试
# 抓握放置:抓吧台前生成的酸奶,放到抹布桌上 # 抓握放置:抓吧台前生成的酸奶,放到抹布桌上
self.gen_obj() self.gen_obj()
self.move_task_area(16, obj_id=0) # self.move_task_area(16, obj_id=0)
self.op_task_execute(16, obj_id=0) # self.op_task_execute(16, obj_id=0)
pos = [340.0, 900.0, 99.0] # pos = [340.0, 900.0, 99.0]
self.move_task_area(17, release_pos=pos) # self.move_task_area(17, release_pos=pos)
self.op_task_execute(17, release_pos=pos) # self.op_task_execute(17, release_pos=pos)
#
# # 做咖啡:做完的咖啡放到水杯桌上
# self.move_task_area(1)
# self.op_task_execute(1)
#
# self.find_obj("CoffeeCup")
#
# self.move_task_area(16, obj_id=275)
# self.op_task_execute(16, obj_id=275)
# pos = [-70.0, 500.0, 107]
# self.move_task_area(17, release_pos=pos)
# self.op_task_execute(17, release_pos=pos)
#
# # 倒水:倒完的水放到旁边桌子上
# self.move_task_area(2)
# self.op_task_execute(2)
# 做咖啡:做完的咖啡放到水杯桌上
self.move_task_area(1)
self.op_task_execute(1)
self.find_obj("CoffeeCup")
self.move_task_area(16, obj_id=275)
self.op_task_execute(16, obj_id=275)
pos = [-70.0, 500.0, 107]
self.move_task_area(17, release_pos=pos)
self.op_task_execute(17, release_pos=pos)
# 倒水:倒完的水放到旁边桌子上
self.move_task_area(2)
self.op_task_execute(2)
# #
# self.move_task_area(16, obj_id=190) # self.move_task_area(16, obj_id=190)
# self.op_task_execute(16, obj_id=190) # self.op_task_execute(16, obj_id=190)
@ -86,6 +88,8 @@ class SceneVLM(Scene):
# self.move_task_area(17, release_pos=pos) # self.move_task_area(17, release_pos=pos)
# self.op_task_execute(17, release_pos=pos) # self.op_task_execute(17, release_pos=pos)
# self.test_yaw()
pass pass
def _step(self): def _step(self):

View File

@ -16,8 +16,8 @@ from robowaiter.scene.scene import Scene,init_world # TODO: 文件名改成Scen
from robowaiter.scene.scene import Scene from robowaiter.scene.scene import Scene
from robowaiter.utils import get_root_path from robowaiter.utils import get_root_path
from robowaiter.algos.navigate.navigate import Navigator from robowaiter.algos.navigator.navigate import Navigator
from robowaiter.algos.navigate import test from robowaiter.algos.navigator import test
class SceneVLN(Scene): class SceneVLN(Scene):
def __init__(self, robot): def __init__(self, robot):
@ -29,7 +29,7 @@ class SceneVLN(Scene):
def _reset(self): def _reset(self):
root_path = get_root_path() root_path = get_root_path()
file_name = os.path.join(root_path,'robowaiter/algos/navigate/map_5.pkl') file_name = os.path.join(root_path,'robowaiter/algos/navigator/map_5.pkl')
with open(file_name, 'rb') as file: with open(file_name, 'rb') as file:
map = pickle.load(file) map = pickle.load(file)
@ -37,7 +37,7 @@ class SceneVLN(Scene):
self.state['map']['obj_pos']['Table'] = np.array((-100, 700)) self.state['map']['obj_pos']['Table'] = np.array((-100, 700))
def _run(self): def _run(self):
file_name = '../../algos/navigate/map_5.pkl' file_name = '../../algos/navigator/map_5.pkl'
if os.path.exists(file_name): if os.path.exists(file_name):
with open(file_name, 'rb') as file: with open(file_name, 'rb') as file:
map = pickle.load(file) map = pickle.load(file)