feat(agent): 增强文件路径验证防止幻觉报告

添加文件路径验证规则和检查逻辑,确保漏洞报告中的文件真实存在
移除LLM响应中的Markdown格式标记,统一解析处理
更新报告工具和端点以支持项目根目录验证
This commit is contained in:
lintsinghua 2025-12-19 19:08:36 +08:00
parent 4d7abae245
commit 80704fdcb4
9 changed files with 362 additions and 57 deletions

View File

@ -511,7 +511,9 @@ async def _execute_agent_task(task_id: str):
if isinstance(f, dict): if isinstance(f, dict):
logger.debug(f"[AgentTask] Finding {i+1}: {f.get('title', 'N/A')[:50]} - {f.get('severity', 'N/A')}") logger.debug(f"[AgentTask] Finding {i+1}: {f.get('title', 'N/A')[:50]} - {f.get('severity', 'N/A')}")
await _save_findings(db, task_id, findings) # 🔥 v2.1: 传递 project_root 用于文件路径验证
saved_count = await _save_findings(db, task_id, findings, project_root=project_root)
logger.info(f"[AgentTask] Saved {saved_count}/{len(findings)} findings (filtered {len(findings) - saved_count} hallucinations)")
# 更新任务统计 # 更新任务统计
# 🔥 CRITICAL FIX: 在设置完成前再次检查取消状态 # 🔥 CRITICAL FIX: 在设置完成前再次检查取消状态
@ -523,7 +525,7 @@ async def _execute_agent_task(task_id: str):
task.status = AgentTaskStatus.COMPLETED task.status = AgentTaskStatus.COMPLETED
task.completed_at = datetime.now(timezone.utc) task.completed_at = datetime.now(timezone.utc)
task.current_phase = AgentTaskPhase.REPORTING task.current_phase = AgentTaskPhase.REPORTING
task.findings_count = len(findings) task.findings_count = saved_count # 🔥 v2.1: 使用实际保存的数量(排除幻觉)
task.total_iterations = result.iterations task.total_iterations = result.iterations
task.tool_calls_count = result.tool_calls task.tool_calls_count = result.tool_calls
task.tokens_used = result.tokens_used task.tokens_used = result.tokens_used
@ -982,8 +984,8 @@ async def _initialize_tools(
"run_code": RunCodeTool(sandbox_manager, project_root), "run_code": RunCodeTool(sandbox_manager, project_root),
"extract_function": ExtractFunctionTool(project_root), "extract_function": ExtractFunctionTool(project_root),
# 报告工具 # 报告工具 - 🔥 v2.1: 传递 project_root 用于文件验证
"create_vulnerability_report": CreateVulnerabilityReportTool(), "create_vulnerability_report": CreateVulnerabilityReportTool(project_root),
} }
# Orchestrator 工具(主要是思考工具) # Orchestrator 工具(主要是思考工具)
@ -1117,11 +1119,26 @@ async def _collect_project_info(
return info return info
async def _save_findings(db: AsyncSession, task_id: str, findings: List[Dict]) -> None: async def _save_findings(
db: AsyncSession,
task_id: str,
findings: List[Dict],
project_root: Optional[str] = None,
) -> int:
""" """
保存发现到数据库 保存发现到数据库
🔥 增强版支持多种 Agent 输出格式健壮的字段映射 🔥 增强版支持多种 Agent 输出格式健壮的字段映射
🔥 v2.1: 添加文件路径验证过滤幻觉发现
Args:
db: 数据库会话
task_id: 任务ID
findings: 发现列表
project_root: 项目根目录用于验证文件路径
Returns:
int: 实际保存的发现数量
""" """
from app.models.agent_task import VulnerabilityType from app.models.agent_task import VulnerabilityType
@ -1129,7 +1146,7 @@ async def _save_findings(db: AsyncSession, task_id: str, findings: List[Dict]) -
if not findings: if not findings:
logger.warning(f"[SaveFindings] No findings to save for task {task_id}") logger.warning(f"[SaveFindings] No findings to save for task {task_id}")
return return 0
# 🔥 Case-insensitive mapping preparation # 🔥 Case-insensitive mapping preparation
severity_map = { severity_map = {
@ -1216,6 +1233,21 @@ async def _save_findings(db: AsyncSession, task_id: str, findings: List[Dict]) -
finding.get("location", "").split(":")[0] if ":" in finding.get("location", "") else finding.get("location") finding.get("location", "").split(":")[0] if ":" in finding.get("location", "") else finding.get("location")
) )
# 🔥 v2.1: 文件路径验证 - 过滤幻觉发现
if project_root and file_path:
# 清理路径(移除可能的行号)
clean_path = file_path.split(":")[0].strip() if ":" in file_path else file_path.strip()
full_path = os.path.join(project_root, clean_path)
if not os.path.isfile(full_path):
# 尝试作为绝对路径
if not (os.path.isabs(clean_path) and os.path.isfile(clean_path)):
logger.warning(
f"[SaveFindings] 🚫 跳过幻觉发现: 文件不存在 '{file_path}' "
f"(title: {finding.get('title', 'N/A')[:50]})"
)
continue # 跳过这个发现
# 🔥 Handle line numbers (support multiple formats) # 🔥 Handle line numbers (support multiple formats)
line_start = finding.get("line_start") or finding.get("line") line_start = finding.get("line_start") or finding.get("line")
if not line_start and ":" in finding.get("location", ""): if not line_start and ":" in finding.get("location", ""):
@ -1346,6 +1378,8 @@ async def _save_findings(db: AsyncSession, task_id: str, findings: List[Dict]) -
logger.error(f"Failed to commit findings: {e}") logger.error(f"Failed to commit findings: {e}")
await db.rollback() await db.rollback()
return saved_count
def _calculate_security_score(findings: List[Dict]) -> float: def _calculate_security_score(findings: List[Dict]) -> float:
"""计算安全评分""" """计算安全评分"""
@ -3154,15 +3188,53 @@ async def generate_audit_report(
md_lines.append("") md_lines.append("")
if f.code_snippet: if f.code_snippet:
# Detect language from file extension # 🔥 v2.1: 增强语言检测,避免默认 python 标记错误
lang = "python" lang = "text" # 默认使用 text 而非 python
if f.file_path: if f.file_path:
ext = f.file_path.split('.')[-1].lower() ext = f.file_path.split('.')[-1].lower()
lang_map = { lang_map = {
'py': 'python', 'js': 'javascript', 'ts': 'typescript', # Python
'jsx': 'jsx', 'tsx': 'tsx', 'java': 'java', 'go': 'go', 'py': 'python', 'pyw': 'python', 'pyi': 'python',
'rs': 'rust', 'rb': 'ruby', 'php': 'php', 'c': 'c', # JavaScript/TypeScript
'cpp': 'cpp', 'cs': 'csharp', 'sol': 'solidity' 'js': 'javascript', 'mjs': 'javascript', 'cjs': 'javascript',
'ts': 'typescript', 'mts': 'typescript',
'jsx': 'jsx', 'tsx': 'tsx',
# Web
'html': 'html', 'htm': 'html',
'css': 'css', 'scss': 'scss', 'sass': 'sass', 'less': 'less',
'vue': 'vue', 'svelte': 'svelte',
# Backend
'java': 'java', 'kt': 'kotlin', 'kts': 'kotlin',
'go': 'go', 'rs': 'rust',
'rb': 'ruby', 'erb': 'erb',
'php': 'php', 'phtml': 'php',
# C-family
'c': 'c', 'h': 'c',
'cpp': 'cpp', 'cc': 'cpp', 'cxx': 'cpp', 'hpp': 'cpp',
'cs': 'csharp',
# Shell/Script
'sh': 'bash', 'bash': 'bash', 'zsh': 'zsh',
'ps1': 'powershell', 'psm1': 'powershell',
# Config
'json': 'json', 'yaml': 'yaml', 'yml': 'yaml',
'toml': 'toml', 'ini': 'ini', 'cfg': 'ini',
'xml': 'xml', 'xhtml': 'xml',
# Database
'sql': 'sql',
# Other
'md': 'markdown', 'markdown': 'markdown',
'sol': 'solidity',
'swift': 'swift',
'r': 'r', 'R': 'r',
'lua': 'lua',
'pl': 'perl', 'pm': 'perl',
'ex': 'elixir', 'exs': 'elixir',
'erl': 'erlang',
'hs': 'haskell',
'scala': 'scala', 'sc': 'scala',
'clj': 'clojure', 'cljs': 'clojure',
'dart': 'dart',
'groovy': 'groovy', 'gradle': 'groovy',
} }
lang = lang_map.get(ext, 'text') lang = lang_map.get(ext, 'text')
md_lines.append("**漏洞代码:**") md_lines.append("**漏洞代码:**")

View File

@ -155,6 +155,24 @@ Thought: [总结所有发现]
Final Answer: [JSON 格式的漏洞报告] Final Answer: [JSON 格式的漏洞报告]
``` ```
## ⚠️ 输出格式要求(严格遵守)
**禁止使用 Markdown 格式标记** 你的输出必须是纯文本格式
正确
```
Thought: 我需要使用 semgrep 扫描代码
Action: semgrep_scan
Action Input: {"target_path": ".", "rules": "auto"}
```
错误禁止
```
**Thought:** 我需要扫描
**Action:** semgrep_scan
**Action Input:** {...}
```
## Final Answer 格式 ## Final Answer 格式
```json ```json
{ {
@ -265,13 +283,21 @@ class AnalysisAgent(BaseAgent):
"""解析 LLM 响应 - 增强版,更健壮地提取思考内容""" """解析 LLM 响应 - 增强版,更健壮地提取思考内容"""
step = AnalysisStep(thought="") step = AnalysisStep(thought="")
# 🔥 v2.1: 预处理 - 移除 Markdown 格式标记LLM 有时会输出 **Action:** 而非 Action:
cleaned_response = response
cleaned_response = re.sub(r'\*\*Action:\*\*', 'Action:', cleaned_response)
cleaned_response = re.sub(r'\*\*Action Input:\*\*', 'Action Input:', cleaned_response)
cleaned_response = re.sub(r'\*\*Thought:\*\*', 'Thought:', cleaned_response)
cleaned_response = re.sub(r'\*\*Final Answer:\*\*', 'Final Answer:', cleaned_response)
cleaned_response = re.sub(r'\*\*Observation:\*\*', 'Observation:', cleaned_response)
# 🔥 首先尝试提取明确的 Thought 标记 # 🔥 首先尝试提取明确的 Thought 标记
thought_match = re.search(r'Thought:\s*(.*?)(?=Action:|Final Answer:|$)', response, re.DOTALL) thought_match = re.search(r'Thought:\s*(.*?)(?=Action:|Final Answer:|$)', cleaned_response, re.DOTALL)
if thought_match: if thought_match:
step.thought = thought_match.group(1).strip() step.thought = thought_match.group(1).strip()
# 🔥 检查是否是最终答案 # 🔥 检查是否是最终答案
final_match = re.search(r'Final Answer:\s*(.*?)$', response, re.DOTALL) final_match = re.search(r'Final Answer:\s*(.*?)$', cleaned_response, re.DOTALL)
if final_match: if final_match:
step.is_final = True step.is_final = True
answer_text = final_match.group(1).strip() answer_text = final_match.group(1).strip()
@ -291,7 +317,7 @@ class AnalysisAgent(BaseAgent):
# 🔥 如果没有提取到 thought使用 Final Answer 前的内容作为思考 # 🔥 如果没有提取到 thought使用 Final Answer 前的内容作为思考
if not step.thought: if not step.thought:
before_final = response[:response.find('Final Answer:')].strip() before_final = cleaned_response[:cleaned_response.find('Final Answer:')].strip()
if before_final: if before_final:
before_final = re.sub(r'^Thought:\s*', '', before_final) before_final = re.sub(r'^Thought:\s*', '', before_final)
step.thought = before_final[:500] if len(before_final) > 500 else before_final step.thought = before_final[:500] if len(before_final) > 500 else before_final
@ -299,21 +325,21 @@ class AnalysisAgent(BaseAgent):
return step return step
# 🔥 提取 Action # 🔥 提取 Action
action_match = re.search(r'Action:\s*(\w+)', response) action_match = re.search(r'Action:\s*(\w+)', cleaned_response)
if action_match: if action_match:
step.action = action_match.group(1).strip() step.action = action_match.group(1).strip()
# 🔥 如果没有提取到 thought提取 Action 之前的内容作为思考 # 🔥 如果没有提取到 thought提取 Action 之前的内容作为思考
if not step.thought: if not step.thought:
action_pos = response.find('Action:') action_pos = cleaned_response.find('Action:')
if action_pos > 0: if action_pos > 0:
before_action = response[:action_pos].strip() before_action = cleaned_response[:action_pos].strip()
before_action = re.sub(r'^Thought:\s*', '', before_action) before_action = re.sub(r'^Thought:\s*', '', before_action)
if before_action: if before_action:
step.thought = before_action[:500] if len(before_action) > 500 else before_action step.thought = before_action[:500] if len(before_action) > 500 else before_action
# 🔥 提取 Action Input # 🔥 提取 Action Input
input_match = re.search(r'Action Input:\s*(.*?)(?=Thought:|Action:|Observation:|$)', response, re.DOTALL) input_match = re.search(r'Action Input:\s*(.*?)(?=Thought:|Action:|Observation:|$)', cleaned_response, re.DOTALL)
if input_match: if input_match:
input_text = input_match.group(1).strip() input_text = input_match.group(1).strip()
input_text = re.sub(r'```json\s*', '', input_text) input_text = re.sub(r'```json\s*', '', input_text)

View File

@ -13,6 +13,7 @@ LLM 是真正的大脑,全程参与决策!
import asyncio import asyncio
import json import json
import logging import logging
import os
import re import re
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from dataclasses import dataclass from dataclasses import dataclass
@ -534,18 +535,25 @@ Action Input: {{"参数": "值"}}
def _parse_llm_response(self, response: str) -> Optional[AgentStep]: def _parse_llm_response(self, response: str) -> Optional[AgentStep]:
"""解析 LLM 响应""" """解析 LLM 响应"""
# 🔥 v2.1: 预处理 - 移除 Markdown 格式标记LLM 有时会输出 **Action:** 而非 Action:
cleaned_response = response
cleaned_response = re.sub(r'\*\*Action:\*\*', 'Action:', cleaned_response)
cleaned_response = re.sub(r'\*\*Action Input:\*\*', 'Action Input:', cleaned_response)
cleaned_response = re.sub(r'\*\*Thought:\*\*', 'Thought:', cleaned_response)
cleaned_response = re.sub(r'\*\*Observation:\*\*', 'Observation:', cleaned_response)
# 提取 Thought # 提取 Thought
thought_match = re.search(r'Thought:\s*(.*?)(?=Action:|$)', response, re.DOTALL) thought_match = re.search(r'Thought:\s*(.*?)(?=Action:|$)', cleaned_response, re.DOTALL)
thought = thought_match.group(1).strip() if thought_match else "" thought = thought_match.group(1).strip() if thought_match else ""
# 提取 Action # 提取 Action
action_match = re.search(r'Action:\s*(\w+)', response) action_match = re.search(r'Action:\s*(\w+)', cleaned_response)
if not action_match: if not action_match:
return None return None
action = action_match.group(1).strip() action = action_match.group(1).strip()
# 提取 Action Input # 提取 Action Input
input_match = re.search(r'Action Input:\s*(.*?)(?=Thought:|Observation:|$)', response, re.DOTALL) input_match = re.search(r'Action Input:\s*(.*?)(?=Thought:|Observation:|$)', cleaned_response, re.DOTALL)
if not input_match: if not input_match:
return None return None
@ -1000,11 +1008,46 @@ Action Input: {{"参数": "值"}}
logger.error(f"Sub-agent dispatch failed: {e}", exc_info=True) logger.error(f"Sub-agent dispatch failed: {e}", exc_info=True)
return f"## 调度失败\n\n错误: {str(e)}" return f"## 调度失败\n\n错误: {str(e)}"
def _normalize_finding(self, finding: Dict[str, Any]) -> Dict[str, Any]: def _validate_file_path(self, file_path: str) -> bool:
"""
🔥 v2.1: 验证文件路径是否真实存在
Args:
file_path: 相对或绝对文件路径可能包含行号 "app.py:36"
Returns:
bool: 文件是否存在
"""
if not file_path or not file_path.strip():
return False
# 获取项目根目录
project_root = self._runtime_context.get("project_root", "")
if not project_root:
# 没有项目根目录时,无法验证,返回 True 以避免误判
return True
# 清理路径(移除可能的行号)
clean_path = file_path.split(":")[0].strip() if ":" in file_path else file_path.strip()
# 尝试相对路径
full_path = os.path.join(project_root, clean_path)
if os.path.isfile(full_path):
return True
# 尝试绝对路径
if os.path.isabs(clean_path) and os.path.isfile(clean_path):
return True
return False
def _normalize_finding(self, finding: Dict[str, Any]) -> Optional[Dict[str, Any]]:
""" """
标准化发现格式 标准化发现格式
不同 Agent 可能返回不同格式的发现这个方法将它们标准化为统一格式 不同 Agent 可能返回不同格式的发现这个方法将它们标准化为统一格式
🔥 v2.1: 添加文件路径验证返回 None 表示发现无效幻觉
""" """
normalized = dict(finding) # 复制原始数据 normalized = dict(finding) # 复制原始数据
@ -1086,6 +1129,15 @@ Action Input: {{"参数": "值"}}
if "impact" not in normalized["description"].lower(): if "impact" not in normalized["description"].lower():
normalized["description"] += f"\n\nImpact: {normalized['impact']}" normalized["description"] += f"\n\nImpact: {normalized['impact']}"
# 🔥 v2.1: 验证文件路径存在性
file_path = normalized.get("file_path", "")
if file_path and not self._validate_file_path(file_path):
logger.warning(
f"[Orchestrator] 🚫 过滤幻觉发现: 文件不存在 '{file_path}' "
f"(title: {normalized.get('title', 'N/A')[:50]})"
)
return None # 返回 None 表示发现无效
return normalized return normalized
def _summarize_findings(self) -> str: def _summarize_findings(self) -> str:

View File

@ -80,6 +80,29 @@ Thought: [总结收集到的所有信息]
Final Answer: [JSON 格式的结果] Final Answer: [JSON 格式的结果]
``` ```
## ⚠️ 输出格式要求(严格遵守)
**禁止使用 Markdown 格式标记** 你的输出必须是纯文本格式
正确格式
```
Thought: 我需要查看项目结构来了解项目组成
Action: list_files
Action Input: {"directory": "."}
```
错误格式禁止使用
```
**Thought:** 我需要查看项目结构
**Action:** list_files
**Action Input:** {"directory": "."}
```
规则
1. 不要在 Thought:Action:Action Input:Final Answer: 前后添加 `**`
2. 不要使用其他 Markdown 格式 `###`、`*斜体*` 等)
3. Action Input 必须是完整的 JSON 对象不能为空或截断
## 输出格式 ## 输出格式
``` ```
@ -208,13 +231,21 @@ class ReconAgent(BaseAgent):
"""解析 LLM 响应 - 增强版,更健壮地提取思考内容""" """解析 LLM 响应 - 增强版,更健壮地提取思考内容"""
step = ReconStep(thought="") step = ReconStep(thought="")
# 🔥 v2.1: 预处理 - 移除 Markdown 格式标记LLM 有时会输出 **Action:** 而非 Action:
cleaned_response = response
cleaned_response = re.sub(r'\*\*Action:\*\*', 'Action:', cleaned_response)
cleaned_response = re.sub(r'\*\*Action Input:\*\*', 'Action Input:', cleaned_response)
cleaned_response = re.sub(r'\*\*Thought:\*\*', 'Thought:', cleaned_response)
cleaned_response = re.sub(r'\*\*Final Answer:\*\*', 'Final Answer:', cleaned_response)
cleaned_response = re.sub(r'\*\*Observation:\*\*', 'Observation:', cleaned_response)
# 🔥 首先尝试提取明确的 Thought 标记 # 🔥 首先尝试提取明确的 Thought 标记
thought_match = re.search(r'Thought:\s*(.*?)(?=Action:|Final Answer:|$)', response, re.DOTALL) thought_match = re.search(r'Thought:\s*(.*?)(?=Action:|Final Answer:|$)', cleaned_response, re.DOTALL)
if thought_match: if thought_match:
step.thought = thought_match.group(1).strip() step.thought = thought_match.group(1).strip()
# 🔥 检查是否是最终答案 # 🔥 检查是否是最终答案
final_match = re.search(r'Final Answer:\s*(.*?)$', response, re.DOTALL) final_match = re.search(r'Final Answer:\s*(.*?)$', cleaned_response, re.DOTALL)
if final_match: if final_match:
step.is_final = True step.is_final = True
answer_text = final_match.group(1).strip() answer_text = final_match.group(1).strip()
@ -234,7 +265,7 @@ class ReconAgent(BaseAgent):
# 🔥 如果没有提取到 thought使用 Final Answer 前的内容作为思考 # 🔥 如果没有提取到 thought使用 Final Answer 前的内容作为思考
if not step.thought: if not step.thought:
before_final = response[:response.find('Final Answer:')].strip() before_final = cleaned_response[:cleaned_response.find('Final Answer:')].strip()
if before_final: if before_final:
# 移除可能的 Thought: 前缀 # 移除可能的 Thought: 前缀
before_final = re.sub(r'^Thought:\s*', '', before_final) before_final = re.sub(r'^Thought:\s*', '', before_final)
@ -243,22 +274,22 @@ class ReconAgent(BaseAgent):
return step return step
# 🔥 提取 Action # 🔥 提取 Action
action_match = re.search(r'Action:\s*(\w+)', response) action_match = re.search(r'Action:\s*(\w+)', cleaned_response)
if action_match: if action_match:
step.action = action_match.group(1).strip() step.action = action_match.group(1).strip()
# 🔥 如果没有提取到 thought提取 Action 之前的内容作为思考 # 🔥 如果没有提取到 thought提取 Action 之前的内容作为思考
if not step.thought: if not step.thought:
action_pos = response.find('Action:') action_pos = cleaned_response.find('Action:')
if action_pos > 0: if action_pos > 0:
before_action = response[:action_pos].strip() before_action = cleaned_response[:action_pos].strip()
# 移除可能的 Thought: 前缀 # 移除可能的 Thought: 前缀
before_action = re.sub(r'^Thought:\s*', '', before_action) before_action = re.sub(r'^Thought:\s*', '', before_action)
if before_action: if before_action:
step.thought = before_action[:500] if len(before_action) > 500 else before_action step.thought = before_action[:500] if len(before_action) > 500 else before_action
# 🔥 提取 Action Input # 🔥 提取 Action Input
input_match = re.search(r'Action Input:\s*(.*?)(?=Thought:|Action:|Observation:|$)', response, re.DOTALL) input_match = re.search(r'Action Input:\s*(.*?)(?=Thought:|Action:|Observation:|$)', cleaned_response, re.DOTALL)
if input_match: if input_match:
input_text = input_match.group(1).strip() input_text = input_match.group(1).strip()
input_text = re.sub(r'```json\s*', '', input_text) input_text = re.sub(r'```json\s*', '', input_text)

View File

@ -223,6 +223,29 @@ Thought: [总结验证结果]
Final Answer: [JSON 格式的验证报告] Final Answer: [JSON 格式的验证报告]
``` ```
## ⚠️ 输出格式要求(严格遵守)
**禁止使用 Markdown 格式标记** 你的输出必须是纯文本格式
正确格式
```
Thought: 我需要读取 search.php 文件来验证 SQL 注入漏洞
Action: read_file
Action Input: {"file_path": "search.php"}
```
错误格式禁止使用
```
**Thought:** 我需要读取文件
**Action:** read_file
**Action Input:** {"file_path": "search.php"}
```
规则
1. 不要在 Thought:Action:Action Input:Final Answer: 前后添加 `**`
2. 不要使用其他 Markdown 格式 `###`、`*斜体*` 等)
3. Action Input 必须是完整的 JSON 对象不能为空或截断
## Final Answer 格式 ## Final Answer 格式
```json ```json
{ {
@ -323,13 +346,21 @@ class VerificationAgent(BaseAgent):
"""解析 LLM 响应 - 增强版,更健壮地提取思考内容""" """解析 LLM 响应 - 增强版,更健壮地提取思考内容"""
step = VerificationStep(thought="") step = VerificationStep(thought="")
# 🔥 v2.1: 预处理 - 移除 Markdown 格式标记LLM 有时会输出 **Action:** 而非 Action:
cleaned_response = response
cleaned_response = re.sub(r'\*\*Action:\*\*', 'Action:', cleaned_response)
cleaned_response = re.sub(r'\*\*Action Input:\*\*', 'Action Input:', cleaned_response)
cleaned_response = re.sub(r'\*\*Thought:\*\*', 'Thought:', cleaned_response)
cleaned_response = re.sub(r'\*\*Final Answer:\*\*', 'Final Answer:', cleaned_response)
cleaned_response = re.sub(r'\*\*Observation:\*\*', 'Observation:', cleaned_response)
# 🔥 首先尝试提取明确的 Thought 标记 # 🔥 首先尝试提取明确的 Thought 标记
thought_match = re.search(r'Thought:\s*(.*?)(?=Action:|Final Answer:|$)', response, re.DOTALL) thought_match = re.search(r'Thought:\s*(.*?)(?=Action:|Final Answer:|$)', cleaned_response, re.DOTALL)
if thought_match: if thought_match:
step.thought = thought_match.group(1).strip() step.thought = thought_match.group(1).strip()
# 🔥 检查是否是最终答案 # 🔥 检查是否是最终答案
final_match = re.search(r'Final Answer:\s*(.*?)$', response, re.DOTALL) final_match = re.search(r'Final Answer:\s*(.*?)$', cleaned_response, re.DOTALL)
if final_match: if final_match:
step.is_final = True step.is_final = True
answer_text = final_match.group(1).strip() answer_text = final_match.group(1).strip()
@ -349,7 +380,7 @@ class VerificationAgent(BaseAgent):
# 🔥 如果没有提取到 thought使用 Final Answer 前的内容作为思考 # 🔥 如果没有提取到 thought使用 Final Answer 前的内容作为思考
if not step.thought: if not step.thought:
before_final = response[:response.find('Final Answer:')].strip() before_final = cleaned_response[:cleaned_response.find('Final Answer:')].strip()
if before_final: if before_final:
before_final = re.sub(r'^Thought:\s*', '', before_final) before_final = re.sub(r'^Thought:\s*', '', before_final)
step.thought = before_final[:500] if len(before_final) > 500 else before_final step.thought = before_final[:500] if len(before_final) > 500 else before_final
@ -357,30 +388,40 @@ class VerificationAgent(BaseAgent):
return step return step
# 🔥 提取 Action # 🔥 提取 Action
action_match = re.search(r'Action:\s*(\w+)', response) action_match = re.search(r'Action:\s*(\w+)', cleaned_response)
if action_match: if action_match:
step.action = action_match.group(1).strip() step.action = action_match.group(1).strip()
# 🔥 如果没有提取到 thought提取 Action 之前的内容作为思考 # 🔥 如果没有提取到 thought提取 Action 之前的内容作为思考
if not step.thought: if not step.thought:
action_pos = response.find('Action:') action_pos = cleaned_response.find('Action:')
if action_pos > 0: if action_pos > 0:
before_action = response[:action_pos].strip() before_action = cleaned_response[:action_pos].strip()
before_action = re.sub(r'^Thought:\s*', '', before_action) before_action = re.sub(r'^Thought:\s*', '', before_action)
if before_action: if before_action:
step.thought = before_action[:500] if len(before_action) > 500 else before_action step.thought = before_action[:500] if len(before_action) > 500 else before_action
# 🔥 提取 Action Input # 🔥 提取 Action Input - 增强版,处理多种格式
input_match = re.search(r'Action Input:\s*(.*?)(?=Thought:|Action:|Observation:|$)', response, re.DOTALL) input_match = re.search(r'Action Input:\s*(.*?)(?=Thought:|Action:|Observation:|$)', cleaned_response, re.DOTALL)
if input_match: if input_match:
input_text = input_match.group(1).strip() input_text = input_match.group(1).strip()
input_text = re.sub(r'```json\s*', '', input_text) input_text = re.sub(r'```json\s*', '', input_text)
input_text = re.sub(r'```\s*', '', input_text) input_text = re.sub(r'```\s*', '', input_text)
# 使用增强的 JSON 解析器
step.action_input = AgentJsonParser.parse( # 🔥 v2.1: 如果 Action Input 为空或只有 **,记录警告
input_text, if not input_text or input_text == '**' or input_text.strip() == '':
default={"raw_input": input_text} logger.warning(f"[Verification] Action Input is empty or malformed: '{input_text}'")
) step.action_input = {}
else:
# 使用增强的 JSON 解析器
step.action_input = AgentJsonParser.parse(
input_text,
default={"raw_input": input_text}
)
elif step.action:
# 🔥 v2.1: 有 Action 但没有 Action Input记录警告
logger.warning(f"[Verification] Action '{step.action}' found but no Action Input")
step.action_input = {}
# 🔥 最后的 fallback如果整个响应没有任何标记整体作为思考 # 🔥 最后的 fallback如果整个响应没有任何标记整体作为思考
if not step.thought and not step.action and not step.is_final: if not step.thought and not step.action and not step.is_final:

View File

@ -331,8 +331,8 @@ class AgentRunner:
self.verification_tools = { self.verification_tools = {
**base_tools, **base_tools,
# 验证工具 - 移除旧的 vulnerability_validation 和 dataflow_analysis强制使用沙箱 # 验证工具 - 移除旧的 vulnerability_validation 和 dataflow_analysis强制使用沙箱
# 🔥 新增漏洞报告工具仅Verification可用 # 🔥 新增漏洞报告工具仅Verification可用- v2.1: 传递 project_root
"create_vulnerability_report": CreateVulnerabilityReportTool(), "create_vulnerability_report": CreateVulnerabilityReportTool(self.project_root),
# 🔥 新增:反思工具 # 🔥 新增:反思工具
"reflect": ReflectTool(), "reflect": ReflectTool(),
} }

View File

@ -216,6 +216,7 @@ def build_specialized_prompt(
# 导入系统提示词 # 导入系统提示词
from .system_prompts import ( from .system_prompts import (
CORE_SECURITY_PRINCIPLES, CORE_SECURITY_PRINCIPLES,
FILE_VALIDATION_RULES, # 🔥 v2.1
VULNERABILITY_PRIORITIES, VULNERABILITY_PRIORITIES,
TOOL_USAGE_GUIDE, TOOL_USAGE_GUIDE,
MULTI_AGENT_RULES, MULTI_AGENT_RULES,
@ -234,6 +235,7 @@ __all__ = [
"build_specialized_prompt", "build_specialized_prompt",
# 系统提示词 # 系统提示词
"CORE_SECURITY_PRINCIPLES", "CORE_SECURITY_PRINCIPLES",
"FILE_VALIDATION_RULES", # 🔥 v2.1
"VULNERABILITY_PRIORITIES", "VULNERABILITY_PRIORITIES",
"TOOL_USAGE_GUIDE", "TOOL_USAGE_GUIDE",
"MULTI_AGENT_RULES", "MULTI_AGENT_RULES",

View File

@ -36,6 +36,60 @@ CORE_SECURITY_PRINCIPLES = """
</core_security_principles> </core_security_principles>
""" """
# 🔥 v2.1: 文件路径验证规则 - 防止幻觉
FILE_VALIDATION_RULES = """
<file_validation_rules>
## 🔒 文件路径验证规则(强制执行)
### ⚠️ 严禁幻觉行为
在报告任何漏洞之前**必须**遵守以下规则
1. **先验证文件存在**
- 在报告漏洞前必须使用 `read_file` `list_files` 工具确认文件存在
- 禁止基于"典型项目结构""常见框架模式"猜测文件路径
- 禁止假设 `config/database.py``app/api.py` 等文件存在
2. **引用真实代码**
- `code_snippet` 必须来自 `read_file` 工具的实际输出
- 禁止凭记忆或推测编造代码片段
- 行号必须在文件实际行数范围内
3. **验证行号准确性**
- 报告的 `line_start` `line_end` 必须基于实际读取的文件
- 如果不确定行号使用 `read_file` 重新确认
4. **匹配项目技术栈**
- Rust 项目不会有 `.py` 文件除非明确存在
- 前端项目不会有后端数据库配置
- 仔细观察 Recon Agent 返回的技术栈信息
### ✅ 正确做法示例
```
# 错误 ❌:直接报告未验证的文件
Action: create_vulnerability_report
Action Input: {"file_path": "config/database.py", ...}
# 正确 ✅:先读取验证,再报告
Action: read_file
Action Input: {"file_path": "config/database.py"}
# 如果文件存在且包含漏洞代码,再报告
Action: create_vulnerability_report
Action Input: {"file_path": "config/database.py", "code_snippet": "实际读取的代码", ...}
```
### 🚫 违规后果
如果报告的文件路径不存在系统会
1. 拒绝创建漏洞报告
2. 记录违规行为
3. 要求重新验证
**记住宁可漏报不可误报质量优于数量**
</file_validation_rules>
"""
# 漏洞优先级和检测策略 # 漏洞优先级和检测策略
VULNERABILITY_PRIORITIES = """ VULNERABILITY_PRIORITIES = """
<vulnerability_priorities> <vulnerability_priorities>
@ -313,6 +367,7 @@ def build_enhanced_prompt(
include_principles: bool = True, include_principles: bool = True,
include_priorities: bool = True, include_priorities: bool = True,
include_tools: bool = True, include_tools: bool = True,
include_validation: bool = True, # 🔥 v2.1: 默认包含文件验证规则
) -> str: ) -> str:
""" """
构建增强的提示词 构建增强的提示词
@ -322,6 +377,7 @@ def build_enhanced_prompt(
include_principles: 是否包含核心原则 include_principles: 是否包含核心原则
include_priorities: 是否包含漏洞优先级 include_priorities: 是否包含漏洞优先级
include_tools: 是否包含工具指南 include_tools: 是否包含工具指南
include_validation: 是否包含文件验证规则
Returns: Returns:
增强后的提示词 增强后的提示词
@ -331,6 +387,10 @@ def build_enhanced_prompt(
if include_principles: if include_principles:
parts.append(CORE_SECURITY_PRINCIPLES) parts.append(CORE_SECURITY_PRINCIPLES)
# 🔥 v2.1: 添加文件验证规则
if include_validation:
parts.append(FILE_VALIDATION_RULES)
if include_priorities: if include_priorities:
parts.append(VULNERABILITY_PRIORITIES) parts.append(VULNERABILITY_PRIORITIES)
@ -342,6 +402,7 @@ def build_enhanced_prompt(
__all__ = [ __all__ = [
"CORE_SECURITY_PRINCIPLES", "CORE_SECURITY_PRINCIPLES",
"FILE_VALIDATION_RULES", # 🔥 v2.1
"VULNERABILITY_PRIORITIES", "VULNERABILITY_PRIORITIES",
"TOOL_USAGE_GUIDE", "TOOL_USAGE_GUIDE",
"MULTI_AGENT_RULES", "MULTI_AGENT_RULES",

View File

@ -5,6 +5,7 @@
""" """
import logging import logging
import os
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional, List, Dict, Any from typing import Optional, List, Dict, Any
@ -50,14 +51,17 @@ class CreateVulnerabilityReportTool(AgentTool):
通常只有专门的报告Agent或验证Agent才会调用这个工具 通常只有专门的报告Agent或验证Agent才会调用这个工具
确保漏洞在被正式报告之前已经经过了充分的验证 确保漏洞在被正式报告之前已经经过了充分的验证
🔥 v2.1: 添加文件路径验证拒绝报告不存在的文件
""" """
# 存储所有报告的漏洞 # 存储所有报告的漏洞
_vulnerability_reports: List[Dict[str, Any]] = [] _vulnerability_reports: List[Dict[str, Any]] = []
def __init__(self): def __init__(self, project_root: Optional[str] = None):
super().__init__() super().__init__()
self._reports: List[Dict[str, Any]] = [] self._reports: List[Dict[str, Any]] = []
self.project_root = project_root # 🔥 v2.1: 用于文件验证
@property @property
def name(self) -> str: def name(self) -> str:
@ -126,6 +130,22 @@ class CreateVulnerabilityReportTool(AgentTool):
if not file_path or not file_path.strip(): if not file_path or not file_path.strip():
return ToolResult(success=False, error="文件路径不能为空") return ToolResult(success=False, error="文件路径不能为空")
# 🔥 v2.1: 验证文件路径存在性 - 防止幻觉
if self.project_root:
# 清理路径(移除可能的行号,如 "app.py:36"
clean_path = file_path.split(":")[0].strip() if ":" in file_path else file_path.strip()
full_path = os.path.join(self.project_root, clean_path)
if not os.path.isfile(full_path):
# 尝试作为绝对路径
if not (os.path.isabs(clean_path) and os.path.isfile(clean_path)):
logger.warning(f"[ReportTool] 🚫 拒绝报告: 文件不存在 '{file_path}'")
return ToolResult(
success=False,
error=f"无法创建报告:文件 '{file_path}' 在项目中不存在。"
f"请先使用 read_file 工具验证文件存在,然后再报告漏洞。"
)
# 验证严重程度 # 验证严重程度
valid_severities = ["critical", "high", "medium", "low", "info"] valid_severities = ["critical", "high", "medium", "low", "info"]
severity = severity.lower() severity = severity.lower()