CodeReview/docs/AGENT_AUDIT_ARCHITECTURE.md

39 KiB
Raw Permalink Blame History

DeepAudit Agent 审计架构文档

目录

  1. 系统概述
  2. 核心设计理念
  3. 后端架构
  4. 前端架构
  5. API接口
  6. 审计任务执行流程
  7. 事件流与遥测
  8. 配置与部署
  9. 关键设计模式
  10. 安全与健壮性
  11. 扩展指南

1. 系统概述

DeepAudit Agent 审计系统是一个基于大语言模型(LLM)驱动的自主化安全代码分析系统。该系统采用动态多Agent层级架构,实现了从代码侦察、漏洞分析到验证确认的完整安全审计流程。

核心特性

特性 描述
LLM中心化决策 LLM作为系统的"大脑",在各个层级自主做出决策
动态Agent树 根据任务需求动态构建层级化的多Agent系统
ReAct模式 实现思考-行动-观察(Thought-Action-Observation)循环
事件驱动架构 通过SSE实时推送事件到前端
丰富的工具生态 20+专业工具覆盖代码分析、模式匹配、漏洞验证

技术栈

  • 后端: Python 3.11+, FastAPI, LangChain/LiteLLM
  • 前端: React 18, TypeScript, Ant Design
  • 数据库: PostgreSQL (Supabase)
  • 通信: SSE (Server-Sent Events), REST API

2. 核心设计理念

2.1 LLM自主决策

与传统的规则驱动自动化不同,本系统中LLM做出所有战略决策

传统方式: 规则 → 固定流程 → 结果
DeepAudit: LLM分析 → 自主决策 → 动态调整 → 结果

LLM负责决定

  • 何时派发哪个子Agent
  • 使用哪些工具进行分析
  • 何时认为分析已足够充分
  • 如何解读和关联发现的问题

2.2 ReAct模式实现

每个Agent的执行遵循ReActReasoning + Acting模式

┌─────────────────────────────────────────────────┐
│                  LLM 输出                        │
├─────────────────────────────────────────────────┤
│ Thought: 我应该先了解项目结构...                  │
│ Action: list_files                              │
│ Action Input: {"directory": ".", "pattern": "*"}│
└─────────────────────────────────────────────────┘
                      ↓
┌─────────────────────────────────────────────────┐
│                系统执行工具                       │
├─────────────────────────────────────────────────┤
│ Observation: [找到 app/, config/, tests/...]     │
└─────────────────────────────────────────────────┘
                      ↓
         反馈给LLM继续下一轮循环

2.3 层级化Agent协作

                    ┌───────────────────┐
                    │  OrchestratorAgent │
                    │    (策略编排者)     │
                    └─────────┬─────────┘
                              │
            ┌─────────────────┼─────────────────┐
            ↓                 ↓                 ↓
    ┌───────────────┐ ┌───────────────┐ ┌───────────────┐
    │  ReconAgent   │ │ AnalysisAgent │ │VerificationAgent│
    │  (情报收集)    │ │  (漏洞猎手)    │ │   (验证确认)   │
    └───────────────┘ └───────────────┘ └───────────────┘

3. 后端架构

3.1 Agent层级结构

所有Agent代码位于 backend/app/services/agent/agents/ 目录。

3.1.1 BaseAgent 基类

文件: base.py

BaseAgent是所有Agent的抽象基类提供核心功能

class BaseAgent:
    """Agent基类 - 提供通用功能"""

    # 核心配置
    config: AgentConfig  # 名称、类型、模式、最大token数、最大迭代次数
    state: AgentState    # 状态管理(status, messages, findings)

    # 关键方法
    async def run(self, input_data: dict) -> AgentResult  # 抽象方法
    async def stream_llm_call()    # 统一的LLM调用(支持流式+自动压缩)
    async def execute_tool()       # 工具执行(带错误处理)

    # 事件发射
    def emit_thinking_token()      # 流式输出LLM思考过程
    def emit_thinking_start/end()  # 思考生命周期
    def emit_tool_call/result()    # 工具执行生命周期
    def emit_finding()             # 漏洞发现事件

关键特性

  • 消息历史压缩超过100k token时自动截断
  • 取消支持(is_cancelled属性)
  • 知识模块动态加载
  • 动态父子关系管理

