# DeepAudit Agent 审计架构文档 ## 目录 1. [系统概述](#1-系统概述) 2. [核心设计理念](#2-核心设计理念) 3. [后端架构](#3-后端架构) - [Agent层级结构](#31-agent层级结构) - [核心基础设施](#32-核心基础设施) - [工具生态系统](#33-工具生态系统) - [知识库与提示词](#34-知识库与提示词) 4. [前端架构](#4-前端架构) 5. [API接口](#5-api接口) 6. [审计任务执行流程](#6-审计任务执行流程) 7. [事件流与遥测](#7-事件流与遥测) 8. [配置与部署](#8-配置与部署) 9. [关键设计模式](#9-关键设计模式) 10. [安全与健壮性](#10-安全与健壮性) 11. [扩展指南](#11-扩展指南) --- ## 1. 系统概述 DeepAudit Agent 审计系统是一个基于大语言模型(LLM)驱动的自主化安全代码分析系统。该系统采用**动态多Agent层级架构**,实现了从代码侦察、漏洞分析到验证确认的完整安全审计流程。 ### 核心特性 | 特性 | 描述 | |------|------| | **LLM中心化决策** | LLM作为系统的"大脑",在各个层级自主做出决策 | | **动态Agent树** | 根据任务需求动态构建层级化的多Agent系统 | | **ReAct模式** | 实现思考-行动-观察(Thought-Action-Observation)循环 | | **事件驱动架构** | 通过SSE实时推送事件到前端 | | **丰富的工具生态** | 20+专业工具覆盖代码分析、模式匹配、漏洞验证 | ### 技术栈 - **后端**: Python 3.11+, FastAPI, LangChain/LiteLLM - **前端**: React 18, TypeScript, Ant Design - **数据库**: PostgreSQL (Supabase) - **通信**: SSE (Server-Sent Events), REST API --- ## 2. 核心设计理念 ### 2.1 LLM自主决策 与传统的规则驱动自动化不同,本系统中**LLM做出所有战略决策**: ``` 传统方式: 规则 → 固定流程 → 结果 DeepAudit: LLM分析 → 自主决策 → 动态调整 → 结果 ``` LLM负责决定: - 何时派发哪个子Agent - 使用哪些工具进行分析 - 何时认为分析已足够充分 - 如何解读和关联发现的问题 ### 2.2 ReAct模式实现 每个Agent的执行遵循ReAct(Reasoning + Acting)模式: ``` ┌─────────────────────────────────────────────────┐ │ LLM 输出 │ ├─────────────────────────────────────────────────┤ │ Thought: 我应该先了解项目结构... │ │ Action: list_files │ │ Action Input: {"directory": ".", "pattern": "*"}│ └─────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────┐ │ 系统执行工具 │ ├─────────────────────────────────────────────────┤ │ Observation: [找到 app/, config/, tests/...] │ └─────────────────────────────────────────────────┘ ↓ 反馈给LLM,继续下一轮循环 ``` ### 2.3 层级化Agent协作 ``` ┌───────────────────┐ │ OrchestratorAgent │ │ (策略编排者) │ └─────────┬─────────┘ │ ┌─────────────────┼─────────────────┐ ↓ ↓ ↓ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │ ReconAgent │ │ AnalysisAgent │ │VerificationAgent│ │ (情报收集) │ │ (漏洞猎手) │ │ (验证确认) │ └───────────────┘ └───────────────┘ └───────────────┘ ``` --- ## 3. 后端架构 ### 3.1 Agent层级结构 所有Agent代码位于 `backend/app/services/agent/agents/` 目录。 #### 3.1.1 BaseAgent 基类 **文件**: `base.py` BaseAgent是所有Agent的抽象基类,提供核心功能: ```python class BaseAgent: """Agent基类 - 提供通用功能""" # 核心配置 config: AgentConfig # 名称、类型、模式、最大token数、最大迭代次数 state: AgentState # 状态管理(status, messages, findings) # 关键方法 async def run(self, input_data: dict) -> AgentResult # 抽象方法 async def stream_llm_call() # 统一的LLM调用(支持流式+自动压缩) async def execute_tool() # 工具执行(带错误处理) # 事件发射 def emit_thinking_token() # 流式输出LLM思考过程 def emit_thinking_start/end() # 思考生命周期 def emit_tool_call/result() # 工具执行生命周期 def emit_finding() # 漏洞发现事件 ``` **关键特性**: - 消息历史压缩(超过100k token时自动截断) - 取消支持(`is_cancelled`属性) - 知识模块动态加载 - 动态父子关系管理 #### 3.1.2 OrchestratorAgent 编排器 **文件**: `orchestrator.py` **角色**: 战略编排与动态任务调度 **最大迭代次数**: 20 ```python class OrchestratorAgent(BaseAgent): """编排器Agent - 协调整体审计流程""" async def run(self, input_data: dict) -> AgentResult: """主ReAct循环 - LLM决定下一步行动""" # dispatch_agent: 派发子Agent # summarize: 汇总当前进度 # finish: 完成审计 async def _dispatch_agent(self, agent_name: str, task: str): """派发ReconAgent、AnalysisAgent或VerificationAgent""" def _parse_llm_response(self, response: str): """从LLM输出中提取Thought/Action/Action Input""" ``` **决策点**: - 决定派发哪个子Agent - 判断何时完成审计 - 汇总所有子Agent的发现 #### 3.1.3 ReconAgent 侦察员 **文件**: `recon.py` **角色**: 项目分析与情报收集 **最大迭代次数**: 15 **主要任务**: - 发现项目结构 - 识别技术栈和框架 - 定位入口点 - 标记高风险区域 **可用工具**: `list_files`, `read_file`, `search_code` **输出格式**: ```json { "project_structure": {...}, "tech_stack": {"languages": ["Python"], "frameworks": ["Django"]}, "entry_points": ["app/views.py", "api/routes.py"], "high_risk_areas": ["auth/", "payment/"], "initial_findings": [...] } ``` #### 3.1.4 AnalysisAgent 分析员 **文件**: `analysis.py` **角色**: 深度代码漏洞分析 **最大迭代次数**: 30 **主要工具**: | 工具 | 描述 | 优先级 | |------|------|--------| | `smart_scan` | 智能批量安全扫描 | **推荐首选** | | `quick_audit` | 快速文件审计 | 二级 | | `pattern_match` | 危险模式检测 | 三级 | | `dataflow_analysis` | 数据流追踪 | 深度分析 | | `semgrep_scan` | Semgrep静态分析 | 外部工具 | | `bandit_scan` | Python安全检测 | 外部工具 | | `gitleaks_scan` | 密钥泄露检测 | 外部工具 | **关注的漏洞类型**: - SQL注入 - XSS跨站脚本 - 命令注入 - 路径遍历 - SSRF服务端请求伪造 #### 3.1.5 VerificationAgent 验证员 **文件**: `verification.py` **角色**: 确认发现并生成PoC **最大迭代次数**: 15 **主要职责**: - 减少误报 - 验证可利用性 - 生成概念验证(PoC) **输出格式**: ```json { "is_verified": true, "poc": { "description": "...", "steps": [...], "payload": "..." }, "exploitability": "high", "confidence": 0.95 } ``` #### 3.1.6 TaskHandoff 任务交接协议 Agent之间通过结构化的`TaskHandoff`进行协作: ```python @dataclass class TaskHandoff: """Agent间的结构化通信协议""" work_completed: str # 已完成的工作 key_findings: List[dict] # 关键发现 insights: List[str] # 洞察 suggested_actions: List[str] # 建议行动 attention_points: List[str] # 需要关注的点 priority_areas: List[str] # 优先区域 context_data: dict # 上下文数据 def to_prompt_context(self) -> str: """转换为LLM可读格式""" ``` --- ### 3.2 核心基础设施 核心基础设施位于 `backend/app/services/agent/core/` 目录。 #### 3.2.1 状态管理 (state.py) ```python class AgentState: """Agent状态跟踪""" status: Literal["created", "running", "waiting", "completed", "failed"] messages: List[Message] # 对话历史 findings: List[Finding] # 发现列表 iterations: int # 当前迭代次数 tool_calls: int # 工具调用次数 ``` #### 3.2.2 执行上下文 (context.py) ```python class ExecutionContext: """分布式追踪的执行上下文""" task_id: str # 任务ID agent_id: str # Agent ID trace_path: str # 追踪路径 (如: Orchestrator > Analysis) depth: int # 嵌套深度 iteration: int # 当前迭代 metadata: dict # 附加元数据 def child_context(self, agent_id: str) -> ExecutionContext: """创建子上下文""" def with_iteration(self, iteration: int) -> ExecutionContext: """带迭代信息的上下文""" ``` #### 3.2.3 熔断器 (circuit_breaker.py) ```python class CircuitBreaker: """熔断器 - 防止级联故障""" # 状态转换: CLOSED -> OPEN -> HALF_OPEN -> CLOSED failure_threshold: int = 5 # 失败阈值 recovery_timeout: float = 30.0 # 恢复超时(秒) async def call(self, func: Callable) -> Any: """受保护的调用""" if self.state == CircuitState.OPEN: raise CircuitOpenError() try: result = await func() self._record_success() return result except Exception as e: self._record_failure() raise ``` #### 3.2.4 重试机制 (retry.py) ```python class RetryStrategy: """指数退避重试""" base_delay: float = 1.0 # 基础延迟(秒) max_delay: float = 60.0 # 最大延迟(秒) max_retries: int = 3 # 最大重试次数 def get_delay(self, attempt: int) -> float: """计算延迟: min(base * (2 ** attempt) + jitter, max)""" ``` #### 3.2.5 速率限制器 (rate_limiter.py) ```python class RateLimiter: """令牌桶算法速率限制""" # 预设限制: # - 外部工具: 0.2 calls/second (每5秒1次) # - LLM调用: 60 calls/minute # - 突发支持: 最多3个并发调用 async def acquire(self, tool_name: str): """获取执行许可""" ``` #### 3.2.6 输入验证 (validation.py) ```python class InputValidator: """输入安全验证""" def validate_file_path(self, path: str) -> bool: """文件路径安全检查""" # - 检查路径遍历攻击 # - 验证文件扩展名 # - 检查符号链接 def validate_file_size(self, path: str, max_size: int = 10_000_000): """文件大小限制(默认10MB)""" ``` #### 3.2.7 错误处理 (errors.py) ```python # 自定义异常层级 class AgentError(Exception): pass class CircuitOpenError(AgentError): pass # 熔断器打开 class AgentExecutionError(AgentError): pass # Agent执行失败 class ToolExecutionError(AgentError): pass # 工具执行失败 class ValidationError(AgentError): pass # 输入验证失败 class TokenLimitError(AgentError): pass # Token超限 ``` --- ### 3.3 工具生态系统 工具代码位于 `backend/app/services/agent/tools/` 目录。 #### 3.3.1 文件操作工具 | 工具 | 文件 | 描述 | |------|------|------| | `FileReadTool` | - | 读取文件内容(支持行范围) | | `ListFilesTool` | - | 列出目录内容(支持glob模式) | | `FileSearchTool` | - | 按关键词搜索代码 | #### 3.3.2 代码分析工具 **SmartScanTool** (`smart_scan_tool.py`) ```python class SmartScanTool: """智能批量安全扫描 - 推荐首选工具""" # 功能: # - 自动检测漏洞模式 # - 聚焦高风险文件 # - 批量处理提高效率 async def execute(self, target: str) -> dict: return { "vulnerabilities": [...], "risk_areas": [...], "recommendations": [...] } ``` **PatternMatchTool** (`pattern_tool.py`) ```python class PatternMatchTool: """基于正则的模式检测""" # 内置模式: # - SQL注入模式 # - XSS模式 # - 命令注入模式 # - 可作为Semgrep的后备 ``` **DataFlowAnalysisTool** (`code_analysis_tool.py`) ```python class DataFlowAnalysisTool: """数据流分析工具""" # 功能: # - 源点到汇点追踪 # - 变量污点分析 # - LLM辅助的数据流理解 ``` #### 3.3.3 外部安全工具 **文件**: `external_tools.py` | 工具 | 描述 | 超时 | 后备方案 | |------|------|------|----------| | `SemgrepTool` | 静态分析规则引擎 | 120s | PatternMatchTool | | `BanditTool` | Python安全linter | 60s | PatternMatchTool | | `GitleaksTool` | 密钥/凭证检测 | 60s | - | #### 3.3.4 完成工具 **FinishTool** (`finish_tool.py`) ```python class FinishTool: """任务完成工具""" async def execute(self, conclusion: str, findings: List[dict]): """标记任务完成并返回最终结果""" ``` --- ### 3.4 知识库与提示词 #### 3.4.1 漏洞知识库 位于 `backend/app/services/agent/knowledge/vulnerabilities/` **按漏洞类型组织**: | 文件 | 漏洞类型 | |------|----------| | `sql_injection.py` | SQL注入 | | `xss.py` | 跨站脚本 | | `csrf.py` | 跨站请求伪造 | | `auth.py` | 认证漏洞 | | `ssrf.py` | 服务端请求伪造 | | `path_traversal.py` | 路径遍历 | | `deserialization.py` | 不安全反序列化 | | `xxe.py` | XML外部实体注入 | | `race_condition.py` | 竞态条件 | | `crypto.py` | 弱加密 | | `injection.py` | 代码/命令注入 | | `business_logic.py` | 业务逻辑漏洞 | | `open_redirect.py` | 开放重定向 | **框架特定知识**: | 文件 | 框架 | |------|------| | `FastAPI.py` | FastAPI安全模式 | | `Django.py` | Django安全问题 | | `Flask.py` | Flask漏洞 | | `Express.js` | Node.js/Express模式 | | `React.js` | 前端安全 | | `Supabase.py` | BaaS安全 | #### 3.4.2 系统提示词 **文件**: `backend/app/services/agent/prompts/system_prompts.py` ```python # 核心安全原则 CORE_SECURITY_PRINCIPLES = """ - 深度分析优于广度覆盖 - 重视数据流追踪 - 上下文感知分析 - 假阳性需验证确认 """ # 漏洞优先级 VULNERABILITY_PRIORITIES = { "critical": ["sql_injection", "command_injection", "code_injection"], "high": ["path_traversal", "ssrf", "auth_bypass"], "medium": ["xss", "information_disclosure", "xxe"], "low": ["csrf", "weak_crypto", "unsafe_transport"] } # 多Agent协作规则 MULTI_AGENT_RULES = """ - 避免重复工作 - 共享上下文信息 - 聚焦自身职责 - 及时交接发现 """ ``` --- ## 4. 前端架构 前端代码位于 `frontend/src/pages/AgentAudit/` 目录。 ### 4.1 目录结构 ``` AgentAudit/ ├── index.tsx # 主页面组件 ├── types.ts # TypeScript类型定义 ├── utils.ts # 工具函数 ├── constants.tsx # 常量配置 ├── hooks/ │ ├── useAgentAuditState.ts # 状态管理(useReducer) │ ├── useResilientStream.ts # SSE流式连接 │ └── index.ts └── components/ ├── Header.tsx # 标题、状态、控制按钮 ├── StatsPanel.tsx # 统计面板 ├── AgentTreeNode.tsx # Agent树节点渲染 ├── AgentDetailPanel.tsx # Agent详情侧边栏 ├── LogEntry.tsx # 单条日志条目 ├── ConnectionStatus.tsx # 连接状态指示器 ├── StatusBadge.tsx # 状态徽章 ├── SplashScreen.tsx # 初始屏幕 ├── AgentErrorBoundary.tsx # 错误边界 ├── ReportExportDialog.tsx # 导出对话框 └── index.ts ``` ### 4.2 状态管理 **文件**: `hooks/useAgentAuditState.ts` ```typescript interface AgentAuditState { task: AgentTask | null; // 当前任务 findings: AgentFinding[]; // 发现列表 agentTree: AgentTreeResponse | null; // Agent树 logs: LogItem[]; // 日志列表 selectedAgentId: string | null; // 选中的Agent connectionStatus: ConnectionStatus; // 连接状态 isAutoScroll: boolean; // 自动滚动 expandedLogIds: Set; // 展开的日志ID } type ConnectionStatus = 'disconnected' | 'connecting' | 'connected' | 'error'; // Reducer Actions type Action = | { type: 'SET_TASK'; payload: AgentTask } | { type: 'SET_FINDINGS'; payload: AgentFinding[] } | { type: 'ADD_LOG'; payload: LogItem } | { type: 'UPDATE_LOG'; payload: { id: string; updates: Partial } } | { type: 'SET_CONNECTION_STATUS'; payload: ConnectionStatus } | { type: 'SELECT_AGENT'; payload: string | null } | { type: 'TOGGLE_LOG_EXPANDED'; payload: string } | { type: 'RESET' }; ``` ### 4.3 事件流处理 **文件**: `hooks/useResilientStream.ts` ```typescript function useResilientStream(taskId: string, options: StreamOptions) { // 功能: // - 弹性SSE连接(自动重连) // - Thinking token逐字符流式更新 // - 按类型过滤事件 // - 组件卸载时自动清理 // 事件处理器 onThinkingToken: (token: string) => void; // 累积thinking内容 onToolStart: (tool: ToolEvent) => void; // 工具开始 onToolEnd: (tool: ToolEvent) => void; // 工具结束 onFinding: (finding: Finding) => void; // 新发现 onComplete: () => void; // 任务完成 onError: (error: Error) => void; // 错误处理 } ``` ### 4.4 日志类型 ```typescript type LogType = | 'thinking' // LLM思考过程(支持流式) | 'tool' // 工具执行(含耗时和状态) | 'phase' // 工作流阶段 | 'finding' // 漏洞发现 | 'info' // 信息消息 | 'error' // 错误消息 | 'dispatch' // 子Agent派发 | 'user'; // 用户操作 interface LogItem { id: string; type: LogType; timestamp: Date; content: string; metadata?: Record; isExpanded?: boolean; isStreaming?: boolean; // thinking类型专用 } ``` ### 4.5 实时更新流程 ``` SSE Stream → useResilientStream → dispatch() → reducer → UI更新 ↓ thinking_token事件 ↓ onThinkingToken回调 ↓ dispatch({type: 'ADD_LOG', payload: {type: 'thinking', content: accumulated}}) ↓ UI渲染LogEntry(流式效果) ``` --- ## 5. API接口 API端点位于 `backend/app/api/v1/endpoints/agent_tasks.py` ### 5.1 任务管理 | 方法 | 端点 | 描述 | |------|------|------| | POST | `/agent-tasks/` | 创建并启动审计任务 | | GET | `/agent-tasks/` | 列出任务(支持过滤) | | GET | `/agent-tasks/{task_id}` | 获取任务详情 | | POST | `/agent-tasks/{task_id}/cancel` | 取消运行中的任务 | ### 5.2 事件流 | 方法 | 端点 | 描述 | |------|------|------| | GET | `/agent-tasks/{task_id}/events` | 基础事件轮询(SSE) | | GET | `/agent-tasks/{task_id}/stream` | 增强事件流(含thinking tokens) | **查询参数**: - `include_thinking`: 是否包含thinking token - `include_tool_calls`: 是否包含工具调用 - `after_sequence`: 从指定序列号后开始 ### 5.3 结果查询 | 方法 | 端点 | 描述 | |------|------|------| | GET | `/agent-tasks/{task_id}/findings` | 获取发现列表(支持分页) | | GET | `/agent-tasks/{task_id}/summary` | 任务摘要与统计 | | PATCH | `/agent-tasks/{task_id}/findings/{finding_id}/status` | 更新发现状态 | | GET | `/agent-tasks/{task_id}/report` | 生成报告(markdown/json) | ### 5.4 高级功能 | 方法 | 端点 | 描述 | |------|------|------| | GET | `/agent-tasks/{task_id}/agent-tree` | Agent层级与执行统计 | | GET | `/agent-tasks/{task_id}/checkpoints` | 执行检查点列表 | | GET | `/agent-tasks/{task_id}/checkpoints/{checkpoint_id}` | 检查点详情 | ### 5.5 创建任务Schema ```python class AgentTaskCreate(BaseModel): project_id: str # 项目ID name: Optional[str] # 任务名称 target_vulnerabilities: List[str] = [ "sql_injection", "xss", "command_injection", "path_traversal", "ssrf" ] verification_level: str = "sandbox" # 验证级别 exclude_patterns: List[str] = [ # 排除模式 "node_modules", "venv", "__pycache__", ".git" ] target_files: Optional[List[str]] # 限定扫描文件 max_iterations: int = 50 # 最大迭代(1-200) timeout_seconds: int = 1800 # 超时(60-7200秒) ``` --- ## 6. 审计任务执行流程 ### 6.1 完整流程图 ``` ┌─────────────────────────────────────────────────────────────────────┐ │ 1. 任务创建 │ │ 用户 → POST /agent-tasks/ → 创建AgentTask(PENDING) → 入队后台任务 │ └─────────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ 2. 初始化 │ │ 加载项目 → 收集项目信息 → 初始化工具 → 创建子Agent → 创建Orchestrator │ └─────────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ 3. Orchestrator循环 (最多20轮) │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ 迭代 N: │ │ │ │ a) 调用LLM获取下一步决策 │ │ │ │ b) 解析: Thought / Action / Action Input │ │ │ │ c) 执行Action: │ │ │ │ - dispatch_agent → 派发子Agent │ │ │ │ - finish → 完成审计 │ │ │ │ d) 发射事件到前端 │ │ │ └──────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ 4. 子Agent执行 (以Analysis为例) │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ 循环 (最多30轮): │ │ │ │ a) LLM思考下一步 │ │ │ │ b) 选择并执行工具 (smart_scan, pattern_match, ...) │ │ │ │ c) 将观察结果加入历史 │ │ │ │ d) 检测是否输出"Final Answer" → 解析发现,返回 │ │ │ └──────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ 5. 结果汇总 │ │ 合并所有发现 → 去重 → 更新验证结果 → 构建findings列表 │ └─────────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ 6. 持久化 │ │ 更新AgentTask状态 → 保存到AgentFinding表 → 计算安全评分 │ └─────────────────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────────────────┐ │ 7. 前端展示 │ │ 实时SSE事件 → 日志累积 → Agent树生长 → 统计更新 → 最终结果展示 │ └─────────────────────────────────────────────────────────────────────┘ ``` ### 6.2 详细执行示例 **场景**: 用户上传Python项目,执行Agent审计 ``` 1. 用户请求: POST /agent-tasks/ { "project_id": "123", "target_files": ["app/views.py"] } 2. Orchestrator迭代1 - LLM输出: Thought: 我应该先用recon了解项目结构 Action: dispatch_agent Action Input: {"agent": "recon", "task": "分析项目结构"} 3. ReconAgent执行: 迭代1: list_files → 找到 app/views.py, models.py... 迭代2: read_file → 读取app/views.py内容 迭代N: Final Answer → 返回项目结构信息 4. Orchestrator迭代2 - LLM输出: Thought: 这是一个Django项目,让我派发Analysis进行漏洞分析 Action: dispatch_agent Action Input: {"agent": "analysis", "task": "深度漏洞分析"} 5. AnalysisAgent执行: 迭代1: smart_scan → 发现第42行可能存在SQL注入 迭代2: read_file → 确认问题代码 迭代M: Final Answer → 返回结构化的漏洞发现 6. Orchestrator迭代3 - LLM输出: Thought: 发现高危SQL注入,需要验证 Action: dispatch_agent Action Input: {"agent": "verification", "task": "验证SQL注入"} 7. VerificationAgent确认: 返回: {"is_verified": true, "poc": {...}} 8. Orchestrator完成: Thought: 审计完成,共发现1个已验证漏洞 Action: finish Action Input: {"conclusion": "...", "findings": [...]} 9. 后端持久化: - AgentTask.status = COMPLETED - 写入AgentFinding表 10. 前端实时展示: - 日志流式更新 - Agent树动态生长 - 统计数据刷新 - 最终展示发现列表 ``` --- ## 7. 事件流与遥测 ### 7.1 事件管理器 **文件**: `backend/app/services/agent/event_manager.py` ```python class EventManager: """事件管理器 - 内存队列 + 流式迭代""" max_events: int = 1000 # 最大事件数 async def emit(self, event_type: str, message: str, **metadata): """发射事件""" async def stream_events(self): """异步迭代器: async for event in manager.stream_events()""" ``` ### 7.2 事件类型 | 类型 | 描述 | |------|------| | `thinking_start` | LLM开始思考 | | `thinking_token` | 思考token(逐字符) | | `thinking_end` | LLM思考结束 | | `tool_call` | 工具调用开始 | | `tool_result` | 工具执行结果 | | `finding` | 漏洞发现 | | `phase_start` | 阶段开始 | | `phase_complete` | 阶段完成 | | `dispatch` | 子Agent派发 | | `llm_thought` | LLM思考内容 | | `llm_decision` | LLM决策 | | `error` | 错误 | | `warning` | 警告 | | `info` | 信息 | ### 7.3 遥测追踪 **文件**: `backend/app/services/agent/telemetry/tracer.py` ```python class Tracer: """分布式追踪""" def create_span(self, name: str, parent_span: Span = None) -> Span: """创建追踪Span""" # 追踪路径示例: Orchestrator > Analysis > (Verification) # 记录的指标: # - 执行时间 # - Token使用量 # - 工具调用次数 # - 关联ID ``` --- ## 8. 配置与部署 ### 8.1 Agent配置 **文件**: `backend/app/services/agent/config.py` ```python class AgentConfig: """Agent配置 (环境变量前缀: AGENT_)""" # LLM设置 llm_max_retries: int = 3 llm_retry_base_delay: float = 1.0 llm_timeout_seconds: int = 120 llm_max_tokens_per_call: int = 4096 llm_temperature: float = 0.1 llm_stream_enabled: bool = True # Agent迭代限制 orchestrator_max_iterations: int = 20 recon_max_iterations: int = 15 analysis_max_iterations: int = 30 verification_max_iterations: int = 15 # 工具设置 tool_timeout_seconds: int = 60 tool_max_retries: int = 2 semgrep_enabled: bool = True bandit_enabled: bool = True gitleaks_enabled: bool = True # 资源限制 max_file_size_bytes: int = 10_000_000 # 10MB max_files_per_scan: int = 1000 max_total_findings: int = 500 max_context_messages: int = 50 # 熔断器 circuit_breaker_enabled: bool = True circuit_failure_threshold: int = 5 circuit_recovery_timeout_seconds: float = 30.0 # 检查点 checkpoint_enabled: bool = True checkpoint_interval_iterations: int = 5 max_checkpoints_per_task: int = 50 ``` ### 8.2 环境预设 ```python def apply_development_preset(): """开发环境预设""" # 更宽松的限制,更多日志 def apply_production_preset(): """生产环境预设""" # 更严格的限制,优化性能 def apply_testing_preset(): """测试环境预设""" # 快速迭代,最小资源 ``` --- ## 9. 关键设计模式 ### 9.1 LLM中心化决策 ``` ┌─────────────────────────────────────────────┐ │ 传统自动化 │ │ 规则引擎 → 固定流程 → 预定义行为 │ └─────────────────────────────────────────────┘ ┌─────────────────────────────────────────────┐ │ DeepAudit │ │ LLM分析 → 自主决策 → 动态适应 → 智能输出 │ └─────────────────────────────────────────────┘ ``` ### 9.2 ReAct模式 ``` Loop: 1. Thought: LLM思考当前状态和下一步 2. Action: 选择要执行的动作/工具 3. Action Input: 提供参数 4. Observation: 获取执行结果 5. 重复直到达成目标或Final Answer ``` ### 9.3 任务交接协议 ```python # Agent A完成工作后 handoff = TaskHandoff( work_completed="扫描了10个文件,发现3个可疑点", key_findings=[finding1, finding2, finding3], suggested_actions=["验证SQL注入", "检查认证逻辑"], priority_areas=["auth/", "api/"] ) # 转换为下一个Agent的输入 context = handoff.to_prompt_context() # Agent B使用此上下文继续工作 ``` ### 9.4 优雅降级 ``` ┌───────────────────────────────────────────────────┐ │ 优雅降级策略 │ ├───────────────────────────────────────────────────┤ │ • 工具失败不中断执行 (continue_on_tool_failure) │ │ • 后备工具链 (Semgrep → PatternMatch) │ │ • 超时时返回部分结果 │ │ • 熔断器防止级联故障 │ │ • 消息压缩避免token超限 │ └───────────────────────────────────────────────────┘ ``` ### 9.5 范围限定审计 ```python # 支持的范围限定: AgentTaskCreate( exclude_patterns=["node_modules", "*.min.js"], # 排除模式 target_files=["src/auth/", "api/views.py"] # 限定文件 ) # 减少噪音,聚焦LLM注意力 ``` --- ## 10. 安全与健壮性 ### 10.1 安全特性 | 特性 | 描述 | |------|------| | **速率限制** | 防止资源耗尽 | | **熔断器** | 阻止级联故障 | | **输入验证** | 文件路径、大小、扩展名检查 | | **资源限制** | 最大发现数(500)、最大文件大小(10MB) | | **范围过滤** | 可限定扫描范围 | ### 10.2 健壮性特性 | 特性 | 描述 | |------|------| | **指数退避重试** | 处理瞬态故障 | | **上下文压缩** | 自动截断长消息历史 | | **取消支持** | 可随时中断运行任务 | | **检查点** | 长任务恢复点 | | **结构化异常** | 分层错误处理 | --- ## 11. 扩展指南 ### 11.1 添加新Agent类型 ```python # 1. 创建新Agent类 class CustomAgent(BaseAgent): def __init__(self, config: AgentConfig): super().__init__(config) self.config.name = "custom" self.config.max_iterations = 20 async def run(self, input_data: dict) -> AgentResult: # 实现ReAct循环 for iteration in range(self.config.max_iterations): response = await self.stream_llm_call(messages) parsed = self._parse_response(response) if parsed.action == "finish": return AgentResult(data=parsed.data) observation = await self.execute_tool( parsed.action, parsed.action_input ) messages.append({"role": "user", "content": f"Observation: {observation}"}) # 2. 在Orchestrator中注册 orchestrator.register_agent("custom", CustomAgent(config)) ``` ### 11.2 添加新工具 ```python # 1. 实现工具接口 class CustomTool: name: str = "custom_tool" description: str = "自定义工具描述" async def execute(self, **kwargs) -> dict: # 实现工具逻辑 return {"result": "..."} # 2. 在工具注册表中添加 TOOL_REGISTRY["custom_tool"] = CustomTool() ``` ### 11.3 添加漏洞知识 ```python # 创建新的知识模块: knowledge/vulnerabilities/custom_vuln.py CUSTOM_VULN_KNOWLEDGE = { "name": "Custom Vulnerability", "description": "...", "patterns": [ r"pattern1", r"pattern2" ], "examples": [...], "remediation": "..." } ``` ### 11.4 自定义提示词 ```python # 在system_prompts.py中添加 CUSTOM_AGENT_PROMPT = """ 你是一个专门的安全分析Agent。 职责: - ... 可用工具: - ... 输出格式: - ... """ ``` ### 11.5 添加新事件类型 ```python # 在Agent中发射自定义事件 self.emit_event( event_type="custom_event", message="自定义消息", custom_field="value" ) # 在前端处理 switch (event.type) { case 'custom_event': handleCustomEvent(event); break; } ``` --- ## 附录: 关键文件索引 ``` 核心Agent逻辑: backend/app/services/agent/agents/base.py # BaseAgent基类 backend/app/services/agent/agents/orchestrator.py # 编排器 backend/app/services/agent/agents/analysis.py # 分析Agent backend/app/services/agent/agents/recon.py # 侦察Agent backend/app/services/agent/agents/verification.py # 验证Agent 核心基础设施: backend/app/services/agent/core/state.py # 状态管理 backend/app/services/agent/core/context.py # 执行上下文 backend/app/services/agent/core/circuit_breaker.py # 熔断器 backend/app/services/agent/core/retry.py # 重试机制 backend/app/services/agent/core/rate_limiter.py # 速率限制 backend/app/services/agent/core/validation.py # 输入验证 backend/app/services/agent/core/errors.py # 错误处理 工具: backend/app/services/agent/tools/code_analysis_tool.py # 代码分析 backend/app/services/agent/tools/pattern_tool.py # 模式匹配 backend/app/services/agent/tools/smart_scan_tool.py # 智能扫描 backend/app/services/agent/tools/external_tools.py # 外部工具 backend/app/services/agent/tools/finish_tool.py # 完成工具 事件与遥测: backend/app/services/agent/event_manager.py # 事件管理 backend/app/services/agent/telemetry/tracer.py # 分布式追踪 配置: backend/app/services/agent/config.py # 集中配置 API: backend/app/api/v1/endpoints/agent_tasks.py # REST端点 前端: frontend/src/pages/AgentAudit/index.tsx # 主页面 frontend/src/pages/AgentAudit/hooks/useAgentAuditState.ts # 状态管理 frontend/src/pages/AgentAudit/hooks/useResilientStream.ts # SSE流 frontend/src/pages/AgentAudit/types.ts # 类型定义 frontend/src/pages/AgentAudit/components/* # UI组件 ``` --- *文档版本: 1.0* *最后更新: 2025-12-13*