This commit is contained in:
Caiyishuai 2023-11-15 15:41:00 +08:00
commit 249100c9d6
18 changed files with 1075 additions and 36 deletions

View File

@ -1,3 +1,3 @@
from robowaiter.robot.robot import Robot
from robowaiter.scene import task_map
# from robowaiter.scene import task_map

View File

@ -269,6 +269,7 @@ if __name__ == '__main__':
print('------------ 自主探索 ------------')
cur_objs = semantic_map.navigation_move(cur_objs,i, map_id)
print("物品列表如下:")
print(cur_objs)

Binary file not shown.

Binary file not shown.

View File

@ -238,7 +238,9 @@ def save_obj_info(img_data, objs_name):
def get_semantic_map(camera, cur_objs, objs_name):
scene = Observe(0)
objs = scene.objects._values
# print(scene.objects)
objs = scene.objects
img_data = get_camera([camera])
show_image(img_data, scene)
objs_name = save_obj_info(img_data, objs_name)

View File

@ -0,0 +1,514 @@
import time
import grpc
import numpy as np
import GrabSim_pb2
import GrabSim_pb2_grpc
channel = grpc.insecure_channel(
"localhost:30001",
options=[
("grpc.max_send_message_length", 1024 * 1024 * 1024),
("grpc.max_receive_message_length", 1024 * 1024 * 1024),
],
)
stub = GrabSim_pb2_grpc.GrabSimStub(channel)
animation_step = [4, 5, 7, 3, 3]
loc_offset = [-700, -1400]
def init_world(scene_num=1, mapID=11):
stub.SetWorld(GrabSim_pb2.BatchMap(count=scene_num, mapID=mapID))
time.sleep(3) # wait for the map to load
def image_extract(camera_data):
image = camera_data.images[0]
return np.frombuffer(image.data, dtype=image.dtype).reshape(
(image.height, image.width, image.channels)
)
class Scene:
robot = None
event_list = []
show_bubble = False
default_state = {
"map": {
"2d": None,
"obj_pos": {}
},
"chat_list": [], # 未处理的顾客的对话, (顾客的位置,顾客对话的内容)
"sub_goal_list": [], # 子目标列表
"status": None, # 仿真器中的观测信息,见下方详细解释
"condition_set": set()
}
"""
status:
location: Dict[X: float, Y: float]
rotation: Dict[Yaw: float]
joints: List[Dict[name: str, location: Dict[X: float, Y: float, Z: float]]]
fingers: List[Dict[name: str, location: List[3 * Dict[X: float, Y: float, Z: float]]]]
objects[:-1]: List[Dict[name: str, location: Dict[X: float, Y: float, Z: float]]]
objects[-1]: Dict[name: "Hand", boxes: List[Dict[diagonals: List[4 * Dict[X0: float, Y0: float, Z0: float, X1: float, Y1: float, Z1: float]]]]]
walkers: List[name: str, pose: Dict[X: float, Y: float, Yaw: float], speed: float, target: Dict[X: float, Y: float, Yaw: float]]
timestamp: int, timestep: int
collision: str, info: str
"""
def __init__(self,robot=None, sceneID=0):
self.sceneID = sceneID
self.use_offset = False
self.start_time = time.time()
self.time = 0
self.sub_task_seq = None
# init robot
if robot:
robot.set_scene(self)
robot.load_BT()
self.robot = robot
# myx op
# 1-7 正常执行, 8-10 控灯操作移动到6, 11-12窗帘操作不需要移动,
self.op_dialog = ["","制作咖啡","倒水","夹点心","拖地","擦桌子","开筒灯","搬椅子", # 1-7
"关筒灯","开大厅灯","关大厅灯","关闭窗帘","打开窗帘", # 8-12
"调整空调开关","调高空调温度","调低空调温度", # 13-15
"抓握物体","放置物体"] # 16-17
self.op_act_num = [0,3,4,6,3,2,0,1,
0,0,0,0,0,
0,0,0,
0,0]
self.op_v_list = [[[0.0,0.0]],[[250.0, 310.0]],[[-70.0, 480.0]],[[250.0, 630.0]],[[-70.0, 740.0]],[[260.0, 1120.0]],[[300.0, -220.0]],
[[0.0, -70.0]]]
self.op_typeToAct = {8:[6,2],9:[6,3],10:[6,4],11:[8,1],12:[8,2]}
# 空调面板位置
self.obj_loc = [[300.5, -140.0,114]]
def reset(self):
# 基类reset默认执行仿真器初始化操作
self.reset_sim()
# reset state
self.state = self.default_state
print("场景初始化完成")
self._reset()
self.running = True
def run(self):
# 基类run
self._run()
# 运行并由robot打印每步信息
while True:
self.step()
def step(self):
# 基类step默认执行行为树tick操作
self.time = time.time() - self.start_time
self.deal_event()
self._step()
self.robot.step()
def deal_event(self):
if len(self.event_list)>0:
next_event = self.event_list[0]
t,func = next_event
if self.time >= t:
print(f'event: {t}, {func.__name__}')
self.event_list.pop(0)
func()
def create_chat_event(self,sentence):
def customer_say():
print(f'顾客说:{sentence}')
if self.show_bubble:
self.chat_bubble(f'顾客说:{sentence}')
self.state['chat_list'].append(f'{sentence}')
return customer_say
@property
def status(self):
return stub.Observe(GrabSim_pb2.SceneID(value=self.sceneID))
def reset_sim(self):
# reset world
init_world()
stub.Reset(GrabSim_pb2.ResetParams(scene=self.sceneID))
def _reset(self):
# 场景自定义的reset
pass
def _run(self):
# 场景自定义的run
pass
def _step(self):
# 场景自定义的step
pass
def walker_control_generator(self, walkerID, autowalk, speed, X, Y, Yaw):
if self.use_offset:
X, Y = X + loc_offset[0], Y + loc_offset[1]
return GrabSim_pb2.WalkerControls.WControl(
id=walkerID,
autowalk=autowalk,
speed=speed,
pose=GrabSim_pb2.Pose(X=X, Y=Y, Yaw=Yaw),
)
def walk_to(self, X, Y, Yaw=None, velocity=200, dis_limit=0):
if self.use_offset:
X, Y = X + loc_offset[0], Y + loc_offset[1]
if Yaw is None:
Yaw = self.status.rotation.Yaw
v = [X, Y, Yaw - 90, velocity, dis_limit]
print(v)
action = GrabSim_pb2.Action(
scene=self.sceneID,
action=GrabSim_pb2.Action.ActionType.WalkTo,
values=v
)
scene_info = stub.Do(action)
return scene_info
def reachable_check(self, X, Y, Yaw):
if self.use_offset:
X, Y = X + loc_offset[0], Y + loc_offset[1]
navigation_info = stub.Do(
GrabSim_pb2.Action(
scene=self.sceneID,
action=GrabSim_pb2.Action.ActionType.WalkTo,
values=[X, Y, Yaw, 0, 0],
)
).info
if navigation_info == "Unreachable":
return False
else:
return True
def add_walker(self, X, Y, Yaw):
if self.use_offset:
X, Y = X + loc_offset[0], Y + loc_offset[1]
if self.reachable_check(X, Y, Yaw):
stub.AddWalker(
GrabSim_pb2.WalkerList(
walkers=[
GrabSim_pb2.WalkerList.Walker(
id=0,
pose=GrabSim_pb2.Pose(
X=X, Y=Y, Yaw=Yaw
), # Parameter id is useless
)
],
scene=self.sceneID,
)
)
def remove_walker(self, *args): # take single walkerID or a list of walkerIDs
remove_list = []
if isinstance(args[0], list):
remove_list = args[0]
else:
for walkerID in args:
# walkerID is the index of the walker in status.walkers.
# Since status.walkers is a list, some walkerIDs would change after removing a walker.
remove_list.append(walkerID)
stub.RemoveWalkers(GrabSim_pb2.RemoveList(IDs=remove_list, scene=self.sceneID))
def clean_walker(self):
stub.CleanWalkers(GrabSim_pb2.SceneID(value=self.sceneID))
def control_walker(self, control_list):
stub.ControlWalkers(
GrabSim_pb2.WalkerControls(controls=control_list, scene=self.sceneID)
)
def control_joints(self, angles):
stub.Do(
GrabSim_pb2.Action(
scene=self.sceneID,
action=GrabSim_pb2.Action.ActionType.RotateJoints,
values=angles,
)
)
def add_object(self, type, X, Y, Z, Yaw=0):
if self.use_offset:
X, Y = X + loc_offset[0], Y + loc_offset[1]
stub.AddObjects(
GrabSim_pb2.ObjectList(
objects=[
GrabSim_pb2.ObjectList.Object(x=X, y=Y, yaw=Yaw, z=Z, type=type)
],
scene=self.sceneID,
)
)
def remove_object(self, *args): # refer to remove_walker
remove_list = []
if isinstance(args[0], list):
remove_list = args[0]
else:
for objectID in args:
remove_list.append(objectID)
stub.RemoveObjects(GrabSim_pb2.RemoveList(IDs=remove_list, scene=self.sceneID))
def clean_object(self):
stub.CleanObjects(GrabSim_pb2.SceneID(value=self.sceneID))
def grasp(self, handID, objectID):
stub.Do(
GrabSim_pb2.Action(
scene=self.sceneID,
action=GrabSim_pb2.Action.ActionType.Grasp,
values=[handID, objectID],
)
)
def release(self, handID):
stub.Do(
GrabSim_pb2.Action(
scene=self.sceneID,
action=GrabSim_pb2.Action.ActionType.Release,
values=[handID],
)
)
def get_camera_color(self, image_only=True):
camera_data = stub.Capture(
GrabSim_pb2.CameraList(
cameras=[GrabSim_pb2.CameraName.Head_Color], scene=self.sceneID
)
)
if image_only:
return image_extract(camera_data)
else:
return camera_data
def get_camera_depth(self, image_only=True):
camera_data = stub.Capture(
GrabSim_pb2.CameraList(
cameras=[GrabSim_pb2.CameraName.Head_Depth], scene=self.sceneID
)
)
if image_only:
return image_extract(camera_data)
else:
return camera_data
def get_camera_segment(self, image_only=True):
camera_data = stub.Capture(
GrabSim_pb2.CameraList(
cameras=[GrabSim_pb2.CameraName.Head_Segment], scene=self.sceneID
)
)
if image_only:
return image_extract(camera_data)
else:
return camera_data
def chat_bubble(self, message):
stub.ControlRobot(
GrabSim_pb2.ControlInfo(
scene=self.sceneID, type=0, action=1, content=message
)
)
def animation_control(self, animation_type):
# animation_type: 1:make coffee 2: pour water 3: grab food 4: mop floor 5: clean table
scene = stub.ControlRobot(
GrabSim_pb2.ControlInfo(scene=self.sceneID, type=animation_type, action=1)
)
if scene.info == "action success":
for i in range(2, animation_step[animation_type - 1] + 1):
stub.ControlRobot(
GrabSim_pb2.ControlInfo(
scene=self.sceneID, type=animation_type, action=i
)
)
def animation_reset(self):
stub.ControlRobot(GrabSim_pb2.ControlInfo(scene=self.sceneID, type=0, action=0))
# 手指移动到指定位置
def ik_control_joints(self, handNum=2, x=30, y=40, z=80):
# print('------------------ik_control_joints----------------------')
# IK控制,双手, 1左手, 2右手; 暂时只动右手
HandPostureObject = [GrabSim_pb2.HandPostureInfos.HandPostureObject(handNum=handNum, x=x, y=y, z=z, roll=0, pitch=0, yaw=0),
# GrabSim_pb2.HandPostureInfos.HandPostureObject(handNum=1, x=0, y=0, z=0, roll=0, pitch=0, yaw=0),
]
temp = stub.GetIKControlInfos(GrabSim_pb2.HandPostureInfos(scene=self.sceneID, handPostureObjects=HandPostureObject))
# 移动到进行操作任务的指定地点
def move_task_area(self,op_type):
if op_type==11 or op_type==12: # 开关窗帘不需要移动
return
scene = stub.Observe(GrabSim_pb2.SceneID(value=self.sceneID))
walk_value = [scene.location.X, scene.location.Y, scene.rotation.Yaw]
if op_type < 8:
v_list = self.op_v_list[op_type]
if op_type>=8 and op_type<=10: # 控灯
v_list = self.op_v_list[6]
if op_type in [13,14,15]: # 空调
v_list = [[240, -140.0]] # KongTiao [300.5, -140.0] # 250
print("------------------move_task_area----------------------")
print("Current Position:", walk_value,"开始任务:",self.op_dialog[op_type])
for walk_v in v_list:
walk_v = walk_v + [scene.rotation.Yaw, 180, 0]
walk_v[2] = 0 if (op_type in [13,14,15]) else scene.rotation.Yaw # 空调操作朝向墙面
action = GrabSim_pb2.Action(
scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v
)
scene = stub.Do(action)
print("After Walk Position:",[scene.location.X, scene.location.Y, scene.rotation.Yaw])
# 相应的行动,由主办方封装
def control_robot_action(self, type=0, action=0, message="你好"):
scene = stub.ControlRobot(
GrabSim_pb2.ControlInfo(
scene=self.sceneID, type=type, action=action, content=message
)
)
if str(scene.info).find("Action Success") > -1:
print(scene.info)
return True
else:
print(scene.info)
return False
def adjust_kongtiao(self,op_type):
obj_loc = self.obj_loc[0][:]
obj_loc[2] -= 5
print("obj_loc:",obj_loc)
if op_type == 13: obj_loc[1] -= 2
if op_type == 14: obj_loc[1] -= 0
if op_type == 15: obj_loc[1] += 2
self.ik_control_joints(2, obj_loc[0], obj_loc[1], obj_loc[2])
time.sleep(3.0)
self.robo_recover()
return True
def gen_obj(self,h=100):
# 4;冰红(盒) 5;酸奶 7:保温杯 9;冰红(瓶) 13:代语词典
scene = stub.Observe(GrabSim_pb2.SceneID(value=self.sceneID))
ginger_loc = [scene.location.X, scene.location.Y, scene.location.Z]
obj_list = [GrabSim_pb2.ObjectList.Object(x=ginger_loc[0] - 50, y=ginger_loc[1] - 40, z = h, roll=0, pitch=0, yaw=0, type=9)]
scene = stub.AddObjects(GrabSim_pb2.ObjectList(objects=obj_list, scene=self.sceneID))
time.sleep(1.0)
def grasp_obj(self,obj_id,hand_id=1):
# Move to Obj
print('------------------moveTo_obj----------------------')
scene = stub.Observe(GrabSim_pb2.SceneID(value=self.sceneID))
obj_info = scene.objects[obj_id]
# Robot
obj_x, obj_y, obj_z = obj_info.location.X, obj_info.location.Y, obj_info.location.Z
walk_v = [obj_x+50, obj_y] + [180, 180, 0]
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = stub.Do(action)
time.sleep(1.0)
# Finger
self.ik_control_joints(2, obj_x-9, obj_y+0.5, obj_z) # -10, 0, 0
time.sleep(3.0)
# Grasp Obj
print('------------------grasp_obj----------------------')
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.Grasp, values=[hand_id, obj_id])
scene = stub.Do(action)
time.sleep(4)
return True
# robot的肢体恢复原位
def robo_recover(self):
action = GrabSim_pb2.Action(scene=self.sceneID,action=GrabSim_pb2.Action.ActionType.RotateJoints, # 恢复原位
values=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
scene = stub.Do(action)
def robo_stoop_parallel(self):
# 0-3是躯干4-6是脖子和头7-13是左胳膊14-20是右胳膊
scene = stub.Observe(GrabSim_pb2.SceneID(value=self.sceneID))
angle = [scene.joints[i].angle for i in range(21)]
angle[0] = 15
angle[19] = -15
angle[20] = -30
for i in range(18,21):
print("name:",scene.joints[i].name,"angle:",scene.joints[i].angle)
# print("angle:",angle)
action = GrabSim_pb2.Action(scene=self.sceneID,action=GrabSim_pb2.Action.ActionType.RotateJoints, # 弯腰
values=angle)
scene = stub.Do(action)
time.sleep(1.0)
def release_obj(self,release_pos):
print("------------------Move to Realese Position----------------------")
walk_v = [release_pos[i] for i in range(2)]
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v + [180,180,0])
scene = stub.Do(action)
print("------------------release_obj----------------------")
self.ik_control_joints(2, release_pos[0] - 80, release_pos[1], release_pos[2])
time.sleep(2.0)
self.robo_stoop_parallel()
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.Release, values=[1])
scene = stub.Do(action)
time.sleep(2.0)
self.robo_recover()
return True
# 执行过程:输出"开始(任务名)" -> 按步骤数执行任务 -> Robot输出成功或失败的对话
def op_task_execute(self,op_type,obj_id=0,yaw=180,release_pos=[240,-140]):
self.control_robot_action(0, 1, "开始"+self.op_dialog[op_type]) # 开始制作咖啡
if op_type in [13,14,15]: # 调整空调:13代表按开关,14升温,15降温
result = self.adjust_kongtiao(op_type)
elif op_type ==16:
result = self.grasp_obj(obj_id)
elif op_type ==17:
result = self.release_obj(release_pos)
elif op_type>=8:
result = self.control_robot_action(self.op_typeToAct[op_type][0], self.op_typeToAct[op_type][1])
print("result:",result)
else:
result = self.control_robot_action(op_type, 1) #
self.control_robot_action(0, 2)
if result:
if self.op_act_num[op_type]>0:
for i in range(2,2+self.op_act_num[op_type]):
self.control_robot_action(op_type,i)
self.control_robot_action(0, 2)
# self.control_robot_action(0, 1, "成功"+self.op_dialog[op_type])
# else:
# self.control_robot_action(0, 1, self.op_dialog[op_type]+"失败")
def test_move(self):
v_list = [[0, 880], [250, 1200], [-55, 750], [70, -200]]
scene = self.status
for walk_v in v_list:
walk_v = walk_v + [scene.rotation.Yaw - 90, 600, 100]
print("walk_v", walk_v)
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = stub.Do(action)
print(scene.info)