3.1.2 OrchestratorAgent 编排器

文件: orchestrator.py 角色: 战略编排与动态任务调度 最大迭代次数: 20

class OrchestratorAgent(BaseAgent):
    """编排器Agent - 协调整体审计流程"""

    async def run(self, input_data: dict) -> AgentResult:
        """主ReAct循环 - LLM决定下一步行动"""
        # dispatch_agent: 派发子Agent
        # summarize: 汇总当前进度
        # finish: 完成审计

    async def _dispatch_agent(self, agent_name: str, task: str):
        """派发ReconAgent、AnalysisAgent或VerificationAgent"""

    def _parse_llm_response(self, response: str):
        """从LLM输出中提取Thought/Action/Action Input"""

决策点

  • 决定派发哪个子Agent
  • 判断何时完成审计
  • 汇总所有子Agent的发现

3.1.3 ReconAgent 侦察员

文件: recon.py 角色: 项目分析与情报收集 最大迭代次数: 15

主要任务

  • 发现项目结构
  • 识别技术栈和框架
  • 定位入口点
  • 标记高风险区域

可用工具: list_files, read_file, search_code

输出格式

{
  "project_structure": {...},
  "tech_stack": {"languages": ["Python"], "frameworks": ["Django"]},
  "entry_points": ["app/views.py", "api/routes.py"],
  "high_risk_areas": ["auth/", "payment/"],
  "initial_findings": [...]
}

3.1.4 AnalysisAgent 分析员

文件: analysis.py 角色: 深度代码漏洞分析 最大迭代次数: 30

主要工具

工具 描述 优先级
smart_scan 智能批量安全扫描 推荐首选
quick_audit 快速文件审计 二级
pattern_match 危险模式检测 三级
dataflow_analysis 数据流追踪 深度分析
semgrep_scan Semgrep静态分析 外部工具
bandit_scan Python安全检测 外部工具
gitleaks_scan 密钥泄露检测 外部工具

关注的漏洞类型

  • SQL注入
  • XSS跨站脚本
  • 命令注入
  • 路径遍历
  • SSRF服务端请求伪造

3.1.5 VerificationAgent 验证员

文件: verification.py 角色: 确认发现并生成PoC 最大迭代次数: 15

主要职责

  • 减少误报
  • 验证可利用性
  • 生成概念验证(PoC)

输出格式

{
  "is_verified": true,
  "poc": {
    "description": "...",
    "steps": [...],
    "payload": "..."
  },
  "exploitability": "high",
  "confidence": 0.95
}

3.1.6 TaskHandoff 任务交接协议

Agent之间通过结构化的TaskHandoff进行协作:

@dataclass
class TaskHandoff:
    """Agent间的结构化通信协议"""
    work_completed: str          # 已完成的工作
    key_findings: List[dict]     # 关键发现
    insights: List[str]          # 洞察
    suggested_actions: List[str] # 建议行动
    attention_points: List[str]  # 需要关注的点
    priority_areas: List[str]    # 优先区域
    context_data: dict           # 上下文数据

    def to_prompt_context(self) -> str:
        """转换为LLM可读格式"""

3.2 核心基础设施

核心基础设施位于 backend/app/services/agent/core/ 目录。

3.2.1 状态管理 (state.py)

class AgentState:
    """Agent状态跟踪"""
    status: Literal["created", "running", "waiting", "completed", "failed"]
    messages: List[Message]  # 对话历史
    findings: List[Finding]  # 发现列表
    iterations: int          # 当前迭代次数
    tool_calls: int          # 工具调用次数

3.2.2 执行上下文 (context.py)

class ExecutionContext:
    """分布式追踪的执行上下文"""
    task_id: str           # 任务ID
    agent_id: str          # Agent ID
    trace_path: str        # 追踪路径 (如: Orchestrator > Analysis)
    depth: int             # 嵌套深度
    iteration: int         # 当前迭代
    metadata: dict         # 附加元数据

    def child_context(self, agent_id: str) -> ExecutionContext:
        """创建子上下文"""

    def with_iteration(self, iteration: int) -> ExecutionContext:
        """带迭代信息的上下文"""

3.2.3 熔断器 (circuit_breaker.py)

