786 lines
24 KiB
Python
786 lines
24 KiB
Python
|
|
"""
|
|||
|
|
Agent 协作工具
|
|||
|
|
|
|||
|
|
提供动态Agent创建、通信和管理功能
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import logging
|
|||
|
|
from typing import Optional, List, Dict, Any
|
|||
|
|
from pydantic import BaseModel, Field
|
|||
|
|
|
|||
|
|
from .base import AgentTool, ToolResult
|
|||
|
|
from ..core.registry import agent_registry
|
|||
|
|
from ..core.message import message_bus, MessageType, MessagePriority
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class CreateAgentInput(BaseModel):
|
|||
|
|
"""创建Agent输入参数"""
|
|||
|
|
name: str = Field(..., description="Agent名称")
|
|||
|
|
task: str = Field(..., description="任务描述")
|
|||
|
|
agent_type: str = Field(
|
|||
|
|
default="specialist",
|
|||
|
|
description="Agent类型: analysis(分析), verification(验证), specialist(专家)"
|
|||
|
|
)
|
|||
|
|
knowledge_modules: Optional[str] = Field(
|
|||
|
|
default=None,
|
|||
|
|
description="知识模块,逗号分隔,最多5个。如: sql_injection,xss,authentication"
|
|||
|
|
)
|
|||
|
|
inherit_context: bool = Field(
|
|||
|
|
default=True,
|
|||
|
|
description="是否继承父Agent的上下文"
|
|||
|
|
)
|
|||
|
|
execute_immediately: bool = Field(
|
|||
|
|
default=False,
|
|||
|
|
description="是否立即执行子Agent(否则只创建不执行)"
|
|||
|
|
)
|
|||
|
|
context: Optional[Dict[str, Any]] = Field(
|
|||
|
|
default=None,
|
|||
|
|
description="传递给子Agent的上下文数据"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class CreateSubAgentTool(AgentTool):
|
|||
|
|
"""
|
|||
|
|
创建子Agent工具
|
|||
|
|
|
|||
|
|
允许Agent动态创建专业化的子Agent来处理特定任务。
|
|||
|
|
子Agent可以加载特定的知识模块,专注于特定领域。
|
|||
|
|
|
|||
|
|
支持两种模式:
|
|||
|
|
1. 仅创建:创建Agent但不执行,后续可以批量执行
|
|||
|
|
2. 立即执行:创建并立即执行Agent,等待结果返回
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(
|
|||
|
|
self,
|
|||
|
|
parent_agent_id: str,
|
|||
|
|
llm_service=None,
|
|||
|
|
tools: Dict[str, Any] = None,
|
|||
|
|
event_emitter=None,
|
|||
|
|
):
|
|||
|
|
super().__init__()
|
|||
|
|
self.parent_agent_id = parent_agent_id
|
|||
|
|
self.llm_service = llm_service
|
|||
|
|
self.tools = tools or {}
|
|||
|
|
self.event_emitter = event_emitter
|
|||
|
|
|
|||
|
|
# 子Agent执行器(延迟初始化)
|
|||
|
|
self._sub_executor = None
|
|||
|
|
|
|||
|
|
def _get_executor(self):
|
|||
|
|
"""获取子Agent执行器"""
|
|||
|
|
if self._sub_executor is None and self.llm_service:
|
|||
|
|
from ..core.executor import SubAgentExecutor
|
|||
|
|
# 需要获取父Agent实例
|
|||
|
|
parent_agent = agent_registry.get_agent(self.parent_agent_id)
|
|||
|
|
if parent_agent:
|
|||
|
|
self._sub_executor = SubAgentExecutor(
|
|||
|
|
parent_agent=parent_agent,
|
|||
|
|
llm_service=self.llm_service,
|
|||
|
|
tools=self.tools,
|
|||
|
|
event_emitter=self.event_emitter,
|
|||
|
|
)
|
|||
|
|
return self._sub_executor
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def name(self) -> str:
|
|||
|
|
return "create_sub_agent"
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def description(self) -> str:
|
|||
|
|
return """创建专业化的子Agent来处理特定任务。
|
|||
|
|
|
|||
|
|
使用场景:
|
|||
|
|
1. 发现需要深入分析的特定漏洞类型
|
|||
|
|
2. 需要专业知识来验证某个发现
|
|||
|
|
3. 任务过于复杂需要分解
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
- name: Agent名称(如 "SQL注入专家")
|
|||
|
|
- task: 具体任务描述
|
|||
|
|
- agent_type: Agent类型 (analysis/verification/specialist)
|
|||
|
|
- knowledge_modules: 知识模块,逗号分隔(如 "sql_injection,database_security")
|
|||
|
|
- inherit_context: 是否继承当前上下文
|
|||
|
|
- execute_immediately: 是否立即执行(默认false,仅创建)
|
|||
|
|
- context: 传递给子Agent的上下文数据
|
|||
|
|
|
|||
|
|
注意:每个Agent最多加载5个知识模块。"""
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def args_schema(self):
|
|||
|
|
return CreateAgentInput
|
|||
|
|
|
|||
|
|
async def _execute(
|
|||
|
|
self,
|
|||
|
|
name: str,
|
|||
|
|
task: str,
|
|||
|
|
agent_type: str = "specialist",
|
|||
|
|
knowledge_modules: Optional[str] = None,
|
|||
|
|
inherit_context: bool = True,
|
|||
|
|
execute_immediately: bool = False,
|
|||
|
|
context: Optional[Dict[str, Any]] = None,
|
|||
|
|
**kwargs
|
|||
|
|
) -> ToolResult:
|
|||
|
|
"""创建子Agent"""
|
|||
|
|
|
|||
|
|
if not name or not name.strip():
|
|||
|
|
return ToolResult(success=False, error="Agent名称不能为空")
|
|||
|
|
|
|||
|
|
if not task or not task.strip():
|
|||
|
|
return ToolResult(success=False, error="任务描述不能为空")
|
|||
|
|
|
|||
|
|
# 解析知识模块
|
|||
|
|
modules = []
|
|||
|
|
if knowledge_modules:
|
|||
|
|
modules = [m.strip() for m in knowledge_modules.split(",") if m.strip()]
|
|||
|
|
if len(modules) > 5:
|
|||
|
|
return ToolResult(
|
|||
|
|
success=False,
|
|||
|
|
error="知识模块数量不能超过5个"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 验证知识模块(如果有)
|
|||
|
|
if modules:
|
|||
|
|
from ..knowledge import knowledge_loader
|
|||
|
|
validation = knowledge_loader.validate_modules(modules)
|
|||
|
|
if validation["invalid"]:
|
|||
|
|
available = knowledge_loader.get_all_module_names()
|
|||
|
|
return ToolResult(
|
|||
|
|
success=False,
|
|||
|
|
error=f"无效的知识模块: {validation['invalid']}。可用模块: {', '.join(available)}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 生成Agent ID
|
|||
|
|
from ..core.state import _generate_agent_id
|
|||
|
|
agent_id = _generate_agent_id()
|
|||
|
|
|
|||
|
|
# 注册到注册表
|
|||
|
|
node = agent_registry.register_agent(
|
|||
|
|
agent_id=agent_id,
|
|||
|
|
agent_name=name.strip(),
|
|||
|
|
agent_type=agent_type,
|
|||
|
|
task=task.strip(),
|
|||
|
|
parent_id=self.parent_agent_id,
|
|||
|
|
knowledge_modules=modules,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 创建消息队列
|
|||
|
|
message_bus.create_queue(agent_id)
|
|||
|
|
|
|||
|
|
logger.info(f"Created sub-agent: {name} ({agent_id}), parent: {self.parent_agent_id}")
|
|||
|
|
|
|||
|
|
# 如果需要立即执行
|
|||
|
|
if execute_immediately:
|
|||
|
|
executor = self._get_executor()
|
|||
|
|
if executor:
|
|||
|
|
# 准备上下文
|
|||
|
|
exec_context = context or {}
|
|||
|
|
exec_context["knowledge_modules"] = modules
|
|||
|
|
|
|||
|
|
# 执行子Agent
|
|||
|
|
exec_result = await executor.create_and_run_sub_agent(
|
|||
|
|
agent_type=agent_type if agent_type in ["analysis", "verification"] else "analysis",
|
|||
|
|
task=task.strip(),
|
|||
|
|
context=exec_context,
|
|||
|
|
knowledge_modules=modules,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 更新注册表状态
|
|||
|
|
if exec_result.get("success"):
|
|||
|
|
agent_registry.update_agent_status(agent_id, "completed", exec_result)
|
|||
|
|
else:
|
|||
|
|
agent_registry.update_agent_status(agent_id, "failed", {"error": exec_result.get("error")})
|
|||
|
|
|
|||
|
|
return ToolResult(
|
|||
|
|
success=exec_result.get("success", False),
|
|||
|
|
data={
|
|||
|
|
"message": f"子Agent '{name}' 已执行完成" if exec_result.get("success") else f"子Agent '{name}' 执行失败",
|
|||
|
|
"agent_id": agent_id,
|
|||
|
|
"execution_result": exec_result,
|
|||
|
|
"findings": exec_result.get("data", {}).get("findings", []) if exec_result.get("success") else [],
|
|||
|
|
},
|
|||
|
|
error=exec_result.get("error"),
|
|||
|
|
metadata=node,
|
|||
|
|
)
|
|||
|
|
else:
|
|||
|
|
logger.warning("SubAgentExecutor not available, agent created but not executed")
|
|||
|
|
|
|||
|
|
return ToolResult(
|
|||
|
|
success=True,
|
|||
|
|
data={
|
|||
|
|
"message": f"子Agent '{name}' 已创建",
|
|||
|
|
"agent_id": agent_id,
|
|||
|
|
"agent_info": {
|
|||
|
|
"id": agent_id,
|
|||
|
|
"name": name,
|
|||
|
|
"type": agent_type,
|
|||
|
|
"task": task[:100],
|
|||
|
|
"knowledge_modules": modules,
|
|||
|
|
"parent_id": self.parent_agent_id,
|
|||
|
|
"status": "created",
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
metadata=node,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SendMessageInput(BaseModel):
|
|||
|
|
"""发送消息输入参数"""
|
|||
|
|
target_agent_id: str = Field(..., description="目标Agent ID")
|
|||
|
|
message: str = Field(..., description="消息内容")
|
|||
|
|
message_type: str = Field(
|
|||
|
|
default="information",
|
|||
|
|
description="消息类型: query(查询), instruction(指令), information(信息)"
|
|||
|
|
)
|
|||
|
|
priority: str = Field(
|
|||
|
|
default="normal",
|
|||
|
|
description="优先级: low, normal, high, urgent"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SendMessageTool(AgentTool):
|
|||
|
|
"""
|
|||
|
|
发送消息工具
|
|||
|
|
|
|||
|
|
向其他Agent发送消息,实现Agent间通信
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, sender_agent_id: str):
|
|||
|
|
super().__init__()
|
|||
|
|
self.sender_agent_id = sender_agent_id
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def name(self) -> str:
|
|||
|
|
return "send_message"
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def description(self) -> str:
|
|||
|
|
return """向其他Agent发送消息。
|
|||
|
|
|
|||
|
|
使用场景:
|
|||
|
|
1. 向子Agent发送指令
|
|||
|
|
2. 向父Agent报告进展
|
|||
|
|
3. 请求其他Agent提供信息
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
- target_agent_id: 目标Agent的ID
|
|||
|
|
- message: 消息内容
|
|||
|
|
- message_type: 消息类型 (query/instruction/information)
|
|||
|
|
- priority: 优先级 (low/normal/high/urgent)"""
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def args_schema(self):
|
|||
|
|
return SendMessageInput
|
|||
|
|
|
|||
|
|
async def _execute(
|
|||
|
|
self,
|
|||
|
|
target_agent_id: str,
|
|||
|
|
message: str,
|
|||
|
|
message_type: str = "information",
|
|||
|
|
priority: str = "normal",
|
|||
|
|
**kwargs
|
|||
|
|
) -> ToolResult:
|
|||
|
|
"""发送消息"""
|
|||
|
|
|
|||
|
|
if not target_agent_id:
|
|||
|
|
return ToolResult(success=False, error="目标Agent ID不能为空")
|
|||
|
|
|
|||
|
|
if not message or not message.strip():
|
|||
|
|
return ToolResult(success=False, error="消息内容不能为空")
|
|||
|
|
|
|||
|
|
# 检查目标Agent是否存在
|
|||
|
|
target_node = agent_registry.get_agent_node(target_agent_id)
|
|||
|
|
if not target_node:
|
|||
|
|
return ToolResult(
|
|||
|
|
success=False,
|
|||
|
|
error=f"目标Agent '{target_agent_id}' 不存在"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 转换消息类型
|
|||
|
|
try:
|
|||
|
|
msg_type = MessageType(message_type)
|
|||
|
|
except ValueError:
|
|||
|
|
msg_type = MessageType.INFORMATION
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
msg_priority = MessagePriority(priority)
|
|||
|
|
except ValueError:
|
|||
|
|
msg_priority = MessagePriority.NORMAL
|
|||
|
|
|
|||
|
|
# 发送消息
|
|||
|
|
sent_message = message_bus.send_message(
|
|||
|
|
from_agent=self.sender_agent_id,
|
|||
|
|
to_agent=target_agent_id,
|
|||
|
|
content=message.strip(),
|
|||
|
|
message_type=msg_type,
|
|||
|
|
priority=msg_priority,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
return ToolResult(
|
|||
|
|
success=True,
|
|||
|
|
data={
|
|||
|
|
"message": f"消息已发送到 '{target_node['name']}'",
|
|||
|
|
"message_id": sent_message.id,
|
|||
|
|
"target_agent": {
|
|||
|
|
"id": target_agent_id,
|
|||
|
|
"name": target_node["name"],
|
|||
|
|
"status": target_node["status"],
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
metadata=sent_message.to_dict(),
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class ViewAgentGraphTool(AgentTool):
|
|||
|
|
"""
|
|||
|
|
查看Agent图工具
|
|||
|
|
|
|||
|
|
查看当前的Agent树结构和状态
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, current_agent_id: str):
|
|||
|
|
super().__init__()
|
|||
|
|
self.current_agent_id = current_agent_id
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def name(self) -> str:
|
|||
|
|
return "view_agent_graph"
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def description(self) -> str:
|
|||
|
|
return """查看当前的Agent树结构和状态。
|
|||
|
|
|
|||
|
|
显示:
|
|||
|
|
- 所有Agent及其层级关系
|
|||
|
|
- 每个Agent的状态和任务
|
|||
|
|
- 加载的知识模块"""
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def args_schema(self):
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
async def _execute(self, **kwargs) -> ToolResult:
|
|||
|
|
"""查看Agent图"""
|
|||
|
|
|
|||
|
|
tree_view = agent_registry.get_agent_tree_view()
|
|||
|
|
stats = agent_registry.get_statistics()
|
|||
|
|
|
|||
|
|
return ToolResult(
|
|||
|
|
success=True,
|
|||
|
|
data={
|
|||
|
|
"graph_structure": tree_view,
|
|||
|
|
"summary": stats,
|
|||
|
|
"current_agent_id": self.current_agent_id,
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class WaitForMessageTool(AgentTool):
|
|||
|
|
"""
|
|||
|
|
等待消息工具
|
|||
|
|
|
|||
|
|
让Agent进入等待状态,等待其他Agent的消息
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, agent_id: str, agent_state=None):
|
|||
|
|
super().__init__()
|
|||
|
|
self.agent_id = agent_id
|
|||
|
|
self.agent_state = agent_state
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def name(self) -> str:
|
|||
|
|
return "wait_for_message"
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def description(self) -> str:
|
|||
|
|
return """进入等待状态,等待其他Agent或用户的消息。
|
|||
|
|
|
|||
|
|
使用场景:
|
|||
|
|
1. 等待子Agent完成任务并报告
|
|||
|
|
2. 等待用户提供更多信息
|
|||
|
|
3. 等待其他Agent的协作响应
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
- reason: 等待原因"""
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def args_schema(self):
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
async def _execute(
|
|||
|
|
self,
|
|||
|
|
reason: str = "等待消息",
|
|||
|
|
**kwargs
|
|||
|
|
) -> ToolResult:
|
|||
|
|
"""进入等待状态"""
|
|||
|
|
|
|||
|
|
# 更新Agent状态
|
|||
|
|
if self.agent_state:
|
|||
|
|
self.agent_state.enter_waiting_state(reason)
|
|||
|
|
|
|||
|
|
# 更新注册表
|
|||
|
|
agent_registry.update_agent_status(self.agent_id, "waiting")
|
|||
|
|
|
|||
|
|
return ToolResult(
|
|||
|
|
success=True,
|
|||
|
|
data={
|
|||
|
|
"status": "waiting",
|
|||
|
|
"message": f"Agent正在等待: {reason}",
|
|||
|
|
"agent_id": self.agent_id,
|
|||
|
|
"resume_conditions": [
|
|||
|
|
"收到其他Agent的消息",
|
|||
|
|
"收到用户消息",
|
|||
|
|
"等待超时",
|
|||
|
|
],
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AgentFinishInput(BaseModel):
|
|||
|
|
"""Agent完成输入参数"""
|
|||
|
|
result_summary: str = Field(..., description="结果摘要")
|
|||
|
|
findings: Optional[List[str]] = Field(default=None, description="发现列表")
|
|||
|
|
success: bool = Field(default=True, description="是否成功")
|
|||
|
|
recommendations: Optional[List[str]] = Field(default=None, description="建议列表")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AgentFinishTool(AgentTool):
|
|||
|
|
"""
|
|||
|
|
Agent完成工具
|
|||
|
|
|
|||
|
|
子Agent完成任务后调用,向父Agent报告结果
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, agent_id: str, agent_state=None):
|
|||
|
|
super().__init__()
|
|||
|
|
self.agent_id = agent_id
|
|||
|
|
self.agent_state = agent_state
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def name(self) -> str:
|
|||
|
|
return "agent_finish"
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def description(self) -> str:
|
|||
|
|
return """完成当前Agent的任务并向父Agent报告。
|
|||
|
|
|
|||
|
|
只有子Agent才能使用此工具。根Agent应使用finish_scan。
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
- result_summary: 结果摘要
|
|||
|
|
- findings: 发现列表
|
|||
|
|
- success: 是否成功完成
|
|||
|
|
- recommendations: 建议列表"""
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def args_schema(self):
|
|||
|
|
return AgentFinishInput
|
|||
|
|
|
|||
|
|
async def _execute(
|
|||
|
|
self,
|
|||
|
|
result_summary: str,
|
|||
|
|
findings: Optional[List[str]] = None,
|
|||
|
|
success: bool = True,
|
|||
|
|
recommendations: Optional[List[str]] = None,
|
|||
|
|
**kwargs
|
|||
|
|
) -> ToolResult:
|
|||
|
|
"""完成Agent任务"""
|
|||
|
|
|
|||
|
|
# 获取父Agent ID
|
|||
|
|
parent_id = agent_registry.get_parent(self.agent_id)
|
|||
|
|
|
|||
|
|
if not parent_id:
|
|||
|
|
return ToolResult(
|
|||
|
|
success=False,
|
|||
|
|
error="此工具只能由子Agent使用。根Agent请使用finish_scan。"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 更新状态
|
|||
|
|
result = {
|
|||
|
|
"summary": result_summary,
|
|||
|
|
"findings": findings or [],
|
|||
|
|
"success": success,
|
|||
|
|
"recommendations": recommendations or [],
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
agent_registry.update_agent_status(
|
|||
|
|
self.agent_id,
|
|||
|
|
"completed" if success else "failed",
|
|||
|
|
result,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if self.agent_state:
|
|||
|
|
self.agent_state.set_completed(result)
|
|||
|
|
|
|||
|
|
# 向父Agent发送完成报告
|
|||
|
|
message_bus.send_completion_report(
|
|||
|
|
from_agent=self.agent_id,
|
|||
|
|
to_agent=parent_id,
|
|||
|
|
summary=result_summary,
|
|||
|
|
findings=[{"description": f} for f in (findings or [])],
|
|||
|
|
success=success,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
agent_node = agent_registry.get_agent_node(self.agent_id)
|
|||
|
|
|
|||
|
|
return ToolResult(
|
|||
|
|
success=True,
|
|||
|
|
data={
|
|||
|
|
"agent_completed": True,
|
|||
|
|
"parent_notified": True,
|
|||
|
|
"completion_summary": {
|
|||
|
|
"agent_id": self.agent_id,
|
|||
|
|
"agent_name": agent_node["name"] if agent_node else "Unknown",
|
|||
|
|
"success": success,
|
|||
|
|
"findings_count": len(findings or []),
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class RunSubAgentsInput(BaseModel):
|
|||
|
|
"""批量执行子Agent输入参数"""
|
|||
|
|
agent_ids: List[str] = Field(..., description="要执行的Agent ID列表")
|
|||
|
|
parallel: bool = Field(default=True, description="是否并行执行")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class RunSubAgentsTool(AgentTool):
|
|||
|
|
"""
|
|||
|
|
批量执行子Agent工具
|
|||
|
|
|
|||
|
|
执行已创建的子Agent,支持并行执行
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(
|
|||
|
|
self,
|
|||
|
|
parent_agent_id: str,
|
|||
|
|
llm_service=None,
|
|||
|
|
tools: Dict[str, Any] = None,
|
|||
|
|
event_emitter=None,
|
|||
|
|
):
|
|||
|
|
super().__init__()
|
|||
|
|
self.parent_agent_id = parent_agent_id
|
|||
|
|
self.llm_service = llm_service
|
|||
|
|
self.tools = tools or {}
|
|||
|
|
self.event_emitter = event_emitter
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def name(self) -> str:
|
|||
|
|
return "run_sub_agents"
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def description(self) -> str:
|
|||
|
|
return """批量执行已创建的子Agent。
|
|||
|
|
|
|||
|
|
使用场景:
|
|||
|
|
1. 创建多个子Agent后批量执行
|
|||
|
|
2. 并行执行多个分析任务
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
- agent_ids: 要执行的Agent ID列表
|
|||
|
|
- parallel: 是否并行执行(默认true)"""
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def args_schema(self):
|
|||
|
|
return RunSubAgentsInput
|
|||
|
|
|
|||
|
|
async def _execute(
|
|||
|
|
self,
|
|||
|
|
agent_ids: List[str],
|
|||
|
|
parallel: bool = True,
|
|||
|
|
**kwargs
|
|||
|
|
) -> ToolResult:
|
|||
|
|
"""批量执行子Agent"""
|
|||
|
|
|
|||
|
|
if not agent_ids:
|
|||
|
|
return ToolResult(success=False, error="Agent ID列表不能为空")
|
|||
|
|
|
|||
|
|
# 验证所有Agent存在且是当前Agent的子Agent
|
|||
|
|
valid_agents = []
|
|||
|
|
for aid in agent_ids:
|
|||
|
|
node = agent_registry.get_agent_node(aid)
|
|||
|
|
if not node:
|
|||
|
|
continue
|
|||
|
|
if node.get("parent_id") != self.parent_agent_id:
|
|||
|
|
continue
|
|||
|
|
if node.get("status") not in ["created", "pending"]:
|
|||
|
|
continue
|
|||
|
|
valid_agents.append(node)
|
|||
|
|
|
|||
|
|
if not valid_agents:
|
|||
|
|
return ToolResult(
|
|||
|
|
success=False,
|
|||
|
|
error="没有找到可执行的子Agent"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 构建执行任务
|
|||
|
|
from ..core.executor import DynamicAgentExecutor, ExecutionTask
|
|||
|
|
|
|||
|
|
executor = DynamicAgentExecutor(
|
|||
|
|
llm_service=self.llm_service,
|
|||
|
|
tools=self.tools,
|
|||
|
|
event_emitter=self.event_emitter,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
tasks = []
|
|||
|
|
for node in valid_agents:
|
|||
|
|
task = ExecutionTask(
|
|||
|
|
agent_id=node["id"],
|
|||
|
|
agent_type=node["type"],
|
|||
|
|
task=node["task"],
|
|||
|
|
context={
|
|||
|
|
"knowledge_modules": node.get("knowledge_modules", []),
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
tasks.append(task)
|
|||
|
|
|
|||
|
|
# 定义Agent工厂函数
|
|||
|
|
async def agent_factory(task: ExecutionTask) -> Dict[str, Any]:
|
|||
|
|
from ..agents import AnalysisAgent, VerificationAgent
|
|||
|
|
|
|||
|
|
agent_class_map = {
|
|||
|
|
"analysis": AnalysisAgent,
|
|||
|
|
"verification": VerificationAgent,
|
|||
|
|
"specialist": AnalysisAgent, # 默认使用分析Agent
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
agent_class = agent_class_map.get(task.agent_type, AnalysisAgent)
|
|||
|
|
|
|||
|
|
return await executor.execute_agent(
|
|||
|
|
agent_class=agent_class,
|
|||
|
|
agent_config={},
|
|||
|
|
input_data={
|
|||
|
|
"task": task.task,
|
|||
|
|
"task_context": task.context,
|
|||
|
|
},
|
|||
|
|
parent_id=self.parent_agent_id,
|
|||
|
|
knowledge_modules=task.context.get("knowledge_modules"),
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 执行
|
|||
|
|
if parallel:
|
|||
|
|
result = await executor.execute_parallel(tasks, agent_factory)
|
|||
|
|
else:
|
|||
|
|
# 顺序执行
|
|||
|
|
result = await executor.execute_parallel(tasks, agent_factory)
|
|||
|
|
|
|||
|
|
return ToolResult(
|
|||
|
|
success=result.success,
|
|||
|
|
data={
|
|||
|
|
"message": f"执行完成: {result.completed_agents}/{result.total_agents} 成功",
|
|||
|
|
"total_agents": result.total_agents,
|
|||
|
|
"completed": result.completed_agents,
|
|||
|
|
"failed": result.failed_agents,
|
|||
|
|
"findings_count": len(result.all_findings),
|
|||
|
|
"findings": result.all_findings[:20], # 限制返回数量
|
|||
|
|
"duration_ms": result.total_duration_ms,
|
|||
|
|
"tokens_used": result.total_tokens,
|
|||
|
|
},
|
|||
|
|
error="; ".join(result.errors) if result.errors else None,
|
|||
|
|
metadata={
|
|||
|
|
"agent_results": {
|
|||
|
|
aid: {
|
|||
|
|
"success": r.get("success"),
|
|||
|
|
"findings_count": len(r.get("data", {}).get("findings", [])) if r.get("success") else 0,
|
|||
|
|
}
|
|||
|
|
for aid, r in result.agent_results.items()
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class CollectSubAgentResultsTool(AgentTool):
|
|||
|
|
"""
|
|||
|
|
收集子Agent结果工具
|
|||
|
|
|
|||
|
|
收集所有子Agent的执行结果和发现
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, parent_agent_id: str):
|
|||
|
|
super().__init__()
|
|||
|
|
self.parent_agent_id = parent_agent_id
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def name(self) -> str:
|
|||
|
|
return "collect_sub_agent_results"
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def description(self) -> str:
|
|||
|
|
return """收集所有子Agent的执行结果。
|
|||
|
|
|
|||
|
|
返回:
|
|||
|
|
- 所有子Agent的状态
|
|||
|
|
- 汇总的发现列表
|
|||
|
|
- 执行统计"""
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def args_schema(self):
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
async def _execute(self, **kwargs) -> ToolResult:
|
|||
|
|
"""收集子Agent结果"""
|
|||
|
|
|
|||
|
|
# 获取所有子Agent
|
|||
|
|
children = agent_registry.get_children(self.parent_agent_id)
|
|||
|
|
|
|||
|
|
if not children:
|
|||
|
|
return ToolResult(
|
|||
|
|
success=True,
|
|||
|
|
data={
|
|||
|
|
"message": "没有子Agent",
|
|||
|
|
"children_count": 0,
|
|||
|
|
"findings": [],
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
all_findings = []
|
|||
|
|
completed = 0
|
|||
|
|
failed = 0
|
|||
|
|
running = 0
|
|||
|
|
|
|||
|
|
child_summaries = []
|
|||
|
|
|
|||
|
|
for child_id in children:
|
|||
|
|
node = agent_registry.get_agent_node(child_id)
|
|||
|
|
if not node:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
status = node.get("status", "unknown")
|
|||
|
|
|
|||
|
|
if status == "completed":
|
|||
|
|
completed += 1
|
|||
|
|
# 收集发现
|
|||
|
|
result = node.get("result", {})
|
|||
|
|
if isinstance(result, dict):
|
|||
|
|
findings = result.get("findings", [])
|
|||
|
|
if isinstance(findings, list):
|
|||
|
|
all_findings.extend(findings)
|
|||
|
|
elif status == "failed":
|
|||
|
|
failed += 1
|
|||
|
|
elif status == "running":
|
|||
|
|
running += 1
|
|||
|
|
|
|||
|
|
child_summaries.append({
|
|||
|
|
"id": child_id,
|
|||
|
|
"name": node.get("name"),
|
|||
|
|
"type": node.get("type"),
|
|||
|
|
"status": status,
|
|||
|
|
"findings_count": len(node.get("result", {}).get("findings", [])) if node.get("result") else 0,
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
return ToolResult(
|
|||
|
|
success=True,
|
|||
|
|
data={
|
|||
|
|
"message": f"收集完成: {completed} 完成, {failed} 失败, {running} 运行中",
|
|||
|
|
"children_count": len(children),
|
|||
|
|
"completed": completed,
|
|||
|
|
"failed": failed,
|
|||
|
|
"running": running,
|
|||
|
|
"total_findings": len(all_findings),
|
|||
|
|
"findings": all_findings,
|
|||
|
|
"children": child_summaries,
|
|||
|
|
},
|
|||
|
|
)
|