View File

@ -13,13 +13,16 @@ class DealChat(Act):
chat = self.scene.state['chat_list'].pop()
self.chat_history += chat + '\n'
res_dict = ask_llm(self.chat_history)
res_dict = ask_llm(chat)
answer = res_dict["Answer"]
self.chat_history += answer + '\n'
goal = res_dict["Goal"]
if goal and "{" not in goal:
goal = {str(goal)}
if goal:
if "{" not in goal:
goal = {str(goal)}
else:
goal=eval(goal)
if goal is not None:
print(f'goal{goal}')

View File

@ -1 +1 @@
{"测试VLM做一杯咖啡": {"Answer": "测试VLM做一杯咖啡", "Goal": "{\"On(Coffee,CoffeeTable)\"}"}, "测试VLM做一杯咖啡放到吧台上": {"Answer": "测试VLM做一杯咖啡放到吧台上", "Goal": "{\"On(Coffee,Bar)\"}"}, "测试VLN前往2号桌": {"Answer": "测试VLN前往2号桌", "Goal": "{\"At(Robot,Table2)\"}"}, "测试AEM": {"Answer": "测试AEM", "Goal": "{\"EnvExplored()\"}"}, "测试VLM倒一杯水": {"Answer": "测试VLM倒一杯水", "Goal": "{\"On(Water,WaterTable)\"}"}, "测试VLM开空调": {"Answer": "测试VLM开空调", "Goal": "{\"Is(AC,On)\"}"}, "测试VLM关空调": {"Answer": "测试VLM关空调", "Goal": "{\"Is(AC,Off)\"}"}, "测试VLM关大厅灯": {"Answer": "测试VLM关大厅灯", "Goal": "{\"Is(HallLight,Off)\"}"}, "测试VLM开大厅灯": {"Answer": "测试VLM开大厅灯", "Goal": "{\"Is(HallLight,On)\"}"}, "测试VLM关筒灯": {"Answer": "测试VLM关筒灯", "Goal": "{\"Is(TubeLight,Off)\"}"}, "测试VLM开筒灯": {"Answer": "测试VLM开筒灯", "Goal": "{\"Is(TubeLight,On)\"}"}, "测试VLM关窗帘": {"Answer": "测试VLM关窗帘", "Goal": "{\"Is(Curtain,Off)\"}"}, "测试VLM开窗帘": {"Answer": "测试VLM开窗帘", "Goal": "{\"Is(Curtain,On)\"}"}, "测试VLM拖地": {"Answer": "测试VLM拖地", "Goal": "{\"Is(Floor,Clean)\"}"}, "测试VLM擦桌子": {"Answer": "测试VLM擦桌子", "Goal": "{\"Is(Table1,Clean)\"}"}, "测试VLM整理椅子": {"Answer": "测试VLM整理椅子", "Goal": "{\"Is(Chairs,Clean)\"}"}, "测试VLM把冰红茶放到Table2": {"Answer": "测试VLM把冰红茶放到Table2", "Goal": "{\"On(BottledDrink,Table2)\"}"}, "我有点热,能开个空调吗?": {"Answer": "当然可以,我现在就开!", "Goal": "{\"Is(AC,On)\"}"}}
{"测试VLM做一杯咖啡": {"Answer": "测试VLM做一杯咖啡", "Goal": "{\"On(Coffee,CoffeeTable)\"}"}, "测试VLM做一杯咖啡放到吧台上": {"Answer": "测试VLM做一杯咖啡放到吧台上", "Goal": "{\"On(Coffee,Bar)\"}"}, "测试VLM做一杯咖啡放到水杯桌上并倒水": {"Answer": "测试VLM做一杯咖啡放到水杯桌上并倒水", "Goal": "{\"On(Coffee,WaterTable)\"}"}, "测试VLN前往2号桌": {"Answer": "测试VLN前往2号桌", "Goal": "{\"At(Robot,Table2)\"}"}, "测试AEM": {"Answer": "测试AEM", "Goal": "{\"EnvExplored()\"}"}, "测试VLM倒一杯水": {"Answer": "测试VLM倒一杯水", "Goal": "{\"On(Water,WaterTable)\"}"}, "测试VLM开空调": {"Answer": "测试VLM开空调", "Goal": "{\"Is(AC,On)\"}"}, "测试VLM关空调": {"Answer": "测试VLM关空调", "Goal": "{\"Is(AC,Off)\"}"}, "测试VLM关大厅灯": {"Answer": "测试VLM关大厅灯", "Goal": "{\"Is(HallLight,Off)\"}"}, "测试VLM开大厅灯": {"Answer": "测试VLM开大厅灯", "Goal": "{\"Is(HallLight,On)\"}"}, "测试VLM关筒灯": {"Answer": "测试VLM关筒灯", "Goal": "{\"Is(TubeLight,Off)\"}"}, "测试VLM开筒灯": {"Answer": "测试VLM开筒灯", "Goal": "{\"Is(TubeLight,On)\"}"}, "测试VLM关窗帘": {"Answer": "测试VLM关窗帘", "Goal": "{\"Is(Curtain,Off)\"}"}, "测试VLM开窗帘": {"Answer": "测试VLM开窗帘", "Goal": "{\"Is(Curtain,On)\"}"}, "测试VLM拖地": {"Answer": "测试VLM拖地", "Goal": "{\"Is(Floor,Clean)\"}"}, "测试VLM擦桌子": {"Answer": "测试VLM擦桌子", "Goal": "{\"Is(Table1,Clean)\"}"}, "测试VLM整理椅子": {"Answer": "测试VLM整理椅子", "Goal": "{\"Is(Chairs,Clean)\"}"}, "测试VLM把冰红茶放到Table2": {"Answer": "测试VLM把冰红茶放到Table2", "Goal": "{\"On(BottledDrink,Table2)\"}"}, "我有点热,能开个空调吗?": {"Answer": "当然可以,我现在就开!", "Goal": "{\"Is(AC,On)\"}"}, "可以带我去吗": {"Answer": "当然可以,前往一号桌", "Goal": "{\"At(Robot,Table1)\"}"}}