class CircuitBreaker:
    """熔断器 - 防止级联故障"""

    # 状态转换: CLOSED -> OPEN -> HALF_OPEN -> CLOSED

    failure_threshold: int = 5     # 失败阈值
    recovery_timeout: float = 30.0 # 恢复超时(秒)

    async def call(self, func: Callable) -> Any:
        """受保护的调用"""
        if self.state == CircuitState.OPEN:
            raise CircuitOpenError()
        try:
            result = await func()
            self._record_success()
            return result
        except Exception as e:
            self._record_failure()
            raise

3.2.4 重试机制 (retry.py)

class RetryStrategy:
    """指数退避重试"""
    base_delay: float = 1.0   # 基础延迟(秒)
    max_delay: float = 60.0   # 最大延迟(秒)
    max_retries: int = 3      # 最大重试次数

    def get_delay(self, attempt: int) -> float:
        """计算延迟: min(base * (2 ** attempt) + jitter, max)"""

3.2.5 速率限制器 (rate_limiter.py)

class RateLimiter:
    """令牌桶算法速率限制"""

    # 预设限制:
    # - 外部工具: 0.2 calls/second (每5秒1次)
    # - LLM调用: 60 calls/minute
    # - 突发支持: 最多3个并发调用

    async def acquire(self, tool_name: str):
        """获取执行许可"""

3.2.6 输入验证 (validation.py)

class InputValidator:
    """输入安全验证"""

    def validate_file_path(self, path: str) -> bool:
        """文件路径安全检查"""
        # - 检查路径遍历攻击
        # - 验证文件扩展名
        # - 检查符号链接

    def validate_file_size(self, path: str, max_size: int = 10_000_000):
        """文件大小限制(默认10MB)"""

3.2.7 错误处理 (errors.py)

# 自定义异常层级
class AgentError(Exception): pass
class CircuitOpenError(AgentError): pass      # 熔断器打开
class AgentExecutionError(AgentError): pass   # Agent执行失败
class ToolExecutionError(AgentError): pass    # 工具执行失败
class ValidationError(AgentError): pass       # 输入验证失败
class TokenLimitError(AgentError): pass       # Token超限

3.3 工具生态系统

工具代码位于 backend/app/services/agent/tools/ 目录。

3.3.1 文件操作工具

工具 文件 描述
FileReadTool - 读取文件内容(支持行范围)
ListFilesTool - 列出目录内容(支持glob模式)
FileSearchTool - 按关键词搜索代码

3.3.2 代码分析工具

SmartScanTool (smart_scan_tool.py)

class SmartScanTool:
    """智能批量安全扫描 - 推荐首选工具"""

    # 功能:
    # - 自动检测漏洞模式
    # - 聚焦高风险文件
    # - 批量处理提高效率

    async def execute(self, target: str) -> dict:
        return {
            "vulnerabilities": [...],
            "risk_areas": [...],
            "recommendations": [...]
        }

PatternMatchTool (pattern_tool.py)

class PatternMatchTool:
    """基于正则的模式检测"""

    # 内置模式:
    # - SQL注入模式
    # - XSS模式
    # - 命令注入模式
    # - 可作为Semgrep的后备

DataFlowAnalysisTool (code_analysis_tool.py)

class DataFlowAnalysisTool:
    """数据流分析工具"""

    # 功能:
    # - 源点到汇点追踪
    # - 变量污点分析
    # - LLM辅助的数据流理解

3.3.3 外部安全工具

文件: external_tools.py

工具 描述 超时 后备方案
SemgrepTool 静态分析规则引擎 120s PatternMatchTool
BanditTool Python安全linter 60s PatternMatchTool
GitleaksTool 密钥/凭证检测 60s -

3.3.4 完成工具

FinishTool (finish_tool.py)

class FinishTool:
    """任务完成工具"""

    async def execute(self, conclusion: str, findings: List[dict]):
        """标记任务完成并返回最终结果"""

3.4 知识库与提示词

3.4.1 漏洞知识库

位于 backend/app/services/agent/knowledge/vulnerabilities/

按漏洞类型组织

