diff --git a/backend/app/api/v1/endpoints/agent_tasks.py b/backend/app/api/v1/endpoints/agent_tasks.py index 799760f..fb76ebe 100644 --- a/backend/app/api/v1/endpoints/agent_tasks.py +++ b/backend/app/api/v1/endpoints/agent_tasks.py @@ -597,11 +597,50 @@ async def _initialize_tools( } # Verification 工具 + # 🔥 导入沙箱工具 + from app.services.agent.tools import ( + SandboxTool, SandboxHttpTool, VulnerabilityVerifyTool, SandboxManager, + # 多语言代码测试工具 + PhpTestTool, PythonTestTool, JavaScriptTestTool, JavaTestTool, + GoTestTool, RubyTestTool, ShellTestTool, UniversalCodeTestTool, + # 漏洞验证专用工具 + CommandInjectionTestTool, SqlInjectionTestTool, XssTestTool, + PathTraversalTestTool, SstiTestTool, DeserializationTestTool, + UniversalVulnTestTool, + ) + + # 🔥 初始化沙箱管理器 + sandbox_manager = SandboxManager() + await sandbox_manager.initialize() + logger.info(f"✅ Sandbox initialized (available: {sandbox_manager.is_available})") + verification_tools = { **base_tools, - # 强制使用沙箱工具,移除 LLM 模拟验证工具 - # "vulnerability_validation": VulnerabilityValidationTool(llm_service), - # "dataflow_analysis": DataFlowAnalysisTool(llm_service), + # 🔥 沙箱验证工具 + "sandbox_exec": SandboxTool(sandbox_manager), + "sandbox_http": SandboxHttpTool(sandbox_manager), + "verify_vulnerability": VulnerabilityVerifyTool(sandbox_manager), + + # 🔥 多语言代码测试工具 + "php_test": PhpTestTool(sandbox_manager, project_root), + "python_test": PythonTestTool(sandbox_manager, project_root), + "javascript_test": JavaScriptTestTool(sandbox_manager, project_root), + "java_test": JavaTestTool(sandbox_manager, project_root), + "go_test": GoTestTool(sandbox_manager, project_root), + "ruby_test": RubyTestTool(sandbox_manager, project_root), + "shell_test": ShellTestTool(sandbox_manager, project_root), + "universal_code_test": UniversalCodeTestTool(sandbox_manager, project_root), + + # 🔥 漏洞验证专用工具 + "test_command_injection": CommandInjectionTestTool(sandbox_manager, project_root), + "test_sql_injection": SqlInjectionTestTool(sandbox_manager, project_root), + "test_xss": XssTestTool(sandbox_manager, project_root), + "test_path_traversal": PathTraversalTestTool(sandbox_manager, project_root), + "test_ssti": SstiTestTool(sandbox_manager, project_root), + "test_deserialization": DeserializationTestTool(sandbox_manager, project_root), + "universal_vuln_test": UniversalVulnTestTool(sandbox_manager, project_root), + + # 报告工具 "create_vulnerability_report": CreateVulnerabilityReportTool(), } diff --git a/backend/app/services/agent/agents/analysis.py b/backend/app/services/agent/agents/analysis.py index 1d18d71..e222a60 100644 --- a/backend/app/services/agent/agents/analysis.py +++ b/backend/app/services/agent/agents/analysis.py @@ -190,19 +190,15 @@ class AnalysisAgent(BaseAgent): def _parse_llm_response(self, response: str) -> AnalysisStep: - """解析 LLM 响应""" + """解析 LLM 响应 - 增强版,更健壮地提取思考内容""" step = AnalysisStep(thought="") - - # 提取 Thought + + # 🔥 首先尝试提取明确的 Thought 标记 thought_match = re.search(r'Thought:\s*(.*?)(?=Action:|Final Answer:|$)', response, re.DOTALL) if thought_match: step.thought = thought_match.group(1).strip() - elif not re.search(r'Action:|Final Answer:', response): - # 🔥 Fallback: If no markers found, treat the whole response as Thought - if response.strip(): - step.thought = response.strip() - - # 检查是否是最终答案 + + # 🔥 检查是否是最终答案 final_match = re.search(r'Final Answer:\s*(.*?)$', response, re.DOTALL) if final_match: step.is_final = True @@ -211,23 +207,40 @@ class AnalysisAgent(BaseAgent): answer_text = re.sub(r'```\s*', '', answer_text) # 使用增强的 JSON 解析器 step.final_answer = AgentJsonParser.parse( - answer_text, + answer_text, default={"findings": [], "raw_answer": answer_text} ) # 确保 findings 格式正确 if "findings" in step.final_answer: step.final_answer["findings"] = [ - f for f in step.final_answer["findings"] + f for f in step.final_answer["findings"] if isinstance(f, dict) ] + + # 🔥 如果没有提取到 thought,使用 Final Answer 前的内容作为思考 + if not step.thought: + before_final = response[:response.find('Final Answer:')].strip() + if before_final: + before_final = re.sub(r'^Thought:\s*', '', before_final) + step.thought = before_final[:500] if len(before_final) > 500 else before_final + return step - - # 提取 Action + + # 🔥 提取 Action action_match = re.search(r'Action:\s*(\w+)', response) if action_match: step.action = action_match.group(1).strip() - - # 提取 Action Input + + # 🔥 如果没有提取到 thought,提取 Action 之前的内容作为思考 + if not step.thought: + action_pos = response.find('Action:') + if action_pos > 0: + before_action = response[:action_pos].strip() + before_action = re.sub(r'^Thought:\s*', '', before_action) + if before_action: + step.thought = before_action[:500] if len(before_action) > 500 else before_action + + # 🔥 提取 Action Input input_match = re.search(r'Action Input:\s*(.*?)(?=Thought:|Action:|Observation:|$)', response, re.DOTALL) if input_match: input_text = input_match.group(1).strip() @@ -238,7 +251,12 @@ class AnalysisAgent(BaseAgent): input_text, default={"raw_input": input_text} ) - + + # 🔥 最后的 fallback:如果整个响应没有任何标记,整体作为思考 + if not step.thought and not step.action and not step.is_final: + if response.strip(): + step.thought = response.strip()[:500] + return step @@ -304,9 +322,12 @@ class AnalysisAgent(BaseAgent): """ initial_message += f"""{handoff_context if handoff_context else f'''## 上下文信息 -### 高风险区域 +### ⚠️ 高风险区域(来自 Recon Agent,必须优先分析) +以下是 Recon Agent 识别的高风险区域,请**务必优先**读取和分析这些文件: {json.dumps(high_risk_areas[:20], ensure_ascii=False)} +**重要**: 请使用 read_file 工具读取上述高风险文件,不要假设文件路径或使用其他路径。 + ### 入口点 (前10个) {json.dumps(entry_points[:10], ensure_ascii=False, indent=2)} @@ -316,13 +337,20 @@ class AnalysisAgent(BaseAgent): ## 任务 {task_context or task or '进行全面的安全漏洞分析,发现代码中的安全问题。'} +## ⚠️ 分析策略要求 +1. **首先**:使用 read_file 读取上面列出的高风险文件 +2. **然后**:分析这些文件中的安全问题 +3. **最后**:如果需要,使用 smart_scan 或其他工具扩展分析 + +**禁止**:不要跳过高风险区域直接做全局扫描 + ## 目标漏洞类型 {config.get('target_vulnerabilities', ['all'])} ## 可用工具 {self.get_tools_description()} -请开始你的安全分析。首先思考分析策略,然后选择合适的工具开始分析。""" +请开始你的安全分析。首先读取高风险区域的文件,然后分析其中的安全问题。""" # 🔥 记录工作开始 self.record_work("开始安全漏洞分析") diff --git a/backend/app/services/agent/agents/base.py b/backend/app/services/agent/agents/base.py index c3cbf51..9f7c7be 100644 --- a/backend/app/services/agent/agents/base.py +++ b/backend/app/services/agent/agents/base.py @@ -16,6 +16,7 @@ from dataclasses import dataclass, field from enum import Enum from datetime import datetime, timezone import asyncio +import json import logging import uuid @@ -1036,26 +1037,49 @@ class BaseAgent(ABC): if result.success: output = str(result.data) - + # 包含 metadata 中的额外信息 if result.metadata: if "issues" in result.metadata: - import json output += f"\n\n发现的问题:\n{json.dumps(result.metadata['issues'], ensure_ascii=False, indent=2)}" if "findings" in result.metadata: - import json output += f"\n\n发现:\n{json.dumps(result.metadata['findings'][:10], ensure_ascii=False, indent=2)}" - + # 截断过长输出 if len(output) > 6000: output = output[:6000] + f"\n\n... [输出已截断,共 {len(str(result.data))} 字符]" return output else: - return f"工具执行失败: {result.error}" - + # 🔥 输出详细的错误信息,包括原始错误 + error_msg = f"""⚠️ 工具执行失败 + +**工具**: {tool_name} +**参数**: {json.dumps(tool_input, ensure_ascii=False, indent=2) if tool_input else '无'} +**错误**: {result.error} + +请根据错误信息调整参数或尝试其他方法。""" + return error_msg + except Exception as e: + import traceback logger.error(f"Tool execution error: {e}") - return f"工具执行错误: {str(e)}" + # 🔥 输出完整的原始错误信息,包括堆栈跟踪 + error_msg = f"""❌ 工具执行异常 + +**工具**: {tool_name} +**参数**: {json.dumps(tool_input, ensure_ascii=False, indent=2) if tool_input else '无'} +**错误类型**: {type(e).__name__} +**错误信息**: {str(e)} +**堆栈跟踪**: +``` +{traceback.format_exc()} +``` + +请分析错误原因,可能需要: +1. 检查参数格式是否正确 +2. 尝试使用其他工具 +3. 如果是权限或资源问题,跳过该操作""" + return error_msg def get_tools_description(self) -> str: """生成工具描述文本(用于 prompt)""" diff --git a/backend/app/services/agent/agents/orchestrator.py b/backend/app/services/agent/agents/orchestrator.py index f43f503..24d6df0 100644 --- a/backend/app/services/agent/agents/orchestrator.py +++ b/backend/app/services/agent/agents/orchestrator.py @@ -159,6 +159,9 @@ class OrchestratorAgent(BaseAgent): # 🔥 跟踪已调度的 Agent 任务,避免重复调度 self._dispatched_tasks: Dict[str, int] = {} # agent_name -> dispatch_count + + # 🔥 保存各个 Agent 的完整结果,用于传递给后续 Agent + self._agent_results: Dict[str, Dict[str, Any]] = {} # agent_name -> full result data def register_sub_agent(self, name: str, agent: BaseAgent): """注册子 Agent""" @@ -216,6 +219,7 @@ class OrchestratorAgent(BaseAgent): self._steps = [] self._all_findings = [] + self._agent_results = {} # 🔥 重置 Agent 结果缓存 final_result = None error_message = None # 🔥 跟踪错误信息 @@ -625,16 +629,23 @@ Action Input: {{"参数": "值"}} # 确保 project_info 包含 root 路径 if "root" not in project_info: project_info["root"] = self._runtime_context.get("project_root", ".") - + + # 🔥 FIX: 构建完整的 previous_results,包含所有已执行 Agent 的结果 + previous_results = { + "findings": self._all_findings, # 传递已收集的发现 + } + + # 🔥 将之前 Agent 的完整结果传递给后续 Agent + for prev_agent, prev_data in self._agent_results.items(): + previous_results[prev_agent] = {"data": prev_data} + sub_input = { "task": task, "task_context": context, "project_info": project_info, "config": self._runtime_context.get("config", {}), "project_root": self._runtime_context.get("project_root", "."), - "previous_results": { - "findings": self._all_findings, # 传递已收集的发现 - }, + "previous_results": previous_results, } # 🔥 执行子 Agent 前检查取消状态 @@ -647,11 +658,18 @@ Action Input: {{"参数": "值"}} # 🔥 执行后再次检查取消状态 if self.is_cancelled: return f"## {agent_name} Agent 执行中断\n\n任务已被用户取消" - + # 🔥 处理子 Agent 结果 - 不同 Agent 返回不同的数据结构 + # 🔥 DEBUG: 添加诊断日志 + logger.info(f"[Orchestrator] Processing {agent_name} result: success={result.success}, data_type={type(result.data).__name__}, data_keys={list(result.data.keys()) if isinstance(result.data, dict) else 'N/A'}") + if result.success and result.data: data = result.data + # 🔥 FIX: 保存 Agent 的完整结果,供后续 Agent 使用 + self._agent_results[agent_name] = data + logger.info(f"[Orchestrator] Saved {agent_name} result with keys: {list(data.keys())}") + # 🔥 CRITICAL FIX: 收集发现 - 支持多种字段名 # findings 字段通常来自 Analysis/Verification Agent # initial_findings 来自 Recon Agent @@ -662,21 +680,112 @@ Action Input: {{"参数": "值"}} # 即使 findings 为空列表,也检查 initial_findings if "initial_findings" in data: initial = data.get("initial_findings", []) - logger.info(f"[Orchestrator] {agent_name} has {len(initial)} initial_findings") + logger.info(f"[Orchestrator] {agent_name} has {len(initial)} initial_findings, types: {[type(f).__name__ for f in initial[:3]]}") for f in initial: if isinstance(f, dict): # 🔥 Normalize finding format - 处理 Recon 返回的格式 normalized = self._normalize_finding(f) if normalized not in raw_findings: raw_findings.append(normalized) - elif isinstance(f, str): - # String finding from Recon - skip, it's just an observation - logger.debug(f"[Orchestrator] Skipping string finding: {f[:50]}...") + logger.info(f"[Orchestrator] Added dict finding from initial_findings") + elif isinstance(f, str) and f.strip(): + # 🔥 FIX: Convert string finding to dict format instead of skipping + # Recon Agent 有时候会返回字符串格式的发现 + # 尝试从字符串中提取文件路径(格式如 "app.py:36 - 描述") + file_path = "" + line_start = 0 + if ":" in f: + parts = f.split(":", 1) + potential_file = parts[0].strip() + # 检查是否像文件路径 + if "." in potential_file and "/" not in potential_file[:3]: + file_path = potential_file + # 尝试提取行号 + if len(parts) > 1: + remaining = parts[1].strip() + line_match = remaining.split()[0] if remaining else "" + if line_match.isdigit(): + line_start = int(line_match) + + string_finding = { + "title": f[:100] if len(f) > 100 else f, + "description": f, + "file_path": file_path, + "line_start": line_start, + "severity": "medium", # 默认中等严重度,Analysis 会重新评估 + "vulnerability_type": "potential_issue", + "source": "recon", + "needs_verification": True, + "confidence": 0.5, # 较低置信度,需要进一步分析 + } + logger.info(f"[Orchestrator] Converted string finding to dict: {f[:80]}... (file={file_path}, line={line_start})") + raw_findings.append(string_finding) + else: + logger.info(f"[Orchestrator] {agent_name} has no 'initial_findings' key in data") # 🔥 Also check high_risk_areas from Recon for potential findings if agent_name == "recon" and "high_risk_areas" in data: high_risk = data.get("high_risk_areas", []) logger.info(f"[Orchestrator] {agent_name} identified {len(high_risk)} high risk areas") + # 🔥 FIX: 将 high_risk_areas 也转换为发现 + for area in high_risk: + if isinstance(area, str) and area.strip(): + # 尝试从描述中提取文件路径和漏洞类型 + file_path = "" + line_start = 0 + vuln_type = "potential_issue" + + # 🔥 FIX: 改进文件路径提取逻辑 + # 格式1: "file.py:36 - 描述" -> 提取 file.py 和 36 + # 格式2: "描述性文本" -> 不提取文件路径 + if ":" in area: + parts = area.split(":", 1) + potential_file = parts[0].strip() + # 只有当 parts[0] 看起来像文件路径时才提取 + # 文件路径通常包含 . 且没有空格(或只在结尾有扩展名) + if ("." in potential_file and + " " not in potential_file and + len(potential_file) < 100 and + any(potential_file.endswith(ext) for ext in ['.py', '.js', '.ts', '.java', '.go', '.php', '.rb', '.c', '.cpp', '.h'])): + file_path = potential_file + # 尝试提取行号 + if len(parts) > 1: + remaining = parts[1].strip() + line_match = remaining.split()[0] if remaining else "" + if line_match.isdigit(): + line_start = int(line_match) + + # 推断漏洞类型 + area_lower = area.lower() + if "command" in area_lower or "命令" in area_lower or "subprocess" in area_lower: + vuln_type = "command_injection" + elif "sql" in area_lower: + vuln_type = "sql_injection" + elif "xss" in area_lower: + vuln_type = "xss" + elif "path" in area_lower or "traversal" in area_lower or "路径" in area_lower: + vuln_type = "path_traversal" + elif "ssrf" in area_lower: + vuln_type = "ssrf" + elif "secret" in area_lower or "密钥" in area_lower or "key" in area_lower: + vuln_type = "hardcoded_secret" + + high_risk_finding = { + "title": area[:100] if len(area) > 100 else area, + "description": area, + "file_path": file_path, + "line_start": line_start, + "severity": "high", # 高风险区域默认高严重度 + "vulnerability_type": vuln_type, + "source": "recon_high_risk", + "needs_verification": True, + "confidence": 0.6, + } + raw_findings.append(high_risk_finding) + logger.info(f"[Orchestrator] Converted high_risk_area to finding: {area[:60]}... (file={file_path}, type={vuln_type})") + + # 🔥 初始化 valid_findings,确保后续代码可以访问 + valid_findings = [] if raw_findings: # 只添加字典格式的发现 diff --git a/backend/app/services/agent/agents/recon.py b/backend/app/services/agent/agents/recon.py index 4a76889..72f1792 100644 --- a/backend/app/services/agent/agents/recon.py +++ b/backend/app/services/agent/agents/recon.py @@ -70,20 +70,15 @@ class ReconAgent(BaseAgent): self._steps: List[ReconStep] = [] def _parse_llm_response(self, response: str) -> ReconStep: - """解析 LLM 响应""" + """解析 LLM 响应 - 增强版,更健壮地提取思考内容""" step = ReconStep(thought="") - - # 提取 Thought + + # 🔥 首先尝试提取明确的 Thought 标记 thought_match = re.search(r'Thought:\s*(.*?)(?=Action:|Final Answer:|$)', response, re.DOTALL) if thought_match: step.thought = thought_match.group(1).strip() - elif not re.search(r'Action:|Final Answer:', response): - # 🔥 Fallback: If no markers found, treat the whole response as Thought - # This prevents empty steps loops "Decision: Continue Thinking" - if response.strip(): - step.thought = response.strip() - - # 检查是否是最终答案 + + # 🔥 检查是否是最终答案 final_match = re.search(r'Final Answer:\s*(.*?)$', response, re.DOTALL) if final_match: step.is_final = True @@ -92,29 +87,45 @@ class ReconAgent(BaseAgent): answer_text = re.sub(r'```\s*', '', answer_text) # 使用增强的 JSON 解析器 step.final_answer = AgentJsonParser.parse( - answer_text, + answer_text, default={"raw_answer": answer_text} ) # 确保 findings 格式正确 if "initial_findings" in step.final_answer: step.final_answer["initial_findings"] = [ - f for f in step.final_answer["initial_findings"] + f for f in step.final_answer["initial_findings"] if isinstance(f, dict) ] + + # 🔥 如果没有提取到 thought,使用 Final Answer 前的内容作为思考 + if not step.thought: + before_final = response[:response.find('Final Answer:')].strip() + if before_final: + # 移除可能的 Thought: 前缀 + before_final = re.sub(r'^Thought:\s*', '', before_final) + step.thought = before_final[:500] if len(before_final) > 500 else before_final + return step - - # 提取 Action + + # 🔥 提取 Action action_match = re.search(r'Action:\s*(\w+)', response) if action_match: step.action = action_match.group(1).strip() - - # 提取 Action Input + + # 🔥 如果没有提取到 thought,提取 Action 之前的内容作为思考 + if not step.thought: + action_pos = response.find('Action:') + if action_pos > 0: + before_action = response[:action_pos].strip() + # 移除可能的 Thought: 前缀 + before_action = re.sub(r'^Thought:\s*', '', before_action) + if before_action: + step.thought = before_action[:500] if len(before_action) > 500 else before_action + + # 🔥 提取 Action Input input_match = re.search(r'Action Input:\s*(.*?)(?=Thought:|Action:|Observation:|$)', response, re.DOTALL) if input_match: input_text = input_match.group(1).strip() - - - input_text = re.sub(r'```json\s*', '', input_text) input_text = re.sub(r'```\s*', '', input_text) # 使用增强的 JSON 解析器 @@ -122,7 +133,12 @@ class ReconAgent(BaseAgent): input_text, default={"raw_input": input_text} ) - + + # 🔥 最后的 fallback:如果整个响应没有任何标记,整体作为思考 + if not step.thought and not step.action and not step.is_final: + if response.strip(): + step.thought = response.strip()[:500] + return step diff --git a/backend/app/services/agent/agents/verification.py b/backend/app/services/agent/agents/verification.py index a8d29c8..dff3595 100644 --- a/backend/app/services/agent/agents/verification.py +++ b/backend/app/services/agent/agents/verification.py @@ -44,13 +44,70 @@ VERIFICATION_SYSTEM_PROMPT = """你是 DeepAudit 的漏洞验证 Agent,一个* - **list_files**: 列出目录文件 参数: directory (str), pattern (str) -### 沙箱验证 (必须使用) +### 沙箱核心工具 - **sandbox_exec**: 在沙箱中执行命令 参数: command (str), timeout (int) - **sandbox_http**: 发送 HTTP 请求测试 参数: method (str), url (str), data (dict), headers (dict) - **verify_vulnerability**: 自动化漏洞验证 - 参数: vulnerability_type (str), target (str), payload (str) + 参数: vulnerability_type (str), target_url (str), payload (str), expected_pattern (str) + +### 🔥 多语言代码测试工具 (按语言选择) +- **php_test**: 测试 PHP 代码,支持模拟 GET/POST 参数 + 参数: file_path (str), php_code (str), get_params (dict), post_params (dict), timeout (int) + 示例: {"file_path": "vuln.php", "get_params": {"cmd": "whoami"}} + +- **python_test**: 测试 Python 代码,支持模拟 Flask/Django 请求 + 参数: file_path (str), code (str), request_params (dict), form_data (dict), timeout (int) + 示例: {"code": "import os; os.system(params['cmd'])", "request_params": {"cmd": "id"}} + +- **javascript_test**: 测试 JavaScript/Node.js 代码 + 参数: file_path (str), code (str), req_query (dict), req_body (dict), timeout (int) + 示例: {"code": "exec(req.query.cmd)", "req_query": {"cmd": "id"}} + +- **java_test**: 测试 Java 代码,支持模拟 Servlet 请求 + 参数: file_path (str), code (str), request_params (dict), timeout (int) + +- **go_test**: 测试 Go 代码 + 参数: file_path (str), code (str), args (list), timeout (int) + +- **ruby_test**: 测试 Ruby 代码,支持模拟 Rails 请求 + 参数: file_path (str), code (str), params (dict), timeout (int) + +- **shell_test**: 测试 Shell/Bash 脚本 + 参数: file_path (str), code (str), args (list), env (dict), timeout (int) + +- **universal_code_test**: 通用多语言测试工具 (自动检测语言) + 参数: language (str), file_path (str), code (str), params (dict), timeout (int) + +### 🔥 漏洞验证专用工具 (按漏洞类型选择,推荐使用) +- **test_command_injection**: 专门测试命令注入漏洞 + 参数: target_file (str), param_name (str), test_command (str), language (str) + 示例: {"target_file": "vuln.php", "param_name": "cmd", "test_command": "whoami"} + +- **test_sql_injection**: 专门测试 SQL 注入漏洞 + 参数: target_file (str), param_name (str), db_type (str), injection_type (str) + 示例: {"target_file": "login.php", "param_name": "username", "db_type": "mysql"} + +- **test_xss**: 专门测试 XSS 漏洞 + 参数: target_file (str), param_name (str), xss_type (str), context (str) + 示例: {"target_file": "search.php", "param_name": "q", "xss_type": "reflected"} + +- **test_path_traversal**: 专门测试路径遍历漏洞 + 参数: target_file (str), param_name (str), target_path (str) + 示例: {"target_file": "download.php", "param_name": "file", "target_path": "/etc/passwd"} + +- **test_ssti**: 专门测试模板注入漏洞 + 参数: target_file (str), param_name (str), template_engine (str) + 示例: {"target_file": "render.py", "param_name": "name", "template_engine": "jinja2"} + +- **test_deserialization**: 专门测试反序列化漏洞 + 参数: target_file (str), language (str), serialization_format (str) + 示例: {"target_file": "api.php", "language": "php", "serialization_format": "php_serialize"} + +- **universal_vuln_test**: 通用漏洞测试工具 (自动选择测试策略) + 参数: vuln_type (str), target_file (str), param_name (str), additional_params (dict) + 支持: command_injection, sql_injection, xss, path_traversal, ssti, deserialization ## 工作方式 你将收到一批待验证的漏洞发现。对于每个发现,你需要: @@ -82,7 +139,7 @@ Final Answer: [JSON 格式的验证报告] "poc": { "description": "PoC 描述", "steps": ["步骤1", "步骤2"], - "payload": "测试 payload" + "payload": "curl 'http://target/vuln.php?cmd=id' 或完整利用代码" }, "impact": "实际影响分析", "recommendation": "修复建议" @@ -104,20 +161,56 @@ Final Answer: [JSON 格式的验证报告] - **false_positive**: 确认是误报,有明确理由 ## 验证策略建议 + +### 对于命令注入漏洞 +1. 使用 **test_command_injection** 工具,它会自动构建测试环境 +2. 或使用对应语言的测试工具 (php_test, python_test 等) +3. 检查命令输出是否包含 uid=, root, www-data 等特征 + +### 对于 SQL 注入漏洞 +1. 使用 **test_sql_injection** 工具 +2. 提供数据库类型 (mysql, postgresql, sqlite) +3. 检查是否能执行 UNION 查询或提取数据 + +### 对于 XSS 漏洞 +1. 使用 **test_xss** 工具 +2. 指定 XSS 类型 (reflected, stored, dom) +3. 检查 payload 是否在输出中未转义 + +### 对于路径遍历漏洞 +1. 使用 **test_path_traversal** 工具 +2. 尝试读取 /etc/passwd 或其他已知文件 +3. 检查是否能访问目标文件 + +### 对于模板注入 (SSTI) 漏洞 +1. 使用 **test_ssti** 工具 +2. 指定模板引擎 (jinja2, twig, freemarker 等) +3. 检查数学表达式是否被执行 + +### 对于反序列化漏洞 +1. 使用 **test_deserialization** 工具 +2. 指定语言和序列化格式 +3. 检查是否能执行任意代码 + +### 对于其他漏洞 1. **上下文分析**: 用 read_file 获取更多代码上下文 -2. **数据流追踪**: 用 dataflow_analysis 确认污点传播 -3. **LLM 深度分析**: 用 vulnerability_validation 进行专业分析 -4. **沙箱测试**: 对高危漏洞用沙箱进行安全测试 +2. **通用测试**: 使用 universal_vuln_test 或 universal_code_test +3. **沙箱测试**: 对高危漏洞用沙箱进行安全测试 ## 重要原则 1. **质量优先** - 宁可漏报也不要误报太多 2. **深入理解** - 理解代码逻辑,不要表面判断 3. **证据支撑** - 判定要有依据 4. **安全第一** - 沙箱测试要谨慎 -5. **🔥 PoC 生成** - 对于 confirmed 和 likely 的漏洞,**必须**生成 PoC: +5. **🔥 PoC 生成** - 对于 confirmed 和 likely 的漏洞,**必须**生成完整的 PoC: - poc.description: 简要描述这个 PoC 的作用 - poc.steps: 详细的复现步骤列表 - - poc.payload: 实际的攻击载荷或测试代码 + - poc.payload: **完整的**利用代码或命令,例如: + - Web漏洞: 完整URL如 `http://target/path?param=` + - 命令注入: 完整的 curl 命令或 HTTP 请求 + - SQL注入: 完整的利用语句或请求 + - 代码执行: 可直接运行的利用脚本 + - ⚠️ payload 字段必须是**可直接复制执行**的完整利用代码,不要只写参数值 现在开始验证漏洞发现!""" @@ -168,19 +261,15 @@ class VerificationAgent(BaseAgent): def _parse_llm_response(self, response: str) -> VerificationStep: - """解析 LLM 响应""" + """解析 LLM 响应 - 增强版,更健壮地提取思考内容""" step = VerificationStep(thought="") - - # 提取 Thought + + # 🔥 首先尝试提取明确的 Thought 标记 thought_match = re.search(r'Thought:\s*(.*?)(?=Action:|Final Answer:|$)', response, re.DOTALL) if thought_match: step.thought = thought_match.group(1).strip() - elif not re.search(r'Action:|Final Answer:', response): - # 🔥 Fallback: If no markers found, treat the whole response as Thought - if response.strip(): - step.thought = response.strip() - - # 检查是否是最终答案 + + # 🔥 检查是否是最终答案 final_match = re.search(r'Final Answer:\s*(.*?)$', response, re.DOTALL) if final_match: step.is_final = True @@ -189,23 +278,40 @@ class VerificationAgent(BaseAgent): answer_text = re.sub(r'```\s*', '', answer_text) # 使用增强的 JSON 解析器 step.final_answer = AgentJsonParser.parse( - answer_text, + answer_text, default={"findings": [], "raw_answer": answer_text} ) # 确保 findings 格式正确 if "findings" in step.final_answer: step.final_answer["findings"] = [ - f for f in step.final_answer["findings"] + f for f in step.final_answer["findings"] if isinstance(f, dict) ] + + # 🔥 如果没有提取到 thought,使用 Final Answer 前的内容作为思考 + if not step.thought: + before_final = response[:response.find('Final Answer:')].strip() + if before_final: + before_final = re.sub(r'^Thought:\s*', '', before_final) + step.thought = before_final[:500] if len(before_final) > 500 else before_final + return step - - # 提取 Action + + # 🔥 提取 Action action_match = re.search(r'Action:\s*(\w+)', response) if action_match: step.action = action_match.group(1).strip() - - # 提取 Action Input + + # 🔥 如果没有提取到 thought,提取 Action 之前的内容作为思考 + if not step.thought: + action_pos = response.find('Action:') + if action_pos > 0: + before_action = response[:action_pos].strip() + before_action = re.sub(r'^Thought:\s*', '', before_action) + if before_action: + step.thought = before_action[:500] if len(before_action) > 500 else before_action + + # 🔥 提取 Action Input input_match = re.search(r'Action Input:\s*(.*?)(?=Thought:|Action:|Observation:|$)', response, re.DOTALL) if input_match: input_text = input_match.group(1).strip() @@ -216,7 +322,12 @@ class VerificationAgent(BaseAgent): input_text, default={"raw_input": input_text} ) - + + # 🔥 最后的 fallback:如果整个响应没有任何标记,整体作为思考 + if not step.thought and not step.action and not step.is_final: + if response.strip(): + step.thought = response.strip()[:500] + return step async def run(self, input_data: Dict[str, Any]) -> AgentResult: @@ -297,7 +408,24 @@ class VerificationAgent(BaseAgent): # 去重 findings_to_verify = self._deduplicate(findings_to_verify) - + + # 🔥 FIX: 优先处理有明确文件路径的发现,将没有文件路径的发现放到后面 + # 这确保 Analysis 的具体发现优先于 Recon 的泛化描述 + def has_valid_file_path(finding: Dict) -> bool: + file_path = finding.get("file_path", "") + return bool(file_path and file_path.strip() and file_path.lower() not in ["unknown", "n/a", ""]) + + findings_with_path = [f for f in findings_to_verify if has_valid_file_path(f)] + findings_without_path = [f for f in findings_to_verify if not has_valid_file_path(f)] + + # 合并:有路径的在前,没路径的在后 + findings_to_verify = findings_with_path + findings_without_path + + if findings_with_path: + logger.info(f"[Verification] 优先处理 {len(findings_with_path)} 个有明确文件路径的发现") + if findings_without_path: + logger.info(f"[Verification] 还有 {len(findings_without_path)} 个发现需要自行定位文件") + if not findings_to_verify: logger.warning(f"[Verification] 没有需要验证的发现! previous_results keys: {list(previous_results.keys()) if isinstance(previous_results, dict) else 'not dict'}") await self.emit_event("warning", "没有需要验证的发现 - 可能是数据格式问题") @@ -322,11 +450,25 @@ class VerificationAgent(BaseAgent): findings_summary = [] for i, f in enumerate(findings_to_verify): + # 🔥 FIX: 正确处理 file_path 格式,可能包含行号 (如 "app.py:36") + file_path = f.get('file_path', 'unknown') + line_start = f.get('line_start', 0) + + # 如果 file_path 已包含行号,提取出来 + if isinstance(file_path, str) and ':' in file_path: + parts = file_path.split(':', 1) + if len(parts) == 2 and parts[1].split()[0].isdigit(): + file_path = parts[0] + try: + line_start = int(parts[1].split()[0]) + except ValueError: + pass + findings_summary.append(f""" ### 发现 {i+1}: {f.get('title', 'Unknown')} - 类型: {f.get('vulnerability_type', 'unknown')} - 严重度: {f.get('severity', 'medium')} -- 文件: {f.get('file_path', 'unknown')}:{f.get('line_start', 0)} +- 文件: {file_path} (行 {line_start}) - 代码: ``` {f.get('code_snippet', 'N/A')[:500]} @@ -341,13 +483,22 @@ class VerificationAgent(BaseAgent): ## 待验证发现 {''.join(findings_summary)} +## ⚠️ 重要验证指南 +1. **直接使用上面列出的文件路径** - 不要猜测或搜索其他路径 +2. **如果文件路径包含冒号和行号** (如 "app.py:36"), 请提取文件名 "app.py" 并使用 read_file 读取 +3. **先读取文件内容,再判断漏洞是否存在** +4. **不要假设文件在子目录中** - 使用发现中提供的精确路径 + ## 验证要求 - 验证级别: {config.get('verification_level', 'standard')} ## 可用工具 {self.get_tools_description()} -请开始验证。对于每个发现,思考如何验证它,使用合适的工具获取更多信息,然后判断是否为真实漏洞。 +请开始验证。对于每个发现: +1. 首先使用 read_file 读取发现中指定的文件(使用精确路径) +2. 分析代码上下文 +3. 判断是否为真实漏洞 {f"特别注意 Analysis Agent 提到的关注点。" if handoff_context else ""}""" # 初始化对话历史 diff --git a/backend/app/services/agent/graph/runner.py b/backend/app/services/agent/graph/runner.py index 69822d5..f50d88c 100644 --- a/backend/app/services/agent/graph/runner.py +++ b/backend/app/services/agent/graph/runner.py @@ -197,6 +197,13 @@ class AgentRunner: from app.services.agent.tools import ( ThinkTool, ReflectTool, CreateVulnerabilityReportTool, + # 多语言代码测试工具 + PhpTestTool, PythonTestTool, JavaScriptTestTool, JavaTestTool, + GoTestTool, RubyTestTool, ShellTestTool, UniversalCodeTestTool, + # 漏洞验证专用工具 + CommandInjectionTestTool, SqlInjectionTestTool, XssTestTool, + PathTraversalTestTool, SstiTestTool, DeserializationTestTool, + UniversalVulnTestTool, ) # 🔥 导入知识查询工具 from app.services.agent.knowledge import ( @@ -282,6 +289,8 @@ class AgentRunner: network_mode=settings.SANDBOX_NETWORK_MODE, ) self.sandbox_manager = SandboxManager(config=sandbox_config) + # 🔥 必须调用 initialize() 来连接 Docker + await self.sandbox_manager.initialize() except Exception as e: logger.warning(f"❌ Sandbox Manager initialization failed: {e}") import traceback @@ -289,15 +298,38 @@ class AgentRunner: # 尝试创建默认管理器作为后备 try: self.sandbox_manager = SandboxManager() + # 🔥 同样需要调用 initialize() + await self.sandbox_manager.initialize() logger.info("⚠️ Created fallback SandboxManager (Docker might be unavailable)") except Exception as e2: logger.error(f"❌ Failed to create fallback SandboxManager: {e2}") # 始终注册沙箱工具,即使 Docker 不可用(工具内部会检查) if self.sandbox_manager: + # 🔥 沙箱核心工具 self.verification_tools["sandbox_exec"] = SandboxTool(self.sandbox_manager) self.verification_tools["sandbox_http"] = SandboxHttpTool(self.sandbox_manager) self.verification_tools["verify_vulnerability"] = VulnerabilityVerifyTool(self.sandbox_manager) + + # 🔥 多语言代码测试工具 + self.verification_tools["php_test"] = PhpTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["python_test"] = PythonTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["javascript_test"] = JavaScriptTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["java_test"] = JavaTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["go_test"] = GoTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["ruby_test"] = RubyTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["shell_test"] = ShellTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["universal_code_test"] = UniversalCodeTestTool(self.sandbox_manager, self.project_root) + + # 🔥 漏洞验证专用工具 + self.verification_tools["test_command_injection"] = CommandInjectionTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["test_sql_injection"] = SqlInjectionTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["test_xss"] = XssTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["test_path_traversal"] = PathTraversalTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["test_ssti"] = SstiTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["test_deserialization"] = DeserializationTestTool(self.sandbox_manager, self.project_root) + self.verification_tools["universal_vuln_test"] = UniversalVulnTestTool(self.sandbox_manager, self.project_root) + logger.info(f"✅ Sandbox tools initialized (Docker available: {self.sandbox_manager.is_available})") else: logger.error("❌ Sandbox tools NOT initialized due to critical manager failure") diff --git a/backend/app/services/agent/json_parser.py b/backend/app/services/agent/json_parser.py index ec9c2cd..6347130 100644 --- a/backend/app/services/agent/json_parser.py +++ b/backend/app/services/agent/json_parser.py @@ -1,6 +1,6 @@ """ Agent JSON 解析工具 -从 LLM 响应中安全地解析 JSON,参考 llm/service.py 的实现 +从 LLM 响应中安全地解析 JSON,优先使用 json-repair 库 """ import json @@ -14,14 +14,15 @@ logger = logging.getLogger(__name__) try: from json_repair import repair_json JSON_REPAIR_AVAILABLE = True + logger.info("✅ json-repair 库已加载") except ImportError: JSON_REPAIR_AVAILABLE = False - logger.debug("json-repair library not available") + logger.warning("⚠️ json-repair 库未安装,将使用备用解析方法") class AgentJsonParser: - """Agent 专用的 JSON 解析器""" - + """Agent 专用的 JSON 解析器 - 优先使用 json-repair""" + @staticmethod def clean_text(text: str) -> str: """清理文本中的控制字符""" @@ -30,7 +31,7 @@ class AgentJsonParser: # 移除 BOM 和零宽字符 text = text.replace('\ufeff', '').replace('\u200b', '').replace('\u200c', '').replace('\u200d', '') return text - + @staticmethod def fix_json_format(text: str) -> str: """修复常见的 JSON 格式问题""" @@ -40,7 +41,66 @@ class AgentJsonParser: # 修复未转义的换行符(在字符串值中) text = re.sub(r':\s*"([^"]*)\n([^"]*)"', r': "\1\\n\2"', text) return text - + + @classmethod + def extract_json_string(cls, text: str) -> str: + """从文本中提取 JSON 字符串部分""" + # 先尝试从 markdown 代码块提取 + md_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text) + if md_match: + return md_match.group(1).strip() + + # 找到第一个 { 或 [ + start_brace = text.find('{') + start_bracket = text.find('[') + + if start_brace == -1 and start_bracket == -1: + return text # 没有找到,返回原文 + + if start_brace == -1: + start_idx = start_bracket + elif start_bracket == -1: + start_idx = start_brace + else: + start_idx = min(start_brace, start_bracket) + + # 找到最后一个 } 或 ] + end_brace = text.rfind('}') + end_bracket = text.rfind(']') + + if end_brace == -1 and end_bracket == -1: + return text[start_idx:] # 没有找到结尾,返回从开始到末尾 + + end_idx = max(end_brace, end_bracket) + 1 + + return text[start_idx:end_idx] + + @classmethod + def repair_with_library(cls, text: str) -> Dict[str, Any]: + """使用 json-repair 库修复并解析 JSON""" + if not JSON_REPAIR_AVAILABLE: + raise ValueError("json-repair library not available") + + # 提取 JSON 字符串 + json_str = cls.extract_json_string(text) + + if not json_str.strip(): + raise ValueError("No JSON content found") + + # 使用 json-repair 修复并解析 + repaired = repair_json(json_str, return_objects=True) + + if isinstance(repaired, dict): + return repaired + elif isinstance(repaired, list): + # 如果返回列表,包装为字典 + return {"items": repaired} + elif isinstance(repaired, str): + # 如果返回字符串,尝试再次解析 + return json.loads(repaired) + + raise ValueError(f"json-repair returned unexpected type: {type(repaired)}") + @classmethod def extract_from_markdown(cls, text: str) -> Dict[str, Any]: """从 markdown 代码块提取 JSON""" @@ -48,35 +108,35 @@ class AgentJsonParser: if match: return json.loads(match.group(1)) raise ValueError("No markdown code block found") - + @classmethod def extract_json_object(cls, text: str) -> Dict[str, Any]: """智能提取 JSON 对象""" start_idx = text.find('{') if start_idx == -1: raise ValueError("No JSON object found") - + # 考虑字符串内的花括号和转义字符 brace_count = 0 in_string = False escape_next = False end_idx = -1 - + for i in range(start_idx, len(text)): char = text[i] - + if escape_next: escape_next = False continue - + if char == '\\': escape_next = True continue - + if char == '"' and not escape_next: in_string = not in_string continue - + if not in_string: if char == '{': brace_count += 1 @@ -85,7 +145,7 @@ class AgentJsonParser: if brace_count == 0: end_idx = i + 1 break - + if end_idx == -1: # 如果找不到完整的 JSON,尝试使用最后一个 } last_brace = text.rfind('}') @@ -93,68 +153,45 @@ class AgentJsonParser: end_idx = last_brace + 1 else: raise ValueError("Incomplete JSON object") - + json_str = text[start_idx:end_idx] # 修复格式问题 json_str = re.sub(r',(\s*[}\]])', r'\1', json_str) - + return json.loads(json_str) - + @classmethod def fix_truncated_json(cls, text: str) -> Dict[str, Any]: """修复截断的 JSON""" start_idx = text.find('{') if start_idx == -1: raise ValueError("Cannot fix truncated JSON") - + json_str = text[start_idx:] - + # 计算缺失的闭合符号 open_braces = json_str.count('{') close_braces = json_str.count('}') open_brackets = json_str.count('[') close_brackets = json_str.count(']') - + # 补全缺失的闭合符号 json_str += ']' * max(0, open_brackets - close_brackets) json_str += '}' * max(0, open_braces - close_braces) - + # 修复格式 json_str = re.sub(r',(\s*[}\]])', r'\1', json_str) return json.loads(json_str) - - @classmethod - def repair_with_library(cls, text: str) -> Dict[str, Any]: - """使用 json-repair 库修复损坏的 JSON""" - if not JSON_REPAIR_AVAILABLE: - raise ValueError("json-repair library not available") - - start_idx = text.find('{') - if start_idx == -1: - raise ValueError("No JSON object found for repair") - - end_idx = text.rfind('}') - if end_idx > start_idx: - json_str = text[start_idx:end_idx + 1] - else: - json_str = text[start_idx:] - - repaired = repair_json(json_str, return_objects=True) - - if isinstance(repaired, dict): - return repaired - - raise ValueError(f"json-repair returned unexpected type: {type(repaired)}") - + @classmethod def parse(cls, text: str, default: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ - 从 LLM 响应中解析 JSON(增强版) - + 从 LLM 响应中解析 JSON(优先使用 json-repair) + Args: text: LLM 响应文本 default: 解析失败时返回的默认值,如果为 None 则抛出异常 - + Returns: 解析后的字典 """ @@ -163,19 +200,25 @@ class AgentJsonParser: logger.warning("LLM 响应为空,返回默认值") return default raise ValueError("LLM 响应内容为空") - + clean = cls.clean_text(text) - - # 尝试多种方式解析 - attempts = [ + + # 🔥 优先使用 json-repair,它能处理大多数格式问题 + attempts = [] + + # 如果 json-repair 可用,优先使用它 + if JSON_REPAIR_AVAILABLE: + attempts.append(("json-repair", lambda: cls.repair_with_library(text))) + + # 然后尝试其他方法作为后备 + attempts.extend([ ("直接解析", lambda: json.loads(text)), ("清理后解析", lambda: json.loads(cls.fix_json_format(clean))), ("Markdown 提取", lambda: cls.extract_from_markdown(text)), ("智能提取", lambda: cls.extract_json_object(clean)), ("截断修复", lambda: cls.fix_truncated_json(clean)), - ("json-repair", lambda: cls.repair_with_library(text)), - ] - + ]) + last_error = None for name, attempt in attempts: try: @@ -187,30 +230,30 @@ class AgentJsonParser: except Exception as e: last_error = e logger.debug(f"JSON 解析方法 '{name}' 失败: {e}") - + # 所有尝试都失败 if default is not None: logger.warning(f"JSON 解析失败,返回默认值。原始内容: {text[:200]}...") return default - + logger.error(f"❌ 无法解析 JSON,原始内容: {text[:500]}...") raise ValueError(f"无法解析 JSON: {last_error}") - + @classmethod def parse_findings(cls, text: str) -> List[Dict[str, Any]]: """ 专门解析 findings 列表 - + Args: text: LLM 响应文本 - + Returns: findings 列表(每个元素都是字典) """ try: result = cls.parse(text, default={"findings": []}) findings = result.get("findings", []) - + # 确保每个 finding 都是字典 valid_findings = [] for f in findings: @@ -219,33 +262,70 @@ class AgentJsonParser: elif isinstance(f, str): # 尝试将字符串解析为 JSON try: - parsed = json.loads(f) + # 优先使用 json-repair + if JSON_REPAIR_AVAILABLE: + parsed = repair_json(f, return_objects=True) + else: + parsed = json.loads(f) if isinstance(parsed, dict): valid_findings.append(parsed) - except json.JSONDecodeError: + except Exception: logger.warning(f"跳过无效的 finding(字符串): {f[:100]}...") else: logger.warning(f"跳过无效的 finding(类型: {type(f)})") - + return valid_findings - + except Exception as e: logger.error(f"解析 findings 失败: {e}") return [] - + @classmethod def safe_get(cls, data: Union[Dict, str, Any], key: str, default: Any = None) -> Any: """ 安全地从数据中获取值 - + Args: data: 可能是字典或其他类型 key: 要获取的键 default: 默认值 - + Returns: 获取的值或默认值 """ if isinstance(data, dict): return data.get(key, default) return default + + @classmethod + def parse_any(cls, text: str, default: Any = None) -> Any: + """ + 解析任意 JSON 类型(对象、数组、字符串等) + + Args: + text: LLM 响应文本 + default: 解析失败时返回的默认值 + + Returns: + 解析后的 Python 对象 + """ + if not text or not text.strip(): + return default + + clean = cls.clean_text(text) + json_str = cls.extract_json_string(clean) + + # 优先使用 json-repair + if JSON_REPAIR_AVAILABLE: + try: + return repair_json(json_str, return_objects=True) + except Exception as e: + logger.debug(f"json-repair 解析失败: {e}") + + # 后备方法 + try: + return json.loads(json_str) + except Exception as e: + logger.debug(f"标准 JSON 解析失败: {e}") + + return default diff --git a/backend/app/services/agent/prompts/system_prompts.py b/backend/app/services/agent/prompts/system_prompts.py index 650b9da..efcd467 100644 --- a/backend/app/services/agent/prompts/system_prompts.py +++ b/backend/app/services/agent/prompts/system_prompts.py @@ -130,6 +130,34 @@ Action: 工具名称 Action Input: {"参数1": "值1", "参数2": "值2"} ``` +### 错误处理指南 + +当工具执行返回错误时,你会收到详细的错误信息,包括: +- 工具名称和参数 +- 错误类型和错误信息 +- 堆栈跟踪(如有) + +**错误处理策略**: + +1. **参数错误** - 检查并修正参数格式 + - 确保 JSON 格式正确 + - 检查必填参数是否提供 + - 验证参数类型(字符串、数字、列表等) + +2. **资源不存在** - 调整目标 + - 文件不存在:使用 list_files 确认路径 + - 工具不可用:使用其他替代工具 + +3. **权限/超时错误** - 跳过或简化 + - 记录问题,继续其他分析 + - 尝试更小范围的操作 + +4. **沙箱错误** - 检查环境 + - Docker 不可用时使用代码分析替代 + - 记录无法验证的原因 + +**重要**:遇到错误时,不要放弃!分析错误原因,尝试其他方法完成任务。 + ### 完成输出格式 ``` @@ -379,6 +407,22 @@ RECON_SYSTEM_PROMPT = f"""你是 DeepAudit 的侦察 Agent,负责收集和分 - 调试设置 - 密钥管理 +## 工作方式 +每一步,你需要输出: + +``` +Thought: [分析当前情况,思考需要收集什么信息] +Action: [工具名称] +Action Input: {{"参数1": "值1"}} +``` + +当你完成信息收集后,输出: + +``` +Thought: [总结收集到的所有信息] +Final Answer: [JSON 格式的结果] +``` + ## 输出格式 ``` @@ -392,12 +436,33 @@ Final Answer: {{ "entry_points": [ {{"type": "...", "file": "...", "line": ..., "method": "..."}} ], - "high_risk_areas": [...], - "initial_findings": [...], + "high_risk_areas": [ + "文件路径:行号 - 风险描述" + ], + "initial_findings": [ + {{"title": "...", "file_path": "...", "line_start": ..., "description": "..."}} + ], "summary": "项目侦察总结" }} ``` +## ⚠️ 重要输出要求 + +### high_risk_areas 格式要求 +每个高风险区域**必须**包含具体的文件路径,格式为: +- `"app.py:36 - SECRET_KEY 硬编码"` +- `"utils/file.py:120 - 使用用户输入构造文件路径"` +- `"api/views.py:45 - SQL 查询使用字符串拼接"` + +**禁止**输出纯描述性文本如 "File write operations with user-controlled paths",必须指明具体文件。 + +### initial_findings 格式要求 +每个发现**必须**包含: +- `title`: 漏洞标题 +- `file_path`: 具体文件路径 +- `line_start`: 行号 +- `description`: 详细描述 + {TOOL_USAGE_GUIDE} """ diff --git a/backend/app/services/agent/tools/__init__.py b/backend/app/services/agent/tools/__init__.py index 7bb6d86..d9ad13f 100644 --- a/backend/app/services/agent/tools/__init__.py +++ b/backend/app/services/agent/tools/__init__.py @@ -15,7 +15,35 @@ from .rag_tool import RAGQueryTool, SecurityCodeSearchTool, FunctionContextTool from .pattern_tool import PatternMatchTool from .code_analysis_tool import CodeAnalysisTool, DataFlowAnalysisTool, VulnerabilityValidationTool from .file_tool import FileReadTool, FileSearchTool, ListFilesTool -from .sandbox_tool import SandboxTool, SandboxHttpTool, VulnerabilityVerifyTool, SandboxManager +from .sandbox_tool import ( + SandboxTool, + SandboxHttpTool, + VulnerabilityVerifyTool, + SandboxManager, +) + +# 🔥 多语言代码测试工具 +from .sandbox_language import ( + PhpTestTool, + PythonTestTool, + JavaScriptTestTool, + JavaTestTool, + GoTestTool, + RubyTestTool, + ShellTestTool, + UniversalCodeTestTool, +) + +# 🔥 漏洞验证专用工具 +from .sandbox_vuln import ( + CommandInjectionTestTool, + SqlInjectionTestTool, + XssTestTool, + PathTraversalTestTool, + SstiTestTool, + DeserializationTestTool, + UniversalVulnTestTool, +) # 外部安全工具 from .external_tools import ( @@ -77,6 +105,25 @@ __all__ = [ "SandboxHttpTool", "VulnerabilityVerifyTool", "SandboxManager", + + # 🔥 多语言代码测试工具 + "PhpTestTool", + "PythonTestTool", + "JavaScriptTestTool", + "JavaTestTool", + "GoTestTool", + "RubyTestTool", + "ShellTestTool", + "UniversalCodeTestTool", + + # 🔥 漏洞验证专用工具 + "CommandInjectionTestTool", + "SqlInjectionTestTool", + "XssTestTool", + "PathTraversalTestTool", + "SstiTestTool", + "DeserializationTestTool", + "UniversalVulnTestTool", # 外部安全工具 "SemgrepTool", diff --git a/backend/app/services/agent/tools/sandbox_language.py b/backend/app/services/agent/tools/sandbox_language.py new file mode 100644 index 0000000..b41a087 --- /dev/null +++ b/backend/app/services/agent/tools/sandbox_language.py @@ -0,0 +1,1162 @@ +""" +多语言代码测试工具 +支持 PHP, Python, JavaScript, Java, Go, Ruby 等语言的沙箱测试 +""" + +import asyncio +import json +import logging +import os +import tempfile +from typing import Optional, Dict, Any, List +from pydantic import BaseModel, Field +from dataclasses import dataclass + +from .base import AgentTool, ToolResult +from .sandbox_tool import SandboxManager + +logger = logging.getLogger(__name__) + + +# ============ 通用语言测试基类 ============ + +class LanguageTestInput(BaseModel): + """语言测试通用输入""" + code: Optional[str] = Field(default=None, description="要执行的代码(与 file_path 二选一)") + file_path: Optional[str] = Field(default=None, description="项目中的文件路径(与 code 二选一)") + params: Optional[Dict[str, str]] = Field(default=None, description="模拟的请求参数") + env_vars: Optional[Dict[str, str]] = Field(default=None, description="环境变量") + timeout: int = Field(default=30, description="超时时间(秒)") + + +class BaseLanguageTestTool(AgentTool): + """语言测试工具基类""" + + LANGUAGE_NAME = "unknown" + LANGUAGE_CMD = "echo" + FILE_EXTENSION = ".txt" + + def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."): + super().__init__() + self.sandbox_manager = sandbox_manager or SandboxManager() + self.project_root = project_root + + @property + def args_schema(self): + return LanguageTestInput + + def _read_file(self, file_path: str) -> Optional[str]: + """读取文件内容""" + full_path = os.path.join(self.project_root, file_path) + if not os.path.exists(full_path): + return None + with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: + return f.read() + + def _build_wrapper_code(self, code: str, params: Optional[Dict[str, str]]) -> str: + """构建包装代码 - 子类实现""" + raise NotImplementedError + + def _build_command(self, code: str) -> str: + """构建执行命令 - 子类实现""" + raise NotImplementedError + + def _analyze_output(self, result: Dict[str, Any], params: Optional[Dict[str, str]]) -> Dict[str, Any]: + """分析输出结果""" + is_vulnerable = False + evidence = None + + if result["exit_code"] == 0 and result.get("stdout"): + stdout = result["stdout"].strip().lower() + + # 通用漏洞特征检测 + vuln_indicators = [ + ("uid=", "命令执行成功 (uid)"), + ("root:", "命令执行成功 (passwd)"), + ("www-data", "命令执行成功 (www-data)"), + ("nobody", "命令执行成功 (nobody)"), + ("daemon", "命令执行成功 (daemon)"), + ("/bin/", "路径泄露"), + ("/etc/", "敏感路径访问"), + ("sql syntax", "SQL 错误"), + ("mysql", "数据库信息泄露"), + ("postgresql", "数据库信息泄露"), + ("sqlite", "数据库信息泄露"), + ("syntax error", "代码执行错误"), + ("stack trace", "堆栈跟踪泄露"), + ("exception", "异常信息泄露"), + ] + + for indicator, desc in vuln_indicators: + if indicator in stdout: + is_vulnerable = True + evidence = f"{desc}: 输出包含 '{indicator}'" + break + + # 检查参数是否被执行 + if params and not is_vulnerable: + for key, value in params.items(): + if value.lower() in stdout: + is_vulnerable = True + evidence = f"参数 '{key}' 的值出现在输出中" + break + + return { + "is_vulnerable": is_vulnerable, + "evidence": evidence, + } + + async def _execute( + self, + code: Optional[str] = None, + file_path: Optional[str] = None, + params: Optional[Dict[str, str]] = None, + env_vars: Optional[Dict[str, str]] = None, + timeout: int = 30, + **kwargs + ) -> ToolResult: + """执行语言测试""" + try: + await self.sandbox_manager.initialize() + except Exception as e: + logger.warning(f"Sandbox init failed: {e}") + + if not self.sandbox_manager.is_available: + return ToolResult( + success=False, + error="沙箱环境不可用 (Docker Unavailable)", + ) + + # 获取代码 + if file_path: + code = self._read_file(file_path) + if code is None: + return ToolResult( + success=False, + error=f"文件不存在: {file_path}", + ) + + if not code: + return ToolResult( + success=False, + error="必须提供 code 或 file_path", + ) + + # 构建包装代码 + wrapped_code = self._build_wrapper_code(code, params) + + # 构建命令 + command = self._build_command(wrapped_code) + + # 执行 + result = await self.sandbox_manager.execute_command( + command=command, + timeout=timeout, + env=env_vars, + ) + + # 分析结果 + analysis = self._analyze_output(result, params) + + # 格式化输出 + output_parts = [f"🔬 {self.LANGUAGE_NAME} 测试结果\n"] + + if file_path: + output_parts.append(f"文件: {file_path}") + if params: + output_parts.append(f"参数: {json.dumps(params, ensure_ascii=False)}") + + output_parts.append(f"\n退出码: {result['exit_code']}") + + if result["stdout"]: + stdout = result["stdout"][:3000] + output_parts.append(f"\n输出:\n```\n{stdout}\n```") + + if result["stderr"]: + stderr = result["stderr"][:1000] + output_parts.append(f"\n错误:\n```\n{stderr}\n```") + + if analysis["is_vulnerable"]: + output_parts.append(f"\n🔴 **漏洞确认**: {analysis['evidence']}") + else: + output_parts.append(f"\n🟡 未能确认漏洞") + + return ToolResult( + success=True, + data="\n".join(output_parts), + metadata={ + "exit_code": result["exit_code"], + "is_vulnerable": analysis["is_vulnerable"], + "evidence": analysis["evidence"], + "language": self.LANGUAGE_NAME, + } + ) + + +# ============ PHP 测试工具 ============ + +class PhpTestTool(BaseLanguageTestTool): + """PHP 代码测试工具""" + + LANGUAGE_NAME = "PHP" + LANGUAGE_CMD = "php" + FILE_EXTENSION = ".php" + + @property + def name(self) -> str: + return "php_test" + + @property + def description(self) -> str: + return """在沙箱中测试 PHP 代码,支持模拟 $_GET/$_POST/$_REQUEST 参数。 + +输入: +- code: PHP 代码(与 file_path 二选一) +- file_path: 项目中的 PHP 文件路径 +- params: 模拟参数,如 {"cmd": "whoami", "id": "1"} +- timeout: 超时秒数 + +示例: +1. 测试文件: {"file_path": "vuln.php", "params": {"cmd": "whoami"}} +2. 测试代码: {"code": "", "params": {"cmd": "id"}}""" + + def _build_wrapper_code(self, code: str, params: Optional[Dict[str, str]]) -> str: + """构建 PHP 包装代码 + + 注意: php -r 不需要 "): + clean_code = clean_code[:-2].strip() + + wrapper_parts.append(clean_code) + + return "\n".join(wrapper_parts) + + def _build_command(self, code: str) -> str: + """构建 PHP 执行命令""" + escaped_code = code.replace("'", "'\"'\"'") + return f"php -r '{escaped_code}'" + + +# ============ Python 测试工具 ============ + +class PythonTestInput(LanguageTestInput): + """Python 测试输入""" + flask_mode: bool = Field(default=False, description="是否模拟 Flask 请求环境") + django_mode: bool = Field(default=False, description="是否模拟 Django 请求环境") + + +class PythonTestTool(BaseLanguageTestTool): + """Python 代码测试工具""" + + LANGUAGE_NAME = "Python" + LANGUAGE_CMD = "python3" + FILE_EXTENSION = ".py" + + @property + def name(self) -> str: + return "python_test" + + @property + def description(self) -> str: + return """在沙箱中测试 Python 代码,支持模拟 Flask/Django 请求参数。 + +输入: +- code: Python 代码(与 file_path 二选一) +- file_path: 项目中的 Python 文件路径 +- params: 模拟参数,如 {"cmd": "whoami", "user_id": "1"} +- flask_mode: 是否模拟 Flask request.args/form +- django_mode: 是否模拟 Django request.GET/POST +- timeout: 超时秒数 + +示例: +1. Flask 模式: {"file_path": "app.py", "params": {"cmd": "id"}, "flask_mode": true} +2. 命令行参数: {"code": "import os; os.system(input())", "params": {"input": "whoami"}}""" + + @property + def args_schema(self): + return PythonTestInput + + def _build_wrapper_code(self, code: str, params: Optional[Dict[str, str]], + flask_mode: bool = False, django_mode: bool = False) -> str: + """构建 Python 包装代码""" + wrapper_parts = [] + + if params: + if flask_mode: + # 模拟 Flask request + wrapper_parts.append(""" +class MockMultiDict(dict): + def get(self, key, default=None, type=None): + value = super().get(key, default) + if type and value is not None: + try: + return type(value) + except: + return default + return value + def getlist(self, key): + value = self.get(key) + return [value] if value else [] + +class MockRequest: + def __init__(self, params): + self.args = MockMultiDict(params) + self.form = MockMultiDict(params) + self.values = MockMultiDict(params) + self.data = params + self.json = params + self.method = 'GET' + self.path = '/' + self.headers = {} + def get_json(self, force=False, silent=False): + return self.json + +import sys +sys.modules['flask'] = type(sys)('flask') +""") + params_str = json.dumps(params) + wrapper_parts.append(f"request = MockRequest({params_str})") + + elif django_mode: + # 模拟 Django request + wrapper_parts.append(""" +class MockQueryDict(dict): + def get(self, key, default=None): + return super().get(key, default) + def getlist(self, key): + value = self.get(key) + return [value] if value else [] + +class MockRequest: + def __init__(self, params): + self.GET = MockQueryDict(params) + self.POST = MockQueryDict(params) + self.method = 'GET' + self.path = '/' + self.META = {} + self.body = b'' +""") + params_str = json.dumps(params) + wrapper_parts.append(f"request = MockRequest({params_str})") + else: + # 普通模式:设置命令行参数和环境变量 + wrapper_parts.append("import sys, os") + args = ["script.py"] + list(params.values()) + wrapper_parts.append(f"sys.argv = {args}") + for key, value in params.items(): + wrapper_parts.append(f"os.environ['{key.upper()}'] = '{value}'") + + wrapper_parts.append(code) + return "\n".join(wrapper_parts) + + def _build_command(self, code: str) -> str: + """构建 Python 执行命令""" + escaped_code = code.replace("'", "'\"'\"'") + return f"python3 -c '{escaped_code}'" + + async def _execute( + self, + code: Optional[str] = None, + file_path: Optional[str] = None, + params: Optional[Dict[str, str]] = None, + env_vars: Optional[Dict[str, str]] = None, + timeout: int = 30, + flask_mode: bool = False, + django_mode: bool = False, + **kwargs + ) -> ToolResult: + """执行 Python 测试""" + try: + await self.sandbox_manager.initialize() + except Exception as e: + logger.warning(f"Sandbox init failed: {e}") + + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="沙箱环境不可用") + + if file_path: + code = self._read_file(file_path) + if code is None: + return ToolResult(success=False, error=f"文件不存在: {file_path}") + + if not code: + return ToolResult(success=False, error="必须提供 code 或 file_path") + + wrapped_code = self._build_wrapper_code(code, params, flask_mode, django_mode) + command = self._build_command(wrapped_code) + + result = await self.sandbox_manager.execute_command( + command=command, + timeout=timeout, + env=env_vars, + ) + + analysis = self._analyze_output(result, params) + + output_parts = [f"🐍 Python 测试结果\n"] + if file_path: + output_parts.append(f"文件: {file_path}") + if flask_mode: + output_parts.append("模式: Flask") + elif django_mode: + output_parts.append("模式: Django") + if params: + output_parts.append(f"参数: {json.dumps(params, ensure_ascii=False)}") + + output_parts.append(f"\n退出码: {result['exit_code']}") + + if result["stdout"]: + output_parts.append(f"\n输出:\n```\n{result['stdout'][:3000]}\n```") + if result["stderr"]: + output_parts.append(f"\n错误:\n```\n{result['stderr'][:1000]}\n```") + + if analysis["is_vulnerable"]: + output_parts.append(f"\n🔴 **漏洞确认**: {analysis['evidence']}") + else: + output_parts.append(f"\n🟡 未能确认漏洞") + + return ToolResult( + success=True, + data="\n".join(output_parts), + metadata={ + "exit_code": result["exit_code"], + "is_vulnerable": analysis["is_vulnerable"], + "evidence": analysis["evidence"], + "language": "Python", + } + ) + + +# ============ JavaScript/Node.js 测试工具 ============ + +class JavaScriptTestInput(LanguageTestInput): + """JavaScript 测试输入""" + express_mode: bool = Field(default=False, description="是否模拟 Express.js 请求环境") + + +class JavaScriptTestTool(BaseLanguageTestTool): + """JavaScript/Node.js 代码测试工具""" + + LANGUAGE_NAME = "JavaScript" + LANGUAGE_CMD = "node" + FILE_EXTENSION = ".js" + + @property + def name(self) -> str: + return "javascript_test" + + @property + def description(self) -> str: + return """在沙箱中测试 JavaScript/Node.js 代码,支持模拟 Express.js 请求。 + +输入: +- code: JavaScript 代码(与 file_path 二选一) +- file_path: 项目中的 JS 文件路径 +- params: 模拟参数,如 {"cmd": "whoami", "id": "1"} +- express_mode: 是否模拟 Express req 对象 +- timeout: 超时秒数 + +示例: +1. Express 模式: {"file_path": "route.js", "params": {"cmd": "id"}, "express_mode": true} +2. 普通模式: {"code": "require('child_process').execSync(process.argv[2])", "params": {"arg": "whoami"}}""" + + @property + def args_schema(self): + return JavaScriptTestInput + + def _build_wrapper_code(self, code: str, params: Optional[Dict[str, str]], + express_mode: bool = False) -> str: + """构建 JavaScript 包装代码""" + wrapper_parts = [] + + if params: + if express_mode: + # 模拟 Express request 对象 + params_json = json.dumps(params) + wrapper_parts.append(f""" +const req = {{ + query: {params_json}, + body: {params_json}, + params: {params_json}, + get: function(header) {{ return this.headers[header]; }}, + headers: {{}}, + method: 'GET', + path: '/', + url: '/', +}}; +const res = {{ + send: function(data) {{ console.log(data); return this; }}, + json: function(data) {{ console.log(JSON.stringify(data)); return this; }}, + status: function(code) {{ return this; }}, + end: function() {{ return this; }}, +}}; +""") + else: + # 普通模式:设置进程参数 + wrapper_parts.append("const params = " + json.dumps(params) + ";") + args = ["node", "script.js"] + list(params.values()) + wrapper_parts.append(f"process.argv = {json.dumps(args)};") + + wrapper_parts.append(code) + return "\n".join(wrapper_parts) + + def _build_command(self, code: str) -> str: + """构建 Node.js 执行命令""" + escaped_code = code.replace("'", "'\"'\"'") + return f"node -e '{escaped_code}'" + + async def _execute( + self, + code: Optional[str] = None, + file_path: Optional[str] = None, + params: Optional[Dict[str, str]] = None, + env_vars: Optional[Dict[str, str]] = None, + timeout: int = 30, + express_mode: bool = False, + **kwargs + ) -> ToolResult: + """执行 JavaScript 测试""" + try: + await self.sandbox_manager.initialize() + except Exception as e: + logger.warning(f"Sandbox init failed: {e}") + + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="沙箱环境不可用") + + if file_path: + code = self._read_file(file_path) + if code is None: + return ToolResult(success=False, error=f"文件不存在: {file_path}") + + if not code: + return ToolResult(success=False, error="必须提供 code 或 file_path") + + wrapped_code = self._build_wrapper_code(code, params, express_mode) + command = self._build_command(wrapped_code) + + result = await self.sandbox_manager.execute_command( + command=command, + timeout=timeout, + env=env_vars, + ) + + analysis = self._analyze_output(result, params) + + output_parts = [f"📜 JavaScript 测试结果\n"] + if file_path: + output_parts.append(f"文件: {file_path}") + if express_mode: + output_parts.append("模式: Express.js") + if params: + output_parts.append(f"参数: {json.dumps(params, ensure_ascii=False)}") + + output_parts.append(f"\n退出码: {result['exit_code']}") + + if result["stdout"]: + output_parts.append(f"\n输出:\n```\n{result['stdout'][:3000]}\n```") + if result["stderr"]: + output_parts.append(f"\n错误:\n```\n{result['stderr'][:1000]}\n```") + + if analysis["is_vulnerable"]: + output_parts.append(f"\n🔴 **漏洞确认**: {analysis['evidence']}") + else: + output_parts.append(f"\n🟡 未能确认漏洞") + + return ToolResult( + success=True, + data="\n".join(output_parts), + metadata={ + "exit_code": result["exit_code"], + "is_vulnerable": analysis["is_vulnerable"], + "evidence": analysis["evidence"], + "language": "JavaScript", + } + ) + + +# ============ Java 测试工具 ============ + +class JavaTestTool(BaseLanguageTestTool): + """Java 代码测试工具""" + + LANGUAGE_NAME = "Java" + FILE_EXTENSION = ".java" + + @property + def name(self) -> str: + return "java_test" + + @property + def description(self) -> str: + return """在沙箱中测试 Java 代码,支持模拟 Servlet 请求参数。 + +输入: +- code: Java 代码(与 file_path 二选一) +- file_path: 项目中的 Java 文件路径 +- params: 模拟参数,如 {"cmd": "whoami"} +- timeout: 超时秒数 + +示例: +{"code": "Runtime.getRuntime().exec(args[0])", "params": {"arg": "whoami"}} + +注意: Java 代码会被包装在 main 方法中执行。""" + + def _build_wrapper_code(self, code: str, params: Optional[Dict[str, str]]) -> str: + """构建 Java 包装代码""" + # 检测是否是完整类 + if "class " in code and "public static void main" in code: + return code + + # 构建模拟请求参数 + param_init = "" + if params: + params_entries = ", ".join([f'"{k}", "{v}"' for k, v in params.items()]) + param_init = f""" + java.util.Map request = new java.util.HashMap<>(); + String[][] entries = {{{params_entries.replace(', ', '}, {')}}}; + for (String[] e : entries) {{ request.put(e[0], e[1]); }} + String[] args = new String[]{{{', '.join([f'"{v}"' for v in params.values()])}}}; +""" + + wrapper = f""" +import java.io.*; +import java.util.*; + +public class Test {{ + public static void main(String[] argv) throws Exception {{ + {param_init} + {code} + }} +}} +""" + return wrapper + + def _build_command(self, code: str) -> str: + """构建 Java 执行命令""" + # Java 需要先编译再执行 + escaped_code = code.replace("'", "'\"'\"'").replace("\\", "\\\\") + return f"echo '{escaped_code}' > /tmp/Test.java && javac /tmp/Test.java && java -cp /tmp Test" + + async def _execute( + self, + code: Optional[str] = None, + file_path: Optional[str] = None, + params: Optional[Dict[str, str]] = None, + env_vars: Optional[Dict[str, str]] = None, + timeout: int = 60, # Java 编译需要更长时间 + **kwargs + ) -> ToolResult: + """执行 Java 测试""" + try: + await self.sandbox_manager.initialize() + except Exception as e: + logger.warning(f"Sandbox init failed: {e}") + + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="沙箱环境不可用") + + if file_path: + code = self._read_file(file_path) + if code is None: + return ToolResult(success=False, error=f"文件不存在: {file_path}") + + if not code: + return ToolResult(success=False, error="必须提供 code 或 file_path") + + wrapped_code = self._build_wrapper_code(code, params) + command = self._build_command(wrapped_code) + + result = await self.sandbox_manager.execute_command( + command=command, + timeout=timeout, + env=env_vars, + ) + + analysis = self._analyze_output(result, params) + + output_parts = [f"☕ Java 测试结果\n"] + if file_path: + output_parts.append(f"文件: {file_path}") + if params: + output_parts.append(f"参数: {json.dumps(params, ensure_ascii=False)}") + + output_parts.append(f"\n退出码: {result['exit_code']}") + + if result["stdout"]: + output_parts.append(f"\n输出:\n```\n{result['stdout'][:3000]}\n```") + if result["stderr"]: + output_parts.append(f"\n错误:\n```\n{result['stderr'][:1000]}\n```") + + if analysis["is_vulnerable"]: + output_parts.append(f"\n🔴 **漏洞确认**: {analysis['evidence']}") + else: + output_parts.append(f"\n🟡 未能确认漏洞") + + return ToolResult( + success=True, + data="\n".join(output_parts), + metadata={ + "exit_code": result["exit_code"], + "is_vulnerable": analysis["is_vulnerable"], + "evidence": analysis["evidence"], + "language": "Java", + } + ) + + +# ============ Go 测试工具 ============ + +class GoTestTool(BaseLanguageTestTool): + """Go 代码测试工具""" + + LANGUAGE_NAME = "Go" + FILE_EXTENSION = ".go" + + @property + def name(self) -> str: + return "go_test" + + @property + def description(self) -> str: + return """在沙箱中测试 Go 代码。 + +输入: +- code: Go 代码(与 file_path 二选一) +- file_path: 项目中的 Go 文件路径 +- params: 模拟参数(作为命令行参数或环境变量) +- timeout: 超时秒数 + +示例: +{"code": "exec.Command(os.Args[1]).Output()", "params": {"cmd": "whoami"}}""" + + def _build_wrapper_code(self, code: str, params: Optional[Dict[str, str]]) -> str: + """构建 Go 包装代码""" + # 检测是否是完整包 + if "package main" in code and "func main()" in code: + return code + + imports = ["fmt", "os"] + if "exec." in code: + imports.append("os/exec") + if "http." in code: + imports.append("net/http") + if "io" in code: + imports.append("io") + + imports_str = "\n".join([f' "{imp}"' for imp in imports]) + + # 模拟参数 + param_code = "" + if params: + args = ["program"] + list(params.values()) + param_code = f" os.Args = []string{{{', '.join([f'\"{a}\"' for a in args])}}}\n" + for key, value in params.items(): + param_code += f' os.Setenv("{key.upper()}", "{value}")\n' + + wrapper = f"""package main + +import ( +{imports_str} +) + +func main() {{ +{param_code} + {code} +}} +""" + return wrapper + + def _build_command(self, code: str) -> str: + """构建 Go 执行命令""" + escaped_code = code.replace("'", "'\"'\"'").replace("\\", "\\\\") + return f"echo '{escaped_code}' > /tmp/main.go && go run /tmp/main.go" + + async def _execute( + self, + code: Optional[str] = None, + file_path: Optional[str] = None, + params: Optional[Dict[str, str]] = None, + env_vars: Optional[Dict[str, str]] = None, + timeout: int = 60, + **kwargs + ) -> ToolResult: + """执行 Go 测试""" + try: + await self.sandbox_manager.initialize() + except Exception as e: + logger.warning(f"Sandbox init failed: {e}") + + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="沙箱环境不可用") + + if file_path: + code = self._read_file(file_path) + if code is None: + return ToolResult(success=False, error=f"文件不存在: {file_path}") + + if not code: + return ToolResult(success=False, error="必须提供 code 或 file_path") + + wrapped_code = self._build_wrapper_code(code, params) + command = self._build_command(wrapped_code) + + result = await self.sandbox_manager.execute_command( + command=command, + timeout=timeout, + env=env_vars, + ) + + analysis = self._analyze_output(result, params) + + output_parts = [f"🔵 Go 测试结果\n"] + if file_path: + output_parts.append(f"文件: {file_path}") + if params: + output_parts.append(f"参数: {json.dumps(params, ensure_ascii=False)}") + + output_parts.append(f"\n退出码: {result['exit_code']}") + + if result["stdout"]: + output_parts.append(f"\n输出:\n```\n{result['stdout'][:3000]}\n```") + if result["stderr"]: + output_parts.append(f"\n错误:\n```\n{result['stderr'][:1000]}\n```") + + if analysis["is_vulnerable"]: + output_parts.append(f"\n🔴 **漏洞确认**: {analysis['evidence']}") + else: + output_parts.append(f"\n🟡 未能确认漏洞") + + return ToolResult( + success=True, + data="\n".join(output_parts), + metadata={ + "exit_code": result["exit_code"], + "is_vulnerable": analysis["is_vulnerable"], + "evidence": analysis["evidence"], + "language": "Go", + } + ) + + +# ============ Ruby 测试工具 ============ + +class RubyTestInput(LanguageTestInput): + """Ruby 测试输入""" + rails_mode: bool = Field(default=False, description="是否模拟 Rails 请求环境") + + +class RubyTestTool(BaseLanguageTestTool): + """Ruby 代码测试工具""" + + LANGUAGE_NAME = "Ruby" + LANGUAGE_CMD = "ruby" + FILE_EXTENSION = ".rb" + + @property + def name(self) -> str: + return "ruby_test" + + @property + def description(self) -> str: + return """在沙箱中测试 Ruby 代码,支持模拟 Rails 请求参数。 + +输入: +- code: Ruby 代码(与 file_path 二选一) +- file_path: 项目中的 Ruby 文件路径 +- params: 模拟参数,如 {"cmd": "whoami"} +- rails_mode: 是否模拟 Rails params +- timeout: 超时秒数 + +示例: +1. Rails 模式: {"file_path": "controller.rb", "params": {"cmd": "id"}, "rails_mode": true} +2. 普通模式: {"code": "system(ARGV[0])", "params": {"cmd": "whoami"}}""" + + @property + def args_schema(self): + return RubyTestInput + + def _build_wrapper_code(self, code: str, params: Optional[Dict[str, str]], + rails_mode: bool = False) -> str: + """构建 Ruby 包装代码""" + wrapper_parts = [] + + if params: + if rails_mode: + # 模拟 Rails params + params_ruby = "{ " + ", ".join([f'"{k}" => "{v}"' for k, v in params.items()]) + " }" + wrapper_parts.append(f""" +class HashWithIndifferentAccess < Hash + def [](key) + super(key.to_s) || super(key.to_sym) + end +end + +def params + @params ||= HashWithIndifferentAccess.new.merge({params_ruby}) +end + +class Request + attr_accessor :params, :method, :path + def initialize(p) + @params = p + @method = 'GET' + @path = '/' + end +end + +request = Request.new(params) +""") + else: + # 普通模式 + for i, (key, value) in enumerate(params.items()): + wrapper_parts.append(f'ARGV[{i}] = "{value}"') + wrapper_parts.append(f'ENV["{key.upper()}"] = "{value}"') + + wrapper_parts.append(code) + return "\n".join(wrapper_parts) + + def _build_command(self, code: str) -> str: + """构建 Ruby 执行命令""" + escaped_code = code.replace("'", "'\"'\"'") + return f"ruby -e '{escaped_code}'" + + async def _execute( + self, + code: Optional[str] = None, + file_path: Optional[str] = None, + params: Optional[Dict[str, str]] = None, + env_vars: Optional[Dict[str, str]] = None, + timeout: int = 30, + rails_mode: bool = False, + **kwargs + ) -> ToolResult: + """执行 Ruby 测试""" + try: + await self.sandbox_manager.initialize() + except Exception as e: + logger.warning(f"Sandbox init failed: {e}") + + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="沙箱环境不可用") + + if file_path: + code = self._read_file(file_path) + if code is None: + return ToolResult(success=False, error=f"文件不存在: {file_path}") + + if not code: + return ToolResult(success=False, error="必须提供 code 或 file_path") + + wrapped_code = self._build_wrapper_code(code, params, rails_mode) + command = self._build_command(wrapped_code) + + result = await self.sandbox_manager.execute_command( + command=command, + timeout=timeout, + env=env_vars, + ) + + analysis = self._analyze_output(result, params) + + output_parts = [f"💎 Ruby 测试结果\n"] + if file_path: + output_parts.append(f"文件: {file_path}") + if rails_mode: + output_parts.append("模式: Rails") + if params: + output_parts.append(f"参数: {json.dumps(params, ensure_ascii=False)}") + + output_parts.append(f"\n退出码: {result['exit_code']}") + + if result["stdout"]: + output_parts.append(f"\n输出:\n```\n{result['stdout'][:3000]}\n```") + if result["stderr"]: + output_parts.append(f"\n错误:\n```\n{result['stderr'][:1000]}\n```") + + if analysis["is_vulnerable"]: + output_parts.append(f"\n🔴 **漏洞确认**: {analysis['evidence']}") + else: + output_parts.append(f"\n🟡 未能确认漏洞") + + return ToolResult( + success=True, + data="\n".join(output_parts), + metadata={ + "exit_code": result["exit_code"], + "is_vulnerable": analysis["is_vulnerable"], + "evidence": analysis["evidence"], + "language": "Ruby", + } + ) + + +# ============ Bash/Shell 测试工具 ============ + +class ShellTestTool(BaseLanguageTestTool): + """Shell/Bash 脚本测试工具""" + + LANGUAGE_NAME = "Shell" + LANGUAGE_CMD = "bash" + FILE_EXTENSION = ".sh" + + @property + def name(self) -> str: + return "shell_test" + + @property + def description(self) -> str: + return """在沙箱中测试 Shell/Bash 脚本。 + +输入: +- code: Shell 代码(与 file_path 二选一) +- file_path: 项目中的 Shell 脚本路径 +- params: 模拟参数(作为位置参数 $1, $2... 或环境变量) +- timeout: 超时秒数 + +示例: +{"code": "eval $1", "params": {"1": "whoami"}}""" + + def _build_wrapper_code(self, code: str, params: Optional[Dict[str, str]]) -> str: + """构建 Shell 包装代码""" + wrapper_parts = ["#!/bin/bash"] + + if params: + for key, value in params.items(): + # 设置位置参数和环境变量 + if key.isdigit(): + # 位置参数需要特殊处理 + pass + else: + wrapper_parts.append(f'export {key.upper()}="{value}"') + + wrapper_parts.append(code) + return "\n".join(wrapper_parts) + + def _build_command(self, code: str) -> str: + """构建 Shell 执行命令""" + escaped_code = code.replace("'", "'\"'\"'") + return f"bash -c '{escaped_code}'" + + +# ============ 通用多语言测试工具 ============ + +class UniversalCodeTestInput(BaseModel): + """通用代码测试输入""" + language: str = Field(..., description="编程语言: php, python, javascript, java, go, ruby, shell") + code: Optional[str] = Field(default=None, description="要执行的代码") + file_path: Optional[str] = Field(default=None, description="文件路径") + params: Optional[Dict[str, str]] = Field(default=None, description="模拟参数") + framework_mode: Optional[str] = Field(default=None, description="框架模式: flask, django, express, rails") + timeout: int = Field(default=30, description="超时秒数") + + +class UniversalCodeTestTool(AgentTool): + """通用多语言代码测试工具 - 自动选择合适的语言测试器""" + + def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."): + super().__init__() + self.sandbox_manager = sandbox_manager or SandboxManager() + self.project_root = project_root + + # 初始化所有语言测试器 + self._testers = { + "php": PhpTestTool(sandbox_manager, project_root), + "python": PythonTestTool(sandbox_manager, project_root), + "javascript": JavaScriptTestTool(sandbox_manager, project_root), + "js": JavaScriptTestTool(sandbox_manager, project_root), + "node": JavaScriptTestTool(sandbox_manager, project_root), + "java": JavaTestTool(sandbox_manager, project_root), + "go": GoTestTool(sandbox_manager, project_root), + "golang": GoTestTool(sandbox_manager, project_root), + "ruby": RubyTestTool(sandbox_manager, project_root), + "rb": RubyTestTool(sandbox_manager, project_root), + "shell": ShellTestTool(sandbox_manager, project_root), + "bash": ShellTestTool(sandbox_manager, project_root), + } + + @property + def name(self) -> str: + return "code_test" + + @property + def description(self) -> str: + return """通用多语言代码测试工具,支持 PHP, Python, JavaScript, Java, Go, Ruby, Shell。 + +自动根据语言选择合适的测试环境,支持各种框架的请求模拟。 + +输入: +- language: 编程语言 (php, python, javascript, java, go, ruby, shell) +- code: 代码内容(与 file_path 二选一) +- file_path: 文件路径 +- params: 模拟参数 +- framework_mode: 框架模式 (flask, django, express, rails) +- timeout: 超时秒数 + +示例: +1. PHP: {"language": "php", "file_path": "vuln.php", "params": {"cmd": "id"}} +2. Python Flask: {"language": "python", "code": "os.system(request.args.get('cmd'))", "params": {"cmd": "whoami"}, "framework_mode": "flask"} +3. Node.js: {"language": "javascript", "code": "require('child_process').execSync(req.query.cmd)", "params": {"cmd": "id"}, "framework_mode": "express"}""" + + @property + def args_schema(self): + return UniversalCodeTestInput + + async def _execute( + self, + language: str, + code: Optional[str] = None, + file_path: Optional[str] = None, + params: Optional[Dict[str, str]] = None, + framework_mode: Optional[str] = None, + timeout: int = 30, + **kwargs + ) -> ToolResult: + """执行通用代码测试""" + language = language.lower().strip() + + tester = self._testers.get(language) + if not tester: + return ToolResult( + success=False, + error=f"不支持的语言: {language}。支持: {list(self._testers.keys())}", + ) + + # 构建测试参数 + test_kwargs = { + "code": code, + "file_path": file_path, + "params": params, + "timeout": timeout, + } + + # 处理框架模式 + if framework_mode: + fm = framework_mode.lower() + if fm == "flask": + test_kwargs["flask_mode"] = True + elif fm == "django": + test_kwargs["django_mode"] = True + elif fm == "express": + test_kwargs["express_mode"] = True + elif fm == "rails": + test_kwargs["rails_mode"] = True + + return await tester._execute(**test_kwargs) diff --git a/backend/app/services/agent/tools/sandbox_tool.py b/backend/app/services/agent/tools/sandbox_tool.py index fcdc777..72831be 100644 --- a/backend/app/services/agent/tools/sandbox_tool.py +++ b/backend/app/services/agent/tools/sandbox_tool.py @@ -44,17 +44,24 @@ class SandboxManager: async def initialize(self): """初始化 Docker 客户端""" if self._initialized: + logger.info("✅ SandboxManager already initialized") return - + try: import docker + logger.info("🔄 Attempting to connect to Docker...") self._docker_client = docker.from_env() # 测试连接 self._docker_client.ping() self._initialized = True - logger.info("Docker sandbox manager initialized") + logger.info("✅ Docker sandbox manager initialized successfully") + except ImportError as e: + logger.error(f"❌ Docker library not installed: {e}") + self._docker_client = None except Exception as e: - logger.warning(f"Docker not available: {e}") + logger.warning(f"❌ Docker not available: {e}") + import traceback + logger.warning(f"Docker connection traceback: {traceback.format_exc()}") self._docker_client = None @property @@ -462,12 +469,13 @@ class SandboxTool(AgentTool): 沙箱执行工具 在安全隔离的环境中执行代码和命令 """ - + # 允许的命令前缀 ALLOWED_COMMANDS = [ "python", "python3", "node", "curl", "wget", "cat", "head", "tail", "grep", "find", "ls", "echo", "printf", "test", "id", "whoami", + "php", # 🔥 添加 PHP 支持 ] def __init__(self, sandbox_manager: Optional[SandboxManager] = None): @@ -763,3 +771,398 @@ class VulnerabilityVerifyTool(AgentTool): } ) + +# ============ PHP 测试工具 ============ + +class PhpTestInput(BaseModel): + """PHP 测试输入""" + php_code: Optional[str] = Field(default=None, description="要执行的 PHP 代码(可选,与 file_path 二选一)") + file_path: Optional[str] = Field(default=None, description="要测试的 PHP 文件路径(可选,与 php_code 二选一)") + get_params: Optional[Dict[str, str]] = Field(default=None, description="模拟的 GET 参数,如 {'cmd': 'whoami'}") + post_params: Optional[Dict[str, str]] = Field(default=None, description="模拟的 POST 参数") + timeout: int = Field(default=30, description="超时时间(秒)") + + +class PhpTestTool(AgentTool): + """ + PHP 代码测试工具 + 在沙箱中执行 PHP 代码,支持模拟 GET/POST 参数 + """ + + def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."): + super().__init__() + self.sandbox_manager = sandbox_manager or SandboxManager() + self.project_root = project_root + + @property + def name(self) -> str: + return "php_test" + + @property + def description(self) -> str: + return """在沙箱中测试 PHP 代码,支持模拟 GET/POST 参数。 +专门用于验证 PHP 漏洞(如命令注入、SQL 注入等)。 + +输入 (二选一): +- php_code: 直接提供要执行的 PHP 代码 +- file_path: 项目中的 PHP 文件路径 + +模拟参数: +- get_params: 模拟 $_GET 参数,如 {"cmd": "whoami", "id": "1"} +- post_params: 模拟 $_POST 参数 + +示例: +1. 测试命令注入: + {"file_path": "vuln.php", "get_params": {"cmd": "whoami"}} + +2. 直接测试代码: + {"php_code": "", "get_params": {"cmd": "id"}} + +⚠️ 在沙箱中执行,不影响真实环境。""" + + @property + def args_schema(self): + return PhpTestInput + + async def _execute( + self, + php_code: Optional[str] = None, + file_path: Optional[str] = None, + get_params: Optional[Dict[str, str]] = None, + post_params: Optional[Dict[str, str]] = None, + timeout: int = 30, + **kwargs + ) -> ToolResult: + """执行 PHP 测试""" + try: + await self.sandbox_manager.initialize() + except Exception as e: + logger.warning(f"Sandbox init failed: {e}") + + if not self.sandbox_manager.is_available: + return ToolResult( + success=False, + error="沙箱环境不可用 (Docker Unavailable)", + ) + + # 构建 PHP 代码 + if file_path: + # 从文件读取 + import os + full_path = os.path.join(self.project_root, file_path) + if not os.path.exists(full_path): + return ToolResult( + success=False, + error=f"文件不存在: {file_path}", + ) + with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: + php_code = f.read() + + if not php_code: + return ToolResult( + success=False, + error="必须提供 php_code 或 file_path", + ) + + # 构建模拟 $_GET 和 $_POST 的包装代码 + wrapper_parts = [""): + clean_code = clean_code[:-2].strip() + + wrapper_parts.append(clean_code) + wrapper_parts.append("?>") + + full_php_code = "\n".join(wrapper_parts) + + # 在沙箱中执行 + # 使用 php -r 直接执行代码 + import shlex + escaped_code = full_php_code.replace("'", "'\"'\"'") + command = f"php -r '{escaped_code}'" + + result = await self.sandbox_manager.execute_command( + command=command, + timeout=timeout, + ) + + # 格式化输出 + output_parts = ["🐘 PHP 测试结果\n"] + + if get_params: + output_parts.append(f"模拟 GET 参数: {get_params}") + if post_params: + output_parts.append(f"模拟 POST 参数: {post_params}") + + output_parts.append(f"\n退出码: {result['exit_code']}") + + if result["stdout"]: + stdout = result["stdout"][:3000] + output_parts.append(f"\n输出:\n```\n{stdout}\n```") + + if result["stderr"]: + stderr = result["stderr"][:1000] + output_parts.append(f"\n错误:\n```\n{stderr}\n```") + + # 判断是否执行成功 + is_vulnerable = False + evidence = None + + if result["exit_code"] == 0 and result["stdout"]: + # 检查是否有命令执行输出 + stdout_lower = result["stdout"].lower() + if get_params and "cmd" in get_params: + cmd_value = get_params["cmd"].lower() + # 检查常见命令输出 + if cmd_value in ["whoami", "id"]: + if "root" in stdout_lower or "uid=" in stdout_lower or "www-data" in stdout_lower: + is_vulnerable = True + evidence = f"命令 '{get_params['cmd']}' 执行成功,输出: {result['stdout'][:200]}" + elif cmd_value.startswith("echo "): + expected = cmd_value[5:].lower() + if expected in stdout_lower: + is_vulnerable = True + evidence = f"Echo 命令执行成功" + else: + # 通用检查:有输出就可能成功 + if len(result["stdout"].strip()) > 0: + is_vulnerable = True + evidence = f"命令可能执行成功,输出: {result['stdout'][:200]}" + + if is_vulnerable: + output_parts.append(f"\n🔴 **漏洞确认**: {evidence}") + else: + output_parts.append(f"\n🟡 未能确认漏洞执行(可能需要检查输出)") + + return ToolResult( + success=True, + data="\n".join(output_parts), + metadata={ + "exit_code": result["exit_code"], + "is_vulnerable": is_vulnerable, + "evidence": evidence, + "stdout": result["stdout"][:500] if result["stdout"] else None, + } + ) + + +# ============ 命令注入专用测试工具 ============ + +class CommandInjectionTestInput(BaseModel): + """命令注入测试输入""" + target_file: str = Field(description="目标文件路径(如 'vuln.php')") + param_name: str = Field(default="cmd", description="注入参数名(默认 'cmd')") + test_command: str = Field(default="id", description="测试命令(默认 'id')") + language: str = Field(default="php", description="目标语言 (php, python, node)") + + +class CommandInjectionTestTool(AgentTool): + """ + 命令注入专用测试工具 + 智能检测和验证命令注入漏洞 + """ + + def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."): + super().__init__() + self.sandbox_manager = sandbox_manager or SandboxManager() + self.project_root = project_root + + @property + def name(self) -> str: + return "test_command_injection" + + @property + def description(self) -> str: + return """专门用于测试命令注入漏洞的工具。 + +输入: +- target_file: 目标文件路径 +- param_name: 注入参数名(默认 'cmd') +- test_command: 测试命令(默认 'id',也可用 'whoami', 'echo test') +- language: 目标语言(php, python, node) + +示例: +{"target_file": "ttt/t.php", "param_name": "cmd", "test_command": "whoami"} + +自动执行: +1. 读取目标文件代码 +2. 构建包含测试命令的执行环境 +3. 在沙箱中执行并分析结果 +4. 判断命令注入是否成功""" + + @property + def args_schema(self): + return CommandInjectionTestInput + + async def _execute( + self, + target_file: str, + param_name: str = "cmd", + test_command: str = "id", + language: str = "php", + **kwargs + ) -> ToolResult: + """执行命令注入测试""" + try: + await self.sandbox_manager.initialize() + except Exception as e: + logger.warning(f"Sandbox init failed: {e}") + + if not self.sandbox_manager.is_available: + return ToolResult( + success=False, + error="沙箱环境不可用 (Docker Unavailable)", + ) + + import os + full_path = os.path.join(self.project_root, target_file) + + if not os.path.exists(full_path): + return ToolResult( + success=False, + error=f"文件不存在: {target_file}", + ) + + # 读取文件内容 + with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: + code_content = f.read() + + output_parts = ["🎯 命令注入测试\n"] + output_parts.append(f"目标文件: {target_file}") + output_parts.append(f"注入参数: {param_name}") + output_parts.append(f"测试命令: {test_command}") + output_parts.append(f"语言: {language}") + + # 根据语言构建测试 + if language.lower() == "php": + result = await self._test_php_injection(code_content, param_name, test_command) + elif language.lower() == "python": + result = await self._test_python_injection(code_content, param_name, test_command) + else: + return ToolResult( + success=False, + error=f"暂不支持语言: {language}", + ) + + output_parts.append(f"\n退出码: {result['exit_code']}") + + if result.get("stdout"): + output_parts.append(f"\n命令输出:\n```\n{result['stdout'][:2000]}\n```") + + if result.get("stderr"): + output_parts.append(f"\n错误输出:\n```\n{result['stderr'][:500]}\n```") + + # 分析结果 + is_vulnerable = False + evidence = None + poc = None + + if result["exit_code"] == 0 and result.get("stdout"): + stdout = result["stdout"].strip() + # 检查命令执行特征 + if test_command in ["id", "whoami"]: + if "uid=" in stdout or "root" in stdout or "www-data" in stdout or stdout.strip(): + is_vulnerable = True + evidence = f"命令 '{test_command}' 成功执行,输出: {stdout[:200]}" + poc = f"curl 'http://target/{target_file}?{param_name}={test_command}'" + elif test_command.startswith("echo "): + expected = test_command[5:] + if expected in stdout: + is_vulnerable = True + evidence = f"Echo 测试成功" + poc = f"curl 'http://target/{target_file}?{param_name}=echo+test'" + else: + if len(stdout) > 0: + is_vulnerable = True + evidence = f"命令可能执行成功,输出: {stdout[:200]}" + poc = f"curl 'http://target/{target_file}?{param_name}={test_command}'" + + if is_vulnerable: + output_parts.append(f"\n\n🔴 **漏洞已确认!**") + output_parts.append(f"证据: {evidence}") + output_parts.append(f"\nPoC: `{poc}`") + else: + output_parts.append(f"\n\n🟡 未能确认漏洞") + if result.get("stderr"): + output_parts.append(f"可能原因: 执行错误或参数未正确传递") + + return ToolResult( + success=True, + data="\n".join(output_parts), + metadata={ + "is_vulnerable": is_vulnerable, + "evidence": evidence, + "poc": poc, + "exit_code": result["exit_code"], + } + ) + + async def _test_php_injection(self, code: str, param_name: str, test_command: str) -> Dict[str, Any]: + """测试 PHP 命令注入""" + # 构建模拟环境 + wrapper = f""""): + clean_code = clean_code[:-2] + + full_code = wrapper + clean_code + "\n?>" + + # 转义并执行 + escaped_code = full_code.replace("'", "'\"'\"'") + command = f"php -r '{escaped_code}'" + + return await self.sandbox_manager.execute_command(command, timeout=30) + + async def _test_python_injection(self, code: str, param_name: str, test_command: str) -> Dict[str, Any]: + """测试 Python 命令注入""" + # 模拟 request.args.get + wrapper = f""" +import sys +class MockArgs: + def get(self, key, default=None): + if key == '{param_name}': + return '{test_command}' + return default + +class MockRequest: + args = MockArgs() + form = MockArgs() + +request = MockRequest() +sys.argv = ['script.py', '{test_command}'] + +""" + full_code = wrapper + code + + escaped_code = full_code.replace("'", "'\"'\"'") + command = f"python3 -c '{escaped_code}'" + + return await self.sandbox_manager.execute_command(command, timeout=30) \ No newline at end of file diff --git a/backend/app/services/agent/tools/sandbox_vuln.py b/backend/app/services/agent/tools/sandbox_vuln.py new file mode 100644 index 0000000..128fa51 --- /dev/null +++ b/backend/app/services/agent/tools/sandbox_vuln.py @@ -0,0 +1,1565 @@ +""" +漏洞验证专用工具 +支持各类经典漏洞的沙箱验证测试 +""" + +import asyncio +import json +import logging +import os +import re +import tempfile +from typing import Optional, Dict, Any, List +from pydantic import BaseModel, Field +from dataclasses import dataclass +from enum import Enum + +from .base import AgentTool, ToolResult +from .sandbox_tool import SandboxManager + +logger = logging.getLogger(__name__) + + +class VulnType(str, Enum): + """漏洞类型枚举""" + SQL_INJECTION = "sql_injection" + COMMAND_INJECTION = "command_injection" + CODE_INJECTION = "code_injection" + XSS = "xss" + PATH_TRAVERSAL = "path_traversal" + SSRF = "ssrf" + XXE = "xxe" + DESERIALIZATION = "deserialization" + SSTI = "ssti" + LDAP_INJECTION = "ldap_injection" + NOSQL_INJECTION = "nosql_injection" + XPATH_INJECTION = "xpath_injection" + + +# ============ 命令注入测试工具 ============ + +class CommandInjectionTestInput(BaseModel): + """命令注入测试输入""" + target_file: str = Field(..., description="目标文件路径") + param_name: str = Field(default="cmd", description="注入参数名") + test_command: str = Field(default="id", description="测试命令: id, whoami, echo test, cat /etc/passwd") + language: str = Field(default="auto", description="语言: auto, php, python, javascript, java, go, ruby, shell") + injection_point: Optional[str] = Field(default=None, description="注入点描述,如 'shell_exec($_GET[cmd])'") + + +class CommandInjectionTestTool(AgentTool): + """ + 命令注入漏洞测试工具 + + 支持多种语言和框架,自动构建测试环境 + """ + + def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."): + super().__init__() + self.sandbox_manager = sandbox_manager or SandboxManager() + self.project_root = project_root + + @property + def name(self) -> str: + return "test_command_injection" + + @property + def description(self) -> str: + return """专门测试命令注入漏洞的工具。 + +支持语言: PHP, Python, JavaScript, Java, Go, Ruby, Shell + +输入: +- target_file: 目标文件路径 +- param_name: 注入参数名 (默认 'cmd') +- test_command: 测试命令 (默认 'id') + - 'id' - 显示用户ID + - 'whoami' - 显示用户名 + - 'cat /etc/passwd' - 读取密码文件 + - 'echo VULN_TEST' - 输出测试字符串 +- language: 语言 (auto 自动检测) + +示例: +1. PHP: {"target_file": "vuln.php", "param_name": "cmd", "test_command": "whoami"} +2. Python: {"target_file": "app.py", "param_name": "cmd", "language": "python"} +3. 自定义: {"target_file": "api.js", "test_command": "echo PWNED"} + +漏洞确认条件: +- 命令输出包含预期结果 (uid=, root, www-data 等) +- 或自定义 echo 内容出现在输出中""" + + @property + def args_schema(self): + return CommandInjectionTestInput + + def _detect_language(self, file_path: str, code: str) -> str: + """自动检测语言""" + ext = os.path.splitext(file_path)[1].lower() + ext_map = { + ".php": "php", + ".py": "python", + ".js": "javascript", + ".ts": "javascript", + ".java": "java", + ".go": "go", + ".rb": "ruby", + ".sh": "shell", + ".bash": "shell", + } + if ext in ext_map: + return ext_map[ext] + + # 基于内容检测 + if " ToolResult: + """执行命令注入测试""" + try: + await self.sandbox_manager.initialize() + except Exception as e: + logger.warning(f"Sandbox init failed: {e}") + + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="沙箱环境不可用") + + # 读取目标文件 + full_path = os.path.join(self.project_root, target_file) + if not os.path.exists(full_path): + return ToolResult(success=False, error=f"文件不存在: {target_file}") + + with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: + code = f.read() + + # 检测语言 + if language == "auto": + language = self._detect_language(target_file, code) + + # 根据语言构建测试 + result = await self._test_by_language(language, code, param_name, test_command) + + # 分析结果 + is_vulnerable = False + evidence = None + poc = None + + if result["exit_code"] == 0 and result.get("stdout"): + stdout = result["stdout"].strip() + + # 检测命令执行特征 + if test_command in ["id", "whoami"]: + patterns = ["uid=", "root", "www-data", "nobody", "daemon", "sandbox"] + for pattern in patterns: + if pattern in stdout.lower(): + is_vulnerable = True + evidence = f"命令 '{test_command}' 执行成功,输出包含 '{pattern}'" + break + # 如果有任何输出且包含典型格式 + if not is_vulnerable and stdout: + is_vulnerable = True + evidence = f"命令 '{test_command}' 有输出: {stdout[:100]}" + + elif test_command.startswith("echo "): + expected = test_command[5:] + if expected.lower() in stdout.lower(): + is_vulnerable = True + evidence = f"Echo 命令执行成功,输出包含 '{expected}'" + + elif test_command.startswith("cat "): + if ":" in stdout or "root" in stdout.lower() or "bin" in stdout.lower(): + is_vulnerable = True + evidence = f"文件读取成功: {stdout[:100]}" + + else: + # 通用检测 + if len(stdout) > 0: + is_vulnerable = True + evidence = f"命令可能执行成功,输出: {stdout[:200]}" + + if is_vulnerable: + poc = f"curl 'http://target/{target_file}?{param_name}={test_command.replace(' ', '+')}" + + # 格式化输出 + output_parts = ["🎯 命令注入测试结果\n"] + output_parts.append(f"目标文件: {target_file}") + output_parts.append(f"语言: {language}") + output_parts.append(f"注入参数: {param_name}") + output_parts.append(f"测试命令: {test_command}") + + output_parts.append(f"\n退出码: {result['exit_code']}") + + if result.get("stdout"): + output_parts.append(f"\n命令输出:\n```\n{result['stdout'][:2000]}\n```") + if result.get("stderr"): + output_parts.append(f"\n错误输出:\n```\n{result['stderr'][:500]}\n```") + + if is_vulnerable: + output_parts.append(f"\n\n🔴 **漏洞已确认!**") + output_parts.append(f"证据: {evidence}") + if poc: + output_parts.append(f"\nPoC: `{poc}`") + else: + output_parts.append(f"\n\n🟡 未能确认漏洞") + + return ToolResult( + success=True, + data="\n".join(output_parts), + metadata={ + "vulnerability_type": "command_injection", + "is_vulnerable": is_vulnerable, + "evidence": evidence, + "poc": poc, + "language": language, + } + ) + + async def _test_by_language(self, language: str, code: str, param_name: str, test_command: str) -> Dict: + """根据语言执行测试""" + if language == "php": + return await self._test_php(code, param_name, test_command) + elif language == "python": + return await self._test_python(code, param_name, test_command) + elif language in ["javascript", "js", "node"]: + return await self._test_javascript(code, param_name, test_command) + elif language == "java": + return await self._test_java(code, param_name, test_command) + elif language in ["go", "golang"]: + return await self._test_go(code, param_name, test_command) + elif language in ["ruby", "rb"]: + return await self._test_ruby(code, param_name, test_command) + else: + return await self._test_shell(code, param_name, test_command) + + async def _test_php(self, code: str, param_name: str, test_command: str) -> Dict: + """测试 PHP 命令注入 + + 注意: php -r 不需要 "): + clean_code = clean_code[:-2] + + full_code = wrapper + clean_code.strip() + escaped = full_code.replace("'", "'\"'\"'") + return await self.sandbox_manager.execute_command(f"php -r '{escaped}'", timeout=30) + + async def _test_python(self, code: str, param_name: str, test_command: str) -> Dict: + """测试 Python 命令注入""" + wrapper = f""" +import sys, os + +class MockArgs: + def get(self, key, default=None): + if key == '{param_name}': + return '{test_command}' + return default + +class MockRequest: + args = MockArgs() + form = MockArgs() + values = MockArgs() + +request = MockRequest() +sys.argv = ['script.py', '{test_command}'] +os.environ['{param_name.upper()}'] = '{test_command}' + +""" + full_code = wrapper + code + escaped = full_code.replace("'", "'\"'\"'") + return await self.sandbox_manager.execute_command(f"python3 -c '{escaped}'", timeout=30) + + async def _test_javascript(self, code: str, param_name: str, test_command: str) -> Dict: + """测试 JavaScript 命令注入""" + wrapper = f""" +const req = {{ + query: {{ '{param_name}': '{test_command}' }}, + body: {{ '{param_name}': '{test_command}' }}, + params: {{ '{param_name}': '{test_command}' }}, +}}; +process.argv = ['node', 'script.js', '{test_command}']; +process.env['{param_name.upper()}'] = '{test_command}'; + +""" + full_code = wrapper + code + escaped = full_code.replace("'", "'\"'\"'") + return await self.sandbox_manager.execute_command(f"node -e '{escaped}'", timeout=30) + + async def _test_java(self, code: str, param_name: str, test_command: str) -> Dict: + """测试 Java 命令注入""" + # 简化处理 - Java 需要完整类结构 + wrapper = f""" +import java.io.*; +import java.util.*; + +public class Test {{ + public static void main(String[] args) throws Exception {{ + Map params = new HashMap<>(); + params.put("{param_name}", "{test_command}"); + String[] argv = new String[]{{"{test_command}"}}; + + {code} + }} +}} +""" + escaped = wrapper.replace("'", "'\"'\"'").replace("\\", "\\\\") + return await self.sandbox_manager.execute_command( + f"echo '{escaped}' > /tmp/Test.java && javac /tmp/Test.java 2>&1 && java -cp /tmp Test 2>&1", + timeout=60 + ) + + async def _test_go(self, code: str, param_name: str, test_command: str) -> Dict: + """测试 Go 命令注入""" + if "package main" not in code: + code = f"""package main + +import ( + "fmt" + "os" + "os/exec" +) + +func main() {{ + os.Args = []string{{"program", "{test_command}"}} + os.Setenv("{param_name.upper()}", "{test_command}") + params := map[string]string{{"{param_name}": "{test_command}"}} + _ = params + + {code} +}} +""" + escaped = code.replace("'", "'\"'\"'").replace("\\", "\\\\") + return await self.sandbox_manager.execute_command( + f"echo '{escaped}' > /tmp/main.go && go run /tmp/main.go 2>&1", + timeout=60 + ) + + async def _test_ruby(self, code: str, param_name: str, test_command: str) -> Dict: + """测试 Ruby 命令注入""" + wrapper = f""" +ARGV[0] = "{test_command}" +ENV["{param_name.upper()}"] = "{test_command}" + +def params + @params ||= {{ "{param_name}" => "{test_command}" }} +end + +""" + full_code = wrapper + code + escaped = full_code.replace("'", "'\"'\"'") + return await self.sandbox_manager.execute_command(f"ruby -e '{escaped}'", timeout=30) + + async def _test_shell(self, code: str, param_name: str, test_command: str) -> Dict: + """测试 Shell 命令注入""" + wrapper = f"""#!/bin/bash +export {param_name.upper()}="{test_command}" +set -- "{test_command}" + +""" + full_code = wrapper + code + escaped = full_code.replace("'", "'\"'\"'") + return await self.sandbox_manager.execute_command(f"bash -c '{escaped}'", timeout=30) + + +# ============ SQL 注入测试工具 ============ + +class SqlInjectionTestInput(BaseModel): + """SQL 注入测试输入""" + target_file: str = Field(..., description="目标文件路径") + param_name: str = Field(default="id", description="注入参数名") + payload: str = Field(default="1' OR '1'='1", description="SQL 注入 payload") + language: str = Field(default="auto", description="语言: auto, php, python, javascript, java, go, ruby") + db_type: str = Field(default="mysql", description="数据库类型: mysql, postgresql, sqlite, oracle, mssql") + + +class SqlInjectionTestTool(AgentTool): + """SQL 注入漏洞测试工具""" + + # SQL 错误特征 + SQL_ERROR_PATTERNS = { + "mysql": [ + r"SQL syntax.*MySQL", + r"Warning.*mysql_", + r"MySQLSyntaxErrorException", + r"valid MySQL result", + r"check the manual that corresponds to your MySQL", + r"mysql_fetch", + r"mysqli_", + ], + "postgresql": [ + r"PostgreSQL.*ERROR", + r"Warning.*pg_", + r"valid PostgreSQL result", + r"Npgsql\.", + r"PSQLException", + ], + "sqlite": [ + r"SQLite.*error", + r"sqlite3\.OperationalError", + r"SQLITE_ERROR", + r"SQLite3::SQLException", + ], + "oracle": [ + r"ORA-\d{5}", + r"Oracle error", + r"Oracle.*Driver", + r"Warning.*oci_", + ], + "mssql": [ + r"ODBC Driver.*SQL Server", + r"SqlException", + r"Unclosed quotation mark", + r"SQL Server.*Error", + ], + "generic": [ + r"SQL syntax", + r"unclosed quotation", + r"quoted string not properly terminated", + r"sql error", + r"database error", + r"query failed", + ], + } + + def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."): + super().__init__() + self.sandbox_manager = sandbox_manager or SandboxManager() + self.project_root = project_root + + @property + def name(self) -> str: + return "test_sql_injection" + + @property + def description(self) -> str: + return """专门测试 SQL 注入漏洞的工具。 + +支持数据库: MySQL, PostgreSQL, SQLite, Oracle, MSSQL + +输入: +- target_file: 目标文件路径 +- param_name: 注入参数名 (默认 'id') +- payload: SQL 注入 payload (默认 "1' OR '1'='1") +- language: 语言 (auto 自动检测) +- db_type: 数据库类型 (默认 mysql) + +常用 Payload: +- 布尔盲注: "1' AND '1'='1" +- 联合查询: "1' UNION SELECT 1,2,3--" +- 报错注入: "1' AND extractvalue(1,concat(0x7e,version()))--" +- 时间盲注: "1' AND SLEEP(5)--" + +示例: +{"target_file": "login.php", "param_name": "username", "payload": "admin'--"}""" + + @property + def args_schema(self): + return SqlInjectionTestInput + + def _detect_sql_error(self, output: str, db_type: str = "mysql") -> Optional[str]: + """检测 SQL 错误特征""" + output_lower = output.lower() + + # 先检测特定数据库 + patterns = self.SQL_ERROR_PATTERNS.get(db_type, []) + for pattern in patterns: + if re.search(pattern, output, re.IGNORECASE): + return f"检测到 {db_type.upper()} 错误: {pattern}" + + # 通用检测 + for pattern in self.SQL_ERROR_PATTERNS["generic"]: + if re.search(pattern, output, re.IGNORECASE): + return f"检测到 SQL 错误: {pattern}" + + return None + + async def _execute( + self, + target_file: str, + param_name: str = "id", + payload: str = "1' OR '1'='1", + language: str = "auto", + db_type: str = "mysql", + **kwargs + ) -> ToolResult: + """执行 SQL 注入测试""" + try: + await self.sandbox_manager.initialize() + except Exception as e: + logger.warning(f"Sandbox init failed: {e}") + + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="沙箱环境不可用") + + # 读取目标文件 + full_path = os.path.join(self.project_root, target_file) + if not os.path.exists(full_path): + return ToolResult(success=False, error=f"文件不存在: {target_file}") + + with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: + code = f.read() + + # 检测语言 + if language == "auto": + ext = os.path.splitext(target_file)[1].lower() + language = {".php": "php", ".py": "python", ".js": "javascript"}.get(ext, "php") + + # 执行测试 + result = await self._test_sql_injection(language, code, param_name, payload) + + # 分析结果 + is_vulnerable = False + evidence = None + + if result.get("stdout") or result.get("stderr"): + output = (result.get("stdout", "") + result.get("stderr", "")) + error_detected = self._detect_sql_error(output, db_type) + if error_detected: + is_vulnerable = True + evidence = error_detected + + # 检测数据泄露 + if not is_vulnerable: + leak_patterns = [ + r"\d+\s*\|\s*\d+", # 表格输出 + r"admin|root|user", # 用户名泄露 + r"password|passwd|pwd", # 密码相关 + ] + for pattern in leak_patterns: + if re.search(pattern, output, re.IGNORECASE): + is_vulnerable = True + evidence = f"可能存在数据泄露: {pattern}" + break + + # 构建 PoC + poc = None + if is_vulnerable: + encoded_payload = payload.replace("'", "%27").replace(" ", "+") + poc = f"curl 'http://target/{target_file}?{param_name}={encoded_payload}'" + + # 格式化输出 + output_parts = ["💉 SQL 注入测试结果\n"] + output_parts.append(f"目标文件: {target_file}") + output_parts.append(f"数据库类型: {db_type}") + output_parts.append(f"注入参数: {param_name}") + output_parts.append(f"Payload: {payload}") + + output_parts.append(f"\n退出码: {result.get('exit_code', -1)}") + + if result.get("stdout"): + output_parts.append(f"\n输出:\n```\n{result['stdout'][:2000]}\n```") + if result.get("stderr"): + output_parts.append(f"\n错误:\n```\n{result['stderr'][:1000]}\n```") + + if is_vulnerable: + output_parts.append(f"\n\n🔴 **SQL 注入漏洞确认!**") + output_parts.append(f"证据: {evidence}") + if poc: + output_parts.append(f"\nPoC: `{poc}`") + else: + output_parts.append(f"\n\n🟡 未能确认 SQL 注入漏洞") + + return ToolResult( + success=True, + data="\n".join(output_parts), + metadata={ + "vulnerability_type": "sql_injection", + "is_vulnerable": is_vulnerable, + "evidence": evidence, + "poc": poc, + "db_type": db_type, + } + ) + + async def _test_sql_injection(self, language: str, code: str, param_name: str, payload: str) -> Dict: + """根据语言测试 SQL 注入""" + # 使用安全的 payload 转义 + safe_payload = payload.replace("'", "\\'") + + if language == "php": + # php -r 不需要 "): + clean_code = clean_code[:-2] + + full_code = wrapper + clean_code.strip() + escaped = full_code.replace("'", "'\"'\"'") + return await self.sandbox_manager.execute_command(f"php -r '{escaped}'", timeout=30) + + elif language == "python": + wrapper = f""" +import sys +class MockArgs: + def get(self, key, default=None): + if key == '{param_name}': + return '''{safe_payload}''' + return default + +class MockRequest: + args = MockArgs() + form = MockArgs() + +request = MockRequest() +""" + full_code = wrapper + code + escaped = full_code.replace("'", "'\"'\"'") + return await self.sandbox_manager.execute_command(f"python3 -c '{escaped}'", timeout=30) + + else: + return {"exit_code": -1, "stdout": "", "stderr": f"不支持的语言: {language}"} + + +# ============ XSS 测试工具 ============ + +class XssTestInput(BaseModel): + """XSS 测试输入""" + target_file: str = Field(..., description="目标文件路径") + param_name: str = Field(default="input", description="注入参数名") + payload: str = Field(default="", description="XSS payload") + xss_type: str = Field(default="reflected", description="XSS 类型: reflected, stored, dom") + language: str = Field(default="auto", description="语言: auto, php, python, javascript") + + +class XssTestTool(AgentTool): + """XSS 漏洞测试工具""" + + XSS_PAYLOADS = [ + "", + "", + "", + "javascript:alert('XSS')", + "'>", + "\">", + "", + ] + + def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."): + super().__init__() + self.sandbox_manager = sandbox_manager or SandboxManager() + self.project_root = project_root + + @property + def name(self) -> str: + return "test_xss" + + @property + def description(self) -> str: + return """专门测试 XSS (跨站脚本) 漏洞的工具。 + +支持类型: Reflected XSS, Stored XSS, DOM XSS + +输入: +- target_file: 目标文件路径 +- param_name: 注入参数名 (默认 'input') +- payload: XSS payload (默认 "") +- xss_type: XSS 类型 (reflected, stored, dom) + +常用 Payload: +- Script 标签: +- 事件处理: +- SVG: + +示例: +{"target_file": "search.php", "param_name": "q", "payload": ""}""" + + @property + def args_schema(self): + return XssTestInput + + async def _execute( + self, + target_file: str, + param_name: str = "input", + payload: str = "", + xss_type: str = "reflected", + language: str = "auto", + **kwargs + ) -> ToolResult: + """执行 XSS 测试""" + try: + await self.sandbox_manager.initialize() + except Exception as e: + logger.warning(f"Sandbox init failed: {e}") + + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="沙箱环境不可用") + + # 读取目标文件 + full_path = os.path.join(self.project_root, target_file) + if not os.path.exists(full_path): + return ToolResult(success=False, error=f"文件不存在: {target_file}") + + with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: + code = f.read() + + # 检测语言 + if language == "auto": + ext = os.path.splitext(target_file)[1].lower() + language = {".php": "php", ".py": "python", ".js": "javascript"}.get(ext, "php") + + # 执行测试 + result = await self._test_xss(language, code, param_name, payload) + + # 分析结果 - 检查 payload 是否被反射 + is_vulnerable = False + evidence = None + + if result.get("stdout"): + output = result["stdout"] + + # 检查 payload 是否原样出现在输出中 + if payload in output: + is_vulnerable = True + evidence = "XSS payload 被原样反射到输出中" + + # 检查关键字符是否被编码 + elif "
+ {/* Header */} -
+
- +
- AUDIT_TERMINAL - v2.0 + AUDIT_TERMINAL + v3.0
{/* 状态指示灯 */} -
-
-
-
+
+
+
+
@@ -484,19 +499,22 @@ export default function TerminalProgressDialog({ {/* Main Content */}
{/* Left Sidebar - Task Info */} -
-
-
Task ID
-
+
+
+
Task ID
+
{taskId?.slice(0, 8)}...
-
-
Type
-
- {taskType === 'repository' ? : } - {taskType} +
+
Type
+
+ {taskType === 'repository' + ? + : } + {taskType}
@@ -504,7 +522,7 @@ export default function TerminalProgressDialog({ {/* Status Badge */}
-
Status
+
Status
{isCancelled ? ( CANCELLED ) : isCompleted ? ( @@ -520,17 +538,18 @@ export default function TerminalProgressDialog({ {/* Terminal Screen */}
{/* Terminal Output */} -
+
{/* Grid background */} -
+
{logs.map((log) => ( -
- +
+ {log.timestamp} - + {log.message}
@@ -538,8 +557,8 @@ export default function TerminalProgressDialog({ {!isCompleted && !isFailed && !isCancelled && (
- {currentTime} - _ + {currentTime} + _
)}
@@ -547,11 +566,11 @@ export default function TerminalProgressDialog({
{/* Bottom Controls */} -
-
- +
+
+ - {isCompleted ? "任务已完成" : isFailed ? "任务失败" : isCancelled ? "任务已取消" : "正在执行..."} + {isCompleted ? "TASK COMPLETED" : isFailed ? "TASK FAILED" : isCancelled ? "TASK CANCELLED" : "EXECUTING..."}
@@ -561,9 +580,9 @@ export default function TerminalProgressDialog({ size="sm" variant="outline" onClick={handleCancel} - className="h-8 cyber-btn-outline text-amber-400 border-amber-500/30 hover:bg-amber-500/10 hover:border-amber-500/50" + className="h-8 bg-transparent border-[#fbbf24]/40 text-[#fbbf24] hover:bg-[#fbbf24]/10 hover:border-[#fbbf24]/60 font-mono uppercase tracking-wider text-[10px]" > - + 取消任务 )} @@ -573,9 +592,9 @@ export default function TerminalProgressDialog({ size="sm" variant="outline" onClick={() => window.open('/logs', '_blank')} - className="h-8 cyber-btn-outline" + className="h-8 bg-transparent border-[#6a7587]/40 text-[#a8b0c0] hover:bg-[#1a2030]/50 hover:border-[#6a7587]/60 font-mono uppercase tracking-wider text-[10px]" > - + 查看日志 )} @@ -584,9 +603,9 @@ export default function TerminalProgressDialog({ )} diff --git a/frontend/src/pages/AgentAudit/index.tsx b/frontend/src/pages/AgentAudit/index.tsx index e85b529..ac5782c 100644 --- a/frontend/src/pages/AgentAudit/index.tsx +++ b/frontend/src/pages/AgentAudit/index.tsx @@ -673,37 +673,25 @@ function AgentAuditPageContent() { if (isLoading && !task) { return ( -
+
{/* Grid background */} -
-
+
+ {/* Vignette */} +
+
- Loading audit task... + LOADING AUDIT TASK...
); } return ( -
+
{/* Subtle grid background */} -
+
+ {/* Scanline effect */} +
{/* Header */}
{/* Left Panel - Activity Log */} -
+
{/* Log header */} -
-
+
+
- - Activity Log + + Activity Log
{isConnected && ( -
+
- - + + - Live + Live
)} - + {filteredLogs.length}{!showAllLogs && logs.length !== filteredLogs.length ? ` / ${logs.length}` : ''}
@@ -743,11 +731,11 @@ function AgentAuditPageContent() {
{/* Log content */} -
+
{/* Filter indicator */} {selectedAgentId && !showAllLogs && ( -
+
- Filtering logs for selected agent + Filtering logs for selected agent
@@ -777,21 +765,21 @@ function AgentAuditPageContent() { {/* Logs */} {filteredLogs.length === 0 ? (
-
+
{isRunning ? (
- - + + {selectedAgentId && !showAllLogs - ? 'Waiting for activity from selected agent...' - : 'Waiting for agent activity...'} + ? 'WAITING FOR ACTIVITY FROM SELECTED AGENT...' + : 'WAITING FOR AGENT ACTIVITY...'}
) : ( - + {selectedAgentId && !showAllLogs - ? 'No activity from selected agent' - : 'No activity yet'} + ? 'NO ACTIVITY FROM SELECTED AGENT' + : 'NO ACTIVITY YET'} )}
@@ -813,36 +801,36 @@ function AgentAuditPageContent() { {/* Status bar */} {task && ( -
+
{isRunning ? ( - + - - + + - {statusVerb}{'.'.repeat(statusDots)} + {statusVerb}{'.'.repeat(statusDots)} ) : isComplete ? ( - Audit {task.status} + AUDIT {task.status?.toUpperCase()} ) : ( - Ready + READY )} -
+
- {task.progress_percentage?.toFixed(0) || 0} - % + {task.progress_percentage?.toFixed(0) || 0} + % - | + - {task.analyzed_files} - /{task.total_files} files + {task.analyzed_files} + /{task.total_files} files - | + - {task.tool_calls_count || 0} - tools + {task.tool_calls_count || 0} + tools
@@ -850,16 +838,16 @@ function AgentAuditPageContent() {
{/* Right Panel - Agent Tree + Stats */} -
+
{/* Agent Tree section */} -
+
{/* Tree header */} -
-
- - Agent Tree +
+
+ + Agent Tree {agentTree && ( - + {agentTree.total_agents} )} @@ -868,16 +856,16 @@ function AgentAuditPageContent() { {selectedAgentId && !showAllLogs && ( )} {agentTree && agentTree.running_agents > 0 && ( -
+
- - + + {agentTree.running_agents}
@@ -886,7 +874,7 @@ function AgentAuditPageContent() {
{/* Tree content */} -
+
{treeNodes.length > 0 ? ( treeNodes.map(node => ( )) ) : ( -
+
{isRunning ? (
- Initializing agents... + INITIALIZING AGENTS...
) : ( - 'No agents yet' + NO AGENTS YET )}
)}