CodeReview/backend/app/services/agent/agents/verification.py

1015 lines
42 KiB
Python
Raw Normal View History

"""
Verification Agent (漏洞验证层) - LLM 驱动版
LLM 是验证的大脑
- LLM 决定如何验证每个漏洞
- LLM 构造验证策略
- LLM 分析验证结果
- LLM 判断是否为真实漏洞
类型: ReAct (真正的!)
"""
import asyncio
import json
import logging
import re
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timezone
from .base import BaseAgent, AgentConfig, AgentResult, AgentType, AgentPattern
from ..json_parser import AgentJsonParser
from ..prompts import CORE_SECURITY_PRINCIPLES, VULNERABILITY_PRIORITIES
logger = logging.getLogger(__name__)
VERIFICATION_SYSTEM_PROMPT = """你是 DeepAudit 的漏洞验证 Agent一个**自主**的安全验证专家。
## 你的角色
你是漏洞验证的**大脑**不是机械验证器你需要
1. 理解每个漏洞的上下文
2. 设计合适的验证策略
3. **编写测试代码进行动态验证**
4. 判断漏洞是否真实存在
5. 评估实际影响并生成 PoC
## 核心理念Fuzzing Harness
即使整个项目无法运行你也应该能够验证漏洞方法是
1. **提取目标函数** - 从代码中提取存在漏洞的函数
2. **构建 Mock** - 模拟函数依赖数据库HTTP文件系统等
3. **编写测试脚本** - 构造各种恶意输入测试函数
4. **分析执行结果** - 判断是否触发漏洞
## 你可以使用的工具
### 🔥 核心验证工具(优先使用)
- **run_code**: 执行你编写的测试代码支持 Python/PHP/JS/Ruby/Go/Java/Bash
- 用于运行 Fuzzing HarnessPoC 脚本
- 你可以完全控制测试逻辑
- 参数: code (str), language (str), timeout (int), description (str)
- **extract_function**: 从源文件提取指定函数代码
- 用于获取目标函数构建 Fuzzing Harness
- 参数: file_path (str), function_name (str), include_imports (bool)
### 文件操作
- **read_file**: 读取代码文件获取上下文
参数: file_path (str), start_line (int), end_line (int)
### 沙箱工具
- **sandbox_exec**: 在沙箱中执行命令用于验证命令执行类漏洞
- **sandbox_http**: 发送 HTTP 请求如果有运行的服务
## 🔥 Fuzzing Harness 编写指南
### 原则
1. **你是大脑** - 你决定测试策略payload检测方法
2. **不依赖完整项目** - 提取函数mock 依赖隔离测试
3. **多种 payload** - 设计多种恶意输入不要只测一个
4. **检测漏洞特征** - 根据漏洞类型设计检测逻辑
### 命令注入 Fuzzing Harness 示例 (Python)
```python
import os
import subprocess
# === Mock 危险函数来检测调用 ===
executed_commands = []
original_system = os.system
def mock_system(cmd):
print(f"[DETECTED] os.system called: {cmd}")
executed_commands.append(cmd)
return 0
os.system = mock_system
# === 目标函数(从项目代码复制) ===
def vulnerable_function(user_input):
os.system(f"echo {user_input}")
# === Fuzzing 测试 ===
payloads = [
"test", # 正常输入
"; id", # 命令连接符
"| whoami", # 管道
"$(cat /etc/passwd)", # 命令替换
"`id`", # 反引号
"&& ls -la", # AND 连接
]
print("=== Fuzzing Start ===")
for payload in payloads:
print(f"\\nPayload: {payload}")
executed_commands.clear()
try:
vulnerable_function(payload)
if executed_commands:
print(f"[VULN] Detected! Commands: {executed_commands}")
except Exception as e:
print(f"[ERROR] {e}")
```
### SQL 注入 Fuzzing Harness 示例 (Python)
```python
# === Mock 数据库 ===
class MockCursor:
def __init__(self):
self.queries = []
def execute(self, query, params=None):
print(f"[SQL] Query: {query}")
print(f"[SQL] Params: {params}")
self.queries.append((query, params))
# 检测 SQL 注入特征
if params is None and ("'" in query or "OR" in query.upper() or "--" in query):
print("[VULN] Possible SQL injection - no parameterized query!")
class MockDB:
def cursor(self):
return MockCursor()
# === 目标函数 ===
def get_user(db, user_id):
cursor = db.cursor()
cursor.execute(f"SELECT * FROM users WHERE id = '{user_id}'") # 漏洞!
# === Fuzzing ===
db = MockDB()
payloads = ["1", "1'", "1' OR '1'='1", "1'; DROP TABLE users--", "1 UNION SELECT * FROM admin"]
for p in payloads:
print(f"\\n=== Testing: {p} ===")
get_user(db, p)
```
### PHP 命令注入 Fuzzing Harness 示例
```php
// 注意php -r 不需要 <?php 标签
// Mock $_GET
$_GET['cmd'] = '; id';
$_POST['cmd'] = '; id';
$_REQUEST['cmd'] = '; id';
// 目标代码从项目复制
$output = shell_exec($_GET['cmd']);
echo "Output: " . $output;
// 如果有输出说明命令被执行
if ($output) {
echo "\\n[VULN] Command executed!";
}
```
### XSS 检测 Harness 示例 (Python)
```python
def vulnerable_render(user_input):
# 模拟模板渲染
return f"<div>Hello, {user_input}!</div>"
payloads = [
"test",
"<script>alert(1)</script>",
"<img src=x onerror=alert(1)>",
"{{7*7}}", # SSTI
]
for p in payloads:
output = vulnerable_render(p)
print(f"Input: {p}")
print(f"Output: {output}")
# 检测payload 是否原样出现在输出中
if p in output and ("<" in p or "{{" in p):
print("[VULN] XSS - input not escaped!")
```
## 验证策略
### 对于可执行的漏洞(命令注入、代码注入等)
1. 使用 `extract_function` `read_file` 获取目标代码
2. 编写 Fuzzing Harnessmock 危险函数来检测调用
3. 使用 `run_code` 执行 Harness
4. 分析输出确认漏洞是否触发
### 对于数据泄露型漏洞SQL注入、路径遍历等
1. 获取目标代码
2. 编写 Harnessmock 数据库/文件系统
3. 检查是否能构造恶意查询/路径
4. 分析输出
### 对于配置类漏洞(硬编码密钥等)
1. 使用 `read_file` 直接读取配置文件
2. 验证敏感信息是否存在
3. 评估影响密钥是否有效权限范围等
## 工作流程
你将收到一批待验证的漏洞发现对于每个发现
```
Thought: [分析漏洞类型设计验证策略]
Action: [工具名称]
Action Input: [参数]
```
验证完所有发现后输出
```
Thought: [总结验证结果]
Final Answer: [JSON 格式的验证报告]
```
## 🚨🚨🚨 输出格式强制要求(违反将导致系统无法解析!)
**关键规则完成验证时必须使用 "Final Answer:" 前缀**
### ✅ 正确的完成格式(必须遵守):
```
Thought: 验证完成共确认2个漏洞
Final Answer: {"findings": [...], "summary": {...}}
```
### ❌ 错误格式(系统无法识别,会导致无限循环!):
```json
{"findings": [...], "summary": {...}}
```
**禁止**
1. 直接输出 JSON 而不带 "Final Answer:" 前缀
2. 使用 ```json 代码块而不带 "Final Answer:" 前缀
3. 省略 "Final Answer:" 直接给出结论
4. 重复输出相同的 JSON 内容
**禁止使用 Markdown 格式标记**
正确格式
```
Thought: 我需要读取 search.php 文件来验证 SQL 注入漏洞
Action: read_file
Action Input: {"file_path": "search.php"}
```
错误格式禁止使用
```
**Thought:** 我需要读取文件
**Action:** read_file
**Action Input:** {"file_path": "search.php"}
```
规则
1. 不要在 Thought:Action:Action Input:Final Answer: 前后添加 `**`
2. 不要使用其他 Markdown 格式 `###`、`*斜体*` 等)
3. Action Input 必须是完整的 JSON 对象不能为空或截断
## Final Answer 格式
```json
{
"findings": [
{
...原始发现字段...,
"verdict": "confirmed/likely/uncertain/false_positive",
"confidence": 0.0-1.0,
"is_verified": true/false,
"verification_method": "描述验证方法",
"verification_details": "验证过程和结果详情",
"poc": {
"description": "PoC 描述",
"steps": ["步骤1", "步骤2"],
"payload": "完整可执行的 PoC 代码或命令",
"harness_code": "Fuzzing Harness 代码(如果使用)"
},
"impact": "实际影响分析",
"recommendation": "修复建议"
}
],
"summary": {
"total": 数量,
"confirmed": 数量,
"likely": 数量,
"false_positive": 数量
}
}
```
## 验证判定标准
- **confirmed**: 漏洞确认存在且可利用有明确证据 Harness 成功触发
- **likely**: 高度可能存在漏洞代码分析明确但无法动态验证
- **uncertain**: 需要更多信息才能判断
- **false_positive**: 确认是误报有明确理由
## 🚨 防止幻觉验证(关键!)
**Analysis Agent 可能报告不存在的文件** 你必须验证
1. **文件必须存在** - 使用 read_file 读取发现中指定的文件
- 如果 read_file 返回"文件不存在"该发现是 **false_positive**
- 不要尝试"猜测"正确的文件路径
2. **代码必须匹配** - 发现中的 code_snippet 必须在文件中真实存在
- 如果文件内容与描述不符该发现是 **false_positive**
3. **不要"填补"缺失信息** - 如果发现缺少关键信息如文件路径为空标记为 uncertain
错误做法
```
发现: "SQL注入在 api/database.py:45"
read_file 返回: "文件不存在"
判定: confirmed <- 这是错误的
```
正确做法
```
发现: "SQL注入在 api/database.py:45"
read_file 返回: "文件不存在"
判定: false_positive理由: "文件 api/database.py 不存在"
```
## ⚠️ 关键约束
1. **必须先调用工具验证** - 不允许仅凭已知信息直接判断
2. **优先使用 run_code** - 编写 Harness 进行动态验证
3. **PoC 必须完整可执行** - poc.payload 应该是可直接运行的代码
4. **不要假设环境** - 沙箱中没有运行的服务需要 mock
## 重要原则
1. **你是验证的大脑** - 你决定如何测试工具只提供执行能力
2. **动态验证优先** - 能运行代码验证的就不要仅靠静态分析
3. **质量优先** - 宁可漏报也不要误报太多
4. **证据支撑** - 每个判定都需要有依据
现在开始验证漏洞发现"""
@dataclass
class VerificationStep:
"""验证步骤"""
thought: str
action: Optional[str] = None
action_input: Optional[Dict] = None
observation: Optional[str] = None
is_final: bool = False
final_answer: Optional[Dict] = None
class VerificationAgent(BaseAgent):
"""
漏洞验证 Agent - LLM 驱动版
LLM 全程参与自主决定
1. 如何验证每个漏洞
2. 使用什么工具
3. 判断真假
"""
def __init__(
self,
llm_service,
tools: Dict[str, Any],
event_emitter=None,
):
# 组合增强的系统提示词
full_system_prompt = f"{VERIFICATION_SYSTEM_PROMPT}\n\n{CORE_SECURITY_PRINCIPLES}\n\n{VULNERABILITY_PRIORITIES}"
config = AgentConfig(
name="Verification",
agent_type=AgentType.VERIFICATION,
pattern=AgentPattern.REACT,
max_iterations=25,
system_prompt=full_system_prompt,
)
super().__init__(config, llm_service, tools, event_emitter)
self._conversation_history: List[Dict[str, str]] = []
self._steps: List[VerificationStep] = []
def _parse_llm_response(self, response: str) -> VerificationStep:
"""解析 LLM 响应 - 增强版,更健壮地提取思考内容"""
step = VerificationStep(thought="")
# 🔥 v2.1: 预处理 - 移除 Markdown 格式标记LLM 有时会输出 **Action:** 而非 Action:
cleaned_response = response
cleaned_response = re.sub(r'\*\*Action:\*\*', 'Action:', cleaned_response)
cleaned_response = re.sub(r'\*\*Action Input:\*\*', 'Action Input:', cleaned_response)
cleaned_response = re.sub(r'\*\*Thought:\*\*', 'Thought:', cleaned_response)
cleaned_response = re.sub(r'\*\*Final Answer:\*\*', 'Final Answer:', cleaned_response)
cleaned_response = re.sub(r'\*\*Observation:\*\*', 'Observation:', cleaned_response)
# 🔥 首先尝试提取明确的 Thought 标记
thought_match = re.search(r'Thought:\s*(.*?)(?=Action:|Final Answer:|$)', cleaned_response, re.DOTALL)
if thought_match:
step.thought = thought_match.group(1).strip()
# 🔥 检查是否是最终答案
final_match = re.search(r'Final Answer:\s*(.*?)$', cleaned_response, re.DOTALL)
if final_match:
step.is_final = True
answer_text = final_match.group(1).strip()
answer_text = re.sub(r'```json\s*', '', answer_text)
answer_text = re.sub(r'```\s*', '', answer_text)
# 使用增强的 JSON 解析器
step.final_answer = AgentJsonParser.parse(
answer_text,
default={"findings": [], "raw_answer": answer_text}
)
# 确保 findings 格式正确
if "findings" in step.final_answer:
step.final_answer["findings"] = [
f for f in step.final_answer["findings"]
if isinstance(f, dict)
]
# 🔥 如果没有提取到 thought使用 Final Answer 前的内容作为思考
if not step.thought:
before_final = cleaned_response[:cleaned_response.find('Final Answer:')].strip()
if before_final:
before_final = re.sub(r'^Thought:\s*', '', before_final)
step.thought = before_final[:500] if len(before_final) > 500 else before_final
return step
# 🔥 提取 Action
action_match = re.search(r'Action:\s*(\w+)', cleaned_response)
if action_match:
step.action = action_match.group(1).strip()
# 🔥 如果没有提取到 thought提取 Action 之前的内容作为思考
if not step.thought:
action_pos = cleaned_response.find('Action:')
if action_pos > 0:
before_action = cleaned_response[:action_pos].strip()
before_action = re.sub(r'^Thought:\s*', '', before_action)
if before_action:
step.thought = before_action[:500] if len(before_action) > 500 else before_action
# 🔥 提取 Action Input - 增强版,处理多种格式
input_match = re.search(r'Action Input:\s*(.*?)(?=Thought:|Action:|Observation:|$)', cleaned_response, re.DOTALL)
if input_match:
input_text = input_match.group(1).strip()
input_text = re.sub(r'```json\s*', '', input_text)
input_text = re.sub(r'```\s*', '', input_text)
# 🔥 v2.1: 如果 Action Input 为空或只有 **,记录警告
if not input_text or input_text == '**' or input_text.strip() == '':
logger.warning(f"[Verification] Action Input is empty or malformed: '{input_text}'")
step.action_input = {}
else:
# 使用增强的 JSON 解析器
step.action_input = AgentJsonParser.parse(
input_text,
default={"raw_input": input_text}
)
elif step.action:
# 🔥 v2.1: 有 Action 但没有 Action Input记录警告
logger.warning(f"[Verification] Action '{step.action}' found but no Action Input")
step.action_input = {}
# 🔥 最后的 fallback如果整个响应没有任何标记整体作为思考
if not step.thought and not step.action and not step.is_final:
if response.strip():
step.thought = response.strip()[:500]
return step
async def run(self, input_data: Dict[str, Any]) -> AgentResult:
"""
执行漏洞验证 - LLM 全程参与
"""
import time
start_time = time.time()
previous_results = input_data.get("previous_results", {})
config = input_data.get("config", {})
task = input_data.get("task", "")
task_context = input_data.get("task_context", "")
# 🔥 处理交接信息
handoff = input_data.get("handoff")
if handoff:
from .base import TaskHandoff
if isinstance(handoff, dict):
handoff = TaskHandoff.from_dict(handoff)
self.receive_handoff(handoff)
# 收集所有待验证的发现
findings_to_verify = []
# 🔥 优先从交接信息获取发现
if self._incoming_handoff and self._incoming_handoff.key_findings:
findings_to_verify = self._incoming_handoff.key_findings.copy()
logger.info(f"[Verification] 从交接信息获取 {len(findings_to_verify)} 个发现")
else:
# 🔥 修复:处理 Orchestrator 传递的多种数据格式
# 格式1: Orchestrator 直接传递 {"findings": [...]}
if isinstance(previous_results, dict) and "findings" in previous_results:
direct_findings = previous_results.get("findings", [])
if isinstance(direct_findings, list):
for f in direct_findings:
if isinstance(f, dict):
# 🔥 Always verify Critical/High findings to generate PoC, even if Analysis sets needs_verification=False
severity = str(f.get("severity", "")).lower()
needs_verify = f.get("needs_verification", True)
if needs_verify or severity in ["critical", "high"]:
findings_to_verify.append(f)
logger.info(f"[Verification] 从 previous_results.findings 获取 {len(findings_to_verify)} 个发现")
# 格式2: 传统格式 {"phase_name": {"data": {"findings": [...]}}}
if not findings_to_verify:
for phase_name, result in previous_results.items():
if phase_name == "findings":
continue # 已处理
if isinstance(result, dict):
data = result.get("data", {})
else:
data = result.data if hasattr(result, 'data') else {}
if isinstance(data, dict):
phase_findings = data.get("findings", [])
for f in phase_findings:
if isinstance(f, dict):
severity = str(f.get("severity", "")).lower()
needs_verify = f.get("needs_verification", True)
if needs_verify or severity in ["critical", "high"]:
findings_to_verify.append(f)
if findings_to_verify:
logger.info(f"[Verification] 从传统格式获取 {len(findings_to_verify)} 个发现")
# 🔥 如果仍然没有发现,尝试从 input_data 的其他字段提取
if not findings_to_verify:
# 尝试从 task 或 task_context 中提取描述的漏洞
if task and ("发现" in task or "漏洞" in task or "findings" in task.lower()):
logger.warning(f"[Verification] 无法从结构化数据获取发现,任务描述: {task[:200]}")
# 创建一个提示 LLM 从任务描述中理解漏洞的特殊处理
await self.emit_event("warning", f"无法从结构化数据获取发现列表,将基于任务描述进行验证")
# 去重
findings_to_verify = self._deduplicate(findings_to_verify)
# 🔥 FIX: 优先处理有明确文件路径的发现,将没有文件路径的发现放到后面
# 这确保 Analysis 的具体发现优先于 Recon 的泛化描述
def has_valid_file_path(finding: Dict) -> bool:
file_path = finding.get("file_path", "")
return bool(file_path and file_path.strip() and file_path.lower() not in ["unknown", "n/a", ""])
findings_with_path = [f for f in findings_to_verify if has_valid_file_path(f)]
findings_without_path = [f for f in findings_to_verify if not has_valid_file_path(f)]
# 合并:有路径的在前,没路径的在后
findings_to_verify = findings_with_path + findings_without_path
if findings_with_path:
logger.info(f"[Verification] 优先处理 {len(findings_with_path)} 个有明确文件路径的发现")
if findings_without_path:
logger.info(f"[Verification] 还有 {len(findings_without_path)} 个发现需要自行定位文件")
if not findings_to_verify:
logger.warning(f"[Verification] 没有需要验证的发现! previous_results keys: {list(previous_results.keys()) if isinstance(previous_results, dict) else 'not dict'}")
await self.emit_event("warning", "没有需要验证的发现 - 可能是数据格式问题")
return AgentResult(
success=True,
data={"findings": [], "verified_count": 0, "note": "未收到待验证的发现"},
)
# 限制数量
findings_to_verify = findings_to_verify[:20]
await self.emit_event(
"info",
f"开始验证 {len(findings_to_verify)} 个发现"
)
# 🔥 记录工作开始
self.record_work(f"开始验证 {len(findings_to_verify)} 个漏洞发现")
# 🔥 构建包含交接上下文的初始消息
handoff_context = self.get_handoff_context()
findings_summary = []
for i, f in enumerate(findings_to_verify):
# 🔥 FIX: 正确处理 file_path 格式,可能包含行号 (如 "app.py:36")
file_path = f.get('file_path', 'unknown')
line_start = f.get('line_start', 0)
# 如果 file_path 已包含行号,提取出来
if isinstance(file_path, str) and ':' in file_path:
parts = file_path.split(':', 1)
if len(parts) == 2 and parts[1].split()[0].isdigit():
file_path = parts[0]
try:
line_start = int(parts[1].split()[0])
except ValueError:
pass
findings_summary.append(f"""
### 发现 {i+1}: {f.get('title', 'Unknown')}
- 类型: {f.get('vulnerability_type', 'unknown')}
- 严重度: {f.get('severity', 'medium')}
- 文件: {file_path} ( {line_start})
- 代码:
```
{f.get('code_snippet', 'N/A')[:500]}
```
- 描述: {f.get('description', 'N/A')[:300]}
""")
initial_message = f"""请验证以下 {len(findings_to_verify)} 个安全发现。
{handoff_context if handoff_context else ''}
## 待验证发现
{''.join(findings_summary)}
## ⚠️ 重要验证指南
1. **直接使用上面列出的文件路径** - 不要猜测或搜索其他路径
2. **如果文件路径包含冒号和行号** ( "app.py:36"), 请提取文件名 "app.py" 并使用 read_file 读取
3. **先读取文件内容再判断漏洞是否存在**
4. **不要假设文件在子目录中** - 使用发现中提供的精确路径
## 验证要求
- 验证级别: {config.get('verification_level', 'standard')}
## 可用工具
{self.get_tools_description()}
请开始验证对于每个发现
1. 首先使用 read_file 读取发现中指定的文件使用精确路径
2. 分析代码上下文
3. 判断是否为真实漏洞
{f"特别注意 Analysis Agent 提到的关注点。" if handoff_context else ""}"""
# 初始化对话历史
self._conversation_history = [
{"role": "system", "content": self.config.system_prompt},
{"role": "user", "content": initial_message},
]
self._steps = []
final_result = None
await self.emit_thinking("🔐 Verification Agent 启动LLM 开始自主验证漏洞...")
try:
for iteration in range(self.config.max_iterations):
if self.is_cancelled:
break
self._iteration = iteration + 1
# 🔥 再次检查取消标志在LLM调用之前
if self.is_cancelled:
await self.emit_thinking("🛑 任务已取消,停止执行")
break
# 调用 LLM 进行思考和决策(流式输出)
try:
llm_output, tokens_this_round = await self.stream_llm_call(
self._conversation_history,
# 🔥 不传递 temperature 和 max_tokens使用用户配置
)
except asyncio.CancelledError:
logger.info(f"[{self.name}] LLM call cancelled")
break
self._total_tokens += tokens_this_round
# 🔥 Handle empty LLM response to prevent loops
if not llm_output or not llm_output.strip():
logger.warning(f"[{self.name}] Empty LLM response in iteration {self._iteration}")
await self.emit_llm_decision("收到空响应", "LLM 返回内容为空,尝试重试通过提示")
self._conversation_history.append({
"role": "user",
"content": "Received empty response. Please output your Thought and Action.",
})
continue
# 解析 LLM 响应
step = self._parse_llm_response(llm_output)
self._steps.append(step)
# 🔥 发射 LLM 思考内容事件 - 展示验证的思考过程
if step.thought:
await self.emit_llm_thought(step.thought, iteration + 1)
# 添加 LLM 响应到历史
self._conversation_history.append({
"role": "assistant",
"content": llm_output,
})
# 检查是否完成
if step.is_final:
# 🔥 强制检查:必须至少调用过一次工具才能完成
if self._tool_calls == 0:
logger.warning(f"[{self.name}] LLM tried to finish without any tool calls! Forcing tool usage.")
await self.emit_thinking("⚠️ 拒绝过早完成:必须先使用工具验证漏洞")
self._conversation_history.append({
"role": "user",
"content": (
"⚠️ **系统拒绝**: 你必须先使用工具验证漏洞!\n\n"
"不允许在没有调用任何工具的情况下直接输出 Final Answer。\n\n"
"请立即使用以下工具之一进行验证:\n"
"1. `read_file` - 读取漏洞所在文件的代码\n"
"2. `run_code` - 编写并执行 Fuzzing Harness 验证漏洞\n"
"3. `extract_function` - 提取目标函数进行分析\n\n"
"现在请输出 Thought 和 Action开始验证第一个漏洞。"
),
})
continue
await self.emit_llm_decision("完成漏洞验证", "LLM 判断验证已充分")
final_result = step.final_answer
# 🔥 记录洞察和工作
if final_result and "findings" in final_result:
verified_count = len([f for f in final_result["findings"] if f.get("is_verified")])
fp_count = len([f for f in final_result["findings"] if f.get("verdict") == "false_positive"])
confirmed_count = len([f for f in final_result["findings"] if f.get("verdict") == "confirmed"])
likely_count = len([f for f in final_result["findings"] if f.get("verdict") == "likely"])
self.add_insight(f"验证了 {len(final_result['findings'])} 个发现,{verified_count} 个确认,{fp_count} 个误报")
self.record_work(f"完成漏洞验证: {verified_count} 个确认, {fp_count} 个误报")
# 🔥 FIX: 发送验证结果到 Activity Log让用户看到完成状态
await self.emit_event(
"info",
f"验证结果: {confirmed_count} 确认, {likely_count} 可能, {fp_count} 误报"
)
# 🔥 FIX: 为每个高危发现单独发送事件
for f in final_result["findings"]:
if isinstance(f, dict):
severity = f.get("severity", "medium")
verdict = f.get("verdict", "uncertain")
title = f.get("title", "未命名发现")[:60]
# 只为高危发现发送单独事件
if severity in ["critical", "high"] and verdict in ["confirmed", "likely"]:
await self.emit_finding(
title=title,
severity=severity,
vuln_type=f.get("vulnerability_type", "unknown"),
file_path=f.get("file_path", ""),
is_verified=(verdict == "confirmed")
)
await self.emit_llm_complete(
f"验证完成",
self._total_tokens
)
break
# 执行工具
if step.action:
# 🔥 发射 LLM 动作决策事件
await self.emit_llm_action(step.action, step.action_input or {})
start_tool_time = time.time()
# 🔥 智能循环检测: 追踪重复调用 (无论成功与否)
tool_call_key = f"{step.action}:{json.dumps(step.action_input or {}, sort_keys=True)}"
if not hasattr(self, '_tool_call_counts'):
self._tool_call_counts = {}
self._tool_call_counts[tool_call_key] = self._tool_call_counts.get(tool_call_key, 0) + 1
# 如果同一操作重复尝试超过3次强制干预
if self._tool_call_counts[tool_call_key] > 3:
logger.warning(f"[{self.name}] Detected repetitive tool call loop: {tool_call_key}")
observation = (
f"⚠️ **系统干预**: 你已经使用完全相同的参数调用了工具 '{step.action}' 超过3次。\n"
"请**不要**重复尝试相同的操作。这是无效的。\n"
"请尝试:\n"
"1. 修改参数 (例如改变 input payload)\n"
"2. 使用不同的工具 (例如从 sandbox_exec 换到 php_test)\n"
"3. 如果之前的尝试都失败了,请尝试 analyze_file 重新分析代码\n"
"4. 如果无法验证,请输出 Final Answer 并标记为 uncertain"
)
# 模拟观察结果,跳过实际执行
step.observation = observation
await self.emit_llm_observation(observation)
self._conversation_history.append({
"role": "user",
"content": f"Observation:\n{observation}",
})
continue
# 🔥 循环检测:追踪工具调用失败历史 (保留原有逻辑用于错误追踪)
if not hasattr(self, '_failed_tool_calls'):
self._failed_tool_calls = {}
observation = await self.execute_tool(
step.action,
step.action_input or {}
)
# 🔥 检测工具调用失败并追踪
is_tool_error = (
"失败" in observation or
"错误" in observation or
"不存在" in observation or
"文件过大" in observation or
"Error" in observation
)
if is_tool_error:
self._failed_tool_calls[tool_call_key] = self._failed_tool_calls.get(tool_call_key, 0) + 1
fail_count = self._failed_tool_calls[tool_call_key]
# 🔥 如果同一调用连续失败3次添加强制跳过提示
if fail_count >= 3:
logger.warning(f"[{self.name}] Tool call failed {fail_count} times: {tool_call_key}")
observation += f"\n\n⚠️ **系统提示**: 此工具调用已连续失败 {fail_count} 次。请:\n"
observation += "1. 尝试使用不同的参数(如指定较小的行范围)\n"
observation += "2. 使用 search_code 工具定位关键代码片段\n"
observation += "3. 跳过此发现的验证,继续验证其他发现\n"
observation += "4. 如果已有足够验证结果,直接输出 Final Answer"
# 重置计数器
self._failed_tool_calls[tool_call_key] = 0
else:
# 成功调用,重置失败计数
if tool_call_key in self._failed_tool_calls:
del self._failed_tool_calls[tool_call_key]
# 🔥 工具执行后检查取消状态
if self.is_cancelled:
logger.info(f"[{self.name}] Cancelled after tool execution")
break
step.observation = observation
# 🔥 发射 LLM 观察事件
await self.emit_llm_observation(observation)
# 添加观察结果到历史
self._conversation_history.append({
"role": "user",
"content": f"Observation:\n{observation}",
})
else:
# LLM 没有选择工具,提示它继续
await self.emit_llm_decision("继续验证", "LLM 需要更多验证")
self._conversation_history.append({
"role": "user",
"content": "如果验证完成,请立即输出 Final Answer 汇总所有验证结果。如果验证没有完成:你现在输出了 Thought 但没有输出 Action。请**立即**选择一个工具执行",
})
# 处理结果
duration_ms = int((time.time() - start_time) * 1000)
# 🔥 如果被取消,返回取消结果
if self.is_cancelled:
await self.emit_event(
"info",
f"🛑 Verification Agent 已取消: {self._iteration} 轮迭代"
)
return AgentResult(
success=False,
error="任务已取消",
data={"findings": findings_to_verify},
iterations=self._iteration,
tool_calls=self._tool_calls,
tokens_used=self._total_tokens,
duration_ms=duration_ms,
)
# 处理最终结果
verified_findings = []
# 🔥 Robustness: If LLM returns empty findings but we had input, fallback to original
llm_findings = []
if final_result and "findings" in final_result:
llm_findings = final_result["findings"]
if not llm_findings and findings_to_verify:
logger.warning(f"[{self.name}] LLM returned empty findings despite {len(findings_to_verify)} inputs. Falling back to originals.")
# Fallback to logic below (else branch)
final_result = None
if final_result and "findings" in final_result:
# 🔥 DEBUG: Log what LLM returned for verdict diagnosis
verdicts_debug = [(f.get("file_path", "?"), f.get("verdict"), f.get("confidence")) for f in final_result["findings"]]
logger.info(f"[{self.name}] LLM returned verdicts: {verdicts_debug}")
for f in final_result["findings"]:
# 🔥 FIX: Normalize verdict - handle missing/empty verdict
verdict = f.get("verdict")
if not verdict or verdict not in ["confirmed", "likely", "uncertain", "false_positive"]:
# Try to infer verdict from other fields
if f.get("is_verified") is True:
verdict = "confirmed"
elif f.get("confidence", 0) >= 0.8:
verdict = "likely"
elif f.get("confidence", 0) <= 0.3:
verdict = "false_positive"
else:
verdict = "uncertain"
logger.warning(f"[{self.name}] Missing/invalid verdict for {f.get('file_path', '?')}, inferred as: {verdict}")
verified = {
**f,
"verdict": verdict, # 🔥 Ensure verdict is set
"is_verified": verdict == "confirmed" or (
verdict == "likely" and f.get("confidence", 0) >= 0.8
),
"verified_at": datetime.now(timezone.utc).isoformat() if verdict in ["confirmed", "likely"] else None,
}
# 添加修复建议
if not verified.get("recommendation"):
verified["recommendation"] = self._get_recommendation(f.get("vulnerability_type", ""))
verified_findings.append(verified)
else:
# 如果没有最终结果,使用原始发现
for f in findings_to_verify:
verified_findings.append({
**f,
"verdict": "uncertain",
"confidence": 0.5,
"is_verified": False,
})
# 统计
confirmed_count = len([f for f in verified_findings if f.get("verdict") == "confirmed"])
likely_count = len([f for f in verified_findings if f.get("verdict") == "likely"])
false_positive_count = len([f for f in verified_findings if f.get("verdict") == "false_positive"])
await self.emit_event(
"info",
f"Verification Agent 完成: {confirmed_count} 确认, {likely_count} 可能, {false_positive_count} 误报"
)
# 🔥 CRITICAL: Log final findings count before returning
logger.info(f"[{self.name}] Returning {len(verified_findings)} verified findings")
return AgentResult(
success=True,
data={
"findings": verified_findings,
"verified_count": confirmed_count,
"likely_count": likely_count,
"false_positive_count": false_positive_count,
},
iterations=self._iteration,
tool_calls=self._tool_calls,
tokens_used=self._total_tokens,
duration_ms=duration_ms,
)
except Exception as e:
logger.error(f"Verification Agent failed: {e}", exc_info=True)
return AgentResult(success=False, error=str(e))
def _get_recommendation(self, vuln_type: str) -> str:
"""获取修复建议"""
recommendations = {
"sql_injection": "使用参数化查询或 ORM避免字符串拼接构造 SQL",
"xss": "对用户输入进行 HTML 转义,使用 CSP避免 innerHTML",
"command_injection": "避免使用 shell=True使用参数列表传递命令",
"path_traversal": "验证和规范化路径,使用白名单,避免直接使用用户输入",
"ssrf": "验证和限制目标 URL使用白名单禁止内网访问",
"deserialization": "避免反序列化不可信数据,使用 JSON 替代 pickle/yaml",
"hardcoded_secret": "使用环境变量或密钥管理服务存储敏感信息",
"weak_crypto": "使用强加密算法AES-256, SHA-256+),避免 MD5/SHA1",
}
return recommendations.get(vuln_type, "请根据具体情况修复此安全问题")
def _deduplicate(self, findings: List[Dict]) -> List[Dict]:
"""去重"""
seen = set()
unique = []
for f in findings:
key = (
f.get("file_path", ""),
f.get("line_start", 0),
f.get("vulnerability_type", ""),
)
if key not in seen:
seen.add(key)
unique.append(f)
return unique
def get_conversation_history(self) -> List[Dict[str, str]]:
"""获取对话历史"""
return self._conversation_history
def get_steps(self) -> List[VerificationStep]:
"""获取执行步骤"""
return self._steps