文件 漏洞类型
sql_injection.py SQL注入
xss.py 跨站脚本
csrf.py 跨站请求伪造
auth.py 认证漏洞
ssrf.py 服务端请求伪造
path_traversal.py 路径遍历
deserialization.py 不安全反序列化
xxe.py XML外部实体注入
race_condition.py 竞态条件
crypto.py 弱加密
injection.py 代码/命令注入
business_logic.py 业务逻辑漏洞
open_redirect.py 开放重定向

框架特定知识

文件 框架
FastAPI.py FastAPI安全模式
Django.py Django安全问题
Flask.py Flask漏洞
Express.js Node.js/Express模式
React.js 前端安全
Supabase.py BaaS安全

3.4.2 系统提示词

文件: backend/app/services/agent/prompts/system_prompts.py

# 核心安全原则
CORE_SECURITY_PRINCIPLES = """
- 深度分析优于广度覆盖
- 重视数据流追踪
- 上下文感知分析
- 假阳性需验证确认
"""

# 漏洞优先级
VULNERABILITY_PRIORITIES = {
    "critical": ["sql_injection", "command_injection", "code_injection"],
    "high": ["path_traversal", "ssrf", "auth_bypass"],
    "medium": ["xss", "information_disclosure", "xxe"],
    "low": ["csrf", "weak_crypto", "unsafe_transport"]
}

# 多Agent协作规则
MULTI_AGENT_RULES = """
- 避免重复工作
- 共享上下文信息
- 聚焦自身职责
- 及时交接发现
"""

4. 前端架构

前端代码位于 frontend/src/pages/AgentAudit/ 目录。

4.1 目录结构

AgentAudit/
├── index.tsx              # 主页面组件
├── types.ts               # TypeScript类型定义
├── utils.ts               # 工具函数
├── constants.tsx          # 常量配置
├── hooks/
│   ├── useAgentAuditState.ts  # 状态管理(useReducer)
│   ├── useResilientStream.ts  # SSE流式连接
│   └── index.ts
└── components/
    ├── Header.tsx             # 标题、状态、控制按钮
    ├── StatsPanel.tsx         # 统计面板
    ├── AgentTreeNode.tsx      # Agent树节点渲染
    ├── AgentDetailPanel.tsx   # Agent详情侧边栏
    ├── LogEntry.tsx           # 单条日志条目
    ├── ConnectionStatus.tsx   # 连接状态指示器
    ├── StatusBadge.tsx        # 状态徽章
    ├── SplashScreen.tsx       # 初始屏幕
    ├── AgentErrorBoundary.tsx # 错误边界
    ├── ReportExportDialog.tsx # 导出对话框
    └── index.ts

4.2 状态管理

文件: hooks/useAgentAuditState.ts

interface AgentAuditState {
  task: AgentTask | null;              // 当前任务
  findings: AgentFinding[];            // 发现列表
  agentTree: AgentTreeResponse | null; // Agent树
  logs: LogItem[];                     // 日志列表
  selectedAgentId: string | null;      // 选中的Agent
  connectionStatus: ConnectionStatus;   // 连接状态
  isAutoScroll: boolean;               // 自动滚动
  expandedLogIds: Set<string>;         // 展开的日志ID
}

type ConnectionStatus = 'disconnected' | 'connecting' | 'connected' | 'error';

// Reducer Actions
type Action =
  | { type: 'SET_TASK'; payload: AgentTask }
  | { type: 'SET_FINDINGS'; payload: AgentFinding[] }
  | { type: 'ADD_LOG'; payload: LogItem }
  | { type: 'UPDATE_LOG'; payload: { id: string; updates: Partial<LogItem> } }
  | { type: 'SET_CONNECTION_STATUS'; payload: ConnectionStatus }
  | { type: 'SELECT_AGENT'; payload: string | null }
  | { type: 'TOGGLE_LOG_EXPANDED'; payload: string }
  | { type: 'RESET' };

4.3 事件流处理

文件: hooks/useResilientStream.ts

function useResilientStream(taskId: string, options: StreamOptions) {
  // 功能:
  // - 弹性SSE连接(自动重连)
  // - Thinking token逐字符流式更新
  // - 按类型过滤事件
  // - 组件卸载时自动清理

  // 事件处理器
  onThinkingToken: (token: string) => void;  // 累积thinking内容
  onToolStart: (tool: ToolEvent) => void;    // 工具开始
  onToolEnd: (tool: ToolEvent) => void;      // 工具结束
  onFinding: (finding: Finding) => void;     // 新发现
  onComplete: () => void;                     // 任务完成
  onError: (error: Error) => void;           // 错误处理
}