View File

@ -18,3 +18,4 @@ Question,Answer,Goal
测试VLM整理椅子,测试VLM整理椅子,"{""Is(Chairs,Clean)""}"
测试VLM把冰红茶放到Table2,测试VLM把冰红茶放到Table2,"{""On(BottledDrink,Table2)""}"
我有点热,能开个空调吗?,当然可以,我现在就开!,"{""Is(AC,On)""}"
可以带我去吗,当然可以,前往一号桌,"{""At(Robot,Table1)""}"

1 Question Answer Goal
18 测试VLM:整理椅子 测试VLM:整理椅子 {"Is(Chairs,Clean)"}
19 测试VLM:把冰红茶放到Table2 测试VLM:把冰红茶放到Table2 {"On(BottledDrink,Table2)"}
20 我有点热,能开个空调吗? 当然可以,我现在就开! {"Is(AC,On)"}
21 可以带我去吗 当然可以,前往一号桌 {"At(Robot,Table1)"}

View File

@ -24,3 +24,4 @@ with open(csv_file_path, mode='r', encoding='gbk') as csv_file, \
json_str = json.dumps(output_dict, ensure_ascii=False)
# 将JSON字符串写入JSONL文件并添加换行符
jsonl_file.write(json_str + '\n')
s

View File

@ -143,14 +143,18 @@ def get_object_info(
obj: Annotated[str, '需要获取信息的物体名称', True]
) -> str:
"""
获取场景中指定物体 `object` 的信息如果`object` 是一个地点例如洗手间地方则输出如果`object`是一个咖啡则输出
获取场景中指定物体 `object` 在哪里
如果`object` 是一个地点例如洗手间地方则输出
如果`object`是一个咖啡则输出
如果`object` 是空桌子则输出一号桌
"""
near_object = None
if obj == "Table":
near_object = "Bar"
if obj == "洗手间":
near_object = "大门"
if obj == "空桌子":
near_object = "一号桌"
return near_object

