This commit is contained in:
Caiyishuai 2023-11-15 14:56:17 +08:00
commit 55d36162ed
14 changed files with 687 additions and 141 deletions

View File

@ -1,11 +1,15 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# enconding = utf8
import math
import os
import pickle
import sys
import time
import grpc
from explore import Explore
import camera
import semantic_map
sys.path.append('./')
sys.path.append('../')
@ -14,8 +18,8 @@ import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
from robowaiter.proto import GrabSim_pb2
from robowaiter.proto import GrabSim_pb2_grpc
import GrabSim_pb2_grpc
import GrabSim_pb2
channel = grpc.insecure_channel('localhost:30001', options=[
('grpc.max_send_message_length', 1024 * 1024 * 1024),
@ -24,6 +28,10 @@ channel = grpc.insecure_channel('localhost:30001', options=[
sim_client = GrabSim_pb2_grpc.GrabSimStub(channel)
visited = set()
all_frontier_list = set()
'''
初始化卸载已经加载的关卡清除所有机器人
'''
@ -68,12 +76,13 @@ def SetWorld(map_id=0, scene_num=1):
def Observe(scene_id=0):
print('------------------show_env_info----------------------')
scene = sim_client.Observe(GrabSim_pb2.SceneID(value=scene_id))
print(
f"location:{[scene.location]}, rotation:{scene.rotation}\n",
f"joints number:{len(scene.joints)}, fingers number:{len(scene.fingers)}\n",
f"objects number: {len(scene.objects)}, walkers number: {len(scene.walkers)}\n"
f"timestep:{scene.timestep}, timestamp:{scene.timestamp}\n"
f"collision:{scene.collision}, info:{scene.info}")
# print(
# f"location:{[scene.location]}, rotation:{scene.rotation}\n",
# f"joints number:{len(scene.joints)}, fingers number:{len(scene.fingers)}\n",
# f"objects number: {len(scene.objects)}, walkers number: {len(scene.walkers)}\n"
# f"timestep:{scene.timestep}, timestamp:{scene.timestamp}\n"
# f"collision:{scene.collision}, info:{scene.info}")
return scene
'''
@ -100,33 +109,152 @@ def Reset(scene_id=0):
yaw:机器人朝向;
velocity:速度,>0代表移动,<0代表瞬移,=0代表只查询;
dis:最终达到的位置距离目标点最远距离,如果超过此距离则目标位置不可达
"""
def navigation_move(v_list, scene_id=0, map_id=11):
# def navigation_test(scene_id=0, map_id=11):
# print('------------------navigation_move----------------------')
# scene = sim_client.Observe(GrabSim_pb2.SceneID(value=scene_id))
# walk_value = [scene.location.X, scene.location.Y, scene.rotation.Yaw]
# print("position:", walk_value)
#
# if map_id == 11: # coffee
# v_list = [[237, 495], [232, 495], [227, 490]]
# else:
# v_list = [[0.0, 0.0]]
#
# for walk_v in v_list:
# walk_v = walk_v + [scene.rotation.Yaw - 90, 60, 0]
# print("walk_v", walk_v)
# action = GrabSim_pb2.Action(scene=scene_id, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
# scene = sim_client.Do(action)
# print(scene.info)
def navigation_move(cur_objs, objs_name_set, v_list, scene_id=0, map_id=11):
print('------------------navigation_move----------------------')
scene = sim_client.Observe(GrabSim_pb2.SceneID(value=scene_id))
walk_value = [scene.location.X, scene.location.Y, scene.rotation.Yaw]
print("position:", walk_value)
# if map_id == 11: # coffee
# v_list = [[0, 880], [250, 1200], [-55, 750], [70, -200]]
# else:
# v_list = [[0.0, 0.0]]
for walk_v in v_list:
walk_v = walk_v + [scene.rotation.Yaw - 90, 60, 0]
walk_v = walk_v + [scene.rotation.Yaw - 90, 250, 60]
print("walk_v", walk_v)
action = GrabSim_pb2.Action(scene=scene_id, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = sim_client.Do(action)
cur_objs, objs_name_set = camera.get_semantic_map(GrabSim_pb2.CameraName.Head_Segment, cur_objs, objs_name_set)
# if scene.info == "Unreachable":
print(scene.info)
return cur_objs, objs_name_set
def isOutMap(pos, min_x=-350, max_x=600, min_y=-400, max_y=1450):
if pos[0] <= min_x or pos[0] >= max_x or pos[1] <= min_y or pos[1] >= \
max_y:
return True
return False
def real2map(x, y):
'''
实际坐标->地图坐标 (向下取整)
'''
# x = round((x - self.min_x) / self.scale_ratio)
# y = round((y - self.min_y) / self.scale_ratio)
x = math.floor((x + 350) / 5)
y = math.floor((y + 400) / 5)
return x, y
def explore(map, cur_pos, explore_range):
for i in range(cur_pos[0] - explore_range, cur_pos[0] + explore_range + 1):
for j in range(cur_pos[1] - explore_range, cur_pos[1] + explore_range + 1):
if isOutMap((i, j)):
continue
x, y = real2map(i, j)
if map[x, y] == 0:
visited.add((i, j))
for i in range(cur_pos[0] - explore_range, cur_pos[0] + explore_range + 1):
for j in range(cur_pos[1] - explore_range, cur_pos[1] + explore_range + 1):
if isOutMap((i, j)):
continue
x, y = real2map(i, j)
if map[x, y] == 0:
if isNewFrontier((i, j), map):
all_frontier_list.add((i, j))
if len(all_frontier_list) <= 400:
free_list = list(visited)
free_array = np.array(free_list)
print(f"探索完成!以下是场景中可以到达的点:{free_array};其余点均是障碍物不可达")
return None
return getNearestFrontier(cur_pos, all_frontier_list)
def isNewFrontier(pos, map):
around_nodes = [(pos[0], pos[1] + 1), (pos[0], pos[1] - 1), (pos[0] - 1, pos[1]), (pos[0] + 1, pos[1])]
for node in around_nodes:
x, y = real2map(node[0], node[1])
if node not in visited and map[x, y] == 0:
return True
if (pos[0], pos[1]) in all_frontier_list:
all_frontier_list.remove((pos[0], pos[1]))
return False
# def reachable(pos, map):
# # scene = sim_client.Observe(GrabSim_pb2.SceneID(value=0))
# # # x, y = real2map(self.area_range[0], self.area_range[2], self.scale_ratio, pos[0], pos[1])
# # # if self.map[x, y] == 1:
# # # return True
# # # else:
# # # return False
# # loc = pos + [0, 0, 5]
# # action = GrabSim_pb2.Action(scene=0, action=GrabSim_pb2.Action.ActionType.WalkTo, values=loc)
# # scene = sim_client.Do(action)
# # # print(scene.info)
# # if (str(scene.info).find('Unreachable') > -1):
# # return False
# # else:
# # return True
# if map[pos[0], pos[1]] == 0:
# return True
# else:
# return False
def getDistance(pos1, pos2):
return math.sqrt((pos1[0] - pos2[0]) ** 2 + (pos1[1] - pos2[1]) ** 2)
def getNearestFrontier(cur_pos, frontiers):
dis_min = sys.maxsize
frontier_best = None
for frontier in frontiers:
dis = getDistance(frontier, cur_pos)
if dis <= dis_min:
dis_min = dis
frontier_best = frontier
return frontier_best
if __name__ == '__main__':
map_id = 11 # 地图编号
scene_num = 1 # 场景数量
node_list = [] # 探索点
explorer = Explore(_explore_range=100)
explore_range = 240 # 机器人感知范围以机器人为中心边长为2 * _explore_range的正方形
cur_objs = []
objs_name_set = set()
file_name = 'map_5.pkl'
if os.path.exists(file_name):
with open(file_name, 'rb') as file:
map = pickle.load(file)
print('------------ 初始化加载场景 ------------')
Init()
@ -136,20 +264,28 @@ if __name__ == '__main__':
for i in range(scene_num):
print('------------ 场景操作 ------------')
Observe(i)
scene = Observe(i)
Reset(i)
print('------------ 自主探索 ------------')
while True:
cur_objs = semantic_map.navigation_move(cur_objs,i, map_id)
print("物品列表如下:")
print(cur_objs)
# scene = sim_client.Observe(GrabSim_pb2.SceneID(value=0))
# cur_pos =[int(scene.location.X), int(scene.location.Y), int(scene.rotation.Yaw)]
goal = explorer.explore(cur_pos) # cur_pos 指的是当前机器人的位置,场景中应该也有接口可以获取
if goal is None:
break
node_list.append(goal)
cur_pos = goal
navigation_move(i, map_id, node_list)
# cur_pos = [int(scene.location.X), int(scene.location.Y)]
# # print(reachable([237,490]))
# # navigation_move([[237,490]], i, map_id)
# # navigation_test(i,map_id)
# while True:
# goal = explore(map, cur_pos, explore_range) # cur_pos 指的是当前机器人的位置,场景中应该也有接口可以获取
# if goal is None:
# break
# node_list.append(goal)
# # print(goal)
# cur_objs, objs_name_set= navigation_move(cur_objs, objs_name_set, [[goal[0], goal[1]]], i, map_id)
# print(cur_objs)
# cur_pos = goal
# TODOnode_list就是机器人拍照的点目前没有设置拍照角度需要机器人到达一个拍照点后前后左右各拍一张照片然后获取语义信息

View File

@ -0,0 +1,281 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# enconding = utf8
import json
import string
import sys
import time
import grpc
sys.path.append('./')
sys.path.append('../')
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
import GrabSim_pb2_grpc
import GrabSim_pb2
channel = grpc.insecure_channel('localhost:30001', options=[
('grpc.max_send_message_length', 1024 * 1024 * 1024),
('grpc.max_receive_message_length', 1024 * 1024 * 1024)
])
sim_client = GrabSim_pb2_grpc.GrabSimStub(channel)
objects_dic = {}
'''
初始化卸载已经加载的关卡清除所有机器人
'''
def Init():
sim_client.Init(GrabSim_pb2.NUL())
'''
获取当前可加载的地图信息(地图名字地图尺寸)
'''
def AcquireAvailableMaps():
AvailableMaps = sim_client.AcquireAvailableMaps(GrabSim_pb2.NUL())
print(AvailableMaps)
'''
1根据mapID加载指定地图
2如果scene_num>1,则根据地图尺寸偏移后加载多个相同地图
3这样就可以在一个关卡中训练多个地图
'''
def SetWorld(map_id=0, scene_num=1):
print('------------------SetWorld----------------------')
world = sim_client.SetWorld(GrabSim_pb2.BatchMap(count=scene_num, mapID=map_id))
'''
返回场景的状态信息
1返回机器人的位置和旋转
2返回各个关节的名字和旋转
3返回场景中标记的物品信息(名字类型位置旋转)
4返回场景中行人的信息(名字位置旋转速度)
5返回机器人手指和双臂的碰撞信息
'''
def Observe(scene_id=0):
print('------------------show_env_info----------------------')
scene = sim_client.Observe(GrabSim_pb2.SceneID(value=scene_id))
# print(
# f"location:{[scene.location]}, rotation:{scene.rotation}\n",
# f"joints number:{len(scene.joints)}, fingers number:{len(scene.fingers)}\n",
# f"objects number: {len(scene.objects)}, walkers number: {len(scene.walkers)}\n"
# f"timestep:{scene.timestep}, timestamp:{scene.timestamp}\n"
# f"collision:{scene.collision}, info:{scene.info}")
return scene
'''
重置场景
1重置桌子的宽度和高度
2清除生成的行人和物品
3重置关节角度位置旋转
4清除碰撞信息
5重置场景中标记的物品
'''
def Reset(scene_id=0):
print('------------------Reset----------------------')
scene = sim_client.Reset(GrabSim_pb2.ResetParams(scene=scene_id))
print(scene)
# 如果场景支持调整桌子
# sim_client.Reset(GrabSim_pb2.ResetParams(scene = scene_id, adjust = True, height = 100.0, width = 100.0))"
'''
根据传入的部位名字获取相机数据
'''
def get_camera(part, scene_id=0):
print('------------------get_camera----------------------')
action = GrabSim_pb2.CameraList(cameras=part, scene=scene_id)
return sim_client.Capture(action)
'''
显示相机画面
'''
def show_image(img_data, scene):
print('------------------show_image----------------------')
im = img_data.images[0]
# 相机内参矩阵
in_matrix = np.array(
[[im.parameters.fx, 0, im.parameters.cx], [0, im.parameters.fy, im.parameters.cy], [0, 0, 1]])
# 相机外参矩阵
out_matrix = np.array(im.parameters.matrix).reshape((4, 4))
# # 旋转矩阵
# rotation_matrix = out_matrix[0:3, 0:3]
#
# # 平移矩阵
# translation_matrix = out_matrix[0:3, -1].reshape(3, 1)
# 像素坐标
# pixel_point = np.array([403, 212, 1]).reshape(3, 1)
pixel_x = 404
pixel_y = 212
depth = 369
# 将像素坐标转换为归一化设备坐标
normalized_x = (pixel_x - im.parameters.cx) / im.parameters.fx
normalized_y = (pixel_y - im.parameters.cy) / im.parameters.fy
# 将归一化设备坐标和深度值转换为相机坐标
camera_x = normalized_x * depth
camera_y = normalized_y * depth
camera_z = depth
# 构建相机坐标向量
camera_coordinates = np.array([camera_x, camera_y, camera_z, 1])
# print("物体相对相机坐标的齐次坐标: ", camera_coordinates)
# 将相机坐标转换为机器人底盘坐标
robot_coordinates = np.dot(out_matrix, camera_coordinates)[:3]
# print("物体的相对底盘坐标为:", robot_coordinates)
# 将物体相对机器人底盘坐标转为齐次坐标
robot_homogeneous_coordinates = np.array([robot_coordinates[0], -robot_coordinates[1], robot_coordinates[2], 1])
# print("物体的相对底盘的齐次坐标为:", robot_homogeneous_coordinates)
# 机器人坐标
X = scene.location.X
Y = scene.location.Y
Z = 0.0
# 机器人旋转信息
Roll = 0.0
Pitch = 0.0
Yaw = scene.rotation.Yaw
# 构建平移矩阵
T = np.array([[1, 0, 0, X],
[0, 1, 0, Y],
[0, 0, 1, Z],
[0, 0, 0, 1]])
# 构建旋转矩阵
Rx = np.array([[1, 0, 0, 0],
[0, np.cos(Roll), -np.sin(Roll), 0],
[0, np.sin(Roll), np.cos(Roll), 0],
[0, 0, 0, 1]])
Ry = np.array([[np.cos(Pitch), 0, np.sin(Pitch), 0],
[0, 1, 0, 0],
[-np.sin(Pitch), 0, np.cos(Pitch), 0],
[0, 0, 0, 1]])
Rz = np.array([[np.cos(np.radians(Yaw)), -np.sin(np.radians(Yaw)), 0, 0],
[np.sin(np.radians(Yaw)), np.cos(np.radians(Yaw)), 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]])
R = np.dot(Rz, np.dot(Ry, Rx))
# 构建机器人的变换矩阵
T_robot = np.dot(T, R)
# print(T_robot)
# 将物体的坐标从机器人底盘坐标系转换到世界坐标系
world_coordinates = np.dot(T_robot, robot_homogeneous_coordinates)[:3]
# print("物体的世界坐标:", world_coordinates)
# 世界偏移后的坐标
world_offest_coordinates = [world_coordinates[0] + 700, world_coordinates[1] + 1400, world_coordinates[2]]
# print("物体世界偏移的坐标: ", world_offest_coordinates)
# world_point = world_coordinates + np.array([])
# print("物体的世界坐标为:", )
# # 相对机器人的世界坐标
# world_point = rotation_matrix.T @ (in_matrix.T * 369 @ pixel_point - translation_matrix)
# print(world_point)
# print(in_matrix @ out_matrix @ obj_world)
#
d = np.frombuffer(im.data, dtype=im.dtype).reshape((im.height, im.width, im.channels))
plt.imshow(d, cmap="gray" if "depth" in im.name.lower() else None)
plt.show()
def save_obj_info(img_data, objs_name):
items = img_data.info.split(";")
dictionary = {}
for item in items:
key, value = item.split(":")
dictionary[int(key)] = value
im = img_data.images[0]
d = np.frombuffer(im.data, dtype=im.dtype).reshape((im.height, im.width, im.channels))
arr_flat = d.ravel()
for id in arr_flat:
if id not in dictionary:
print(id)
else:
objs_name.add(dictionary[id])
return objs_name
def get_semantic_map(camera, cur_objs, objs_name):
scene = Observe(0)
objs = scene.objects._values
img_data = get_camera([camera])
show_image(img_data, scene)
objs_name = save_obj_info(img_data, objs_name)
for obj_name in list(objs_name):
for obj in objs:
if obj.name == obj_name and obj not in cur_objs:
cur_objs.append(obj)
break
return cur_objs, objs_name
if __name__ == '__main__':
map_id = 11 # 地图编号
scene_num = 1 # 场景数量
cur_objs = []
print('------------ 初始化加载场景 ------------')
Init()
AcquireAvailableMaps()
SetWorld(map_id, scene_num)
time.sleep(5.0)
for i in range(scene_num):
print('------------ 场景操作 ------------')
scene = Observe(i)
Reset(i)
print('------------ 相机捕获 ------------')
Reset(i)
time.sleep(1.0)
# print(get_semantic_map(GrabSim_pb2.CameraName.Head_Segment,cur_objs))
for camera_name in [GrabSim_pb2.CameraName.Head_Depth]:
img_data = get_camera([camera_name], i)
show_image(img_data, scene)
# for camera_name in [GrabSim_pb2.CameraName.Waist_Color, GrabSim_pb2.CameraName.Waist_Depth]:
# img_data = get_camera([camera_name], i)
# show_image(img_data, 2)

View File

@ -1,91 +0,0 @@
import math
import sys
import numpy as np
# from rrt import RRT
# from rrt_star import RRTStar
# def real2map(min_x, min_y, scale_ratio, x, y):
# '''
# 实际坐标->地图坐标 (向下取整)
# '''
# # x = round((x - self.min_x) / self.scale_ratio)
# # y = round((y - self.min_y) / self.scale_ratio)
# x = math.floor((x - min_x) / scale_ratio)
# y = math.floor((y - min_y) / scale_ratio)
# return x, y
def getDistance(pos1, pos2):
return math.sqrt((pos1[0] - pos2[0]) ** 2 + (pos1[1] - pos2[1]) ** 2)
def getNearestFrontier(cur_pos, frontiers):
dis_min = sys.maxsize
frontier_best = None
for frontier in frontiers:
dis = getDistance(frontier, cur_pos)
if dis <= dis_min:
dis_min = dis
frontier_best = frontier
return frontier_best
class Explore:
def __init__(self, _explore_range):
self.explore_range = _explore_range # 机器人感知范围以机器人为中心边长为2 * _explore_range的正方形
self.visited = set()
self.all_frontier_list = set()
def explore(self, cur_pos):
for i in range(cur_pos[0] - self.explore_range, cur_pos[0] + self.explore_range + 1):
for j in range(cur_pos[1] - self.explore_range, cur_pos[1] + self.explore_range + 1):
# if self.isOutMap((i, j)):
# continue
# x, y = real2map(self.area_range[0], self.area_range[2], self.scale_ratio, i, j)
# if self.map[x, y] == 0:
if self.reachable((i, j)):
self.visited.add((i, j))
for i in range(cur_pos[0] - self.explore_range, cur_pos[0] + self.explore_range + 1):
for j in range(cur_pos[1] - self.explore_range, cur_pos[1] + self.explore_range + 1):
# if self.isOutMap((i, j)):
# continue
# x, y = real2map(self.area_range[0], self.area_range[2], self.scale_ratio, i, j)
if self.reachable((i, j)):
if self.isNewFrontier((i, j)):
self.all_frontier_list.add((i, j))
if len(self.all_frontier_list) == 0:
free_list = list(self.visited)
free_array = np.array(free_list)
print(f"探索完成!以下是场景中可以到达的点:{free_array};其余点均是障碍物不可达")
return None
return getNearestFrontier(cur_pos, self.all_frontier_list)
def isNewFrontier(self, pos):
around_nodes = [(pos[0], pos[1] + 1), (pos[0], pos[1] - 1), (pos[0] - 1, pos[1]), (pos[0] + 1, pos[1])]
for node in around_nodes:
if node not in self.visited and self.reachable(node):
return True
if (pos[0], pos[1]) in self.all_frontier_list:
self.all_frontier_list.remove((pos[0], pos[1]))
return False
def reachable(self, pos):
# x, y = real2map(self.area_range[0], self.area_range[2], self.scale_ratio, pos[0], pos[1])
# if self.map[x, y] == 1:
# return True
# else:
# return False
# TODO: 调用reachable_check函数
pass
# def isOutMap(self, pos):
# if pos[0] <= self.area_range[0] or pos[0] >= self.area_range[1] or pos[1] <= self.area_range[2] or pos[1] >= \
# self.area_range[3]:
# return True
# return False

Binary file not shown.

View File

@ -0,0 +1,146 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# enconding = utf8
import sys
import time
import grpc
import camera
sys.path.append('./')
sys.path.append('../')
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
import GrabSim_pb2_grpc
import GrabSim_pb2
channel = grpc.insecure_channel('localhost:30001', options=[
('grpc.max_send_message_length', 1024 * 1024 * 1024),
('grpc.max_receive_message_length', 1024 * 1024 * 1024)
])
sim_client = GrabSim_pb2_grpc.GrabSimStub(channel)
'''
初始化卸载已经加载的关卡清除所有机器人
'''
def Init():
sim_client.Init(GrabSim_pb2.NUL())
'''
获取当前可加载的地图信息(地图名字地图尺寸)
'''
def AcquireAvailableMaps():
AvailableMaps = sim_client.AcquireAvailableMaps(GrabSim_pb2.NUL())
print(AvailableMaps)
'''
1根据mapID加载指定地图
2如果scene_num>1,则根据地图尺寸偏移后加载多个相同地图
3这样就可以在一个关卡中训练多个地图
'''
def SetWorld(map_id=0, scene_num=1):
print('------------------SetWorld----------------------')
world = sim_client.SetWorld(GrabSim_pb2.BatchMap(count=scene_num, mapID=map_id))
'''
返回场景的状态信息
1返回机器人的位置和旋转
2返回各个关节的名字和旋转
3返回场景中标记的物品信息(名字类型位置旋转)
4返回场景中行人的信息(名字位置旋转速度)
5返回机器人手指和双臂的碰撞信息
'''
def Observe(scene_id=0):
print('------------------show_env_info----------------------')
scene = sim_client.Observe(GrabSim_pb2.SceneID(value=scene_id))
print(
f"location:{[scene.location]}, rotation:{scene.rotation}\n",
f"joints number:{len(scene.joints)}, fingers number:{len(scene.fingers)}\n",
f"objects number: {len(scene.objects)}, walkers number: {len(scene.walkers)}\n"
f"timestep:{scene.timestep}, timestamp:{scene.timestamp}\n"
f"collision:{scene.collision}, info:{scene.info}")
'''
重置场景
1重置桌子的宽度和高度
2清除生成的行人和物品
3重置关节角度位置旋转
4清除碰撞信息
5重置场景中标记的物品
'''
def Reset(scene_id=0):
print('------------------Reset----------------------')
scene = sim_client.Reset(GrabSim_pb2.ResetParams(scene=scene_id))
print(scene)
# 如果场景支持调整桌子
# sim_client.Reset(GrabSim_pb2.ResetParams(scene = scene_id, adjust = True, height = 100.0, width = 100.0))"
"""
导航移动
yaw:机器人朝向;
velocity:速度,>0代表移动,<0代表瞬移,=0代表只查询;
dis:最终达到的位置距离目标点最远距离,如果超过此距离则目标位置不可达
"""
def navigation_move(cur_objs, scene_id=0, map_id=0):
print('------------------navigation_move----------------------')
scene = sim_client.Observe(GrabSim_pb2.SceneID(value=scene_id))
walk_value = [scene.location.X, scene.location.Y, scene.rotation.Yaw]
print("position:", walk_value)
objs_name_set = set()
if map_id == 11: # coffee
v_list = [[247,520], [247, 700], [270, 1100], [55, 940], [30, 900], [30, 520], [160, -165], [247, 0],[247, 520]]
else:
v_list = [[0.0, 0.0]]
for walk_v in v_list:
walk_v = walk_v + [scene.rotation.Yaw - 90, 200, 10]
print("walk_v", walk_v)
action = GrabSim_pb2.Action(scene=scene_id, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = sim_client.Do(action)
cur_objs, objs_name_set = camera.get_semantic_map(GrabSim_pb2.CameraName.Head_Segment, cur_objs, objs_name_set)
print(scene.info)
return cur_objs
if __name__ == '__main__':
map_id = 11 # 地图编号
scene_num = 1 # 场景数量
cur_objs = []
print('------------ 初始化加载场景 ------------')
Init()
AcquireAvailableMaps()
SetWorld(map_id, scene_num)
time.sleep(5.0)
for i in range(scene_num):
print('------------ 场景操作 ------------')
Observe(i)
Reset(i)
print('------------ 导航移动 ------------')
cur_objs = navigation_move(cur_objs, i, map_id)
print(cur_objs)

View File

@ -6,14 +6,20 @@ from robowaiter.llm_client.ask_llm import ask_llm
class DealChat(Act):
def __init__(self):
super().__init__()
self.chat_history = ""
def _update(self) -> ptree.common.Status:
# if self.scene.status?
chat = self.scene.state['chat_list'].pop()
self.chat_history += chat + '\n'
res_dict = ask_llm(chat)
res_dict = ask_llm(self.chat_history)
answer = res_dict["Answer"]
goal = eval(res_dict["Goal"])
self.chat_history += answer + '\n'
goal = res_dict["Goal"]
if goal and "{" not in goal:
goal = {str(goal)}
if goal is not None:
print(f'goal{goal}')

View File

@ -4,6 +4,8 @@ import requests
import urllib3
from robowaiter.utils import get_root_path
from robowaiter.llm_client.single_round import single_round
from robowaiter.llm_client.tool_api import run_conversation
########################################
# 该文件实现了与大模型的简单通信
########################################
@ -22,7 +24,9 @@ def ask_llm(question):
if question in test_questions_dict:
ans = test_questions_dict[question]
else:
ans = single_round(question)
ans = run_conversation(question, stream=False)
# ans = single_round(question)
print(f"大模型输出: {ans}")
return ans

View File

@ -1,6 +1,6 @@
import requests
import urllib3
from robowaiter.llm_client.tool_api import run_conversation
########################################
# 该文件实现了与大模型的简单通信、多轮对话输入end表示对话结束
########################################
@ -20,8 +20,8 @@ while k!='end':
user_dict={"role": "user","content":question_now}
data_memory.append(user_dict)
#print(data_memory)
response = requests.post(url, headers=headers, json={"messages":data_memory, "repetition_penalty": 1.0}, verify=False)
answer=response.json()['choices'][n]['message']['content']
response = run_conversation(str(data_memory))
answer=str(response)
print(answer)
assistant_dict={"role": "assistant","content":answer}
data_memory.append(assistant_dict)

View File

@ -3,8 +3,8 @@ import json
import openai
from colorama import init, Fore
from loguru import logger
from tool_register import get_tools, dispatch_tool
import json
from robowaiter.llm_client.tool_register import get_tools, dispatch_tool
import requests
import json
@ -29,22 +29,21 @@ def get_response(**kwargs):
functions = get_tools()
def run_conversation(query: str, stream=False, functions=None, max_retry=5):
def run_conversation(query: str, stream=False, max_retry=5):
params = dict(model="chatglm3", messages=[{"role": "user", "content": query}], stream=stream)
if functions:
params["functions"] = functions
params["functions"] = functions
response = get_response(**params)
for _ in range(max_retry):
if response["choices"][0]["message"].get("function_call"):
function_call = response["choices"][0]["message"]["function_call"]
logger.info(f"Function Call Response: {function_call}")
if "sub_task" in function_call["name"]:
return {
"Answer": "好的",
"Goal": function_call["arguments"]
"Goal": json.loads(function_call["arguments"])["goal"]
}
logger.info(f"Function Call Response: {function_call}")
function_args = json.loads(function_call["arguments"])
tool_response = dispatch_tool(function_call["name"], function_args)
logger.info(f"Tool Call Response: {tool_response}")
@ -71,5 +70,5 @@ def run_conversation(query: str, stream=False, functions=None, max_retry=5):
if __name__ == "__main__":
query = "关掉空调"
print(run_conversation(query, functions=functions, stream=False))
query = "可以带我去吗"
print(run_conversation(query, stream=False))

View File

@ -40,7 +40,7 @@ def register_tool(func: callable):
"params": tool_params
}
print("[registered tool] " + pformat(tool_def))
# print("[registered tool] " + pformat(tool_def))
_TOOL_HOOKS[tool_name] = func
_TOOL_DESCRIPTIONS[tool_name] = tool_def
@ -126,13 +126,14 @@ def get_tools() -> dict:
@register_tool
def create_sub_task(
goal: Annotated[str, '子任务需要达到的目标条件集合', True]
goal: Annotated[str, '子任务需要达到的目标条件集合,例如{On(Coffee,Bar)}{At(Robot,Table1)}{Is(AC,Off)}', True]
) -> str:
"""
当需要完成具身任务如做咖啡拿放物体扫地前往某位置调用该函数根据用户的提示进行意图理解生成子任务的目标状态集合 `goal`以一阶逻辑的形式表示用户意图
做一杯咖啡,`goal`={On(Coffee,Bar)},
前往一号桌,`goal`={At(Robot,Table1)},
打开空调,`goal`={Is(AC,On)},
做一杯咖啡,则该函数的参数为 "On(Coffee,Bar)",
前往一号桌,则该函数的参数为 "At(Robot,Table1)",
打开空调,则该函数的参数为 "Is(AC,On)",
关空调,则该函数的参数为 "Is(AC,Off)",
"""
return goal
@ -142,7 +143,7 @@ def get_object_info(
obj: Annotated[str, '需要获取信息的物体名称', True]
) -> str:
"""
获取场景中指定物体 `object` 的信息
获取场景中指定物体 `object` 的信息如果`object` 是一个地点例如洗手间地方则输出如果`object`是一个咖啡则输出
"""
near_object = None
if obj == "Table":

View File

@ -335,6 +335,23 @@ class Scene:
)
)
# def walker_bubble(self, message):
# status = self.status
# walker_name = status.walkers[0].name
# talk_content = walker_name + ":" + message
# self.control_robot_action(0, 0, 3, talk_content)
# def control_robot_action(self, scene_id=0, type=0, action=0, message="你好"):
# print('------------------control_robot_action----------------------')
# scene = stub.ControlRobot(
# GrabSim_pb2.ControlInfo(scene=scene_id, type=type, action=action, content=message))
# if (str(scene.info).find("Action Success") > -1):
# print(scene.info)
# return True
# else:
# print(scene.info)
# return False
def animation_control(self, animation_type):
# animation_type: 1:make coffee 2: pour water 3: grab food 4: mop floor 5: clean table
scene = stub.ControlRobot(

View File

@ -16,14 +16,17 @@ class SceneGQA(Scene):
super().__init__(robot)
# 在这里加入场景中发生的事件, (事件发生的时间,事件函数)
self.event_list = [
(5, self.create_chat_event("给我一杯咖啡")),
(20, self.create_chat_event("我要拿铁")),
(40, self.create_chat_event("再来一杯")),
(5, self.create_chat_event("洗手间在哪里")),
(12, self.create_chat_event("可以带我去吗")),
]
def _reset(self):
self.add_walker(1085, 2630, 220)
self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)])
self.clean_walker()
self.add_walker(50, 500, 0)
self.walker_bubble("洗手间在哪里")
# self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)])
def _run(self):
@ -39,4 +42,4 @@ if __name__ == '__main__':
# create task
task = SceneGQA(robot)
task.reset()
task.run()
task.run()