4.4 日志类型

type LogType =
  | 'thinking'  // LLM思考过程(支持流式)
  | 'tool'      // 工具执行(含耗时和状态)
  | 'phase'     // 工作流阶段
  | 'finding'   // 漏洞发现
  | 'info'      // 信息消息
  | 'error'     // 错误消息
  | 'dispatch'  // 子Agent派发
  | 'user';     // 用户操作

interface LogItem {
  id: string;
  type: LogType;
  timestamp: Date;
  content: string;
  metadata?: Record<string, any>;
  isExpanded?: boolean;
  isStreaming?: boolean;  // thinking类型专用
}

4.5 实时更新流程

SSE Stream → useResilientStream → dispatch() → reducer → UI更新
     ↓
thinking_token事件
     ↓
onThinkingToken回调
     ↓
dispatch({type: 'ADD_LOG', payload: {type: 'thinking', content: accumulated}})
     ↓
UI渲染LogEntry(流式效果)

5. API接口

API端点位于 backend/app/api/v1/endpoints/agent_tasks.py

5.1 任务管理

方法 端点 描述
POST /agent-tasks/ 创建并启动审计任务
GET /agent-tasks/ 列出任务(支持过滤)
GET /agent-tasks/{task_id} 获取任务详情
POST /agent-tasks/{task_id}/cancel 取消运行中的任务

5.2 事件流

方法 端点 描述
GET /agent-tasks/{task_id}/events 基础事件轮询(SSE)
GET /agent-tasks/{task_id}/stream 增强事件流(含thinking tokens)

查询参数

  • include_thinking: 是否包含thinking token
  • include_tool_calls: 是否包含工具调用
  • after_sequence: 从指定序列号后开始

5.3 结果查询

方法 端点 描述
GET /agent-tasks/{task_id}/findings 获取发现列表(支持分页)
GET /agent-tasks/{task_id}/summary 任务摘要与统计
PATCH /agent-tasks/{task_id}/findings/{finding_id}/status 更新发现状态
GET /agent-tasks/{task_id}/report 生成报告(markdown/json)

5.4 高级功能

方法 端点 描述
GET /agent-tasks/{task_id}/agent-tree Agent层级与执行统计
GET /agent-tasks/{task_id}/checkpoints 执行检查点列表
GET /agent-tasks/{task_id}/checkpoints/{checkpoint_id} 检查点详情

5.5 创建任务Schema

class AgentTaskCreate(BaseModel):
    project_id: str                    # 项目ID
    name: Optional[str]                # 任务名称
    target_vulnerabilities: List[str] = [
        "sql_injection", "xss", "command_injection",
        "path_traversal", "ssrf"
    ]
    verification_level: str = "sandbox"  # 验证级别
    exclude_patterns: List[str] = [      # 排除模式
        "node_modules", "venv", "__pycache__", ".git"
    ]
    target_files: Optional[List[str]]    # 限定扫描文件
    max_iterations: int = 50             # 最大迭代(1-200)
    timeout_seconds: int = 1800          # 超时(60-7200秒)

6. 审计任务执行流程

6.1 完整流程图

┌─────────────────────────────────────────────────────────────────────┐
│                        1. 任务创建                                   │
│  用户 → POST /agent-tasks/ → 创建AgentTask(PENDING) → 入队后台任务    │
└─────────────────────────────────────────────────────────────────────┘
                                    ↓
┌─────────────────────────────────────────────────────────────────────┐
│                        2. 初始化                                     │
│  加载项目 → 收集项目信息 → 初始化工具 → 创建子Agent → 创建Orchestrator │
└─────────────────────────────────────────────────────────────────────┘
                                    ↓
┌─────────────────────────────────────────────────────────────────────┐
│                   3. Orchestrator循环 (最多20轮)                     │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │ 迭代 N:                                                       │   │
│  │  a) 调用LLM获取下一步决策                                      │   │
│  │  b) 解析: Thought / Action / Action Input                    │   │
│  │  c) 执行Action:                                              │   │
│  │     - dispatch_agent → 派发子Agent                           │   │
│  │     - finish → 完成审计                                       │   │
│  │  d) 发射事件到前端                                            │   │
│  └──────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘
                                    ↓
