Merge remote-tracking branch 'origin/main'

This commit is contained in:
wuziji 2023-11-14 16:18:23 +08:00
commit f73245c13a
22 changed files with 510 additions and 88 deletions

View File

@ -12,3 +12,4 @@ pytorch==1.11.0
torchvision==0.12.0
torchaudio==0.11.0
cudatoolkit=11.3
loguru

View File

@ -3,18 +3,7 @@ from robowaiter.behavior_lib._base.Behavior import Bahavior
class Act(Bahavior):
print_name_prefix = "act "
type = 'Act'
all_place = {'Bar', 'Bar2', 'WaterTable', 'CoffeeTable', 'Table1', 'Table2', 'Table3'}
all_object = {'Coffee', 'Water', 'Dessert', 'Softdrink', 'BottledDrink', 'Yogurt', 'ADMilk', 'MilkDrink', 'Milk',
'VacuumCup'}
place_xyz_dic={
'Bar': (247.0, 520.0, 100.0),
'Bar2': (240.0, 40.0, 70.0),
'WaterTable':(-70.0, 500.0, 107),
'CoffeeTable':(247.0, 520.0, 100.0), # 位置需要更改!!!
'Table1': (247.0, 520.0, 100.0),# 位置需要更改!!!
'Table2': (-55.0, 0.0, 107),
'Table3':(-55.0, 150.0, 107)
}
def __init__(self,*args):
super().__init__(*args)
self.info = self.get_info(*args)

View File

