完成了完全最优行为树扩展算法

初始化传入 actions,运行时传入 goal,返回完全最优行为树的 ptml 表示
This commit is contained in:
Caiyishuai 2023-11-06 17:31:43 +08:00
parent 838eda13cc
commit effa653e30
8 changed files with 947 additions and 0 deletions

View File

@ -0,0 +1,95 @@
#叶结点
class Leaf:
def __init__(self,type,content,mincost):
self.type=type
self.content=content #conditionset or action
self.parent=None
self.parent_index=0
self.mincost=mincost
# tick 叶节点返回返回值以及对应的条件或行动对象self.content
def tick(self,state):
if self.type=='cond':
if self.content <= state:
return 'success',self.content
else:
return 'failure',self.content
if self.type=='act':
if self.content.pre<=state:
return 'running',self.content #action
else:
return 'failure',self.content
def __str__(self):
print( self.content)
return ''
def print_nodes(self):
print(self.content)
def count_size(self):
return 1
#可能包含控制结点的行为树
class ControlBT:
def __init__(self,type):
self.type=type
self.children=[]
self.parent=None
self.parent_index=0
def add_child(self,subtree_list):
for subtree in subtree_list:
self.children.append(subtree)
subtree.parent=self
subtree.parent_index=len(self.children)-1
# tick行为树根据不同控制结点逻辑tick子结点
def tick(self,state):
if len(self.children) < 1:
print("error,no child")
if self.type =='?':#选择结点,即或结点
for child in self.children:
val,obj=child.tick(state)
if val=='success':
return val,obj
if val=='running':
return val,obj
return 'failure','?fails'
if self.type =='>':#顺序结点,即与结点
for child in self.children:
val,obj=child.tick(state)
if val=='failure':
return val,obj
if val=='running':
return val,obj
return 'success', '>success'
if self.type =='act':#行动结点
return self.children[0].tick(state)
if self.type =='cond':#条件结点
return self.children[0].tick(state)
def getFirstChild(self):
return self.children[0]
def __str__(self):
print(self.type+'\n')
for child in self.children:
print (child)
return ''
def print_nodes(self):
print(self.type)
for child in self.children:
child.print_nodes()
# 递归统计树中结点数
def count_size(self):
result=1
for child in self.children:
result+= child.count_size()
return result

View File

@ -0,0 +1,59 @@
selector{
cond At(Table,Coffee)
selector{
cond At(Robot,Table), Holding(Coffee)
act PutDown(Table,Coffee)
}
selector{
cond At(Robot,Coffee), NotHolding, At(Robot,Table)
act PickUp(Coffee)
}
selector{
cond Available(Table), Holding(Coffee)
act MoveTo(Table)
}
selector{
cond At(Robot,Coffee), At(Robot,Table), Holding(VacuumCup)
act PutDown(Table,VacuumCup)
}
selector{
cond At(Robot,CoffeeMachine), NotHolding, At(Robot,Table)
act OpCoffeeMachine
}
selector{
cond At(Robot,Coffee), Available(Table), NotHolding
act PickUp(Coffee)
}
selector{
cond At(Robot,CoffeeMachine), At(Robot,Table), Holding(VacuumCup)
act PutDown(Table,VacuumCup)
}
selector{
cond Available(Table), Available(Coffee), NotHolding
act MoveTo(Coffee)
}
selector{
cond Available(Table), At(Robot,CoffeeMachine), NotHolding
act OpCoffeeMachine
}
selector{
cond Available(Table), Available(Coffee), At(Robot,Table), Holding(VacuumCup)
act PutDown(Table,VacuumCup)
}
selector{
cond Available(Table), NotHolding, Available(CoffeeMachine)
act MoveTo(CoffeeMachine)
}
selector{
cond Available(Table), Available(Coffee), Holding(VacuumCup)
act MoveTo(Table)
}
selector{
cond Available(Table), Available(CoffeeMachine), At(Robot,Table), Holding(VacuumCup)
act PutDown(Table,VacuumCup)
}
selector{
cond Available(Table), Available(CoffeeMachine), Holding(VacuumCup)
act MoveTo(Table)
}
}

View File