┌─────────────────────────────────────────────────────────────────────┐
│                   4. 子Agent执行 (以Analysis为例)                    │
│  ┌──────────────────────────────────────────────────────────────┐   │
│  │ 循环 (最多30轮):                                              │   │
│  │  a) LLM思考下一步                                             │   │
│  │  b) 选择并执行工具 (smart_scan, pattern_match, ...)          │   │
│  │  c) 将观察结果加入历史                                        │   │
│  │  d) 检测是否输出"Final Answer" → 解析发现,返回               │   │
│  └──────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘
                                    ↓
┌─────────────────────────────────────────────────────────────────────┐
│                        5. 结果汇总                                   │
│  合并所有发现 → 去重 → 更新验证结果 → 构建findings列表               │
└─────────────────────────────────────────────────────────────────────┘
                                    ↓
┌─────────────────────────────────────────────────────────────────────┐
│                        6. 持久化                                     │
│  更新AgentTask状态 → 保存到AgentFinding表 → 计算安全评分              │
└─────────────────────────────────────────────────────────────────────┘
                                    ↓
┌─────────────────────────────────────────────────────────────────────┐
│                        7. 前端展示                                   │
│  实时SSE事件 → 日志累积 → Agent树生长 → 统计更新 → 最终结果展示        │
└─────────────────────────────────────────────────────────────────────┘

6.2 详细执行示例

场景: 用户上传Python项目执行Agent审计

1. 用户请求: POST /agent-tasks/
   {
     "project_id": "123",
     "target_files": ["app/views.py"]
   }

2. Orchestrator迭代1 - LLM输出:
   Thought: 我应该先用recon了解项目结构
   Action: dispatch_agent
   Action Input: {"agent": "recon", "task": "分析项目结构"}

3. ReconAgent执行:
   迭代1: list_files → 找到 app/views.py, models.py...
   迭代2: read_file → 读取app/views.py内容
   迭代N: Final Answer → 返回项目结构信息

4. Orchestrator迭代2 - LLM输出:
   Thought: 这是一个Django项目让我派发Analysis进行漏洞分析
   Action: dispatch_agent
   Action Input: {"agent": "analysis", "task": "深度漏洞分析"}

5. AnalysisAgent执行:
   迭代1: smart_scan → 发现第42行可能存在SQL注入
   迭代2: read_file → 确认问题代码
   迭代M: Final Answer → 返回结构化的漏洞发现

6. Orchestrator迭代3 - LLM输出:
   Thought: 发现高危SQL注入需要验证
   Action: dispatch_agent
   Action Input: {"agent": "verification", "task": "验证SQL注入"}

7. VerificationAgent确认:
   返回: {"is_verified": true, "poc": {...}}

8. Orchestrator完成:
   Thought: 审计完成共发现1个已验证漏洞
   Action: finish
   Action Input: {"conclusion": "...", "findings": [...]}

9. 后端持久化:
   - AgentTask.status = COMPLETED
   - 写入AgentFinding表

10. 前端实时展示:
    - 日志流式更新
    - Agent树动态生长
    - 统计数据刷新
    - 最终展示发现列表

7. 事件流与遥测

7.1 事件管理器

文件: backend/app/services/agent/event_manager.py

class EventManager:
    """事件管理器 - 内存队列 + 流式迭代"""

    max_events: int = 1000  # 最大事件数

    async def emit(self, event_type: str, message: str, **metadata):
        """发射事件"""

    async def stream_events(self):
        """异步迭代器: async for event in manager.stream_events()"""

7.2 事件类型

类型 描述
thinking_start LLM开始思考
thinking_token 思考token(逐字符)
thinking_end LLM思考结束
tool_call 工具调用开始
tool_result 工具执行结果
finding 漏洞发现
phase_start 阶段开始
phase_complete 阶段完成
dispatch 子Agent派发
llm_thought LLM思考内容
llm_decision LLM决策
error 错误
warning 警告
info 信息

7.3 遥测追踪

文件: backend/app/services/agent/telemetry/tracer.py

