""" 漏洞验证专用工具 支持各类经典漏洞的沙箱验证测试 """ import asyncio import json import logging import os import re import tempfile from typing import Optional, Dict, Any, List from pydantic import BaseModel, Field from dataclasses import dataclass from enum import Enum from .base import AgentTool, ToolResult from .sandbox_tool import SandboxManager logger = logging.getLogger(__name__) class VulnType(str, Enum): """漏洞类型枚举""" SQL_INJECTION = "sql_injection" COMMAND_INJECTION = "command_injection" CODE_INJECTION = "code_injection" XSS = "xss" PATH_TRAVERSAL = "path_traversal" SSRF = "ssrf" XXE = "xxe" DESERIALIZATION = "deserialization" SSTI = "ssti" LDAP_INJECTION = "ldap_injection" NOSQL_INJECTION = "nosql_injection" XPATH_INJECTION = "xpath_injection" # ============ 命令注入测试工具 ============ class CommandInjectionTestInput(BaseModel): """命令注入测试输入""" target_file: str = Field(..., description="目标文件路径") param_name: str = Field(default="cmd", description="注入参数名") test_command: str = Field(default="id", description="测试命令: id, whoami, echo test, cat /etc/passwd") language: str = Field(default="auto", description="语言: auto, php, python, javascript, java, go, ruby, shell") injection_point: Optional[str] = Field(default=None, description="注入点描述,如 'shell_exec($_GET[cmd])'") class CommandInjectionTestTool(AgentTool): """ 命令注入漏洞测试工具 支持多种语言和框架,自动构建测试环境 """ def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."): super().__init__() self.sandbox_manager = sandbox_manager or SandboxManager() self.project_root = project_root @property def name(self) -> str: return "test_command_injection" @property def description(self) -> str: return """专门测试命令注入漏洞的工具。 支持语言: PHP, Python, JavaScript, Java, Go, Ruby, Shell 输入: - target_file: 目标文件路径 - param_name: 注入参数名 (默认 'cmd') - test_command: 测试命令 (默认 'id') - 'id' - 显示用户ID - 'whoami' - 显示用户名 - 'cat /etc/passwd' - 读取密码文件 - 'echo VULN_TEST' - 输出测试字符串 - language: 语言 (auto 自动检测) 示例: 1. PHP: {"target_file": "vuln.php", "param_name": "cmd", "test_command": "whoami"} 2. Python: {"target_file": "app.py", "param_name": "cmd", "language": "python"} 3. 自定义: {"target_file": "api.js", "test_command": "echo PWNED"} 漏洞确认条件: - 命令输出包含预期结果 (uid=, root, www-data 等) - 或自定义 echo 内容出现在输出中""" @property def args_schema(self): return CommandInjectionTestInput def _detect_language(self, file_path: str, code: str) -> str: """自动检测语言""" ext = os.path.splitext(file_path)[1].lower() ext_map = { ".php": "php", ".py": "python", ".js": "javascript", ".ts": "javascript", ".java": "java", ".go": "go", ".rb": "ruby", ".sh": "shell", ".bash": "shell", } if ext in ext_map: return ext_map[ext] # 基于内容检测 if " ToolResult: """执行命令注入测试""" try: await self.sandbox_manager.initialize() except Exception as e: logger.warning(f"Sandbox init failed: {e}") if not self.sandbox_manager.is_available: return ToolResult(success=False, error="沙箱环境不可用") # 读取目标文件 full_path = os.path.join(self.project_root, target_file) if not os.path.exists(full_path): return ToolResult(success=False, error=f"文件不存在: {target_file}") with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: code = f.read() # 检测语言 if language == "auto": language = self._detect_language(target_file, code) # 根据语言构建测试 result = await self._test_by_language(language, code, param_name, test_command) # 分析结果 is_vulnerable = False evidence = None poc = None if result["exit_code"] == 0 and result.get("stdout"): stdout = result["stdout"].strip() # 检测命令执行特征 if test_command in ["id", "whoami"]: patterns = ["uid=", "root", "www-data", "nobody", "daemon", "sandbox"] for pattern in patterns: if pattern in stdout.lower(): is_vulnerable = True evidence = f"命令 '{test_command}' 执行成功,输出包含 '{pattern}'" break # 如果有任何输出且包含典型格式 if not is_vulnerable and stdout: is_vulnerable = True evidence = f"命令 '{test_command}' 有输出: {stdout[:100]}" elif test_command.startswith("echo "): expected = test_command[5:] if expected.lower() in stdout.lower(): is_vulnerable = True evidence = f"Echo 命令执行成功,输出包含 '{expected}'" elif test_command.startswith("cat "): if ":" in stdout or "root" in stdout.lower() or "bin" in stdout.lower(): is_vulnerable = True evidence = f"文件读取成功: {stdout[:100]}" else: # 通用检测 if len(stdout) > 0: is_vulnerable = True evidence = f"命令可能执行成功,输出: {stdout[:200]}" if is_vulnerable: poc = f"curl 'http://target/{target_file}?{param_name}={test_command.replace(' ', '+')}" # 格式化输出 output_parts = ["🎯 命令注入测试结果\n"] output_parts.append(f"目标文件: {target_file}") output_parts.append(f"语言: {language}") output_parts.append(f"注入参数: {param_name}") output_parts.append(f"测试命令: {test_command}") output_parts.append(f"\n退出码: {result['exit_code']}") if result.get("stdout"): output_parts.append(f"\n命令输出:\n```\n{result['stdout'][:2000]}\n```") if result.get("stderr"): output_parts.append(f"\n错误输出:\n```\n{result['stderr'][:500]}\n```") if is_vulnerable: output_parts.append(f"\n\n🔴 **漏洞已确认!**") output_parts.append(f"证据: {evidence}") if poc: output_parts.append(f"\nPoC: `{poc}`") else: output_parts.append(f"\n\n🟡 未能确认漏洞") return ToolResult( success=True, data="\n".join(output_parts), metadata={ "vulnerability_type": "command_injection", "is_vulnerable": is_vulnerable, "evidence": evidence, "poc": poc, "language": language, } ) async def _test_by_language(self, language: str, code: str, param_name: str, test_command: str) -> Dict: """根据语言执行测试""" if language == "php": return await self._test_php(code, param_name, test_command) elif language == "python": return await self._test_python(code, param_name, test_command) elif language in ["javascript", "js", "node"]: return await self._test_javascript(code, param_name, test_command) elif language == "java": return await self._test_java(code, param_name, test_command) elif language in ["go", "golang"]: return await self._test_go(code, param_name, test_command) elif language in ["ruby", "rb"]: return await self._test_ruby(code, param_name, test_command) else: return await self._test_shell(code, param_name, test_command) async def _test_php(self, code: str, param_name: str, test_command: str) -> Dict: """测试 PHP 命令注入 注意: php -r 不需要 "): clean_code = clean_code[:-2] full_code = wrapper + clean_code.strip() escaped = full_code.replace("'", "'\"'\"'") return await self.sandbox_manager.execute_command(f"php -r '{escaped}'", timeout=30) async def _test_python(self, code: str, param_name: str, test_command: str) -> Dict: """测试 Python 命令注入""" wrapper = f""" import sys, os class MockArgs: def get(self, key, default=None): if key == '{param_name}': return '{test_command}' return default class MockRequest: args = MockArgs() form = MockArgs() values = MockArgs() request = MockRequest() sys.argv = ['script.py', '{test_command}'] os.environ['{param_name.upper()}'] = '{test_command}' """ full_code = wrapper + code escaped = full_code.replace("'", "'\"'\"'") return await self.sandbox_manager.execute_command(f"python3 -c '{escaped}'", timeout=30) async def _test_javascript(self, code: str, param_name: str, test_command: str) -> Dict: """测试 JavaScript 命令注入""" wrapper = f""" const req = {{ query: {{ '{param_name}': '{test_command}' }}, body: {{ '{param_name}': '{test_command}' }}, params: {{ '{param_name}': '{test_command}' }}, }}; process.argv = ['node', 'script.js', '{test_command}']; process.env['{param_name.upper()}'] = '{test_command}'; """ full_code = wrapper + code escaped = full_code.replace("'", "'\"'\"'") return await self.sandbox_manager.execute_command(f"node -e '{escaped}'", timeout=30) async def _test_java(self, code: str, param_name: str, test_command: str) -> Dict: """测试 Java 命令注入""" # 简化处理 - Java 需要完整类结构 wrapper = f""" import java.io.*; import java.util.*; public class Test {{ public static void main(String[] args) throws Exception {{ Map params = new HashMap<>(); params.put("{param_name}", "{test_command}"); String[] argv = new String[]{{"{test_command}"}}; {code} }} }} """ escaped = wrapper.replace("'", "'\"'\"'").replace("\\", "\\\\") return await self.sandbox_manager.execute_command( f"echo '{escaped}' > /tmp/Test.java && javac /tmp/Test.java 2>&1 && java -cp /tmp Test 2>&1", timeout=60 ) async def _test_go(self, code: str, param_name: str, test_command: str) -> Dict: """测试 Go 命令注入""" if "package main" not in code: code = f"""package main import ( "fmt" "os" "os/exec" ) func main() {{ os.Args = []string{{"program", "{test_command}"}} os.Setenv("{param_name.upper()}", "{test_command}") params := map[string]string{{"{param_name}": "{test_command}"}} _ = params {code} }} """ escaped = code.replace("'", "'\"'\"'").replace("\\", "\\\\") return await self.sandbox_manager.execute_command( f"echo '{escaped}' > /tmp/main.go && go run /tmp/main.go 2>&1", timeout=60 ) async def _test_ruby(self, code: str, param_name: str, test_command: str) -> Dict: """测试 Ruby 命令注入""" wrapper = f""" ARGV[0] = "{test_command}" ENV["{param_name.upper()}"] = "{test_command}" def params @params ||= {{ "{param_name}" => "{test_command}" }} end """ full_code = wrapper + code escaped = full_code.replace("'", "'\"'\"'") return await self.sandbox_manager.execute_command(f"ruby -e '{escaped}'", timeout=30) async def _test_shell(self, code: str, param_name: str, test_command: str) -> Dict: """测试 Shell 命令注入""" wrapper = f"""#!/bin/bash export {param_name.upper()}="{test_command}" set -- "{test_command}" """ full_code = wrapper + code escaped = full_code.replace("'", "'\"'\"'") return await self.sandbox_manager.execute_command(f"bash -c '{escaped}'", timeout=30) # ============ SQL 注入测试工具 ============ class SqlInjectionTestInput(BaseModel): """SQL 注入测试输入""" target_file: str = Field(..., description="目标文件路径") param_name: str = Field(default="id", description="注入参数名") payload: str = Field(default="1' OR '1'='1", description="SQL 注入 payload") language: str = Field(default="auto", description="语言: auto, php, python, javascript, java, go, ruby") db_type: str = Field(default="mysql", description="数据库类型: mysql, postgresql, sqlite, oracle, mssql") class SqlInjectionTestTool(AgentTool): """SQL 注入漏洞测试工具""" # SQL 错误特征 SQL_ERROR_PATTERNS = { "mysql": [ r"SQL syntax.*MySQL", r"Warning.*mysql_", r"MySQLSyntaxErrorException", r"valid MySQL result", r"check the manual that corresponds to your MySQL", r"mysql_fetch", r"mysqli_", ], "postgresql": [ r"PostgreSQL.*ERROR", r"Warning.*pg_", r"valid PostgreSQL result", r"Npgsql\.", r"PSQLException", ], "sqlite": [ r"SQLite.*error", r"sqlite3\.OperationalError", r"SQLITE_ERROR", r"SQLite3::SQLException", ], "oracle": [ r"ORA-\d{5}", r"Oracle error", r"Oracle.*Driver", r"Warning.*oci_", ], "mssql": [ r"ODBC Driver.*SQL Server", r"SqlException", r"Unclosed quotation mark", r"SQL Server.*Error", ], "generic": [ r"SQL syntax", r"unclosed quotation", r"quoted string not properly terminated", r"sql error", r"database error", r"query failed", ], } def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."): super().__init__() self.sandbox_manager = sandbox_manager or SandboxManager() self.project_root = project_root @property def name(self) -> str: return "test_sql_injection" @property def description(self) -> str: return """专门测试 SQL 注入漏洞的工具。 支持数据库: MySQL, PostgreSQL, SQLite, Oracle, MSSQL 输入: - target_file: 目标文件路径 - param_name: 注入参数名 (默认 'id') - payload: SQL 注入 payload (默认 "1' OR '1'='1") - language: 语言 (auto 自动检测) - db_type: 数据库类型 (默认 mysql) 常用 Payload: - 布尔盲注: "1' AND '1'='1" - 联合查询: "1' UNION SELECT 1,2,3--" - 报错注入: "1' AND extractvalue(1,concat(0x7e,version()))--" - 时间盲注: "1' AND SLEEP(5)--" 示例: {"target_file": "login.php", "param_name": "username", "payload": "admin'--"}""" @property def args_schema(self): return SqlInjectionTestInput def _detect_sql_error(self, output: str, db_type: str = "mysql") -> Optional[str]: """检测 SQL 错误特征""" output_lower = output.lower() # 先检测特定数据库 patterns = self.SQL_ERROR_PATTERNS.get(db_type, []) for pattern in patterns: if re.search(pattern, output, re.IGNORECASE): return f"检测到 {db_type.upper()} 错误: {pattern}" # 通用检测 for pattern in self.SQL_ERROR_PATTERNS["generic"]: if re.search(pattern, output, re.IGNORECASE): return f"检测到 SQL 错误: {pattern}" return None async def _execute( self, target_file: str, param_name: str = "id", payload: str = "1' OR '1'='1", language: str = "auto", db_type: str = "mysql", **kwargs ) -> ToolResult: """执行 SQL 注入测试""" try: await self.sandbox_manager.initialize() except Exception as e: logger.warning(f"Sandbox init failed: {e}") if not self.sandbox_manager.is_available: return ToolResult(success=False, error="沙箱环境不可用") # 读取目标文件 full_path = os.path.join(self.project_root, target_file) if not os.path.exists(full_path): return ToolResult(success=False, error=f"文件不存在: {target_file}") with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: code = f.read() # 检测语言 if language == "auto": ext = os.path.splitext(target_file)[1].lower() language = {".php": "php", ".py": "python", ".js": "javascript"}.get(ext, "php") # 执行测试 result = await self._test_sql_injection(language, code, param_name, payload) # 分析结果 is_vulnerable = False evidence = None if result.get("stdout") or result.get("stderr"): output = (result.get("stdout", "") + result.get("stderr", "")) error_detected = self._detect_sql_error(output, db_type) if error_detected: is_vulnerable = True evidence = error_detected # 检测数据泄露 if not is_vulnerable: leak_patterns = [ r"\d+\s*\|\s*\d+", # 表格输出 r"admin|root|user", # 用户名泄露 r"password|passwd|pwd", # 密码相关 ] for pattern in leak_patterns: if re.search(pattern, output, re.IGNORECASE): is_vulnerable = True evidence = f"可能存在数据泄露: {pattern}" break # 构建 PoC poc = None if is_vulnerable: encoded_payload = payload.replace("'", "%27").replace(" ", "+") poc = f"curl 'http://target/{target_file}?{param_name}={encoded_payload}'" # 格式化输出 output_parts = ["💉 SQL 注入测试结果\n"] output_parts.append(f"目标文件: {target_file}") output_parts.append(f"数据库类型: {db_type}") output_parts.append(f"注入参数: {param_name}") output_parts.append(f"Payload: {payload}") output_parts.append(f"\n退出码: {result.get('exit_code', -1)}") if result.get("stdout"): output_parts.append(f"\n输出:\n```\n{result['stdout'][:2000]}\n```") if result.get("stderr"): output_parts.append(f"\n错误:\n```\n{result['stderr'][:1000]}\n```") if is_vulnerable: output_parts.append(f"\n\n🔴 **SQL 注入漏洞确认!**") output_parts.append(f"证据: {evidence}") if poc: output_parts.append(f"\nPoC: `{poc}`") else: output_parts.append(f"\n\n🟡 未能确认 SQL 注入漏洞") return ToolResult( success=True, data="\n".join(output_parts), metadata={ "vulnerability_type": "sql_injection", "is_vulnerable": is_vulnerable, "evidence": evidence, "poc": poc, "db_type": db_type, } ) async def _test_sql_injection(self, language: str, code: str, param_name: str, payload: str) -> Dict: """根据语言测试 SQL 注入""" # 使用安全的 payload 转义 safe_payload = payload.replace("'", "\\'") if language == "php": # php -r 不需要 "): clean_code = clean_code[:-2] full_code = wrapper + clean_code.strip() escaped = full_code.replace("'", "'\"'\"'") return await self.sandbox_manager.execute_command(f"php -r '{escaped}'", timeout=30) elif language == "python": wrapper = f""" import sys class MockArgs: def get(self, key, default=None): if key == '{param_name}': return '''{safe_payload}''' return default class MockRequest: args = MockArgs() form = MockArgs() request = MockRequest() """ full_code = wrapper + code escaped = full_code.replace("'", "'\"'\"'") return await self.sandbox_manager.execute_command(f"python3 -c '{escaped}'", timeout=30) else: return {"exit_code": -1, "stdout": "", "stderr": f"不支持的语言: {language}"} # ============ XSS 测试工具 ============ class XssTestInput(BaseModel): """XSS 测试输入""" target_file: str = Field(..., description="目标文件路径") param_name: str = Field(default="input", description="注入参数名") payload: str = Field(default="", description="XSS payload") xss_type: str = Field(default="reflected", description="XSS 类型: reflected, stored, dom") language: str = Field(default="auto", description="语言: auto, php, python, javascript") class XssTestTool(AgentTool): """XSS 漏洞测试工具""" XSS_PAYLOADS = [ "", "", "", "javascript:alert('XSS')", "'>", "\">", "", ] def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."): super().__init__() self.sandbox_manager = sandbox_manager or SandboxManager() self.project_root = project_root @property def name(self) -> str: return "test_xss" @property def description(self) -> str: return """专门测试 XSS (跨站脚本) 漏洞的工具。 支持类型: Reflected XSS, Stored XSS, DOM XSS 输入: - target_file: 目标文件路径 - param_name: 注入参数名 (默认 'input') - payload: XSS payload (默认 "") - xss_type: XSS 类型 (reflected, stored, dom) 常用 Payload: - Script 标签: - 事件处理: - SVG: 示例: {"target_file": "search.php", "param_name": "q", "payload": ""}""" @property def args_schema(self): return XssTestInput async def _execute( self, target_file: str, param_name: str = "input", payload: str = "", xss_type: str = "reflected", language: str = "auto", **kwargs ) -> ToolResult: """执行 XSS 测试""" try: await self.sandbox_manager.initialize() except Exception as e: logger.warning(f"Sandbox init failed: {e}") if not self.sandbox_manager.is_available: return ToolResult(success=False, error="沙箱环境不可用") # 读取目标文件 full_path = os.path.join(self.project_root, target_file) if not os.path.exists(full_path): return ToolResult(success=False, error=f"文件不存在: {target_file}") with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: code = f.read() # 检测语言 if language == "auto": ext = os.path.splitext(target_file)[1].lower() language = {".php": "php", ".py": "python", ".js": "javascript"}.get(ext, "php") # 执行测试 result = await self._test_xss(language, code, param_name, payload) # 分析结果 - 检查 payload 是否被反射 is_vulnerable = False evidence = None if result.get("stdout"): output = result["stdout"] # 检查 payload 是否原样出现在输出中 if payload in output: is_vulnerable = True evidence = "XSS payload 被原样反射到输出中" # 检查关键字符是否被编码 elif "