查询函数融入场景中

This commit is contained in:
ChenXL97 2023-11-18 22:30:14 +08:00
parent 1033b95781
commit 8bdf5a96b4
6 changed files with 81 additions and 37 deletions

View File

@ -9,7 +9,8 @@ class DealChat(Act):
self.chat_history = "" self.chat_history = ""
self.function_success = False self.function_success = False
self.func_map = { self.func_map = {
"create_sub_task": self.create_sub_task "create_sub_task": self.create_sub_task,
"get_object_info": self.get_object_info
} }
def _update(self) -> ptree.common.Status: def _update(self) -> ptree.common.Status:
@ -53,3 +54,19 @@ class DealChat(Act):
print("参数解析错误") print("参数解析错误")
self.scene.robot.expand_sub_task_tree(goal_set) self.scene.robot.expand_sub_task_tree(goal_set)
def get_object_info(self,**args):
try:
obj = args['obj']
self.function_success = True
except:
obj = None
print("参数解析错误")
near_object = "None"
if obj == "洗手间":
near_object = "大门"
return near_object

View File

@ -36,3 +36,10 @@ create_sub_task
抱歉,我这就去开空调和关窗帘。 抱歉,我这就去开空调和关窗帘。
create_sub_task create_sub_task
{"goal":"Is(AC,On),Is(Curtain,Off)"} {"goal":"Is(AC,On),Is(Curtain,Off)"}
请问洗手间在哪里?
洗手间在这附近
get_object_info
{"obj":"洗手间"}

View File