class Tracer:
    """分布式追踪"""

    def create_span(self, name: str, parent_span: Span = None) -> Span:
        """创建追踪Span"""

    # 追踪路径示例: Orchestrator > Analysis > (Verification)

    # 记录的指标:
    # - 执行时间
    # - Token使用量
    # - 工具调用次数
    # - 关联ID

8. 配置与部署

8.1 Agent配置

文件: backend/app/services/agent/config.py

class AgentConfig:
    """Agent配置 (环境变量前缀: AGENT_)"""

    # LLM设置
    llm_max_retries: int = 3
    llm_retry_base_delay: float = 1.0
    llm_timeout_seconds: int = 120
    llm_max_tokens_per_call: int = 4096
    llm_temperature: float = 0.1
    llm_stream_enabled: bool = True

    # Agent迭代限制
    orchestrator_max_iterations: int = 20
    recon_max_iterations: int = 15
    analysis_max_iterations: int = 30
    verification_max_iterations: int = 15

    # 工具设置
    tool_timeout_seconds: int = 60
    tool_max_retries: int = 2
    semgrep_enabled: bool = True
    bandit_enabled: bool = True
    gitleaks_enabled: bool = True

    # 资源限制
    max_file_size_bytes: int = 10_000_000  # 10MB
    max_files_per_scan: int = 1000
    max_total_findings: int = 500
    max_context_messages: int = 50

    # 熔断器
    circuit_breaker_enabled: bool = True
    circuit_failure_threshold: int = 5
    circuit_recovery_timeout_seconds: float = 30.0

    # 检查点
    checkpoint_enabled: bool = True
    checkpoint_interval_iterations: int = 5
    max_checkpoints_per_task: int = 50

8.2 环境预设

def apply_development_preset():
    """开发环境预设"""
    # 更宽松的限制,更多日志

def apply_production_preset():
    """生产环境预设"""
    # 更严格的限制,优化性能

def apply_testing_preset():
    """测试环境预设"""
    # 快速迭代,最小资源

9. 关键设计模式

9.1 LLM中心化决策

┌─────────────────────────────────────────────┐
│              传统自动化                      │
│  规则引擎 → 固定流程 → 预定义行为           │
└─────────────────────────────────────────────┘

┌─────────────────────────────────────────────┐
│              DeepAudit                       │
│  LLM分析 → 自主决策 → 动态适应 → 智能输出   │
└─────────────────────────────────────────────┘

9.2 ReAct模式

Loop:
  1. Thought: LLM思考当前状态和下一步
  2. Action: 选择要执行的动作/工具
  3. Action Input: 提供参数
  4. Observation: 获取执行结果
  5. 重复直到达成目标或Final Answer

9.3 任务交接协议

# Agent A完成工作后
handoff = TaskHandoff(
    work_completed="扫描了10个文件发现3个可疑点",
    key_findings=[finding1, finding2, finding3],
    suggested_actions=["验证SQL注入", "检查认证逻辑"],
    priority_areas=["auth/", "api/"]
)

# 转换为下一个Agent的输入
context = handoff.to_prompt_context()
# Agent B使用此上下文继续工作

9.4 优雅降级

┌───────────────────────────────────────────────────┐
│                 优雅降级策略                       │
├───────────────────────────────────────────────────┤
│ • 工具失败不中断执行 (continue_on_tool_failure)   │
│ • 后备工具链 (Semgrep → PatternMatch)             │
│ • 超时时返回部分结果                              │
│ • 熔断器防止级联故障                              │
│ • 消息压缩避免token超限                           │
└───────────────────────────────────────────────────┘

9.5 范围限定审计

# 支持的范围限定:
AgentTaskCreate(
    exclude_patterns=["node_modules", "*.min.js"],  # 排除模式
    target_files=["src/auth/", "api/views.py"]      # 限定文件
)
# 减少噪音聚焦LLM注意力

10. 安全与健壮性

10.1 安全特性

特性 描述
速率限制 防止资源耗尽
熔断器 阻止级联故障
输入验证 文件路径、大小、扩展名检查
资源限制 最大发现数(500)、最大文件大小(10MB)
范围过滤 可限定扫描范围

10.2 健壮性特性