281
robowaiter/proto/camera.py Normal file
View File

@ -0,0 +1,281 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# enconding = utf8
import json
import string
import sys
import time
import grpc
sys.path.append('./')
sys.path.append('../')
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
import GrabSim_pb2_grpc
import GrabSim_pb2
channel = grpc.insecure_channel('localhost:30001', options=[
('grpc.max_send_message_length', 1024 * 1024 * 1024),
('grpc.max_receive_message_length', 1024 * 1024 * 1024)
])
sim_client = GrabSim_pb2_grpc.GrabSimStub(channel)
objects_dic = {}
'''
初始化卸载已经加载的关卡清除所有机器人
'''
def Init():
sim_client.Init(GrabSim_pb2.NUL())
'''
获取当前可加载的地图信息(地图名字地图尺寸)
'''
def AcquireAvailableMaps():
AvailableMaps = sim_client.AcquireAvailableMaps(GrabSim_pb2.NUL())
print(AvailableMaps)
'''
1根据mapID加载指定地图
2如果scene_num>1,则根据地图尺寸偏移后加载多个相同地图
3这样就可以在一个关卡中训练多个地图
'''
def SetWorld(map_id=0, scene_num=1):
print('------------------SetWorld----------------------')
world = sim_client.SetWorld(GrabSim_pb2.BatchMap(count=scene_num, mapID=map_id))
'''
返回场景的状态信息
1返回机器人的位置和旋转
2返回各个关节的名字和旋转
3返回场景中标记的物品信息(名字类型位置旋转)
4返回场景中行人的信息(名字位置旋转速度)
5返回机器人手指和双臂的碰撞信息
'''
def Observe(scene_id=0):
print('------------------show_env_info----------------------')
scene = sim_client.Observe(GrabSim_pb2.SceneID(value=scene_id))
# print(
# f"location:{[scene.location]}, rotation:{scene.rotation}\n",
# f"joints number:{len(scene.joints)}, fingers number:{len(scene.fingers)}\n",
# f"objects number: {len(scene.objects)}, walkers number: {len(scene.walkers)}\n"
# f"timestep:{scene.timestep}, timestamp:{scene.timestamp}\n"
# f"collision:{scene.collision}, info:{scene.info}")
return scene
'''
重置场景
1重置桌子的宽度和高度
2清除生成的行人和物品
3重置关节角度位置旋转
4清除碰撞信息
5重置场景中标记的物品
'''
def Reset(scene_id=0):
print('------------------Reset----------------------')
scene = sim_client.Reset(GrabSim_pb2.ResetParams(scene=scene_id))
print(scene)
# 如果场景支持调整桌子
# sim_client.Reset(GrabSim_pb2.ResetParams(scene = scene_id, adjust = True, height = 100.0, width = 100.0))"
'''
根据传入的部位名字获取相机数据
'''
def get_camera(part, scene_id=0):
print('------------------get_camera----------------------')
action = GrabSim_pb2.CameraList(cameras=part, scene=scene_id)
return sim_client.Capture(action)
'''
显示相机画面
'''
def show_image(img_data, scene):
print('------------------show_image----------------------')
im = img_data.images[0]
# 相机内参矩阵
in_matrix = np.array(
[[im.parameters.fx, 0, im.parameters.cx], [0, im.parameters.fy, im.parameters.cy], [0, 0, 1]])
# 相机外参矩阵
out_matrix = np.array(im.parameters.matrix).reshape((4, 4))
# # 旋转矩阵
# rotation_matrix = out_matrix[0:3, 0:3]
#
# # 平移矩阵
# translation_matrix = out_matrix[0:3, -1].reshape(3, 1)
# 像素坐标
# pixel_point = np.array([403, 212, 1]).reshape(3, 1)
pixel_x = 404
pixel_y = 212
depth = 369
# 将像素坐标转换为归一化设备坐标
normalized_x = (pixel_x - im.parameters.cx) / im.parameters.fx
normalized_y = (pixel_y - im.parameters.cy) / im.parameters.fy
# 将归一化设备坐标和深度值转换为相机坐标
camera_x = normalized_x * depth
camera_y = normalized_y * depth
camera_z = depth
# 构建相机坐标向量
camera_coordinates = np.array([camera_x, camera_y, camera_z, 1])
# print("物体相对相机坐标的齐次坐标: ", camera_coordinates)
# 将相机坐标转换为机器人底盘坐标
robot_coordinates = np.dot(out_matrix, camera_coordinates)[:3]
# print("物体的相对底盘坐标为:", robot_coordinates)
# 将物体相对机器人底盘坐标转为齐次坐标
robot_homogeneous_coordinates = np.array([robot_coordinates[0], -robot_coordinates[1], robot_coordinates[2], 1])
# print("物体的相对底盘的齐次坐标为:", robot_homogeneous_coordinates)
# 机器人坐标
X = scene.location.X
Y = scene.location.Y
Z = 0.0
# 机器人旋转信息
Roll = 0.0
Pitch = 0.0
Yaw = scene.rotation.Yaw
# 构建平移矩阵
T = np.array([[1, 0, 0, X],
[0, 1, 0, Y],
[0, 0, 1, Z],
[0, 0, 0, 1]])
# 构建旋转矩阵
Rx = np.array([[1, 0, 0, 0],
[0, np.cos(Roll), -np.sin(Roll), 0],
[0, np.sin(Roll), np.cos(Roll), 0],
[0, 0, 0, 1]])
Ry = np.array([[np.cos(Pitch), 0, np.sin(Pitch), 0],
[0, 1, 0, 0],
[-np.sin(Pitch), 0, np.cos(Pitch), 0],
[0, 0, 0, 1]])
Rz = np.array([[np.cos(np.radians(Yaw)), -np.sin(np.radians(Yaw)), 0, 0],
[np.sin(np.radians(Yaw)), np.cos(np.radians(Yaw)), 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]])
R = np.dot(Rz, np.dot(Ry, Rx))
# 构建机器人的变换矩阵
T_robot = np.dot(T, R)
# print(T_robot)
# 将物体的坐标从机器人底盘坐标系转换到世界坐标系
world_coordinates = np.dot(T_robot, robot_homogeneous_coordinates)[:3]
# print("物体的世界坐标:", world_coordinates)
# 世界偏移后的坐标
world_offest_coordinates = [world_coordinates[0] + 700, world_coordinates[1] + 1400, world_coordinates[2]]
# print("物体世界偏移的坐标: ", world_offest_coordinates)
# world_point = world_coordinates + np.array([])
# print("物体的世界坐标为:", )
# # 相对机器人的世界坐标
# world_point = rotation_matrix.T @ (in_matrix.T * 369 @ pixel_point - translation_matrix)
# print(world_point)
# print(in_matrix @ out_matrix @ obj_world)
#
d = np.frombuffer(im.data, dtype=im.dtype).reshape((im.height, im.width, im.channels))
plt.imshow(d, cmap="gray" if "depth" in im.name.lower() else None)
plt.show()
def save_obj_info(img_data, objs_name):
items = img_data.info.split(";")
dictionary = {}
for item in items:
key, value = item.split(":")
dictionary[int(key)] = value
im = img_data.images[0]
d = np.frombuffer(im.data, dtype=im.dtype).reshape((im.height, im.width, im.channels))
arr_flat = d.ravel()
for id in arr_flat:
if id not in dictionary:
print(id)
else:
objs_name.add(dictionary[id])
return objs_name
def get_semantic_map(camera, cur_objs, objs_name):
scene = Observe(0)
objs = scene.objects
img_data = get_camera([camera])
show_image(img_data, scene)
objs_name = save_obj_info(img_data, objs_name)
for obj_name in list(objs_name):
for obj in objs:
if obj.name == obj_name and obj not in cur_objs:
cur_objs.append(obj)
break
return cur_objs, objs_name
if __name__ == '__main__':
map_id = 11 # 地图编号
scene_num = 1 # 场景数量
cur_objs = []
print('------------ 初始化加载场景 ------------')
Init()
AcquireAvailableMaps()
SetWorld(map_id, scene_num)
time.sleep(5.0)
for i in range(scene_num):
print('------------ 场景操作 ------------')
scene = Observe(i)
Reset(i)
print('------------ 相机捕获 ------------')
Reset(i)
time.sleep(1.0)
# print(get_semantic_map(GrabSim_pb2.CameraName.Head_Segment,cur_objs))
for camera_name in [GrabSim_pb2.CameraName.Head_Depth]:
img_data = get_camera([camera_name], i)
show_image(img_data, scene)
# for camera_name in [GrabSim_pb2.CameraName.Waist_Color, GrabSim_pb2.CameraName.Waist_Depth]:
# img_data = get_camera([camera_name], i)
# show_image(img_data, 2)

