39 KiB
DeepAudit Agent 审计架构文档
目录
1. 系统概述
DeepAudit Agent 审计系统是一个基于大语言模型(LLM)驱动的自主化安全代码分析系统。该系统采用动态多Agent层级架构,实现了从代码侦察、漏洞分析到验证确认的完整安全审计流程。
核心特性
| 特性 | 描述 |
|---|---|
| LLM中心化决策 | LLM作为系统的"大脑",在各个层级自主做出决策 |
| 动态Agent树 | 根据任务需求动态构建层级化的多Agent系统 |
| ReAct模式 | 实现思考-行动-观察(Thought-Action-Observation)循环 |
| 事件驱动架构 | 通过SSE实时推送事件到前端 |
| 丰富的工具生态 | 20+专业工具覆盖代码分析、模式匹配、漏洞验证 |
技术栈
- 后端: Python 3.11+, FastAPI, LangChain/LiteLLM
- 前端: React 18, TypeScript, Ant Design
- 数据库: PostgreSQL (Supabase)
- 通信: SSE (Server-Sent Events), REST API
2. 核心设计理念
2.1 LLM自主决策
与传统的规则驱动自动化不同,本系统中LLM做出所有战略决策:
传统方式: 规则 → 固定流程 → 结果
DeepAudit: LLM分析 → 自主决策 → 动态调整 → 结果
LLM负责决定:
- 何时派发哪个子Agent
- 使用哪些工具进行分析
- 何时认为分析已足够充分
- 如何解读和关联发现的问题
2.2 ReAct模式实现
每个Agent的执行遵循ReAct(Reasoning + Acting)模式:
┌─────────────────────────────────────────────────┐
│ LLM 输出 │
├─────────────────────────────────────────────────┤
│ Thought: 我应该先了解项目结构... │
│ Action: list_files │
│ Action Input: {"directory": ".", "pattern": "*"}│
└─────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────┐
│ 系统执行工具 │
├─────────────────────────────────────────────────┤
│ Observation: [找到 app/, config/, tests/...] │
└─────────────────────────────────────────────────┘
↓
反馈给LLM,继续下一轮循环
2.3 层级化Agent协作
┌───────────────────┐
│ OrchestratorAgent │
│ (策略编排者) │
└─────────┬─────────┘
│
┌─────────────────┼─────────────────┐
↓ ↓ ↓
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ ReconAgent │ │ AnalysisAgent │ │VerificationAgent│
│ (情报收集) │ │ (漏洞猎手) │ │ (验证确认) │
└───────────────┘ └───────────────┘ └───────────────┘
3. 后端架构
3.1 Agent层级结构
所有Agent代码位于 backend/app/services/agent/agents/ 目录。
3.1.1 BaseAgent 基类
文件: base.py
BaseAgent是所有Agent的抽象基类,提供核心功能:
class BaseAgent:
"""Agent基类 - 提供通用功能"""
# 核心配置
config: AgentConfig # 名称、类型、模式、最大token数、最大迭代次数
state: AgentState # 状态管理(status, messages, findings)
# 关键方法
async def run(self, input_data: dict) -> AgentResult # 抽象方法
async def stream_llm_call() # 统一的LLM调用(支持流式+自动压缩)
async def execute_tool() # 工具执行(带错误处理)
# 事件发射
def emit_thinking_token() # 流式输出LLM思考过程
def emit_thinking_start/end() # 思考生命周期
def emit_tool_call/result() # 工具执行生命周期
def emit_finding() # 漏洞发现事件
关键特性:
- 消息历史压缩(超过100k token时自动截断)
- 取消支持(
is_cancelled属性) - 知识模块动态加载
- 动态父子关系管理
3.1.2 OrchestratorAgent 编排器
文件: orchestrator.py
角色: 战略编排与动态任务调度
最大迭代次数: 20
class OrchestratorAgent(BaseAgent):
"""编排器Agent - 协调整体审计流程"""
async def run(self, input_data: dict) -> AgentResult:
"""主ReAct循环 - LLM决定下一步行动"""
# dispatch_agent: 派发子Agent
# summarize: 汇总当前进度
# finish: 完成审计
async def _dispatch_agent(self, agent_name: str, task: str):
"""派发ReconAgent、AnalysisAgent或VerificationAgent"""
def _parse_llm_response(self, response: str):
"""从LLM输出中提取Thought/Action/Action Input"""
决策点:
- 决定派发哪个子Agent
- 判断何时完成审计
- 汇总所有子Agent的发现
3.1.3 ReconAgent 侦察员
文件: recon.py
角色: 项目分析与情报收集
最大迭代次数: 15
主要任务:
- 发现项目结构
- 识别技术栈和框架
- 定位入口点
- 标记高风险区域
可用工具: list_files, read_file, search_code
输出格式:
{
"project_structure": {...},
"tech_stack": {"languages": ["Python"], "frameworks": ["Django"]},
"entry_points": ["app/views.py", "api/routes.py"],
"high_risk_areas": ["auth/", "payment/"],
"initial_findings": [...]
}
3.1.4 AnalysisAgent 分析员
文件: analysis.py
角色: 深度代码漏洞分析
最大迭代次数: 30
主要工具:
| 工具 | 描述 | 优先级 |
|---|---|---|
smart_scan |
智能批量安全扫描 | 推荐首选 |
quick_audit |
快速文件审计 | 二级 |
pattern_match |
危险模式检测 | 三级 |
dataflow_analysis |
数据流追踪 | 深度分析 |
semgrep_scan |
Semgrep静态分析 | 外部工具 |
bandit_scan |
Python安全检测 | 外部工具 |
gitleaks_scan |
密钥泄露检测 | 外部工具 |
关注的漏洞类型:
- SQL注入
- XSS跨站脚本
- 命令注入
- 路径遍历
- SSRF服务端请求伪造
3.1.5 VerificationAgent 验证员
文件: verification.py
角色: 确认发现并生成PoC
最大迭代次数: 15
主要职责:
- 减少误报
- 验证可利用性
- 生成概念验证(PoC)
输出格式:
{
"is_verified": true,
"poc": {
"description": "...",
"steps": [...],
"payload": "..."
},
"exploitability": "high",
"confidence": 0.95
}
3.1.6 TaskHandoff 任务交接协议
Agent之间通过结构化的TaskHandoff进行协作:
@dataclass
class TaskHandoff:
"""Agent间的结构化通信协议"""
work_completed: str # 已完成的工作
key_findings: List[dict] # 关键发现
insights: List[str] # 洞察
suggested_actions: List[str] # 建议行动
attention_points: List[str] # 需要关注的点
priority_areas: List[str] # 优先区域
context_data: dict # 上下文数据
def to_prompt_context(self) -> str:
"""转换为LLM可读格式"""
3.2 核心基础设施
核心基础设施位于 backend/app/services/agent/core/ 目录。
3.2.1 状态管理 (state.py)
class AgentState:
"""Agent状态跟踪"""
status: Literal["created", "running", "waiting", "completed", "failed"]
messages: List[Message] # 对话历史
findings: List[Finding] # 发现列表
iterations: int # 当前迭代次数
tool_calls: int # 工具调用次数
3.2.2 执行上下文 (context.py)
class ExecutionContext:
"""分布式追踪的执行上下文"""
task_id: str # 任务ID
agent_id: str # Agent ID
trace_path: str # 追踪路径 (如: Orchestrator > Analysis)
depth: int # 嵌套深度
iteration: int # 当前迭代
metadata: dict # 附加元数据
def child_context(self, agent_id: str) -> ExecutionContext:
"""创建子上下文"""
def with_iteration(self, iteration: int) -> ExecutionContext:
"""带迭代信息的上下文"""
3.2.3 熔断器 (circuit_breaker.py)
class CircuitBreaker:
"""熔断器 - 防止级联故障"""
# 状态转换: CLOSED -> OPEN -> HALF_OPEN -> CLOSED
failure_threshold: int = 5 # 失败阈值
recovery_timeout: float = 30.0 # 恢复超时(秒)
async def call(self, func: Callable) -> Any:
"""受保护的调用"""
if self.state == CircuitState.OPEN:
raise CircuitOpenError()
try:
result = await func()
self._record_success()
return result
except Exception as e:
self._record_failure()
raise
3.2.4 重试机制 (retry.py)
class RetryStrategy:
"""指数退避重试"""
base_delay: float = 1.0 # 基础延迟(秒)
max_delay: float = 60.0 # 最大延迟(秒)
max_retries: int = 3 # 最大重试次数
def get_delay(self, attempt: int) -> float:
"""计算延迟: min(base * (2 ** attempt) + jitter, max)"""
3.2.5 速率限制器 (rate_limiter.py)
class RateLimiter:
"""令牌桶算法速率限制"""
# 预设限制:
# - 外部工具: 0.2 calls/second (每5秒1次)
# - LLM调用: 60 calls/minute
# - 突发支持: 最多3个并发调用
async def acquire(self, tool_name: str):
"""获取执行许可"""
3.2.6 输入验证 (validation.py)
class InputValidator:
"""输入安全验证"""
def validate_file_path(self, path: str) -> bool:
"""文件路径安全检查"""
# - 检查路径遍历攻击
# - 验证文件扩展名
# - 检查符号链接
def validate_file_size(self, path: str, max_size: int = 10_000_000):
"""文件大小限制(默认10MB)"""
3.2.7 错误处理 (errors.py)
# 自定义异常层级
class AgentError(Exception): pass
class CircuitOpenError(AgentError): pass # 熔断器打开
class AgentExecutionError(AgentError): pass # Agent执行失败
class ToolExecutionError(AgentError): pass # 工具执行失败
class ValidationError(AgentError): pass # 输入验证失败
class TokenLimitError(AgentError): pass # Token超限
3.3 工具生态系统
工具代码位于 backend/app/services/agent/tools/ 目录。
3.3.1 文件操作工具
| 工具 | 文件 | 描述 |
|---|---|---|
FileReadTool |
- | 读取文件内容(支持行范围) |
ListFilesTool |
- | 列出目录内容(支持glob模式) |
FileSearchTool |
- | 按关键词搜索代码 |
3.3.2 代码分析工具
SmartScanTool (smart_scan_tool.py)
class SmartScanTool:
"""智能批量安全扫描 - 推荐首选工具"""
# 功能:
# - 自动检测漏洞模式
# - 聚焦高风险文件
# - 批量处理提高效率
async def execute(self, target: str) -> dict:
return {
"vulnerabilities": [...],
"risk_areas": [...],
"recommendations": [...]
}
PatternMatchTool (pattern_tool.py)
class PatternMatchTool:
"""基于正则的模式检测"""
# 内置模式:
# - SQL注入模式
# - XSS模式
# - 命令注入模式
# - 可作为Semgrep的后备
DataFlowAnalysisTool (code_analysis_tool.py)
class DataFlowAnalysisTool:
"""数据流分析工具"""
# 功能:
# - 源点到汇点追踪
# - 变量污点分析
# - LLM辅助的数据流理解
3.3.3 外部安全工具
文件: external_tools.py
| 工具 | 描述 | 超时 | 后备方案 |
|---|---|---|---|
SemgrepTool |
静态分析规则引擎 | 120s | PatternMatchTool |
BanditTool |
Python安全linter | 60s | PatternMatchTool |
GitleaksTool |
密钥/凭证检测 | 60s | - |
3.3.4 完成工具
FinishTool (finish_tool.py)
class FinishTool:
"""任务完成工具"""
async def execute(self, conclusion: str, findings: List[dict]):
"""标记任务完成并返回最终结果"""
3.4 知识库与提示词
3.4.1 漏洞知识库
位于 backend/app/services/agent/knowledge/vulnerabilities/
按漏洞类型组织:
| 文件 | 漏洞类型 |
|---|---|
sql_injection.py |
SQL注入 |
xss.py |
跨站脚本 |
csrf.py |
跨站请求伪造 |
auth.py |
认证漏洞 |
ssrf.py |
服务端请求伪造 |
path_traversal.py |
路径遍历 |
deserialization.py |
不安全反序列化 |
xxe.py |
XML外部实体注入 |
race_condition.py |
竞态条件 |
crypto.py |
弱加密 |
injection.py |
代码/命令注入 |
business_logic.py |
业务逻辑漏洞 |
open_redirect.py |
开放重定向 |
框架特定知识:
| 文件 | 框架 |
|---|---|
FastAPI.py |
FastAPI安全模式 |
Django.py |
Django安全问题 |
Flask.py |
Flask漏洞 |
Express.js |
Node.js/Express模式 |
React.js |
前端安全 |
Supabase.py |
BaaS安全 |
3.4.2 系统提示词
文件: backend/app/services/agent/prompts/system_prompts.py
# 核心安全原则
CORE_SECURITY_PRINCIPLES = """
- 深度分析优于广度覆盖
- 重视数据流追踪
- 上下文感知分析
- 假阳性需验证确认
"""
# 漏洞优先级
VULNERABILITY_PRIORITIES = {
"critical": ["sql_injection", "command_injection", "code_injection"],
"high": ["path_traversal", "ssrf", "auth_bypass"],
"medium": ["xss", "information_disclosure", "xxe"],
"low": ["csrf", "weak_crypto", "unsafe_transport"]
}
# 多Agent协作规则
MULTI_AGENT_RULES = """
- 避免重复工作
- 共享上下文信息
- 聚焦自身职责
- 及时交接发现
"""
4. 前端架构
前端代码位于 frontend/src/pages/AgentAudit/ 目录。
4.1 目录结构
AgentAudit/
├── index.tsx # 主页面组件
├── types.ts # TypeScript类型定义
├── utils.ts # 工具函数
├── constants.tsx # 常量配置
├── hooks/
│ ├── useAgentAuditState.ts # 状态管理(useReducer)
│ ├── useResilientStream.ts # SSE流式连接
│ └── index.ts
└── components/
├── Header.tsx # 标题、状态、控制按钮
├── StatsPanel.tsx # 统计面板
├── AgentTreeNode.tsx # Agent树节点渲染
├── AgentDetailPanel.tsx # Agent详情侧边栏
├── LogEntry.tsx # 单条日志条目
├── ConnectionStatus.tsx # 连接状态指示器
├── StatusBadge.tsx # 状态徽章
├── SplashScreen.tsx # 初始屏幕
├── AgentErrorBoundary.tsx # 错误边界
├── ReportExportDialog.tsx # 导出对话框
└── index.ts
4.2 状态管理
文件: hooks/useAgentAuditState.ts
interface AgentAuditState {
task: AgentTask | null; // 当前任务
findings: AgentFinding[]; // 发现列表
agentTree: AgentTreeResponse | null; // Agent树
logs: LogItem[]; // 日志列表
selectedAgentId: string | null; // 选中的Agent
connectionStatus: ConnectionStatus; // 连接状态
isAutoScroll: boolean; // 自动滚动
expandedLogIds: Set<string>; // 展开的日志ID
}
type ConnectionStatus = 'disconnected' | 'connecting' | 'connected' | 'error';
// Reducer Actions
type Action =
| { type: 'SET_TASK'; payload: AgentTask }
| { type: 'SET_FINDINGS'; payload: AgentFinding[] }
| { type: 'ADD_LOG'; payload: LogItem }
| { type: 'UPDATE_LOG'; payload: { id: string; updates: Partial<LogItem> } }
| { type: 'SET_CONNECTION_STATUS'; payload: ConnectionStatus }
| { type: 'SELECT_AGENT'; payload: string | null }
| { type: 'TOGGLE_LOG_EXPANDED'; payload: string }
| { type: 'RESET' };
4.3 事件流处理
文件: hooks/useResilientStream.ts
function useResilientStream(taskId: string, options: StreamOptions) {
// 功能:
// - 弹性SSE连接(自动重连)
// - Thinking token逐字符流式更新
// - 按类型过滤事件
// - 组件卸载时自动清理
// 事件处理器
onThinkingToken: (token: string) => void; // 累积thinking内容
onToolStart: (tool: ToolEvent) => void; // 工具开始
onToolEnd: (tool: ToolEvent) => void; // 工具结束
onFinding: (finding: Finding) => void; // 新发现
onComplete: () => void; // 任务完成
onError: (error: Error) => void; // 错误处理
}
4.4 日志类型
type LogType =
| 'thinking' // LLM思考过程(支持流式)
| 'tool' // 工具执行(含耗时和状态)
| 'phase' // 工作流阶段
| 'finding' // 漏洞发现
| 'info' // 信息消息
| 'error' // 错误消息
| 'dispatch' // 子Agent派发
| 'user'; // 用户操作
interface LogItem {
id: string;
type: LogType;
timestamp: Date;
content: string;
metadata?: Record<string, any>;
isExpanded?: boolean;
isStreaming?: boolean; // thinking类型专用
}
4.5 实时更新流程
SSE Stream → useResilientStream → dispatch() → reducer → UI更新
↓
thinking_token事件
↓
onThinkingToken回调
↓
dispatch({type: 'ADD_LOG', payload: {type: 'thinking', content: accumulated}})
↓
UI渲染LogEntry(流式效果)
5. API接口
API端点位于 backend/app/api/v1/endpoints/agent_tasks.py
5.1 任务管理
| 方法 | 端点 | 描述 |
|---|---|---|
| POST | /agent-tasks/ |
创建并启动审计任务 |
| GET | /agent-tasks/ |
列出任务(支持过滤) |
| GET | /agent-tasks/{task_id} |
获取任务详情 |
| POST | /agent-tasks/{task_id}/cancel |
取消运行中的任务 |
5.2 事件流
| 方法 | 端点 | 描述 |
|---|---|---|
| GET | /agent-tasks/{task_id}/events |
基础事件轮询(SSE) |
| GET | /agent-tasks/{task_id}/stream |
增强事件流(含thinking tokens) |
查询参数:
include_thinking: 是否包含thinking tokeninclude_tool_calls: 是否包含工具调用after_sequence: 从指定序列号后开始
5.3 结果查询
| 方法 | 端点 | 描述 |
|---|---|---|
| GET | /agent-tasks/{task_id}/findings |
获取发现列表(支持分页) |
| GET | /agent-tasks/{task_id}/summary |
任务摘要与统计 |
| PATCH | /agent-tasks/{task_id}/findings/{finding_id}/status |
更新发现状态 |
| GET | /agent-tasks/{task_id}/report |
生成报告(markdown/json) |
5.4 高级功能
| 方法 | 端点 | 描述 |
|---|---|---|
| GET | /agent-tasks/{task_id}/agent-tree |
Agent层级与执行统计 |
| GET | /agent-tasks/{task_id}/checkpoints |
执行检查点列表 |
| GET | /agent-tasks/{task_id}/checkpoints/{checkpoint_id} |
检查点详情 |
5.5 创建任务Schema
class AgentTaskCreate(BaseModel):
project_id: str # 项目ID
name: Optional[str] # 任务名称
target_vulnerabilities: List[str] = [
"sql_injection", "xss", "command_injection",
"path_traversal", "ssrf"
]
verification_level: str = "sandbox" # 验证级别
exclude_patterns: List[str] = [ # 排除模式
"node_modules", "venv", "__pycache__", ".git"
]
target_files: Optional[List[str]] # 限定扫描文件
max_iterations: int = 50 # 最大迭代(1-200)
timeout_seconds: int = 1800 # 超时(60-7200秒)
6. 审计任务执行流程
6.1 完整流程图
┌─────────────────────────────────────────────────────────────────────┐
│ 1. 任务创建 │
│ 用户 → POST /agent-tasks/ → 创建AgentTask(PENDING) → 入队后台任务 │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ 2. 初始化 │
│ 加载项目 → 收集项目信息 → 初始化工具 → 创建子Agent → 创建Orchestrator │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ 3. Orchestrator循环 (最多20轮) │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 迭代 N: │ │
│ │ a) 调用LLM获取下一步决策 │ │
│ │ b) 解析: Thought / Action / Action Input │ │
│ │ c) 执行Action: │ │
│ │ - dispatch_agent → 派发子Agent │ │
│ │ - finish → 完成审计 │ │
│ │ d) 发射事件到前端 │ │
│ └──────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ 4. 子Agent执行 (以Analysis为例) │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 循环 (最多30轮): │ │
│ │ a) LLM思考下一步 │ │
│ │ b) 选择并执行工具 (smart_scan, pattern_match, ...) │ │
│ │ c) 将观察结果加入历史 │ │
│ │ d) 检测是否输出"Final Answer" → 解析发现,返回 │ │
│ └──────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ 5. 结果汇总 │
│ 合并所有发现 → 去重 → 更新验证结果 → 构建findings列表 │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ 6. 持久化 │
│ 更新AgentTask状态 → 保存到AgentFinding表 → 计算安全评分 │
└─────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────┐
│ 7. 前端展示 │
│ 实时SSE事件 → 日志累积 → Agent树生长 → 统计更新 → 最终结果展示 │
└─────────────────────────────────────────────────────────────────────┘
6.2 详细执行示例
场景: 用户上传Python项目,执行Agent审计
1. 用户请求: POST /agent-tasks/
{
"project_id": "123",
"target_files": ["app/views.py"]
}
2. Orchestrator迭代1 - LLM输出:
Thought: 我应该先用recon了解项目结构
Action: dispatch_agent
Action Input: {"agent": "recon", "task": "分析项目结构"}
3. ReconAgent执行:
迭代1: list_files → 找到 app/views.py, models.py...
迭代2: read_file → 读取app/views.py内容
迭代N: Final Answer → 返回项目结构信息
4. Orchestrator迭代2 - LLM输出:
Thought: 这是一个Django项目,让我派发Analysis进行漏洞分析
Action: dispatch_agent
Action Input: {"agent": "analysis", "task": "深度漏洞分析"}
5. AnalysisAgent执行:
迭代1: smart_scan → 发现第42行可能存在SQL注入
迭代2: read_file → 确认问题代码
迭代M: Final Answer → 返回结构化的漏洞发现
6. Orchestrator迭代3 - LLM输出:
Thought: 发现高危SQL注入,需要验证
Action: dispatch_agent
Action Input: {"agent": "verification", "task": "验证SQL注入"}
7. VerificationAgent确认:
返回: {"is_verified": true, "poc": {...}}
8. Orchestrator完成:
Thought: 审计完成,共发现1个已验证漏洞
Action: finish
Action Input: {"conclusion": "...", "findings": [...]}
9. 后端持久化:
- AgentTask.status = COMPLETED
- 写入AgentFinding表
10. 前端实时展示:
- 日志流式更新
- Agent树动态生长
- 统计数据刷新
- 最终展示发现列表
7. 事件流与遥测
7.1 事件管理器
文件: backend/app/services/agent/event_manager.py
class EventManager:
"""事件管理器 - 内存队列 + 流式迭代"""
max_events: int = 1000 # 最大事件数
async def emit(self, event_type: str, message: str, **metadata):
"""发射事件"""
async def stream_events(self):
"""异步迭代器: async for event in manager.stream_events()"""
7.2 事件类型
| 类型 | 描述 |
|---|---|
thinking_start |
LLM开始思考 |
thinking_token |
思考token(逐字符) |
thinking_end |
LLM思考结束 |
tool_call |
工具调用开始 |
tool_result |
工具执行结果 |
finding |
漏洞发现 |
phase_start |
阶段开始 |
phase_complete |
阶段完成 |
dispatch |
子Agent派发 |
llm_thought |
LLM思考内容 |
llm_decision |
LLM决策 |
error |
错误 |
warning |
警告 |
info |
信息 |
7.3 遥测追踪
文件: backend/app/services/agent/telemetry/tracer.py
class Tracer:
"""分布式追踪"""
def create_span(self, name: str, parent_span: Span = None) -> Span:
"""创建追踪Span"""
# 追踪路径示例: Orchestrator > Analysis > (Verification)
# 记录的指标:
# - 执行时间
# - Token使用量
# - 工具调用次数
# - 关联ID
8. 配置与部署
8.1 Agent配置
文件: backend/app/services/agent/config.py
class AgentConfig:
"""Agent配置 (环境变量前缀: AGENT_)"""
# LLM设置
llm_max_retries: int = 3
llm_retry_base_delay: float = 1.0
llm_timeout_seconds: int = 120
llm_max_tokens_per_call: int = 4096
llm_temperature: float = 0.1
llm_stream_enabled: bool = True
# Agent迭代限制
orchestrator_max_iterations: int = 20
recon_max_iterations: int = 15
analysis_max_iterations: int = 30
verification_max_iterations: int = 15
# 工具设置
tool_timeout_seconds: int = 60
tool_max_retries: int = 2
semgrep_enabled: bool = True
bandit_enabled: bool = True
gitleaks_enabled: bool = True
# 资源限制
max_file_size_bytes: int = 10_000_000 # 10MB
max_files_per_scan: int = 1000
max_total_findings: int = 500
max_context_messages: int = 50
# 熔断器
circuit_breaker_enabled: bool = True
circuit_failure_threshold: int = 5
circuit_recovery_timeout_seconds: float = 30.0
# 检查点
checkpoint_enabled: bool = True
checkpoint_interval_iterations: int = 5
max_checkpoints_per_task: int = 50
8.2 环境预设
def apply_development_preset():
"""开发环境预设"""
# 更宽松的限制,更多日志
def apply_production_preset():
"""生产环境预设"""
# 更严格的限制,优化性能
def apply_testing_preset():
"""测试环境预设"""
# 快速迭代,最小资源
9. 关键设计模式
9.1 LLM中心化决策
┌─────────────────────────────────────────────┐
│ 传统自动化 │
│ 规则引擎 → 固定流程 → 预定义行为 │
└─────────────────────────────────────────────┘
┌─────────────────────────────────────────────┐
│ DeepAudit │
│ LLM分析 → 自主决策 → 动态适应 → 智能输出 │
└─────────────────────────────────────────────┘
9.2 ReAct模式
Loop:
1. Thought: LLM思考当前状态和下一步
2. Action: 选择要执行的动作/工具
3. Action Input: 提供参数
4. Observation: 获取执行结果
5. 重复直到达成目标或Final Answer
9.3 任务交接协议
# Agent A完成工作后
handoff = TaskHandoff(
work_completed="扫描了10个文件,发现3个可疑点",
key_findings=[finding1, finding2, finding3],
suggested_actions=["验证SQL注入", "检查认证逻辑"],
priority_areas=["auth/", "api/"]
)
# 转换为下一个Agent的输入
context = handoff.to_prompt_context()
# Agent B使用此上下文继续工作
9.4 优雅降级
┌───────────────────────────────────────────────────┐
│ 优雅降级策略 │
├───────────────────────────────────────────────────┤
│ • 工具失败不中断执行 (continue_on_tool_failure) │
│ • 后备工具链 (Semgrep → PatternMatch) │
│ • 超时时返回部分结果 │
│ • 熔断器防止级联故障 │
│ • 消息压缩避免token超限 │
└───────────────────────────────────────────────────┘
9.5 范围限定审计
# 支持的范围限定:
AgentTaskCreate(
exclude_patterns=["node_modules", "*.min.js"], # 排除模式
target_files=["src/auth/", "api/views.py"] # 限定文件
)
# 减少噪音,聚焦LLM注意力
10. 安全与健壮性
10.1 安全特性
| 特性 | 描述 |
|---|---|
| 速率限制 | 防止资源耗尽 |
| 熔断器 | 阻止级联故障 |
| 输入验证 | 文件路径、大小、扩展名检查 |
| 资源限制 | 最大发现数(500)、最大文件大小(10MB) |
| 范围过滤 | 可限定扫描范围 |
10.2 健壮性特性
| 特性 | 描述 |
|---|---|
| 指数退避重试 | 处理瞬态故障 |
| 上下文压缩 | 自动截断长消息历史 |
| 取消支持 | 可随时中断运行任务 |
| 检查点 | 长任务恢复点 |
| 结构化异常 | 分层错误处理 |
11. 扩展指南
11.1 添加新Agent类型
# 1. 创建新Agent类
class CustomAgent(BaseAgent):
def __init__(self, config: AgentConfig):
super().__init__(config)
self.config.name = "custom"
self.config.max_iterations = 20
async def run(self, input_data: dict) -> AgentResult:
# 实现ReAct循环
for iteration in range(self.config.max_iterations):
response = await self.stream_llm_call(messages)
parsed = self._parse_response(response)
if parsed.action == "finish":
return AgentResult(data=parsed.data)
observation = await self.execute_tool(
parsed.action,
parsed.action_input
)
messages.append({"role": "user", "content": f"Observation: {observation}"})
# 2. 在Orchestrator中注册
orchestrator.register_agent("custom", CustomAgent(config))
11.2 添加新工具
# 1. 实现工具接口
class CustomTool:
name: str = "custom_tool"
description: str = "自定义工具描述"
async def execute(self, **kwargs) -> dict:
# 实现工具逻辑
return {"result": "..."}
# 2. 在工具注册表中添加
TOOL_REGISTRY["custom_tool"] = CustomTool()
11.3 添加漏洞知识
# 创建新的知识模块: knowledge/vulnerabilities/custom_vuln.py
CUSTOM_VULN_KNOWLEDGE = {
"name": "Custom Vulnerability",
"description": "...",
"patterns": [
r"pattern1",
r"pattern2"
],
"examples": [...],
"remediation": "..."
}
11.4 自定义提示词
# 在system_prompts.py中添加
CUSTOM_AGENT_PROMPT = """
你是一个专门的安全分析Agent。
职责:
- ...
可用工具:
- ...
输出格式:
- ...
"""
11.5 添加新事件类型
# 在Agent中发射自定义事件
self.emit_event(
event_type="custom_event",
message="自定义消息",
custom_field="value"
)
# 在前端处理
switch (event.type) {
case 'custom_event':
handleCustomEvent(event);
break;
}
附录: 关键文件索引
核心Agent逻辑:
backend/app/services/agent/agents/base.py # BaseAgent基类
backend/app/services/agent/agents/orchestrator.py # 编排器
backend/app/services/agent/agents/analysis.py # 分析Agent
backend/app/services/agent/agents/recon.py # 侦察Agent
backend/app/services/agent/agents/verification.py # 验证Agent
核心基础设施:
backend/app/services/agent/core/state.py # 状态管理
backend/app/services/agent/core/context.py # 执行上下文
backend/app/services/agent/core/circuit_breaker.py # 熔断器
backend/app/services/agent/core/retry.py # 重试机制
backend/app/services/agent/core/rate_limiter.py # 速率限制
backend/app/services/agent/core/validation.py # 输入验证
backend/app/services/agent/core/errors.py # 错误处理
工具:
backend/app/services/agent/tools/code_analysis_tool.py # 代码分析
backend/app/services/agent/tools/pattern_tool.py # 模式匹配
backend/app/services/agent/tools/smart_scan_tool.py # 智能扫描
backend/app/services/agent/tools/external_tools.py # 外部工具
backend/app/services/agent/tools/finish_tool.py # 完成工具
事件与遥测:
backend/app/services/agent/event_manager.py # 事件管理
backend/app/services/agent/telemetry/tracer.py # 分布式追踪
配置:
backend/app/services/agent/config.py # 集中配置
API:
backend/app/api/v1/endpoints/agent_tasks.py # REST端点
前端:
frontend/src/pages/AgentAudit/index.tsx # 主页面
frontend/src/pages/AgentAudit/hooks/useAgentAuditState.ts # 状态管理
frontend/src/pages/AgentAudit/hooks/useResilientStream.ts # SSE流
frontend/src/pages/AgentAudit/types.ts # 类型定义
frontend/src/pages/AgentAudit/components/* # UI组件
文档版本: 1.0 最后更新: 2025-12-13