@ -13,7 +13,19 @@ class Bahavior(ptree.behaviour.Behaviour):
'''
scene = None
print_name_prefix = ""
all_place = {'Bar', 'Bar2', 'WaterTable', 'CoffeeTable', 'Table1', 'Table2', 'Table3'}
all_object = {'Coffee', 'Water', 'Dessert', 'Softdrink', 'BottledDrink', 'Yogurt', 'ADMilk', 'MilkDrink', 'Milk',
'VacuumCup'}
place_xyz_dic={
'Bar': (247.0, 520.0, 100.0),
'Bar2': (240.0, 40.0, 70.0),
'WaterTable':(-70.0, 500.0, 107),
'CoffeeTable':(247.0, 520.0, 100.0), # 位置需要更改!!!
'Table1': (247.0, 520.0, 100.0),# 位置需要更改!!!
'Table2': (-55.0, 0.0, 107),
'Table3':(-55.0, 150.0, 107)
}
@classmethod
def get_ins_name(cls,*args):
name = cls.__name__
@ -24,10 +36,7 @@ class Bahavior(ptree.behaviour.Behaviour):
return ins_name
def __init__(self,*args):
name = self.__class__.__name__
if len(args)>0:
name = f'{name}({",".join(list(args))})'
self.name = name
self.name = Bahavior.get_ins_name(*args)
#get valid args
# self.valid_arg_list = []
# lines = self.valid_params.strip().splitlines()

View File

@ -28,13 +28,13 @@ class Clean(Act):
info["pre"]= {f'Holding(Nothing)'}
if arg == "Table1":
info["add"]= {f'Is(Table1,Clean)'}
info["del"] = {f'Is(Table1,Dirty)'}
info["del_set"] = {f'Is(Table1,Dirty)'}
elif arg == "Floor":
info["add"] = {f'Is(Floor,Clean)'}
info["del"] = {f'Is(Floor,Dirty)'}
info["del_set"] = {f'Is(Floor,Dirty)'}
elif arg == "Chairs":
info["add"] = {f'Is(Chairs,Clean)'}
info["del"] = {f'Is(Chairs,Dirty)'}
info["del_set"] = {f'Is(Chairs,Dirty)'}
return info
def _update(self) -> ptree.common.Status:
@ -43,5 +43,5 @@ class Clean(Act):
self.scene.op_task_execute(self.op_type)
self.scene.state["condition_set"].union(self.info["add"])
self.scene.state["condition_set"] -= self.info["del"]
self.scene.state["condition_set"] -= self.info["del_set"]
return Status.RUNNING

View File

@ -26,7 +26,7 @@ class Make(Act):
def get_info(cls,arg):
info = {}
info["pre"]= {f'Holding(Nothing)'}
info['del'] = set()
info['del_set'] = set()
if arg == "Coffee":
info["add"]= {f'On(Coffee,CoffeeTable)'}
elif arg == "Water":
@ -41,5 +41,5 @@ class Make(Act):
self.scene.op_task_execute(self.op_type)
self.scene.state["condition_set"].union(self.info["add"])
self.scene.state["condition_set"] -= self.info["del"]
self.scene.state["condition_set"] -= self.info["del_set"]
return Status.RUNNING

View File

@ -14,23 +14,39 @@ class MoveTo(Act):
@classmethod
def get_info(self,arg):
def get_info(cls,arg):
info = {}
info['pre'] = set()
info["add"] = {f'At(Robot,{arg})'}
info["del"] = {f'At(Robot,{place})' for place in self.valid_args if place != arg}
info["del_set"] = {f'At(Robot,{place})' for place in cls.valid_args if place != arg}
return info
def _update(self) -> ptree.common.Status:
# self.scene.test_move()
navigator = Navigator(scene=self.scene, area_range=[-350, 600, -400, 1450], map=self.scene.state["map"]["2d"])
goal = self.scene.state['map']['obj_pos'][self.args[0]]
navigator.navigate(goal, animation=False)
# navigator = Navigator(scene=self.scene, area_range=[-350, 600, -400, 1450], map=self.scene.state["map"]["2d"])
# goal = self.scene.state['map']['obj_pos'][self.args[0]]
# navigator.navigate(goal, animation=False)
self.scene.state['condition_set'].add('At(Robot,Table)')
if self.target_place in Act.place_xyz_dic:
goal = Act.place_xyz_dic[self.target_place]
self.scene.walk_to(goal[0],goal[1])
else:
# 获取obj_id
for id,obj in enumerate(self.scene.objects):
if obj.name == self.target_place:
obj_id = id
break
obj_info = self.scene.objects[obj_id]
obj_x, obj_y, obj_z = obj_info.location.X, obj_info.location.Y, obj_info.location.Z
self.scene.walk_to(obj_x,obj_y)
# goal = self.scene.state['map']['obj_pos'][self.args[0]]
# self.scene.walk_to(goal[0],goal[1])
# self.scene.walk_to(goal[0],goal[1]) # X, Y, Yaw=None, velocity=200, dis_limit=0
self.scene.state["condition_set"] |= (self.info["add"])
self.scene.state["condition_set"] -= self.info["del_set"]
return ptree.common.Status.RUNNING

View File

@ -6,18 +6,18 @@ from robowaiter.behavior_lib._base.Behavior import Status
class PickUp(Act):
can_be_expanded = True
num_args = 1
valid_args = Act.all_object
def __init__(self, *args):
super().__init__(*args)
self.target_obj = self.args[0]
@classmethod
def get_info(self,arg):
def get_info(cls,arg):
info = {}
info["pre"] = {f'At(Robot,{arg})','Holding(Nothing)'}
info["add"] = {f'Holding({arg})'}
info["del"] = {f'Holding(Nothing)'}
info["del_set"] = {f'Holding(Nothing)'}
return info
@ -30,5 +30,5 @@ class PickUp(Act):
self.scene.op_task_execute(op_type, obj_id=obj_id)
self.scene.state["condition_set"].union(self.info["add"])
self.scene.state["condition_set"] -= self.info["del"]
self.scene.state["condition_set"] -= self.info["del_set"]
return Status.RUNNING

View File

@ -2,10 +2,13 @@ import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Act import Act
from robowaiter.behavior_lib._base.Behavior import Status
import itertools
class PutDown(Act):
can_be_expanded = True
num_args = 1
num_args = 2
valid_args = tuple(itertools.product(Act.all_object, Act.all_place))
def __init__(self, *args):
super().__init__(*args)
@ -14,11 +17,11 @@ class PutDown(Act):
@classmethod
def get_info(self,arg):
def get_info(cls,*arg):
info = {}
info["pre"] = {f'Holding({arg[0]})',f'At(Robot,{arg[1]})'}
info["add"] = {f'Holding(Nothing)',f'At({arg[0]},{arg[1]})'}
info["del"] = {f'Holding(Nothing)'}
info["del_set"] = {f'Holding(Nothing)'}
return info
@ -31,5 +34,5 @@ class PutDown(Act):
self.scene.op_task_execute(op_type, release_pos=release_pos)
self.scene.state["condition_set"].union(self.info["add"])
self.scene.state["condition_set"] -= self.info["del"]
self.scene.state["condition_set"] -= self.info["del_set"]
return Status.RUNNING

View File

@ -2,12 +2,19 @@ import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Act import Act
from robowaiter.behavior_lib._base.Behavior import Status
import itertools
class Clean(Act):
class Turn(Act):
can_be_expanded = True
num_args = 1
valid_args = [('AC','ACTemperature','TubeLight','HallLight','Curtain'),
('Off','On','Up','Down','Clean','Dirty')]
num_args = 2
valid_args = [('AC','TubeLight','HallLight','Curtain'),
('On','Off')]
valid_args = list(itertools.product(valid_args[0], valid_args[1]))
valid_args.extend([('ACTemperature','Up'),('ACTemperature','Down')])
valid_args = tuple(valid_args)
def __init__(self, *args):
super().__init__(*args)
@ -39,19 +46,26 @@ class Clean(Act):
self.op_type = 12
@classmethod
def get_info(cls,arg):
def get_info(cls,*arg):
info = {}
# 明天写
# info["pre"]= {f'Holding(Nothing)'}
# if arg == "Table1":
# info["add"]= {f'Is(Table1,Clean)'}
# info["del"] = {f'Is(Table1,Dirty)'}
# elif arg == "Floor":
# info["add"] = {f'Is(Floor,Clean)'}
# info["del"] = {f'Is(Floor,Dirty)'}
# elif arg == "Chairs":
# info["add"] = {f'Is(Chairs,Clean)'}
# info["del"] = {f'Is(Chairs,Dirty)'}
if arg[0]=="TubeLight" or arg[0]=="HallLight" or arg[0]=="Curtain" or arg[0]=='AC':
if arg[1]=="On":
info["pre"] = {f'Is({arg[0]},Off)'}
info["add"] = {f'Is({arg[0]},On)'}
info["del_set"] = {f'Is({arg[0]},Off)'}
elif arg[1]=="Off":
info["pre"] = {f'Is({arg[0]},On)'}
info["add"] = {f'Is({arg[0]},Off)'}
info["del_set"] = {f'Is({arg[0]},On)'}
elif arg[0]=='ACTemperature':
if arg[1]=="Up":
info["pre"] = {f'Is({arg[0]},Down)'}
info["add"] = {f'Is({arg[0]},Up)'}
info["del_set"] = {f'Is({arg[0]},Down)'}
elif arg[1]=="Donw":
info["pre"] = {f'Is({arg[0]},Up)'}
info["add"] = {f'Is({arg[0]},Down)'}
info["del_set"] = {f'Is({arg[0]},Up)'}
return info
def _update(self) -> ptree.common.Status:
@ -60,5 +74,5 @@ class Clean(Act):
self.scene.op_task_execute(self.op_type)
self.scene.state["condition_set"].union(self.info["add"])
self.scene.state["condition_set"] -= self.info["del"]
self.scene.state["condition_set"] -= self.info["del_set"]
return Status.RUNNING

View File

@ -1,13 +1,15 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Cond import Cond
import itertools
class At(Cond):
can_be_expanded = True
num_params = 2
valid_params = '''
Robot, Bar
'''
valid_args = list(itertools.product(('Robot','Customer'), tuple(Cond.all_object | Cond.all_place | {'Customer'})))
valid_args.remove(('Customer','Customer'))
valid_args = tuple(valid_args)
def __init__(self,*args):
super().__init__(*args)
@ -15,9 +17,8 @@ class At(Cond):
def _update(self) -> ptree.common.Status:
# if self.scene.status?
arg_str = self.arg_str
if f'At({arg_str})' in self.scene.state["condition_set"]:
if self.name in self.scene.state["condition_set"]:
return ptree.common.Status.SUCCESS
else:
return ptree.common.Status.FAILURE

View File

@ -0,0 +1,25 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Cond import Cond
class Holding(Cond):
can_be_expanded = True
num_params = 2
valid_args = [tuple(Cond.all_object|{'Nothing'})]
def __init__(self,*args):
super().__init__(*args)
def _update(self) -> ptree.common.Status:
# if self.scene.status?
if self.name in self.scene.state["condition_set"]:
return ptree.common.Status.SUCCESS
else:
return ptree.common.Status.FAILURE
# if self.scene.state['chat_list'] == []:
# return ptree.common.Status.FAILURE
# else:
# return ptree.common.Status.SUCCESS

View File

@ -0,0 +1,36 @@
import py_trees as ptree
from typing import Any
from robowaiter.behavior_lib._base.Cond import Cond
import itertools
class Is(Cond):
can_be_expanded = True
num_params = 2
valid_params1 = [('AC','TubeLight','HallLight','Curtain'),
('On','Off')]
valid_params2 = [('Table1','Floor','Chairs'),
('Clean','Dirty')]
valid_params3 = [('ACTemperature'),
('Up','Down')]
valid_args = list(itertools.product(valid_params1[0], valid_params1[1]))
valid_args.extend(list(itertools.product(valid_params2[0], valid_params2[1])))
valid_args.extend(list(itertools.product(valid_params3[0], valid_params3[1])))
valid_args = tuple(valid_args)
def __init__(self,*args):
super().__init__(*args)
def _update(self) -> ptree.common.Status:
# if self.scene.status?
if self.name in self.scene.state["condition_set"]:
return ptree.common.Status.SUCCESS
else:
return ptree.common.Status.FAILURE
# if self.scene.state['chat_list'] == []:
# return ptree.common.Status.FAILURE
# else:
# return ptree.common.Status.SUCCESS

View File

@ -5,9 +5,9 @@ from robowaiter.behavior_lib._base.Cond import Cond
class On(Cond):
can_be_expanded = True
num_params = 2
valid_params = '''
Robot, Bar
'''
valid_params = [tuple(Cond.all_object),
tuple(Cond.all_place)]
def __init__(self,*args):
super().__init__(*args)
@ -15,9 +15,8 @@ class On(Cond):
def _update(self) -> ptree.common.Status:
# if self.scene.status?
arg_str = self.arg_str
if f'At({arg_str})' in self.scene.state["condition_set"]:
if self.name in self.scene.state["condition_set"]:
return ptree.common.Status.SUCCESS
else:
return ptree.common.Status.FAILURE

View File

@ -1,2 +1 @@
{"测试VLM做一杯咖啡": {"Answer": "测试VLM做一杯咖啡", "Goal": "{\"At(Coffee,Bar)\"}"}, "测试VLN前往桌子": {"Answer": "测试VLN前往桌子", "Goal": "{\"At(Robot,Table)\"}"}, "测试VLM倒一杯水": {"Answer": "测试VLM倒一杯水", "Goal": "{\"At(Water,WaterTable)\"}"}}
{"测试VLM做一杯咖啡": {"Answer": "测试VLM做一杯咖啡", "Goal": "{\"At(Coffee,Bar)\"}"}, "测试VLM前往桌子2": {"Answer": " 测试VLM前往桌子2", "Goal": "{\"At(Robot,Table2)\"}"}, "测试AEM": {"Answer": "测试AEM", "Goal": "{\"EnvExplored()\"}"}, "测试VLM倒一杯水": {"Answer": "测试VLM倒一杯水", "Goal": "{\"On(Water,WaterTable)\"}"}, "测试VLM开空调": {"Answer": "测试VLM开空调", "Goal": "{\"Is(AC,On)\"}"}, "测试VLM关大厅灯": {"Answer": "测试VLM关大厅灯", "Goal": "{\"Is(HallLight,Off)\"}"}, "测试VLM关筒灯": {"Answer": "测试VLM关筒灯", "Goal": "{\"Is(TubeLight,Off)\"}"}, "测试VLM关窗帘": {"Answer": "测试VLM关窗", "Goal": "{\"Is(Curtain,Off)\"}"}, "测试VLM拖地": {"Answer": "测试VLM拖地", "Goal": "{\"Is(Floor,Clean)\"}"}, "测试VLM擦桌子": {"Answer": "测试VLM擦桌子", "Goal": "{\"Is(Table1,Clean)\"}"}, "测试VLM把冰红茶放到Table2": {"Answer": "测试VLM把冰红茶放到Table2", "Goal": "{\"On(BottledDrink,Table2)\"}"}}

View File

@ -1,5 +1,12 @@
Question,Answer,Goal
测试VLM做一杯咖啡,测试VLM做一杯咖啡,"{""At(Coffee,Bar)""}"
测试VLN前往桌子,测试VLN前往桌子,"{""At(Robot,Table)""}"
测试VLM前往桌子2, 测试VLM前往桌子2,"{""At(Robot,Table2)""}"
测试AEM,测试AEM,"{""EnvExplored()""}"
测试VLM倒一杯水,测试VLM倒一杯水,"{""At(Water,WaterTable)""}"
测试VLM倒一杯水,测试VLM倒一杯水,"{""On(Water,WaterTable)""}"
测试VLM开空调,测试VLM开空调,"{""Is(AC,On)""}"
测试VLM关大厅灯,测试VLM关大厅灯,"{""Is(HallLight,Off)""}"
测试VLM关筒灯,测试VLM关筒灯,"{""Is(TubeLight,Off)""}"
测试VLM关窗帘,测试VLM关窗,"{""Is(Curtain,Off)""}"
测试VLM拖地,测试VLM拖地,"{""Is(Floor,Clean)""}"
测试VLM擦桌子,测试VLM擦桌子,"{""Is(Table1,Clean)""}"
测试VLM把冰红茶放到Table2,测试VLM把冰红茶放到Table2,"{""On(BottledDrink,Table2)""}"

1 Question Answer Goal
2 测试VLM:做一杯咖啡 测试VLM:做一杯咖啡 {"At(Coffee,Bar)"}
3 测试VLN:前往桌子 测试VLM:前往桌子2 测试VLN:前往桌子 测试VLM:前往桌子2 {"At(Robot,Table)"} {"At(Robot,Table2)"}
4 测试AEM 测试AEM {"EnvExplored()"}
5 测试VLM:倒一杯水 测试VLM:倒一杯水 {"At(Water,WaterTable)"} {"On(Water,WaterTable)"}
6 测试VLM:开空调 测试VLM:开空调 {"Is(AC,On)"}
7 测试VLM:关大厅灯 测试VLM:关大厅灯 {"Is(HallLight,Off)"}
8 测试VLM:关筒灯 测试VLM:关筒灯 {"Is(TubeLight,Off)"}
9 测试VLM:关窗帘 测试VLM:关窗 {"Is(Curtain,Off)"}
10 测试VLM:拖地 测试VLM:拖地 {"Is(Floor,Clean)"}
11 测试VLM:擦桌子 测试VLM:擦桌子 {"Is(Table1,Clean)"}
12 测试VLM:把冰红茶放到Table2 测试VLM:把冰红茶放到Table2 {"On(BottledDrink,Table2)"}

View File

@ -0,0 +1,67 @@
# 使用curl命令测试返回
# curl -X POST "http://127.0.0.1:8000/v1/chat/completions" \
# -H "Content-Type: application/json" \
# -d "{\"model\": \"chatglm3-6b\", \"messages\": [{\"role\": \"system\", \"content\": \"You are ChatGLM3, a large language model trained by Zhipu.AI. Follow the user's instructions carefully. Respond using markdown.\"}, {\"role\": \"user\", \"content\": \"你好给我讲一个故事大概100字\"}], \"stream\": false, \"max_tokens\": 100, \"temperature\": 0.8, \"top_p\": 0.8}"
# 使用Python代码测返回
import requests
import json
import urllib3
########################################
# 该文件实现了与大模型的简单通信
########################################
# 忽略https的安全性警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
base_url = "https://45.125.46.134:25344" # 本地部署的地址,或者使用你访问模型的API地址
def create_chat_completion(model, messages, use_stream=False):
data = {
"model": model, # 模型名称
"messages": messages, # 会话历史
"stream": use_stream, # 是否流式响应
"max_tokens": 100, # 最多生成字数
"temperature": 0.8, # 温度
"top_p": 0.8, # 采样概率
}
response = requests.post(f"{base_url}/v1/chat/completions", json=data, stream=use_stream, verify=False)
if response.status_code == 200:
if use_stream:
# 处理流式响应
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')[6:]
try:
response_json = json.loads(decoded_line)
content = response_json.get("choices", [{}])[0].get("delta", {}).get("content", "")
print(content)
except:
print("Special Token:", decoded_line)
else:
# 处理非流式响应
decoded_line = response.json()
print(decoded_line)
content = decoded_line.get("choices", [{}])[0].get("message", "").get("content", "")
print(content)
else:
print("Error:", response.status_code)
return None
if __name__ == "__main__":
chat_messages = [
{
"role": "system",
"content": "You are ChatGLM3, a large language model trained by Zhipu.AI. Follow the user's instructions carefully. Respond using markdown.",
},
{
"role": "user",
"content": "你好给我讲一个故事大概100字"
}
]
create_chat_completion("chatglm3-6b", chat_messages, use_stream=False)

View File

@ -0,0 +1,84 @@
import json
import openai
from colorama import init, Fore
from loguru import logger
from tool_register import get_tools, dispatch_tool
import requests
import json
import urllib3
init(autoreset=True)
########################################
# 该文件实现了与大模型的通信以及工具调用
########################################
# 忽略https的安全性警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
base_url = "https://45.125.46.134:25344" # 本地部署的地址,或者使用你访问模型的API地址
def get_response(**kwargs):
data = kwargs
response = requests.post(f"{base_url}/v1/chat/completions", json=data, stream=data["stream"], verify=False)
decoded_line = response.json()
return decoded_line
functions = get_tools()
def run_conversation(query: str, stream=False, functions=None, max_retry=5):
params = dict(model="chatglm3", messages=[{"role": "user", "content": query}], stream=stream)
if functions:
params["functions"] = functions
response = get_response(**params)
for _ in range(max_retry):
if response["choices"][0]["message"].get("function_call"):
function_call = response["choices"][0]["message"]["function_call"]
logger.info(f"Function Call Response: {function_call}")
function_args = json.loads(function_call["arguments"])
tool_response = dispatch_tool(function_call["name"], function_args)
logger.info(f"Tool Call Response: {tool_response}")
params["messages"].append(response["choices"][0]["message"])
params["messages"].append(
{
"role": "function",
"name": function_call["name"],
"content": tool_response, # 调用函数返回结果
}
)
else:
reply = response["choices"][0]["message"]["content"]
logger.info(f"Final Reply: \n{reply}")
return
response = get_response(**params)
if __name__ == "__main__":
# chat_messages = [
# {
# "role": "system",
# "content": "You are ChatGLM3, a large language model trained by Zhipu.AI. Follow the user's instructions carefully. Respond using markdown.",
# },
# {
# "role": "user",
# "content": "你好给我讲一个故事大概100字"
# }
# ]
# create_chat_completion("chatglm3-6b", chat_messages, use_stream=False)
# query = "你是谁"
# run_conversation(query, stream=False)
#
# logger.info("\n=========== next conversation ===========")
query = "洗手间在哪儿"
run_conversation(query, functions=functions, stream=False)

View File

@ -0,0 +1,156 @@
import inspect
import traceback
from copy import deepcopy
from pprint import pformat
from types import GenericAlias
from typing import get_origin, Annotated
_TOOL_HOOKS = {}
_TOOL_DESCRIPTIONS = {}
def register_tool(func: callable):
tool_name = func.__name__
tool_description = inspect.getdoc(func).strip()
python_params = inspect.signature(func).parameters
tool_params = []
for name, param in python_params.items():
annotation = param.annotation
if annotation is inspect.Parameter.empty:
raise TypeError(f"Parameter `{name}` missing type annotation")
if get_origin(annotation) != Annotated:
raise TypeError(f"Annotation type for `{name}` must be typing.Annotated")
typ, (description, required) = annotation.__origin__, annotation.__metadata__
typ: str = str(typ) if isinstance(typ, GenericAlias) else typ.__name__
if not isinstance(description, str):
raise TypeError(f"Description for `{name}` must be a string")
if not isinstance(required, bool):
raise TypeError(f"Required for `{name}` must be a bool")
tool_params.append({
"name": name,
"description": description,
"type": typ,
"required": required
})
tool_def = {
"name": tool_name,
"description": tool_description,
"params": tool_params
}
print("[registered tool] " + pformat(tool_def))
_TOOL_HOOKS[tool_name] = func
_TOOL_DESCRIPTIONS[tool_name] = tool_def
return func
def dispatch_tool(tool_name: str, tool_params: dict) -> str:
if tool_name not in _TOOL_HOOKS:
return f"Tool `{tool_name}` not found. Please use a provided tool."
tool_call = _TOOL_HOOKS[tool_name]
try:
ret = tool_call(**tool_params)
except:
ret = traceback.format_exc()
return str(ret)
def get_tools() -> dict:
return deepcopy(_TOOL_DESCRIPTIONS)
# Tool Definitions
# @register_tool
# def random_number_generator(
# seed: Annotated[int, 'The random seed used by the generator', True],
# range: Annotated[tuple[int, int], 'The range of the generated numbers', True],
# ) -> int:
# """
# Generates a random number x, s.t. range[0] <= x < range[1]
# """
# if not isinstance(seed, int):
# raise TypeError("Seed must be an integer")
# if not isinstance(range, tuple):
# raise TypeError("Range must be a tuple")
# if not isinstance(range[0], int) or not isinstance(range[1], int):
# raise TypeError("Range must be a tuple of integers")
#
# import random
# return random.Random(seed).randint(*range)
# @register_tool
# def get_weather(
# city_name: Annotated[str, 'The name of the city to be queried', True],
# ) -> str:
# """
# Get the current weather for `city_name`
# """
#
# if not isinstance(city_name, str):
# raise TypeError("City name must be a string")
#
# key_selection = {
# "current_condition": ["temp_C", "FeelsLikeC", "humidity", "weatherDesc", "observation_time"],
# }
# import requests
# try:
# resp = requests.get(f"https://wttr.in/{city_name}?format=j1")
# resp.raise_for_status()
# resp = resp.json()
# ret = {k: {_v: resp[k][0][_v] for _v in v} for k, v in key_selection.items()}
# except:
# import traceback
# ret = "Error encountered while fetching weather data!\n" + traceback.format_exc()
#
# return str(ret)
# @register_tool
# def add(
# a: Annotated[int, '需要相加的第1个数', True],
# b: Annotated[int, '需要相加的第2个数', True]
# ) -> int:
# """
# 获取 `a` + `b` 的值
# """
#
# if (not isinstance(a, int)) or (not isinstance(b, int)):
# raise TypeError("相加的数必须为整数")
#
# return int(a+b)
@register_tool
def create_sub_task(
goal: Annotated[str, '用于子任务的目标状态集合', True]
) -> str:
"""
当需要完成具身任务如做咖啡拿放物体扫地前往某位置调用该函数根据用户的提示进行意图理解生成子任务的目标状态集合以一阶逻辑的形式来表示例如前往桌子的目标状态为{At(Robot,Table)}做咖啡的目标状态为{On(Coffee,Bar)}
"""
return goal
@register_tool
def find_near_object(
object: Annotated[str, '需要判断所在位置的物体', True]
) -> str:
"""
在场景中找到相邻的物体并说出 `object` 在输出物体的附近
"""
near_object = None
if object == "Table":
near_object = "Bar"
if object == "洗手间":
near_object = "大门"
return near_object
if __name__ == "__main__":
print(dispatch_tool("get_weather", {"city_name": "beijing"}))
print(get_tools())

View File

@ -418,10 +418,12 @@ class Scene:
def gen_obj(self,h=100):
# 4;冰红(盒) 5;酸奶 7:保温杯 9;冰红(瓶) 13:代语词典 14:cake 61:甜牛奶
type= 9 #9
scene = stub.Observe(GrabSim_pb2.SceneID(value=self.sceneID))
ginger_loc = [scene.location.X, scene.location.Y, scene.location.Z]
obj_list = [GrabSim_pb2.ObjectList.Object(x=ginger_loc[0] - 50, y=ginger_loc[1] - 40, z = h, roll=0, pitch=0, yaw=0, type=type)]
obj_list = [GrabSim_pb2.ObjectList.Object(x=ginger_loc[0] - 55, y=ginger_loc[1] - 40, z = 95, roll=0, pitch=0, yaw=0, type=9),
# GrabSim_pb2.ObjectList.Object(x=ginger_loc[0] - 50, y=ginger_loc[1] - 40, z=h, roll=0, pitch=0, yaw=0, type=9),
GrabSim_pb2.ObjectList.Object(x=340, y=960, z = 88, roll=0, pitch=0, yaw=0, type=7),
]
scene = stub.AddObjects(GrabSim_pb2.ObjectList(objects=obj_list, scene=self.sceneID))
time.sleep(1.0)
@ -434,11 +436,17 @@ class Scene:
# Robot
obj_x, obj_y, obj_z = obj_info.location.X, obj_info.location.Y, obj_info.location.Z
walk_v = [obj_x+50, obj_y] + [180, 180, 0]
if obj_y>=820 and obj_y<= 1200 and obj_x>=240 and obj_x<= 500: # 物品位于斜的抹布桌上 ([240,500],[820,1200])
walk_v = [obj_x+40, obj_y-35, 130, 180, 0]
obj_x += 3
obj_y += 2.5
# walk_v = [obj_x,obj_y-30,130, 180, 0]
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = stub.Do(action)
time.sleep(1.0)
# Finger
self.ik_control_joints(2, obj_x-9, obj_y+0.5, obj_z) # -10, 0, 0
self.ik_control_joints(2, obj_x-9, obj_y, obj_z) # -10, 0, 0
time.sleep(3.0)
# Grasp Obj
print('------------------grasp_obj----------------------')
@ -461,9 +469,6 @@ class Scene:
angle[0] = 15
angle[19] = -15
angle[20] = -30
for i in range(18,21):
print("name:",scene.joints[i].name,"angle:",scene.joints[i].angle)
# print("angle:",angle)
action = GrabSim_pb2.Action(scene=self.sceneID,action=GrabSim_pb2.Action.ActionType.RotateJoints, # 弯腰
values=angle)
scene = stub.Do(action)
@ -471,13 +476,19 @@ class Scene:
def release_obj(self,release_pos):
print("------------------Move to Realese Position----------------------")
walk_v = [release_pos[i] for i in range(2)]
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v + [180,180,0])
walk_v = [release_pos[i] for i in range(2)] + [180,180,0]
if release_pos==[340.0, 900.0, 99.0]:
walk_v[2] = 130
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.WalkTo, values=walk_v)
scene = stub.Do(action)
print("------------------release_obj----------------------")
self.ik_control_joints(2, release_pos[0] - 80, release_pos[1], release_pos[2])
time.sleep(2.0)
self.robo_stoop_parallel()
if release_pos==[340.0, 900.0, 99.0]:
self.ik_control_joints(2, 300.0, 935, release_pos[2])
time.sleep(2.0)
else:
self.ik_control_joints(2, release_pos[0] - 80, release_pos[1], release_pos[2])
time.sleep(2.0)
self.robo_stoop_parallel()
action = GrabSim_pb2.Action(scene=self.sceneID, action=GrabSim_pb2.Action.ActionType.Release, values=[1])
scene = stub.Do(action)
@ -487,7 +498,7 @@ class Scene:
return True
# 执行过程:输出"开始(任务名)" -> 按步骤数执行任务 -> Robot输出成功或失败的对话
def op_task_execute(self,op_type,obj_id=0,yaw=180,release_pos=[240,-140]):
def op_task_execute(self,op_type,obj_id=0,release_pos=[240,-140]):
self.control_robot_action(0, 1, "开始"+self.op_dialog[op_type]) # 开始制作咖啡
if op_type in [13,14,15]: # 调整空调:13代表按开关,14升温,15降温
result = self.adjust_kongtiao(op_type)

View File

@ -11,11 +11,13 @@ class SceneVLM(Scene):
super().__init__(robot)
# 在这里加入场景中发生的事件, (事件发生的时间,事件函数)
self.event_list = [
(5, self.create_chat_event("测试VLM做一杯咖啡")),
# (5, self.create_chat_event("测试VLM做一杯咖啡")),
# (5, self.create_chat_event("测试VLM倒一杯水")),
(5, self.create_chat_event("测试VLM开空调")),
]
def _reset(self):
self.state["condition_set"] = {'At(Robot,Bar)','Holding(Nothing)','Is(AC,Off)'}
pass
def _run(self, op_type=7):
@ -30,9 +32,9 @@ class SceneVLM(Scene):
# self.gen_obj()
# self.op_task_execute(op_type, obj_id=0)
# # 原始吧台处:[247.0, 520.0, 100.0], 空调开关旁吧台:[240.0, 40.0, 70.0], 水杯桌:[-70.0, 500.0, 107]
# # 桌子1:[-55.0, 0.0, 107],桌子2:[-55.0, 150.0, 107]
# elif op_type == 17: self.op_task_execute(op_type, release_pos=[247.0, 520.0, 100.0])#[-55.0, 150.0, 107]
# else:
# # 桌子1:[-55.0, 0.0, 107],桌子2:[-55.0, 150.0, 107], 抹布桌:[340.0, 900.0, 98.0]
# if op_type == 17: self.op_task_execute(op_type, release_pos=[340.0, 900.0, 99.0]) #[325.0, 860.0, 100]
# if op_type not in [16,17]:
# self.move_task_area(op_type)
# self.op_task_execute(op_type)
pass

View File

@ -1,7 +1,7 @@
import os
from robowaiter import Robot, task_map
TASK_NAME = 'VLN'
TASK_NAME = 'VLM'
# create robot
project_path = "./robowaiter"

View File

@ -1,4 +1,7 @@
selector{
cond At(Robot,Table)
act MoveTo(Table)
cond Is(AC,On)
sequence{
cond Is(AC,Off)
act Turn(AC,On)
}
}