@ -0,0 +1,210 @@
import copy
import random
from opt_bt_expansion.BehaviorTree import Leaf,ControlBT
class CondActPair:
def __init__(self, cond_leaf,act_leaf):
self.cond_leaf = cond_leaf
self.act_leaf = act_leaf
#定义行动类,行动包括前提、增加和删除影响
class Action:
def __init__(self,name='anonymous action',pre=set(),add=set(),del_set=set(),cost=1):
self.pre=copy.deepcopy(pre)
self.add=copy.deepcopy(add)
self.del_set=copy.deepcopy(del_set)
self.name=name
self.cost=cost
def __str__(self):
return self.name
#生成随机状态
def generate_random_state(num):
result = set()
for i in range(0,num):
if random.random()>0.5:
result.add(i)
return result
#从状态和行动生成后继状态
def state_transition(state,action):
if not action.pre <= state:
print ('error: action not applicable')
return state
new_state=(state | action.add) - action.del_set
return new_state
#本文所提出的完备规划算法
class OptBTExpAlgorithm:
def __init__(self,verbose=False):
self.bt = None
self.nodes=[]
self.traversed=[]
self.mounted=[]
self.conditions=[]
self.conditions_index=[]
self.verbose=verbose
def clear(self):
self.bt = None
self.nodes = []
self.traversed = []
self.conditions = []
self.conditions_index = []
#运行规划算法从初始状态、目标状态和可用行动计算行为树self.bt
def run_algorithm(self,goal,actions):
if self.verbose:
print("\n算法开始!")
self.bt = ControlBT(type='cond')
# 初始行为树只包含目标条件
gc_node = Leaf(type='cond', content=goal,mincost=0) # 为了统一,都成对出现
ga_node = Leaf(type='act', content=None, mincost=0)
subtree = ControlBT(type='?')
subtree.add_child([copy.deepcopy(gc_node)]) # 子树首先保留所扩展结
self.bt.add_child([subtree])
# self.conditions.append(goal)
cond_anc_pair = CondActPair(cond_leaf=gc_node,act_leaf=ga_node)
self.nodes.append(copy.deepcopy(cond_anc_pair)) # the set of explored but unexpanded conditions
self.traversed = [goal] # the set of expanded conditions
while len(self.nodes)!=0:
# Find the condition for the shortest cost path
pair_node = None
min_cost = float ('inf')
index= -1
for i,cond_anc_pair in enumerate(self.nodes):
if cond_anc_pair.cond_leaf.mincost < min_cost:
min_cost = cond_anc_pair.cond_leaf.mincost
pair_node = copy.deepcopy(cond_anc_pair)
index = i
break
if self.verbose:
print("选择扩展条件结点:",pair_node.cond_leaf.content)
# Update self.nodes and self.traversed
self.nodes.pop(index) # the set of explored but unexpanded conditions. self.nodes.remove(pair_node)
c = pair_node.cond_leaf.content # 子树所扩展结点对应的条件一个文字的set
# Mount the action node and extend BT. T = Eapand(T,c,A(c))
if c!=goal:
sequence_structure = ControlBT(type='>')
sequence_structure.add_child(
[copy.deepcopy(pair_node.cond_leaf), copy.deepcopy(pair_node.act_leaf)])
subtree.add_child([copy.deepcopy(sequence_structure)]) # subtree 是回不断变化的它的父亲是self.bt
if self.verbose:
print("完成扩展 a_node= %s,对应的新条件 c_attr= %s,mincost=%d" \
% (cond_anc_pair.act_leaf.content.name, cond_anc_pair.cond_leaf.content,
cond_anc_pair.cond_leaf.mincost))
if self.verbose:
print("遍历所有动作, 寻找符合条件的动作")
# 遍历所有动作, 寻找符合条件的动作
current_mincost = pair_node.cond_leaf.mincost # 当前的最短路径是多少
for i in range(0, len(actions)):
if not c & ((actions[i].pre | actions[i].add) - actions[i].del_set) <= set():
if (c - actions[i].del_set) == c:
if self.verbose:
print("———— 满足条件可以扩展")
c_attr = (actions[i].pre | c) - actions[i].add
# 剪枝操作,现在的条件是以前扩展过的条件的超集
valid = True
for j in self.traversed: # 剪枝操作
if j <= c_attr:
valid = False
if self.verbose:
print("———— --被剪枝")
break
if valid:
# 把符合条件的动作节点都放到列表里
if self.verbose:
print("———— -- %s 符合条件放入列表" % actions[i].name)
c_attr_node = Leaf(type='cond', content=c_attr, mincost=current_mincost + actions[i].cost)
a_attr_node = Leaf(type='act', content=actions[i], mincost=current_mincost + actions[i].cost)
cond_anc_pair = CondActPair(cond_leaf=c_attr_node, act_leaf=a_attr_node)
self.nodes.append(copy.deepcopy(cond_anc_pair)) # condition node list
self.traversed.append(c_attr) # 重点 the set of expanded conditions
if self.verbose:
print("算法结束!\n")
return True
def print_solution(self):
print("========= BT ==========") # 树的bfs遍历
nodes_ls = []
nodes_ls.append(self.bt)
while len(nodes_ls) != 0:
parnode = nodes_ls[0]
print("Parrent:", parnode.type)
for child in parnode.children:
if isinstance(child, Leaf):
print("---- Leaf:", child.content)
elif isinstance(child, ControlBT):
print("---- ControlBT:", child.type)
nodes_ls.append(child)
print()
nodes_ls.pop(0)
print("========= BT ==========\n")
# 返回所有能到达目标状态的初始状态
def get_all_state_leafs(self):
state_leafs=[]
nodes_ls = []
nodes_ls.append(self.bt)
while len(nodes_ls) != 0:
parnode = nodes_ls[0]
for child in parnode.children:
if isinstance(child, Leaf):
if child.type == "cond":
state_leafs.append(child.content)
elif isinstance(child, ControlBT):
nodes_ls.append(child)
nodes_ls.pop(0)
return state_leafs
# 树的dfs
def dfs_ptml(self,parnode):
for child in parnode.children:
if isinstance(child, Leaf):
if child.type == 'cond':
self.ptml_string += "cond "
c_set_str = ', '.join(map(str, child.content)) + "\n"
self.ptml_string += c_set_str
elif child.type == 'act':
self.ptml_string += 'act '+child.content.name+"\n"
elif isinstance(child, ControlBT):
if parnode.type == '?':
self.ptml_string += "selector{\n"
self.dfs_ptml(parnode=child)
elif parnode.type == '>':
self.ptml_string += "sequence{\n"
self.dfs_ptml( parnode=child)
self.ptml_string += '}\n'
def get_ptml(self):
self.ptml_string = "selector{\n"
self.dfs_ptml(self.bt.children[0])
self.ptml_string += '}\n'
return self.ptml_string
def save_ptml_file(self,file_name):
self.ptml_string = "selector{\n"
self.dfs_ptml(self.bt.children[0])
self.ptml_string += '}\n'
with open(f'./{file_name}.ptml', 'w') as file:
file.write(self.ptml_string)
return self.ptml_string

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 KiB