View File

@ -46,4 +46,4 @@ if __name__ == '__main__':
# create task
task = SceneOT(robot)
task.reset()
task.run()
task.run()

View File

@ -0,0 +1,44 @@
"""
人提出请求机器人完成任务
1. 做咖啡固定动画接收到做咖啡指令走到咖啡机拿杯子操作咖啡机取杯子送到客人桌子上
2. 倒水
3. 夹点心
具体描述设计一套点单规则如菜单包含咖啡点心等按照规则拟造随机的订单在收到订单后通过大模型让机器人输出合理的备餐计划并尝试在模拟环境中按照这个规划实现任务
"""
# todo: 接收点单信息,大模型生成任务规划
from robowaiter.scene.scene import Scene
class SceneOT(Scene):
def __init__(self, robot):
super().__init__(robot)
# 在这里加入场景中发生的事件
self.event_list = [
# (5,self.create_chat_event("做一杯咖啡")),
(5,self.create_chat_event("感觉有点冷,可以关一下空调吗")),
]
def _reset(self):
self.add_walker(50, 300, 0)
# self.add_walker(1085, 2630, 220)
# self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)])
def _run(self):
pass
if __name__ == '__main__':
import os
from robowaiter.robot.robot import Robot
robot = Robot()
# create task
task = SceneOT(robot)
task.reset()
task.run()