RoboWaiter/dataset/bt_expansion/bt_expansion.py

467 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import numpy as np
import copy
import time
#定义行动类,行动包括前提、增加和删除影响
class Action:
def __init__(self,name='anonymous action'):
self.pre=set()
self.add=set()
self.del_set=set()
self.name=name
def __str__(self):
return self.name
# 从状态随机生成一个行动
def generate_from_state(self,state,num):
for i in range(0,num):
if i in state:
if random.random() >0.5:
self.pre.add(i)
if random.random() >0.5:
self.del_set.add(i)
continue
if random.random() > 0.5:
self.add.add(i)
continue
if random.random() >0.5:
self.del_set.add(i)
def print_action(self):
print (self.pre)
print(self.add)
print(self.del_set)
#生成随机状态
def generate_random_state(num):
result = set()
for i in range(0,num):
if random.random()>0.5:
result.add(i)
return result
#从状态和行动生成后继状态
def state_transition(state,action):
if not action.pre <= state:
print ('error: action not applicable')
return state
new_state=(state | action.add) - action.del_set
return new_state
#叶结点
class Leaf:
def __init__(self,type,content):
self.type=type
self.content=content #conditionset or action
self.parent=None
self.parent_index=0
# tick 叶节点返回返回值以及对应的条件或行动对象self.content
def tick(self,state):
if self.type=='cond':
if self.content <= state:
return 'success',self.content
else:
return 'failure',self.content
if self.type=='act':
if self.content.pre<=state:
return 'running',self.content #action
else:
return 'failure',self.content
def __str__(self):
print( self.content)
return ''
def print_nodes(self):
print(self.content)
def count_size(self):
return 1
#可能包含控制结点的行为树
class ControlBT:
def __init__(self,type):
self.type=type
self.children=[]
self.parent=None
self.parent_index=0
def add_child(self,subtree_list):
for subtree in subtree_list:
self.children.append(subtree)
subtree.parent=self
subtree.parent_index=len(self.children)-1
# tick行为树根据不同控制结点逻辑tick子结点
def tick(self,state):
if len(self.children) < 1:
print("error,no child")
if self.type =='?':#选择结点,即或结点
for child in self.children:
val,obj=child.tick(state)
if val=='success':
return val,obj
if val=='running':
return val,obj
return 'failure','?fails'
if self.type =='>':#顺序结点,即与结点
for child in self.children:
val,obj=child.tick(state)
if val=='failure':
return val,obj
if val=='running':
return val,obj
return 'success', '>success'
if self.type =='act':#行动结点
return self.children[0].tick(state)
if self.type =='cond':#条件结点
return self.children[0].tick(state)
def getFirstChild(self):
return self.children[0]
def __str__(self):
print(self.type+'\n')
for child in self.children:
print (child)
return ''
def print_nodes(self):
print(self.type)
for child in self.children:
child.print_nodes()
# 递归统计树中结点数
def count_size(self):
result=1
for child in self.children:
result+= child.count_size()
return result
#本文所提出的完备规划算法
class BTalgorithm:
def __init__(self):
self.bt = None
self.nodes=[]
self.traversed=[]
self.conditions=[]
self.conditions_index=[]
#print (self.conditions_list[0])
def clear(self):
self.bt = None
self.nodes = []
self.traversed = []
self.conditions = []
self.conditions_index = []
#运行规划算法从初始状态、目标状态和可用行动计算行为树self.bt
def run_algorithm(self,start,goal,actions):
# 初始行为树只包含目标条件
self.bt = ControlBT(type='cond')
g_node = Leaf(type='cond', content=goal)
self.bt.add_child([g_node])
self.conditions.append(goal)
self.nodes.append(g_node) #condition node list
# 尝试在初始状态执行行为树
val, obj = self.bt.tick(start)
canrun = False
if val == 'success' or val == 'running':
canrun = True
# 循环扩展,直到行为树能够在初始状态运行
while not canrun:
index = -1
for i in range(0,len(self.nodes)):
if self.nodes[i].content in self.traversed:
continue
else:
c_node = self.nodes[i]
index = i
break
if index == -1:#树中结点扩展完毕,仍无法运行行为树,返回失败
print('Failure')
return False
#根据所选择条件结点扩展子树
subtree = ControlBT(type='?')
subtree.add_child([copy.deepcopy(c_node)])#子树首先保留所扩展结点
c = c_node.content#子树所扩展结点对应的条件一个文字的set
for i in range(0,len(actions)):#选择符合条件的行动,
#print("have action")
if not c & ( (actions[i].pre | actions[i].add)-actions[i].del_set) <=set():
#print ("pass add")
if (c - actions[i].del_set) == c:
#print("pass delete")
c_attr = (actions[i].pre | c )-actions[i].add
valid = True
for j in self.traversed:#剪枝操作
if j<=c_attr:
valid = False
break
if valid:
#print("pass prune")
# 构建行动的顺序结构
sequence_structure=ControlBT(type='>')
c_attr_node = Leaf(type='cond', content=c_attr)
a_node = Leaf(type='act', content=actions[i])
sequence_structure.add_child([c_attr_node,a_node])
# 将顺序结构添加到子树
subtree.add_child([sequence_structure])
self.nodes.append(c_attr_node)
# 将原条件结点c_node替换为扩展后子树subtree
parent_of_c = c_node.parent
parent_of_c.children[0] = subtree
#记录已扩展条件
self.traversed.append(c)
# 尝试在初始状态运行行为树
val, obj = self.bt.tick(start)
canrun = False
if val == 'success' or val == 'running':
canrun = True
return True
def print_solution(self):
print(len(self.nodes))
# for i in self.nodes:
# if isinstance(i,Node):
# print (i.content)
# else:
# print (i)
#所对比的基准算法,具体扩展细节有差异
class Weakalgorithm:
def __init__(self):
self.bt=None
self.nodes=[]
self.traversed=[]
self.conditions=[]
self.conditions_index=[]
self.danger=False
#print (self.conditions_list[0])
def clear(self):
self.bt=None
self.nodes = []
self.traversed = []
self.conditions = []
self.conditions_index = []
self.danger = False
def run_algorithm(self,start,goal,actions):
self.bt = ControlBT(type='cond')
g_node = Leaf(type='cond', content=goal)
self.bt.add_child([g_node])
# self.nodes.append(goal)
self.conditions.append(goal)
self.nodes.append(g_node) # condition node list
# nodes_start_index=0
# self.conditions_index.append(0)
val, obj = self.bt.tick(start)
canrun = False
if val == 'success' or val == 'running':
canrun = True
while not canrun:
#print ("loop begin")
index = -1
for i in range(0,len(self.nodes)):
if self.nodes[i].content in self.traversed:
continue
else:
c_node = self.nodes[i]
index = i
break
#print (index)
if index == -1:
print('Algorithm Failure, all conditions expanded')
return False
subtree = ControlBT(type='?')
subtree.add_child([copy.deepcopy(c_node)])
c = c_node.content
for i in range(0,len(actions)):
if not c & actions[i].add <= set():
if not (c - actions[i].del_set) == c:
danger = True
self.danger = True
#continue
c_attr = actions[i].pre
valid = True
if valid:
sequence_structure = ControlBT(type='>')
for j in c_attr:
if j in self.traversed:
continue
c_attr_node = Leaf(type='cond', content={j})
sequence_structure.add_child([c_attr_node])
if j in start:
continue
self.nodes.append(c_attr_node)
a_node = Leaf(type='act', content=actions[i])
sequence_structure.add_child([ a_node])
subtree.add_child([sequence_structure])
parent_of_c = c_node.parent
p_index = c_node.parent_index
parent_of_c.children[p_index] = subtree
self.traversed.append(c)
val, obj = self.bt.tick(start)
canrun = False
if val == 'success' or val == 'running':
canrun = True
return True
def print_solution(self):
print(len(self.nodes))
if __name__ == '__main__':
random.seed(1)
# 设置生成规划问题集的超参数:文字数、解深度、迭代次数
literals_num=10
depth = 10
iters= 10
total_tree_size = []
total_action_num = []
total_state_num = []
total_steps_num=[]
#fail_count=0
#danger_count=0
success_count =0
failure_count = 0
planning_time_total = 0.0
# 实验1000次
for count in range (0,1000):
# 生成一个规划问题,包括随机的状态和行动,以及目标状态
states = []
actions = []
start = generate_random_state(literals_num)
state = start
states.append(state)
#print (state)
for i in range (0,depth):
a = Action()
a.generate_from_state(state,literals_num)
if not a in actions:
actions.append(a)
state = state_transition(state,a)
if state in states:
pass
else:
states.append(state)
#print(state)
goal = states[-1]
state = start
for i in range (0,iters):
a = Action()
a.generate_from_state(state,literals_num)
if not a in actions:
actions.append(a)
state = state_transition(state,a)
if state in states:
pass
else:
states.append(state)
state = random.sample(states,1)[0]
# 选择测试本文算法btalgorithm或对比算法weakalgorithm
algo = BTalgorithm()
#algo = Weakalgorithm()
start_time = time.time()
if algo.run_algorithm(start, goal, list(actions)):#运行算法规划后行为树为algo.bt
total_tree_size.append( algo.bt.count_size()-1)
else:
print ("error")
end_time = time.time()
planning_time_total += (end_time-start_time)
#开始从初始状态运行行为树,测试
state=start
steps=0
val, obj = algo.bt.tick(state)#tick行为树obj为所运行的行动
while val !='success' and val !='failure':#运行直到行为树成功或失败
state = state_transition(state,obj)
val, obj = algo.bt.tick(state)
if(val == 'failure'):
print("bt fails at step",steps)
steps+=1
if(steps>=500):#至多运行500步
break
if not goal <= state:#错误解,目标条件不在执行后状态满足
#print ("wrong solution",steps)
failure_count+=1
else:#正确解,满足目标条件
#print ("right solution",steps)
success_count+=1
total_steps_num.append(steps)
algo.clear()
total_action_num.append(len(actions))
total_state_num.append(len(states))
print (success_count,failure_count)#算法成功和失败次数
print(np.mean(total_tree_size), np.std(total_tree_size, ddof=1))#1000次测试树大小
print (np.mean(total_steps_num),np.std(total_steps_num,ddof=1))
print (np.mean(total_state_num))#1000次问题的平均状态数
print (np.mean(total_action_num))#1000次问题的平均行动数
print(planning_time_total,planning_time_total/1000.0)
#print(total_state_num)
#casestudy begin 对应论文的case study包含三个行动的移动机械臂场景
# actions=[]
# a = Action(name='movebtob')
# a.pre={1,2}
# a.add={3}
# a.del_set={1,4}
# actions.append(a)
# a=Action(name='moveatob')
# a.pre={1}
# a.add={5,2}
# a.del_set={1,6}
# actions.append(a)
# a=Action(name='moveatoa')
# a.pre={7}
# a.add={8,2}
# a.del_set={7,6}
# actions.append(a)
#
# start = {1,7,4,6}
# goal={3}
# algo = BTalgorithm()
# algo.clear()
# algo.run_algorithm(start, goal, list(actions))
# state = start
# steps = 0
# val, obj = algo.bt.tick(state)
# while val != 'success' and val != 'failure':
# state = state_transition(state, obj)
# print (obj.name)
# val, obj = algo.bt.tick(state)
# if (val == 'failure'):
# print("bt fails at step", steps)
# steps += 1
# if not goal <= state:
# print ("wrong solution",steps)
# else:
# print ("right solution",steps)
# #algo.bt.print_nodes()
# print (algo.bt.count_size()-1)
# algo.clear()
#case study end