121
opt_bt_expansion/README.md Normal file
View File

@ -0,0 +1,121 @@
## 代码说明
### 1. `BehaviorTree.py` 实现行为树叶子结点和非叶子结点的定义
- **Leaf **:表示叶节点,可以是动作(`act`)或条件(`cond`)。
- **ControlBT**:代表可能包含控制节点的行为树。它们可以是选择器(`?`)、序列(`>`)、动作节点(`act`)或条件节点(`cond`)。
- 上述两个类都包含 `tick` 方法。
### 2. `OptimalBTExpansionAlgorithm.py` 实现最优行为树扩展算法
![image-20231103191141047](README.assets/image-20231103191141047.png)
定义行动类
```python
#定义行动类,行动包括前提、增加和删除影响
class Action:
def __init__(self,name='anonymous action',pre=set(),add=set(),del_set=set(),cost=1):
self.pre=copy.deepcopy(pre)
self.add=copy.deepcopy(add)
self.del_set=copy.deepcopy(del_set)
self.name=name
self.cost=cost
def __str__(self):
return self.name
```
调用算法
```python
algo = OptBTExpAlgorithm(verbose=True)
algo.clear()
algo.run_algorithm(start, goal, actions) # 使用算法得到行为树在 algo.bt
algo.print_solution() # 打印行为树
val, obj = algo.bt.tick(state) # 执行行为树
algo.save_ptml_file("bt.ptml") # 保存行为树为 ptml 文件
```
### 3. **`tools.py`** 实现打印数据、行为树测试等模块
使用方法
```python
print_action_data_table(goal,start,actions) # 打印所有变量
# 行为树鲁棒性测试,随机生成规划问题
# 设置生成规划问题集的超参数:文字数、解深度、迭代次数
seed=1
literals_num=10
depth = 10
iters= 10
BTTest(seed=seed,literals_num=literals_num,depth=depth,iters=iters)
```
### 4. `example.py` 中设计规划案例 goals, start,actions
```python
def MoveBtoB ():
actions=[]
a = Action(name="Move(b,ab)")
a.pre={'Free(ab)','WayClear'}
a.add={'At(b,ab)'}
a.del_set= {'Free(ab)','At(b,pb)'}
a.cost = 1
actions.append(a)
a=Action(name="Move(s,ab)")
a.pre={'Free(ab)'}
a.add={'Free(ab)','WayClear'}
a.del_set={'Free(ab)','At(s,ps)'}
a.cost = 1
actions.append(a)
a=Action(name="Move(s,as)")
a.pre={'Free(as)'}
a.add={'At(s,ps)','WayClear'}
a.del_set={'Free(as)','At(s,ps)'}
a.cost = 1
actions.append(a)
start = {'Free(ab)','Free(as)','At(b,pb)','At(s,ps)'}
goal= {'At(b,ab)'}
return goal,start,actions
```
### 5. `opt_bt_exp_main.py` 为主函数,在此演示如何调用最优行为树扩展算法得到完全扩展最优行为树
```python
actions=[
Action(name='PutDown(Table,Coffee)', pre={'Holding(Coffee)','At(Robot,Table)'}, add={'At(Table,Coffee)','NotHolding'}, del_set={'Holding(Coffee)'}, cost=1)
…………
]
algo = BTOptExpInterface(actions)
goal = {'At(Table,Coffee)'}
ptml_string = algo.process(goal,start)
print(ptml_string)
```
两种检测方法,用于检测当前状态 `start` 能否到达目标状态 `goal`
```python
# 判断初始状态能否到达目标状态
start = {'At(Robot,Bar)', 'Holding(VacuumCup)', 'Available(Table)', 'Available(CoffeeMachine)','Available(FrontDesk)'}
# 方法一:算法返回所有可能的初始状态,在里面看看有没有对应的初始状态
right_bt = algo.find_all_leaf_states_contain_start(start)
if not right_bt:
print("ERROR1: The current state cannot reach the goal state!")
else:
print("Right1: The current state can reach the goal state!")
# 方法二:预先跑一边行为树,看能否到达目标状态
right_bt2 = algo.run_bt_from_start(goal,start)
if not right_bt2:
print("ERROR2: The current state cannot reach the goal state!")
else:
print("Right2: The current state can reach the goal state!")
```