特性 描述
指数退避重试 处理瞬态故障
上下文压缩 自动截断长消息历史
取消支持 可随时中断运行任务
检查点 长任务恢复点
结构化异常 分层错误处理

11. 扩展指南

11.1 添加新Agent类型

# 1. 创建新Agent类
class CustomAgent(BaseAgent):
    def __init__(self, config: AgentConfig):
        super().__init__(config)
        self.config.name = "custom"
        self.config.max_iterations = 20

    async def run(self, input_data: dict) -> AgentResult:
        # 实现ReAct循环
        for iteration in range(self.config.max_iterations):
            response = await self.stream_llm_call(messages)
            parsed = self._parse_response(response)

            if parsed.action == "finish":
                return AgentResult(data=parsed.data)

            observation = await self.execute_tool(
                parsed.action,
                parsed.action_input
            )
            messages.append({"role": "user", "content": f"Observation: {observation}"})

# 2. 在Orchestrator中注册
orchestrator.register_agent("custom", CustomAgent(config))

11.2 添加新工具

# 1. 实现工具接口
class CustomTool:
    name: str = "custom_tool"
    description: str = "自定义工具描述"

    async def execute(self, **kwargs) -> dict:
        # 实现工具逻辑
        return {"result": "..."}

# 2. 在工具注册表中添加
TOOL_REGISTRY["custom_tool"] = CustomTool()

11.3 添加漏洞知识

# 创建新的知识模块: knowledge/vulnerabilities/custom_vuln.py

CUSTOM_VULN_KNOWLEDGE = {
    "name": "Custom Vulnerability",
    "description": "...",
    "patterns": [
        r"pattern1",
        r"pattern2"
    ],
    "examples": [...],
    "remediation": "..."
}

11.4 自定义提示词

# 在system_prompts.py中添加
CUSTOM_AGENT_PROMPT = """
你是一个专门的安全分析Agent。

职责:
- ...

可用工具:
- ...

输出格式:
- ...
"""

11.5 添加新事件类型

# 在Agent中发射自定义事件
self.emit_event(
    event_type="custom_event",
    message="自定义消息",
    custom_field="value"
)

# 在前端处理
switch (event.type) {
    case 'custom_event':
        handleCustomEvent(event);
        break;
}

附录: 关键文件索引

核心Agent逻辑:
  backend/app/services/agent/agents/base.py          # BaseAgent基类
  backend/app/services/agent/agents/orchestrator.py  # 编排器
  backend/app/services/agent/agents/analysis.py      # 分析Agent
  backend/app/services/agent/agents/recon.py         # 侦察Agent
  backend/app/services/agent/agents/verification.py  # 验证Agent

核心基础设施:
  backend/app/services/agent/core/state.py           # 状态管理
  backend/app/services/agent/core/context.py         # 执行上下文
  backend/app/services/agent/core/circuit_breaker.py # 熔断器
  backend/app/services/agent/core/retry.py           # 重试机制
  backend/app/services/agent/core/rate_limiter.py    # 速率限制
  backend/app/services/agent/core/validation.py      # 输入验证
  backend/app/services/agent/core/errors.py          # 错误处理

工具:
  backend/app/services/agent/tools/code_analysis_tool.py  # 代码分析
  backend/app/services/agent/tools/pattern_tool.py        # 模式匹配
  backend/app/services/agent/tools/smart_scan_tool.py     # 智能扫描
  backend/app/services/agent/tools/external_tools.py      # 外部工具
  backend/app/services/agent/tools/finish_tool.py         # 完成工具

事件与遥测:
  backend/app/services/agent/event_manager.py        # 事件管理
  backend/app/services/agent/telemetry/tracer.py     # 分布式追踪

配置:
  backend/app/services/agent/config.py               # 集中配置

API:
  backend/app/api/v1/endpoints/agent_tasks.py        # REST端点

前端:
  frontend/src/pages/AgentAudit/index.tsx                    # 主页面
  frontend/src/pages/AgentAudit/hooks/useAgentAuditState.ts  # 状态管理
  frontend/src/pages/AgentAudit/hooks/useResilientStream.ts  # SSE流
  frontend/src/pages/AgentAudit/types.ts                     # 类型定义
  frontend/src/pages/AgentAudit/components/*                 # UI组件

文档版本: 1.0 最后更新: 2025-12-13