@ -29,16 +29,30 @@ root_path = get_root_path()
# load test questions # load test questions
file_path = os.path.join(root_path,"robowaiter/llm_client/data/fix_questions.txt") file_path = os.path.join(root_path,"robowaiter/llm_client/data/fix_questions.txt")
functions = get_tools()
fix_questions_dict = {} fix_questions_dict = {}
no_reply_functions = ["create_sub_task"]
with open(file_path,'r',encoding="utf-8") as f: with open(file_path,'r',encoding="utf-8") as f:
#读取所有行 #读取所有行
lines = f.read().strip() lines = f.read().strip()
sections = re.split(r'\n\s*\n', lines) sections = re.split(r'\n\s*\n', lines)
for s in sections: for s in sections:
x = s.split() x = s.split()
fix_questions_dict[x[0]] = x[1:] if len(x) == 2:
fix_questions_dict[x[0]] = {
"answer": x[1],
"function": None
}
else:
fix_questions_dict[x[0]] = {
"answer": x[1],
"function": x[2],
"args": x[3]
}
functions = get_tools()
role_system = [{ role_system = [{
"role": "system", "role": "system",
"content": "你是RoboWaiter,一个由HPCL团队开发的机器人服务员你在咖啡厅工作。接受顾客的指令并调用工具函数来完成各种服务任务。如果顾客问你们这里有什么或者想要点单你说我们咖啡厅提供咖啡点心酸奶等食物。如果顾客不需要你了你就回到吧台招待。如果顾客叫你去做某事你回复好的我马上去做这件事。", "content": "你是RoboWaiter,一个由HPCL团队开发的机器人服务员你在咖啡厅工作。接受顾客的指令并调用工具函数来完成各种服务任务。如果顾客问你们这里有什么或者想要点单你说我们咖啡厅提供咖啡点心酸奶等食物。如果顾客不需要你了你就回到吧台招待。如果顾客叫你去做某事你回复好的我马上去做这件事。",
@ -55,11 +69,12 @@ def new_response():
def parse_fix_question(question): def parse_fix_question(question):
response = new_response() response = new_response()
fix_ans = fix_questions_dict[question] fix_ans = fix_questions_dict[question]
if len(fix_ans)<=1: #简单对话 if not fix_ans['function']: #简单对话
message = {'role': 'assistant', 'content': fix_ans[0], 'name': None, message = {'role': 'assistant', 'content': fix_ans["answer"], 'name': None,
'function_call': None} 'function_call': None}
else: else:
reply, func,args = fix_ans func = fix_ans["function"]
args = fix_ans["args"]
# tool_response = dispatch_tool(function_call["name"], json.loads(args)) # tool_response = dispatch_tool(function_call["name"], json.loads(args))
# logger.info(f"Tool Call Response: {tool_response}") # logger.info(f"Tool Call Response: {tool_response}")
message = {'role': 'assistant', message = {'role': 'assistant',
@ -115,7 +130,7 @@ def deal_response(response, history, func_map=None ):
} }
history.append(t) history.append(t)
return function_call["name"], return_message return function_call["name"], tool_response
else: else:
return_message = response["choices"][0]["message"] return_message = response["choices"][0]["message"]
@ -133,8 +148,14 @@ def ask_llm(question,history, func_map=None, retry=3):
function_call,result = deal_response(response, history, func_map) function_call,result = deal_response(response, history, func_map)
if function_call: if function_call:
if question in fix_questions_dict: if question in fix_questions_dict:
reply = fix_questions_dict[question][0] if fix_questions_dict[question]['function'] in no_reply_functions:
result = single_round(reply,"你是机器人服务员,请把以下句子换一种表述方式对顾客说,但是意思不变,尽量简短:\n") reply = fix_questions_dict[question]["answer"]
result = single_round(reply,
"你是机器人服务员,请把以下句子换一种表述方式对顾客说,但是意思不变,尽量简短:\n")
else:
reply = fix_questions_dict[question]["answer"]
result = single_round(f"你是机器人服务员,顾客想知道{question}, 你的具身场景查询返回的是{result},请把按照以下句子对顾客说,{reply}, 尽量简短。\n")
message = {'role': 'assistant', 'content': result, 'name': None, message = {'role': 'assistant', 'content': result, 'name': None,
'function_call': None} 'function_call': None}
history.append(message) history.append(message)

View File

@ -158,31 +158,31 @@ def stop_serve(
# @register_tool @register_tool
# def get_object_info( def get_object_info(
# obj: Annotated[str, '需要获取信息的物体名称', True] obj: Annotated[str, '需要获取信息的物体名称', True]
# ) -> str: ) -> str:
# """ """
# 获取场景中指定物体 `object` 在哪里,不涉及到具体的执行任务 获取场景中指定物体 `object` 在哪里不涉及到具体的执行任务
# 如果`object` 是一个地点,例如洗手间,则输出大门。 如果`object` 是一个地点例如洗手间则输出大门
# 如果`object`是咖啡,则输出桌子,咖啡在桌子上。 如果`object`是咖啡则输出桌子咖啡在桌子上
# 如果`object` 是空桌子,则输出一号桌 如果`object` 是空桌子则输出一号桌
# """ """
# near_object = None near_object = None
# # if obj == "Table": # if obj == "Table":
# # near_object = "Bar" # near_object = "Bar"
# # if obj == "洗手间": if obj == "洗手间":
# # near_object = "大门" near_object = "大门"
# # if obj == "空桌子": # if obj == "空桌子":
# # near_object = "一号桌" # near_object = "一号桌"
# if obj in find_obj_utils.all_loc: # object是一个地点 if obj in find_obj_utils.all_loc: # object是一个地点
# mp = list(find_obj_utils.loc_map[obj]) mp = list(find_obj_utils.loc_map[obj])
# # near_object = random.choice(mp) # near_object = random.choice(mp)
# near_object = mp near_object = mp
# if obj in find_obj_utils.all_obj: # object是一个物品 if obj in find_obj_utils.all_obj: # object是一个物品
# near_ls = find_obj_utils.all_loc + find_obj_utils.all_obj near_ls = find_obj_utils.all_loc + find_obj_utils.all_obj
# near_object = random.choices(near_ls,k=5) near_object = random.choices(near_ls,k=5)
# return near_object return near_object
# @register_tool # @register_tool
# def find_location( # def find_location(

View File

@ -15,9 +15,8 @@ class SceneGQA(Scene):
def __init__(self, robot): def __init__(self, robot):
super().__init__(robot) super().__init__(robot)
# 在这里加入场景中发生的事件, (事件发生的时间,事件函数) # 在这里加入场景中发生的事件, (事件发生的时间,事件函数)
self.event_list = [ self.new_event_list = [
(5, self.create_chat_event("哪里有空桌子")), (3, self.customer_say, ("System","请问洗手间在哪里?"))
(12, self.create_chat_event("可以带我去吗")),
] ]
def _reset(self): def _reset(self):

View File