View File

@ -0,0 +1,174 @@
from opt_bt_expansion.OptimalBTExpansionAlgorithm import Action
def MakeCoffee():
actions=[
Action(name='Put(Table,Coffee)', pre={'Holding(Coffee)','At(Table)'}, add={'At(Table,Coffee)','NotHolding'}, del_set={'Holding(Coffee)'}, cost=1),
Action(name='Put(Table,VacuumCup)', pre={'Holding(VacuumCup)','At(Table)'}, add={'At(Table,VacuumCup)','NotHolding'}, del_set={'Holding(VacuumCup)'}, cost=1),
Action(name='Grasp(Coffee)', pre={'NotHolding','At(Coffee)'}, add={'Holding(Coffee)'}, del_set={'NotHolding'}, cost=1),
Action(name='MoveTo(Table)', pre={'Exist(Table)'}, add={'At(Table)'}, del_set={'At(FrontDesk)','At(Coffee)','At(CoffeeMachine)'}, cost=1),
Action(name='MoveTo(Coffee)', pre={'Exist(Coffee)'}, add={'At(Coffee)'}, del_set={'At(FrontDesk)','At(Table)','At(CoffeeMachine)'}, cost=1),
Action(name='MoveTo(CoffeeMachine)', pre={'Exist(CoffeeMachine)'}, add={'At(CoffeeMachine)'}, del_set={'At(FrontDesk)','At(Coffee)','At(Table)'}, cost=1),
Action(name='OpCoffeeMachine', pre={'At(CoffeeMachine)','NotHolding'}, add={'Exist(Coffee)','At(Coffee)'}, del_set=set(), cost=1),
]
start = {'At(FrontDesk)','Holding(VacuumCup)','Exist(Table)','Exist(CoffeeMachine)','Exist(FrontDesk)'}
goal = {'At(Table,Coffee)'}
return goal,start,actions
# 本例子中,将 VacuumCup 放到 FrontDesk比 MoveTo(Table) 再 Put(Table,VacuumCup) 的 cost 要小
def MakeCoffeeCost():
actions=[
Action(name='PutDown(Table,Coffee)', pre={'Holding(Coffee)','At(Robot,Table)'}, add={'At(Table,Coffee)','NotHolding'}, del_set={'Holding(Coffee)'}, cost=1),
Action(name='PutDown(Table,VacuumCup)', pre={'Holding(VacuumCup)','At(Robot,Table)'}, add={'At(Table,VacuumCup)','NotHolding'}, del_set={'Holding(VacuumCup)'}, cost=1),
Action(name='PickUp(Coffee)', pre={'NotHolding','At(Robot,Coffee)'}, add={'Holding(Coffee)'}, del_set={'NotHolding'}, cost=1),
Action(name='MoveTo(Table)', pre={'Available(Table)'}, add={'At(Robot,Table)'}, del_set={'At(Robot,FrontDesk)','At(Robot,Coffee)','At(Robot,CoffeeMachine)'}, cost=1),
Action(name='MoveTo(Coffee)', pre={'Available(Coffee)'}, add={'At(Robot,Coffee)'}, del_set={'At(Robot,FrontDesk)','At(Robot,Table)','At(Robot,CoffeeMachine)'}, cost=1),
Action(name='MoveTo(CoffeeMachine)', pre={'Available(CoffeeMachine)'}, add={'At(Robot,CoffeeMachine)'}, del_set={'At(Robot,FrontDesk)','At(Robot,Coffee)','At(Robot,Table)'}, cost=1),
Action(name='OpCoffeeMachine', pre={'At(Robot,CoffeeMachine)','NotHolding'}, add={'Available(Coffee)','At(Robot,Coffee)'}, del_set=set(), cost=1),
]
start = {'At(Robot,Bar)','Holding(VacuumCup)','Available(Table)','Available(CoffeeMachine)','Available(FrontDesk)'}
goal = {'At(Table,Coffee)'}
return goal,start,actions
# test
def Test():
actions=[
Action(name='a1', pre={6}, add={0,2,4}, del_set={1,5}, cost=1),
Action(name='a2', pre=set(), add={0,1}, del_set=set(), cost=1),
Action(name='a3', pre={1,6}, add={0,2,3,5}, del_set={1,6}, cost=1),
Action(name='a4', pre={0,2,3}, add={4,5}, del_set={0,6}, cost=1),
Action(name='a5', pre={0,1,4}, add={2,3,6}, del_set={0}, cost=1),
]
start = {1,2,6}
goal={0,1,2,4,6}
return goal,start,actions
# def Test():
# actions=[
# Action(name='a1', pre={2}, add={1}, del_set=set(), cost=1),
# Action(name='a2', pre=set(), add={1}, del_set={0,2}, cost=1),
# Action(name='a3', pre={1}, add=set(), del_set={0,2}, cost=1),
# Action(name='a4', pre=set(), add={0}, del_set=set(), cost=1),
# Action(name='a5', pre={1}, add={0,2}, del_set={1}, cost=1),
# Action(name='a6', pre={1}, add=set(), del_set={0,1,2}, cost=1),
# Action(name='a7', pre={1}, add={2}, del_set={0, 2}, cost=1),
# ]
#
# start = {1,2}
# goal={0,1}
# return goal,start,actions
# todo: 最原始的例子
def MoveBtoB_num ():
actions=[]
a = Action(name='a1')
a.pre={1,4}
a.add={"c_goal"}
a.del_set={1,4}
a.cost = 1
actions.append(a)
a=Action(name='a2')
a.pre={1,2,3}
a.add={"c_goal"}
a.del_set={1,2,3}
a.cost = 1
actions.append(a)
a=Action(name='a3')
a.pre={1,2}
a.add={4}
a.del_set={2}
a.cost = 1
actions.append(a)
a=Action(name='a4')
a.pre={"c_start"}
a.add={1,2,3}
a.del_set={"c_start",4}
a.cost = 1
actions.append(a)
start = {"c_start"}
goal={"c_goal"}
return goal,start,actions
# todo: 最原始的例子
def MoveBtoB ():
actions=[]
a = Action(name="Move(b,ab)") #'movebtob'
a.pre={'Free(ab)','WayClear'} #{1,2}
a.add={'At(b,ab)'} #{3}
a.del_set= {'Free(ab)','At(b,pb)'} #{1,4}
a.cost = 1
actions.append(a)
a=Action(name="Move(s,ab)") #'moveatob'
a.pre={'Free(ab)'} #{1}
a.add={'Free(ab)','WayClear'} #{5,2}
a.del_set={'Free(ab)','At(s,ps)'} #{1,6}
a.cost = 1
actions.append(a)
a=Action(name="Move(s,as)") #'moveatoa'
a.pre={'Free(as)'} #{7}
a.add={'At(s,ps)','WayClear'} #{8,2}
a.del_set={'Free(as)','At(s,ps)'} #{7,6}
a.cost = 1
actions.append(a)
start = {'Free(ab)','Free(as)','At(b,pb)','At(s,ps)'} #{1,7,4,6}
goal= {'At(b,ab)'} #{3}
return goal,start,actions
# 小蔡师兄论文里的例子
def Cond2BelongsToCond3():
actions=[]
a = Action(name='a1')
a.pre={1,4}
a.add={"c_goal"}
a.del_set={1,4}
a.cost = 1
actions.append(a)
a=Action(name='a2')
a.pre={1,2,3}
a.add={"c_goal"}
a.del_set={1,2,3}
a.cost = 100
actions.append(a)
a=Action(name='a3')
a.pre={1,2}
a.add={4}
a.del_set={2}
a.cost = 1
actions.append(a)
a=Action(name='a4')
a.pre={"c_start"}
a.add={1,2,3}
a.del_set={"c_start",4}
a.cost = 1
actions.append(a)
start = {"c_start"}
goal={"c_goal"}
return goal,start,actions