View File

@ -0,0 +1,147 @@
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# enconding = utf8
import sys
import time
import grpc
# import camera
from robowaiter.proto import camera
sys.path.append('./')
sys.path.append('../')
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.axes_grid1 import make_axes_locatable
import GrabSim_pb2_grpc
import GrabSim_pb2
channel = grpc.insecure_channel('localhost:30001', options=[
('grpc.max_send_message_length', 1024 * 1024 * 1024),
('grpc.max_receive_message_length', 1024 * 1024 * 1024)
])
sim_client = GrabSim_pb2_grpc.GrabSimStub(channel)
'''
初始化卸载已经加载的关卡清除所有机器人
'''
def Init():
sim_client.Init(GrabSim_pb2.NUL())
'''
获取当前可加载的地图信息(地图名字地图尺寸)
'''
def AcquireAvailableMaps():
AvailableMaps = sim_client.AcquireAvailableMaps(GrabSim_pb2.NUL())
print(AvailableMaps)
'''
1根据mapID加载指定地图
2如果scene_num>1,则根据地图尺寸偏移后加载多个相同地图
3这样就可以在一个关卡中训练多个地图
'''
def SetWorld(map_id=0, scene_num=1):
print('------------------SetWorld----------------------')
world = sim_client.SetWorld(GrabSim_pb2.BatchMap(count=scene_num, mapID=map_id))
'''
返回场景的状态信息
1返回机器人的位置和旋转
2返回各个关节的名字和旋转
3返回场景中标记的物品信息(名字类型位置旋转)
4返回场景中行人的信息(名字位置旋转速度)
5返回机器人手指和双臂的碰撞信息
'''
def Observe(scene_id=0):
print('------------------show_env_info----------------------')
scene = sim_client.Observe(GrabSim_pb2.SceneID(value=scene_id))
print(
f"location:{[scene.location]}, rotation:{scene.rotation}\n",
f"joints number:{len(scene.joints)}, fingers number:{len(scene.fingers)}\n",
f"objects number: {len(scene.objects)}, walkers number: {len(scene.walkers)}\n"
f"timestep:{scene.timestep}, timestamp:{scene.timestamp}\n"
f"collision:{scene.collision}, info:{scene.info}")
'''
重置场景
1重置桌子的宽度和高度
2清除生成的行人和物品
3重置关节角度位置旋转
4清除碰撞信息
5重置场景中标记的物品
'''
def Reset(scene_id=0):
print('------------------Reset----------------------')
scene = sim_client.Reset(GrabSim_pb2.ResetParams(scene=scene_id))
print(scene)
# 如果场景支持调整桌子
# sim_client.Reset(GrabSim_pb2.ResetParams(scene = scene_id, adjust = True, height = 100.0, width = 100.0))"
"""
导航移动
yaw:机器人朝向;
velocity:速度,>0代表移动,<0代表瞬移,=0代表只查询;
dis:最终达到的位置距离目标点最远距离,如果超过此距离则目标位置不可达
"""
def navigation_move(cur_objs, scene_id=0, map_id=0):
print('------------------navigation_move----------------------')
scene = sim_client.Observe(GrabSim_pb2.SceneID(value=scene_id))
walk_value = [scene.location.X, scene.location.Y, scene.rotation.Yaw]
print("position:", walk_value)
objs_name_set = set()
if map_id == 11: # coffee
v_list = [[247,520], [247, 700], [270, 1100], [55, 940], [30, 900], [30, 520], [160, -165], [247, 0],[247, 520]]
else:
v_list = [[0.0, 0.0]]
for walk_v in v_list:
walk_v = walk_v + [scene.rotation.Yaw - 90, 200, 10]
print("walk_v", walk_v)
action = GrabSim_pb2.Action(scene=scene_id, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = sim_client.Do(action)
cur_objs, objs_name_set = camera.get_semantic_map(GrabSim_pb2.CameraName.Head_Segment, cur_objs, objs_name_set)
print(scene.info)
return cur_objs
if __name__ == '__main__':
map_id = 11 # 地图编号
scene_num = 1 # 场景数量
cur_objs = []
print('------------ 初始化加载场景 ------------')
Init()
AcquireAvailableMaps()
SetWorld(map_id, scene_num)
time.sleep(5.0)
for i in range(scene_num):
print('------------ 场景操作 ------------')
Observe(i)
Reset(i)
print('------------ 导航移动 ------------')
cur_objs = navigation_move(cur_objs, i, map_id)
print(cur_objs)

View File

@ -1,17 +1,17 @@
from .scene import Scene
from robowaiter.scene.tasks.AEM import SceneAEM
from robowaiter.scene.tasks.GQA import SceneGQA
from robowaiter.scene.tasks.VLN import SceneVLN
from robowaiter.scene.tasks.VLM import SceneVLM
from robowaiter.scene.tasks.Open_tasks import SceneOT
from robowaiter.scene.tasks.Auto_tasks import SceneAT
# from .scene import Scene
# from robowaiter.scene.tasks.AEM import SceneAEM
# from robowaiter.scene.tasks.GQA import SceneGQA
# from robowaiter.scene.tasks.VLN import SceneVLN
# from robowaiter.scene.tasks.VLM import SceneVLM
# from robowaiter.scene.tasks.Open_tasks import SceneOT
# from robowaiter.scene.tasks.Auto_tasks import SceneAT
task_map = {
"AEM": SceneAEM,
"GQA": SceneGQA,
"VLN": SceneVLN,
"VLM": SceneVLM,
"OT": SceneOT,
"AT": SceneAT,
}
# task_map = {
# "AEM": SceneAEM,
# "GQA": SceneGQA,
# "VLN": SceneVLN,
# "VLM": SceneVLM,
# "OT": SceneOT,
# "AT": SceneAT,
# }

View File

@ -2,11 +2,15 @@ import time
import grpc
import numpy as np
import matplotlib.pyplot as plt
from robowaiter.proto import camera
from robowaiter.proto import semantic_map
import math
from robowaiter.proto import GrabSim_pb2
from robowaiter.proto import GrabSim_pb2_grpc
channel = grpc.insecure_channel(
"localhost:30001",
options=[
@ -100,6 +104,11 @@ class Scene:
# 空调面板位置
self.obj_loc = [300.5, -140.0,114]
# AEM
self.visited = set()
self.all_frontier_list = set()
self.semantic_map = semantic_map
def reset(self):
# 基类reset默认执行仿真器初始化操作
@ -212,7 +221,7 @@ class Scene:
print('------------------add_walkers----------------------')
walker_list = []
for i in range(len(walker_loc)):
loc = walker_loc[i] + [0, 0, 100]
loc = walker_loc[i] + [0,0, 100]
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=loc)
scene = stub.Do(action)
print(scene.info)
@ -553,4 +562,74 @@ class Scene:
scene = stub.Do(action)
print(scene.info)
def navigation_move(self, cur_objs, objs_name_set, v_list, scene_id=0, map_id=11):
print('------------------navigation_move----------------------')
scene = stub.Observe(GrabSim_pb2.SceneID(value=scene_id))
walk_value = [scene.location.X, scene.location.Y, scene.rotation.Yaw]
print("position:", walk_value)
# if map_id == 11: # coffee
# v_list = [[0, 880], [250, 1200], [-55, 750], [70, -200]]
# else:
# v_list = [[0.0, 0.0]]
for walk_v in v_list:
walk_v = walk_v + [scene.rotation.Yaw - 90, 250, 60]
print("walk_v", walk_v)
action = GrabSim_pb2.Action(scene=scene_id, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = stub.Do(action)
cur_objs, objs_name_set = camera.get_semantic_map(GrabSim_pb2.CameraName.Head_Segment, cur_objs, objs_name_set)
# if scene.info == "Unreachable":
print(scene.info)
return cur_objs, objs_name_set
def isOutMap(self, pos, min_x=-350, max_x=600, min_y=-400, max_y=1450):
if pos[0] <= min_x or pos[0] >= max_x or pos[1] <= min_y or pos[1] >= max_y:
return True
return False
def real2map(self, x, y):
'''
实际坐标->地图坐标 (向下取整)
'''
# x = round((x - self.min_x) / self.scale_ratio)
# y = round((y - self.min_y) / self.scale_ratio)
x = math.floor((x + 350) / 5)
y = math.floor((y + 400) / 5)
return x, y
def explore(self, map, cur_pos, explore_range):
for i in range(cur_pos[0] - explore_range, cur_pos[0] + explore_range + 1):
for j in range(cur_pos[1] - explore_range, cur_pos[1] + explore_range + 1):
if self.isOutMap((i, j)):
continue
x, y = self.real2map(i, j)
if map[x, y] == 0:
self.visited.add((i, j))
for i in range(cur_pos[0] - explore_range, cur_pos[0] + explore_range + 1):
for j in range(cur_pos[1] - explore_range, cur_pos[1] + explore_range + 1):
if self.isOutMap((i, j)):
continue
x, y = self.real2map(i, j)
if map[x, y] == 0:
if self.isNewFrontier((i, j), map):
self.all_frontier_list.add((i, j))
if len(self.all_frontier_list) <= 400:
free_list = list(self.visited)
free_array = np.array(free_list)
print(f"探索完成!以下是场景中可以到达的点:{free_array};其余点均是障碍物不可达")
return None
return self.getNearestFrontier(cur_pos, self.all_frontier_list)
def isNewFrontier(self, pos, map):
around_nodes = [(pos[0], pos[1] + 1), (pos[0], pos[1] - 1), (pos[0] - 1, pos[1]), (pos[0] + 1, pos[1])]
for node in around_nodes:
x, y = self.real2map(node[0], node[1])
if node not in self.visited and map[x, y] == 0:
return True
if (pos[0], pos[1]) in self.all_frontier_list:
self.all_frontier_list.remove((pos[0], pos[1]))
return False

View File

@ -4,18 +4,18 @@
"""
from robowaiter.scene.scene import Scene
class SceneAEM(Scene):
def __init__(self, robot):
super().__init__(robot)
self.event_list = [
(5, self.create_chat_event("测试AEM")),
]
def _reset(self):
pass
def _run(self):
cur_objs = []
print('------------ 自主探索 ------------')
cur_objs = self.semantic_map.navigation_move(cur_objs, 0, 11)
print("物品列表如下:")
print(cur_objs)
pass
@ -29,4 +29,4 @@ if __name__ == '__main__':
# create task
task = SceneAEM(robot)
task.reset()
task.run()
task.run()

View File

@ -16,16 +16,16 @@ class SceneGQA(Scene):
super().__init__(robot)
# 在这里加入场景中发生的事件, (事件发生的时间,事件函数)
self.event_list = [
(5, self.create_chat_event("洗手间在哪里")),
(5, self.create_chat_event("哪里有空桌子")),
(12, self.create_chat_event("可以带我去吗")),
]
def _reset(self):
self.clean_walker()
# self.clean_walker()
self.add_walkers()
self.add_walker(50, 500, 0)
self.walker_bubble("洗手间在哪里")
# self.walker_bubble("洗手间在哪里")
# self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)])

View File

@ -23,7 +23,13 @@ class SceneOT(Scene):
]
def _reset(self):
self.add_walker(50, 300, 0)
scene = self.add_walkers([[50, 300, 0]])
# time.sleep(2.0)
# print("我有点热,能开个空调吗?")
print("scene.walkers:",scene.walkers)
cont = scene.walkers[0].name+":我有点热,能开个空调吗?"
self.control_robot_action(0,3,cont)
# self.add_walker(1085, 2630, 220)
# self.control_walker([self.walker_control_generator(0, False, 100, 755, 1900, 180)])