View File

@ -0,0 +1,121 @@
from opt_bt_expansion.BehaviorTree import Leaf,ControlBT # 行为结点类:叶子结点和非叶子节点
from opt_bt_expansion.OptimalBTExpansionAlgorithm import Action,OptBTExpAlgorithm,state_transition # 调用最优行为树扩展算法
from opt_bt_expansion.tools import print_action_data_table,BTTest
from opt_bt_expansion.examples import MoveBtoB_num,MoveBtoB,Cond2BelongsToCond3 # 导入三个例子
from opt_bt_expansion.examples import *
# 封装好的主接口
class BTOptExpInterface:
def __init__(self, action_list):
"""
Initialize the BTOptExpansion with a list of actions.
:param action_list: A list of actions to be used in the behavior tree.
"""
# self.actions = []
# for act in action_list:
# a = Action(name=act.name)
# a.pre=act['pre']
# a.add=act['add']
# a.del_set= act['del_set']
# a.cost = 1
# self.actions.append(a)
self.actions = action_list
self.has_processed = False
def process(self, goal):
"""
Process the input sets and return a string result.
:param input_set: The set of goal states and the set of initial states.
:return: A PTML string representing the outcome of the behavior tree.
"""
self.goal = goal
self.algo = OptBTExpAlgorithm(verbose=False)
self.algo.clear()
self.algo.run_algorithm(self.goal, self.actions) # 调用算法得到行为树保存至 algo.bt
self.ptml_string = self.algo.get_ptml()
self.has_processed = True
# algo.print_solution() # print behavior tree
return self.ptml_string
# 方法一:查找所有初始状态是否包含当前状态
def find_all_leaf_states_contain_start(self,start):
if not self.has_processed:
raise RuntimeError("The process method must be called before find_all_leaf_states_contain_start!")
# 返回所有能到达目标状态的初始状态
state_leafs = self.algo.get_all_state_leafs()
for state in state_leafs:
if start >= state:
return True
return False
# 方法二:模拟跑一遍行为树,看 start 能够通过执行一系列动作到达 goal
def run_bt_from_start(self,goal,start):
if not self.has_processed:
raise RuntimeError("The process method must be called before run_bt_from_start!")
# 检查是否能到达目标
right_bt = True
state = start
steps = 0
val, obj = self.algo.bt.tick(state)
while val != 'success' and val != 'failure':
state = state_transition(state, obj)
val, obj = self.algo.bt.tick(state)
if (val == 'failure'):
# print("bt fails at step", steps)
right_bt = False
steps += 1
if not goal <= state:
# print("wrong solution", steps)
right_bt = False
else:
pass
# print("right solution", steps)
return right_bt
if __name__ == '__main__' :
# todo: Example Cafe
# todo: Define goal, start, actions
actions=[
Action(name='PutDown(Table,Coffee)', pre={'Holding(Coffee)','At(Robot,Table)'}, add={'At(Table,Coffee)','NotHolding'}, del_set={'Holding(Coffee)'}, cost=1),
Action(name='PutDown(Table,VacuumCup)', pre={'Holding(VacuumCup)','At(Robot,Table)'}, add={'At(Table,VacuumCup)','NotHolding'}, del_set={'Holding(VacuumCup)'}, cost=1),
Action(name='PickUp(Coffee)', pre={'NotHolding','At(Robot,Coffee)'}, add={'Holding(Coffee)'}, del_set={'NotHolding'}, cost=1),
Action(name='MoveTo(Table)', pre={'Available(Table)'}, add={'At(Robot,Table)'}, del_set={'At(Robot,FrontDesk)','At(Robot,Coffee)','At(Robot,CoffeeMachine)'}, cost=1),
Action(name='MoveTo(Coffee)', pre={'Available(Coffee)'}, add={'At(Robot,Coffee)'}, del_set={'At(Robot,FrontDesk)','At(Robot,Table)','At(Robot,CoffeeMachine)'}, cost=1),
Action(name='MoveTo(CoffeeMachine)', pre={'Available(CoffeeMachine)'}, add={'At(Robot,CoffeeMachine)'}, del_set={'At(Robot,FrontDesk)','At(Robot,Coffee)','At(Robot,Table)'}, cost=1),
Action(name='OpCoffeeMachine', pre={'At(Robot,CoffeeMachine)','NotHolding'}, add={'Available(Coffee)','At(Robot,Coffee)'}, del_set=set(), cost=1),
]
algo = BTOptExpInterface(actions)
goal = {'At(Table,Coffee)'}
ptml_string = algo.process(goal)
print(ptml_string)
file_name = "MakeCoffee"
with open(f'./{file_name}.ptml', 'w') as file:
file.write(ptml_string)
# 判断初始状态能否到达目标状态
start = {'At(Robot,Bar)', 'Holding(VacuumCup)', 'Available(Table)', 'Available(CoffeeMachine)','Available(FrontDesk)'}
# 方法一:算法返回所有可能的初始状态,在里面看看有没有对应的初始状态
right_bt = algo.find_all_leaf_states_contain_start(start)
if not right_bt:
print("ERROR1: The current state cannot reach the goal state!")
else:
print("Right1: The current state can reach the goal state!")
# 方法二:预先跑一边行为树,看能否到达目标状态
right_bt2 = algo.run_bt_from_start(goal,start)
if not right_bt2:
print("ERROR2: The current state cannot reach the goal state!")
else:
print("Right2: The current state can reach the goal state!")

167
opt_bt_expansion/tools.py Normal file
View File

@ -0,0 +1,167 @@
from tabulate import tabulate
import numpy as np
import random
from opt_bt_expansion.OptimalBTExpansionAlgorithm import Action,OptBTExpAlgorithm
import time
def print_action_data_table(goal,start,actions):
data = []
for a in actions:
data.append([a.name ,a.pre ,a.add ,a.del_set ,a.cost])
data.append(["Goal" ,goal ," " ,"Start" ,start])
print(tabulate(data, headers=["Name", "Pre", "Add" ,"Del" ,"Cost"], tablefmt="fancy_grid")) # grid plain simple github fancy_grid
# 从状态随机生成一个行动
def generate_from_state(act,state,num):
for i in range(0,num):
if i in state:
if random.random() >0.5:
act.pre.add(i)
if random.random() >0.5:
act.del_set.add(i)
continue
if random.random() > 0.5:
act.add.add(i)
continue
if random.random() >0.5:
act.del_set.add(i)
def print_action(act):
print (act.pre)
print(act.add)
print(act.del_set)
#行为树测试代码
def BTTest(seed=1,literals_num=10,depth=10,iters=10,total_count=1000):
print("============= BT Test ==============")
random.seed(seed)
# 设置生成规划问题集的超参数:文字数、解深度、迭代次数
literals_num=literals_num
depth = depth
iters= iters
total_tree_size = []
total_action_num = []
total_state_num = []
total_steps_num=[]
#fail_count=0
#danger_count=0
success_count =0
failure_count = 0
planning_time_total = 0.0
# 实验1000次
for count in range (total_count):
action_num = 1
# 生成一个规划问题,包括随机的状态和行动,以及目标状态
states = []
actions = []
start = generate_random_state(literals_num)
state = start
states.append(state)
#print (state)
for i in range (0,depth):
a = Action()
generate_from_state(a,state,literals_num)
if not a in actions:
a.name = "a"+str(action_num)
action_num+=1
actions.append(a)
state = state_transition(state,a)
if state in states:
pass
else:
states.append(state)
#print(state)
goal = states[-1]
state = start
for i in range (0,iters):
a = Action()
generate_from_state(a,state,literals_num)
if not a in actions:
a.name = "a"+str(action_num)
action_num+=1
actions.append(a)
state = state_transition(state,a)
if state in states:
pass
else:
states.append(state)
state = random.sample(states,1)[0]
# 选择测试本文算法btalgorithm或对比算法weakalgorithm
algo = OptBTExpAlgorithm()
#algo = Weakalgorithm()
start_time = time.time()
# print_action_data_table(goal, start, list(actions))
if algo.run_algorithm(start, goal, actions):#运行算法规划后行为树为algo.bt
total_tree_size.append( algo.bt.count_size()-1)
# algo.print_solution() # 打印行为树
else:
print ("error")
end_time = time.time()
planning_time_total += (end_time-start_time)
#开始从初始状态运行行为树,测试
state=start
steps=0
val, obj = algo.bt.tick(state)#tick行为树obj为所运行的行动
while val !='success' and val !='failure':#运行直到行为树成功或失败
state = state_transition(state,obj)
val, obj = algo.bt.tick(state)
if(val == 'failure'):
print("bt fails at step",steps)
steps+=1
if(steps>=500):#至多运行500步
break
if not goal <= state:#错误解,目标条件不在执行后状态满足
#print ("wrong solution",steps)
failure_count+=1
else:#正确解,满足目标条件
#print ("right solution",steps)
success_count+=1
total_steps_num.append(steps)
algo.clear()
total_action_num.append(len(actions))
total_state_num.append(len(states))
print ("success:",success_count,"failure:",failure_count)#算法成功和失败次数
print("Total Tree Size: mean=",np.mean(total_tree_size), "std=",np.std(total_tree_size, ddof=1))#1000次测试树大小
print ("Total Steps Num: mean=",np.mean(total_steps_num),"std=",np.std(total_steps_num,ddof=1))
print ("Average number of states:",np.mean(total_state_num))#1000次问题的平均状态数
print ("Average number of actions",np.mean(total_action_num))#1000次问题的平均行动数
print("Planning Time Total:",planning_time_total,planning_time_total/1000.0)
print("============ End BT Test ===========")
# xiao cai
# success: 1000 failure: 0
# Total Tree Size: mean= 35.303 std= 29.71336526001515
# Total Steps Num: mean= 1.898 std= 0.970844240101644
# Average number of states: 20.678
# Average number of actions 20.0
# Planning Time Total: 0.6280641555786133 0.0006280641555786133
# our start
# success: 1000 failure: 0
# Total Tree Size: mean= 17.945 std= 12.841997192488865
# Total Steps Num: mean= 1.785 std= 0.8120556843187752
# Average number of states: 20.678
# Average number of actions 20.0
# Planning Time Total: 1.4748523235321045 0.0014748523235321046
# our
# success: 1000 failure: 0
# Total Tree Size: mean= 48.764 std= 20.503626574406358
# Total Steps Num: mean= 1.785 std= 0.8120556843187752
# Average number of states: 20.678
# Average number of actions 20.0
# Planning Time Total: 3.3271877765655518 0.0033271877765655516