CodeReview/backend/app/services/agent/tools/sandbox_vuln.py

1566 lines
54 KiB
Python

"""
漏洞验证专用工具
支持各类经典漏洞的沙箱验证测试
"""
import asyncio
import json
import logging
import os
import re
import tempfile
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
from dataclasses import dataclass
from enum import Enum
from .base import AgentTool, ToolResult
from .sandbox_tool import SandboxManager
logger = logging.getLogger(__name__)
class VulnType(str, Enum):
"""漏洞类型枚举"""
SQL_INJECTION = "sql_injection"
COMMAND_INJECTION = "command_injection"
CODE_INJECTION = "code_injection"
XSS = "xss"
PATH_TRAVERSAL = "path_traversal"
SSRF = "ssrf"
XXE = "xxe"
DESERIALIZATION = "deserialization"
SSTI = "ssti"
LDAP_INJECTION = "ldap_injection"
NOSQL_INJECTION = "nosql_injection"
XPATH_INJECTION = "xpath_injection"
# ============ 命令注入测试工具 ============
class CommandInjectionTestInput(BaseModel):
"""命令注入测试输入"""
target_file: str = Field(..., description="目标文件路径")
param_name: str = Field(default="cmd", description="注入参数名")
test_command: str = Field(default="id", description="测试命令: id, whoami, echo test, cat /etc/passwd")
language: str = Field(default="auto", description="语言: auto, php, python, javascript, java, go, ruby, shell")
injection_point: Optional[str] = Field(default=None, description="注入点描述,如 'shell_exec($_GET[cmd])'")
class CommandInjectionTestTool(AgentTool):
"""
命令注入漏洞测试工具
支持多种语言和框架,自动构建测试环境
"""
def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."):
super().__init__()
self.sandbox_manager = sandbox_manager or SandboxManager()
self.project_root = project_root
@property
def name(self) -> str:
return "test_command_injection"
@property
def description(self) -> str:
return """专门测试命令注入漏洞的工具。
支持语言: PHP, Python, JavaScript, Java, Go, Ruby, Shell
输入:
- target_file: 目标文件路径
- param_name: 注入参数名 (默认 'cmd')
- test_command: 测试命令 (默认 'id')
- 'id' - 显示用户ID
- 'whoami' - 显示用户名
- 'cat /etc/passwd' - 读取密码文件
- 'echo VULN_TEST' - 输出测试字符串
- language: 语言 (auto 自动检测)
示例:
1. PHP: {"target_file": "vuln.php", "param_name": "cmd", "test_command": "whoami"}
2. Python: {"target_file": "app.py", "param_name": "cmd", "language": "python"}
3. 自定义: {"target_file": "api.js", "test_command": "echo PWNED"}
漏洞确认条件:
- 命令输出包含预期结果 (uid=, root, www-data 等)
- 或自定义 echo 内容出现在输出中"""
@property
def args_schema(self):
return CommandInjectionTestInput
def _detect_language(self, file_path: str, code: str) -> str:
"""自动检测语言"""
ext = os.path.splitext(file_path)[1].lower()
ext_map = {
".php": "php",
".py": "python",
".js": "javascript",
".ts": "javascript",
".java": "java",
".go": "go",
".rb": "ruby",
".sh": "shell",
".bash": "shell",
}
if ext in ext_map:
return ext_map[ext]
# 基于内容检测
if "<?php" in code or "<?=" in code:
return "php"
if "import " in code and ("os." in code or "subprocess" in code):
return "python"
if "require(" in code or "import " in code:
return "javascript"
if "package main" in code:
return "go"
if "class " in code and "public " in code:
return "java"
if "#!/bin/bash" in code or "#!/bin/sh" in code:
return "shell"
return "shell" # 默认
async def _execute(
self,
target_file: str,
param_name: str = "cmd",
test_command: str = "id",
language: str = "auto",
injection_point: Optional[str] = None,
**kwargs
) -> ToolResult:
"""执行命令注入测试"""
try:
await self.sandbox_manager.initialize()
except Exception as e:
logger.warning(f"Sandbox init failed: {e}")
if not self.sandbox_manager.is_available:
return ToolResult(success=False, error="沙箱环境不可用")
# 读取目标文件
full_path = os.path.join(self.project_root, target_file)
if not os.path.exists(full_path):
return ToolResult(success=False, error=f"文件不存在: {target_file}")
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
code = f.read()
# 检测语言
if language == "auto":
language = self._detect_language(target_file, code)
# 根据语言构建测试
result = await self._test_by_language(language, code, param_name, test_command)
# 分析结果
is_vulnerable = False
evidence = None
poc = None
if result["exit_code"] == 0 and result.get("stdout"):
stdout = result["stdout"].strip()
# 检测命令执行特征
if test_command in ["id", "whoami"]:
patterns = ["uid=", "root", "www-data", "nobody", "daemon", "sandbox"]
for pattern in patterns:
if pattern in stdout.lower():
is_vulnerable = True
evidence = f"命令 '{test_command}' 执行成功,输出包含 '{pattern}'"
break
# 如果有任何输出且包含典型格式
if not is_vulnerable and stdout:
is_vulnerable = True
evidence = f"命令 '{test_command}' 有输出: {stdout[:100]}"
elif test_command.startswith("echo "):
expected = test_command[5:]
if expected.lower() in stdout.lower():
is_vulnerable = True
evidence = f"Echo 命令执行成功,输出包含 '{expected}'"
elif test_command.startswith("cat "):
if ":" in stdout or "root" in stdout.lower() or "bin" in stdout.lower():
is_vulnerable = True
evidence = f"文件读取成功: {stdout[:100]}"
else:
# 通用检测
if len(stdout) > 0:
is_vulnerable = True
evidence = f"命令可能执行成功,输出: {stdout[:200]}"
if is_vulnerable:
poc = f"curl 'http://target/{target_file}?{param_name}={test_command.replace(' ', '+')}"
# 格式化输出
output_parts = ["🎯 命令注入测试结果\n"]
output_parts.append(f"目标文件: {target_file}")
output_parts.append(f"语言: {language}")
output_parts.append(f"注入参数: {param_name}")
output_parts.append(f"测试命令: {test_command}")
output_parts.append(f"\n退出码: {result['exit_code']}")
if result.get("stdout"):
output_parts.append(f"\n命令输出:\n```\n{result['stdout'][:2000]}\n```")
if result.get("stderr"):
output_parts.append(f"\n错误输出:\n```\n{result['stderr'][:500]}\n```")
if is_vulnerable:
output_parts.append(f"\n\n🔴 **漏洞已确认!**")
output_parts.append(f"证据: {evidence}")
if poc:
output_parts.append(f"\nPoC: `{poc}`")
else:
output_parts.append(f"\n\n🟡 未能确认漏洞")
return ToolResult(
success=True,
data="\n".join(output_parts),
metadata={
"vulnerability_type": "command_injection",
"is_vulnerable": is_vulnerable,
"evidence": evidence,
"poc": poc,
"language": language,
}
)
async def _test_by_language(self, language: str, code: str, param_name: str, test_command: str) -> Dict:
"""根据语言执行测试"""
if language == "php":
return await self._test_php(code, param_name, test_command)
elif language == "python":
return await self._test_python(code, param_name, test_command)
elif language in ["javascript", "js", "node"]:
return await self._test_javascript(code, param_name, test_command)
elif language == "java":
return await self._test_java(code, param_name, test_command)
elif language in ["go", "golang"]:
return await self._test_go(code, param_name, test_command)
elif language in ["ruby", "rb"]:
return await self._test_ruby(code, param_name, test_command)
else:
return await self._test_shell(code, param_name, test_command)
async def _test_php(self, code: str, param_name: str, test_command: str) -> Dict:
"""测试 PHP 命令注入
注意: php -r 不需要 <?php 标签,直接执行纯 PHP 代码
"""
# 模拟超全局变量(不需要 <?php 标签)
wrapper = f"""$_GET['{param_name}'] = '{test_command}';
$_POST['{param_name}'] = '{test_command}';
$_REQUEST['{param_name}'] = '{test_command}';
"""
# 清理原代码的 PHP 标签
clean_code = code.strip()
if clean_code.startswith("<?php"):
clean_code = clean_code[5:]
elif clean_code.startswith("<?"):
clean_code = clean_code[2:]
if clean_code.endswith("?>"):
clean_code = clean_code[:-2]
full_code = wrapper + clean_code.strip()
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"php -r '{escaped}'", timeout=30)
async def _test_python(self, code: str, param_name: str, test_command: str) -> Dict:
"""测试 Python 命令注入"""
wrapper = f"""
import sys, os
class MockArgs:
def get(self, key, default=None):
if key == '{param_name}':
return '{test_command}'
return default
class MockRequest:
args = MockArgs()
form = MockArgs()
values = MockArgs()
request = MockRequest()
sys.argv = ['script.py', '{test_command}']
os.environ['{param_name.upper()}'] = '{test_command}'
"""
full_code = wrapper + code
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"python3 -c '{escaped}'", timeout=30)
async def _test_javascript(self, code: str, param_name: str, test_command: str) -> Dict:
"""测试 JavaScript 命令注入"""
wrapper = f"""
const req = {{
query: {{ '{param_name}': '{test_command}' }},
body: {{ '{param_name}': '{test_command}' }},
params: {{ '{param_name}': '{test_command}' }},
}};
process.argv = ['node', 'script.js', '{test_command}'];
process.env['{param_name.upper()}'] = '{test_command}';
"""
full_code = wrapper + code
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"node -e '{escaped}'", timeout=30)
async def _test_java(self, code: str, param_name: str, test_command: str) -> Dict:
"""测试 Java 命令注入"""
# 简化处理 - Java 需要完整类结构
wrapper = f"""
import java.io.*;
import java.util.*;
public class Test {{
public static void main(String[] args) throws Exception {{
Map<String, String> params = new HashMap<>();
params.put("{param_name}", "{test_command}");
String[] argv = new String[]{{"{test_command}"}};
{code}
}}
}}
"""
escaped = wrapper.replace("'", "'\"'\"'").replace("\\", "\\\\")
return await self.sandbox_manager.execute_command(
f"echo '{escaped}' > /tmp/Test.java && javac /tmp/Test.java 2>&1 && java -cp /tmp Test 2>&1",
timeout=60
)
async def _test_go(self, code: str, param_name: str, test_command: str) -> Dict:
"""测试 Go 命令注入"""
if "package main" not in code:
code = f"""package main
import (
"fmt"
"os"
"os/exec"
)
func main() {{
os.Args = []string{{"program", "{test_command}"}}
os.Setenv("{param_name.upper()}", "{test_command}")
params := map[string]string{{"{param_name}": "{test_command}"}}
_ = params
{code}
}}
"""
escaped = code.replace("'", "'\"'\"'").replace("\\", "\\\\")
return await self.sandbox_manager.execute_command(
f"echo '{escaped}' > /tmp/main.go && go run /tmp/main.go 2>&1",
timeout=60
)
async def _test_ruby(self, code: str, param_name: str, test_command: str) -> Dict:
"""测试 Ruby 命令注入"""
wrapper = f"""
ARGV[0] = "{test_command}"
ENV["{param_name.upper()}"] = "{test_command}"
def params
@params ||= {{ "{param_name}" => "{test_command}" }}
end
"""
full_code = wrapper + code
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"ruby -e '{escaped}'", timeout=30)
async def _test_shell(self, code: str, param_name: str, test_command: str) -> Dict:
"""测试 Shell 命令注入"""
wrapper = f"""#!/bin/bash
export {param_name.upper()}="{test_command}"
set -- "{test_command}"
"""
full_code = wrapper + code
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"bash -c '{escaped}'", timeout=30)
# ============ SQL 注入测试工具 ============
class SqlInjectionTestInput(BaseModel):
"""SQL 注入测试输入"""
target_file: str = Field(..., description="目标文件路径")
param_name: str = Field(default="id", description="注入参数名")
payload: str = Field(default="1' OR '1'='1", description="SQL 注入 payload")
language: str = Field(default="auto", description="语言: auto, php, python, javascript, java, go, ruby")
db_type: str = Field(default="mysql", description="数据库类型: mysql, postgresql, sqlite, oracle, mssql")
class SqlInjectionTestTool(AgentTool):
"""SQL 注入漏洞测试工具"""
# SQL 错误特征
SQL_ERROR_PATTERNS = {
"mysql": [
r"SQL syntax.*MySQL",
r"Warning.*mysql_",
r"MySQLSyntaxErrorException",
r"valid MySQL result",
r"check the manual that corresponds to your MySQL",
r"mysql_fetch",
r"mysqli_",
],
"postgresql": [
r"PostgreSQL.*ERROR",
r"Warning.*pg_",
r"valid PostgreSQL result",
r"Npgsql\.",
r"PSQLException",
],
"sqlite": [
r"SQLite.*error",
r"sqlite3\.OperationalError",
r"SQLITE_ERROR",
r"SQLite3::SQLException",
],
"oracle": [
r"ORA-\d{5}",
r"Oracle error",
r"Oracle.*Driver",
r"Warning.*oci_",
],
"mssql": [
r"ODBC Driver.*SQL Server",
r"SqlException",
r"Unclosed quotation mark",
r"SQL Server.*Error",
],
"generic": [
r"SQL syntax",
r"unclosed quotation",
r"quoted string not properly terminated",
r"sql error",
r"database error",
r"query failed",
],
}
def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."):
super().__init__()
self.sandbox_manager = sandbox_manager or SandboxManager()
self.project_root = project_root
@property
def name(self) -> str:
return "test_sql_injection"
@property
def description(self) -> str:
return """专门测试 SQL 注入漏洞的工具。
支持数据库: MySQL, PostgreSQL, SQLite, Oracle, MSSQL
输入:
- target_file: 目标文件路径
- param_name: 注入参数名 (默认 'id')
- payload: SQL 注入 payload (默认 "1' OR '1'='1")
- language: 语言 (auto 自动检测)
- db_type: 数据库类型 (默认 mysql)
常用 Payload:
- 布尔盲注: "1' AND '1'='1"
- 联合查询: "1' UNION SELECT 1,2,3--"
- 报错注入: "1' AND extractvalue(1,concat(0x7e,version()))--"
- 时间盲注: "1' AND SLEEP(5)--"
示例:
{"target_file": "login.php", "param_name": "username", "payload": "admin'--"}"""
@property
def args_schema(self):
return SqlInjectionTestInput
def _detect_sql_error(self, output: str, db_type: str = "mysql") -> Optional[str]:
"""检测 SQL 错误特征"""
output_lower = output.lower()
# 先检测特定数据库
patterns = self.SQL_ERROR_PATTERNS.get(db_type, [])
for pattern in patterns:
if re.search(pattern, output, re.IGNORECASE):
return f"检测到 {db_type.upper()} 错误: {pattern}"
# 通用检测
for pattern in self.SQL_ERROR_PATTERNS["generic"]:
if re.search(pattern, output, re.IGNORECASE):
return f"检测到 SQL 错误: {pattern}"
return None
async def _execute(
self,
target_file: str,
param_name: str = "id",
payload: str = "1' OR '1'='1",
language: str = "auto",
db_type: str = "mysql",
**kwargs
) -> ToolResult:
"""执行 SQL 注入测试"""
try:
await self.sandbox_manager.initialize()
except Exception as e:
logger.warning(f"Sandbox init failed: {e}")
if not self.sandbox_manager.is_available:
return ToolResult(success=False, error="沙箱环境不可用")
# 读取目标文件
full_path = os.path.join(self.project_root, target_file)
if not os.path.exists(full_path):
return ToolResult(success=False, error=f"文件不存在: {target_file}")
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
code = f.read()
# 检测语言
if language == "auto":
ext = os.path.splitext(target_file)[1].lower()
language = {".php": "php", ".py": "python", ".js": "javascript"}.get(ext, "php")
# 执行测试
result = await self._test_sql_injection(language, code, param_name, payload)
# 分析结果
is_vulnerable = False
evidence = None
if result.get("stdout") or result.get("stderr"):
output = (result.get("stdout", "") + result.get("stderr", ""))
error_detected = self._detect_sql_error(output, db_type)
if error_detected:
is_vulnerable = True
evidence = error_detected
# 检测数据泄露
if not is_vulnerable:
leak_patterns = [
r"\d+\s*\|\s*\d+", # 表格输出
r"admin|root|user", # 用户名泄露
r"password|passwd|pwd", # 密码相关
]
for pattern in leak_patterns:
if re.search(pattern, output, re.IGNORECASE):
is_vulnerable = True
evidence = f"可能存在数据泄露: {pattern}"
break
# 构建 PoC
poc = None
if is_vulnerable:
encoded_payload = payload.replace("'", "%27").replace(" ", "+")
poc = f"curl 'http://target/{target_file}?{param_name}={encoded_payload}'"
# 格式化输出
output_parts = ["💉 SQL 注入测试结果\n"]
output_parts.append(f"目标文件: {target_file}")
output_parts.append(f"数据库类型: {db_type}")
output_parts.append(f"注入参数: {param_name}")
output_parts.append(f"Payload: {payload}")
output_parts.append(f"\n退出码: {result.get('exit_code', -1)}")
if result.get("stdout"):
output_parts.append(f"\n输出:\n```\n{result['stdout'][:2000]}\n```")
if result.get("stderr"):
output_parts.append(f"\n错误:\n```\n{result['stderr'][:1000]}\n```")
if is_vulnerable:
output_parts.append(f"\n\n🔴 **SQL 注入漏洞确认!**")
output_parts.append(f"证据: {evidence}")
if poc:
output_parts.append(f"\nPoC: `{poc}`")
else:
output_parts.append(f"\n\n🟡 未能确认 SQL 注入漏洞")
return ToolResult(
success=True,
data="\n".join(output_parts),
metadata={
"vulnerability_type": "sql_injection",
"is_vulnerable": is_vulnerable,
"evidence": evidence,
"poc": poc,
"db_type": db_type,
}
)
async def _test_sql_injection(self, language: str, code: str, param_name: str, payload: str) -> Dict:
"""根据语言测试 SQL 注入"""
# 使用安全的 payload 转义
safe_payload = payload.replace("'", "\\'")
if language == "php":
# php -r 不需要 <?php 标签
wrapper = f"""$_GET['{param_name}'] = '{safe_payload}';
$_POST['{param_name}'] = '{safe_payload}';
$_REQUEST['{param_name}'] = '{safe_payload}';
error_reporting(E_ALL);
ini_set('display_errors', 1);
"""
clean_code = code.strip()
if clean_code.startswith("<?php"):
clean_code = clean_code[5:]
elif clean_code.startswith("<?"):
clean_code = clean_code[2:]
if clean_code.endswith("?>"):
clean_code = clean_code[:-2]
full_code = wrapper + clean_code.strip()
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"php -r '{escaped}'", timeout=30)
elif language == "python":
wrapper = f"""
import sys
class MockArgs:
def get(self, key, default=None):
if key == '{param_name}':
return '''{safe_payload}'''
return default
class MockRequest:
args = MockArgs()
form = MockArgs()
request = MockRequest()
"""
full_code = wrapper + code
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"python3 -c '{escaped}'", timeout=30)
else:
return {"exit_code": -1, "stdout": "", "stderr": f"不支持的语言: {language}"}
# ============ XSS 测试工具 ============
class XssTestInput(BaseModel):
"""XSS 测试输入"""
target_file: str = Field(..., description="目标文件路径")
param_name: str = Field(default="input", description="注入参数名")
payload: str = Field(default="<script>alert('XSS')</script>", description="XSS payload")
xss_type: str = Field(default="reflected", description="XSS 类型: reflected, stored, dom")
language: str = Field(default="auto", description="语言: auto, php, python, javascript")
class XssTestTool(AgentTool):
"""XSS 漏洞测试工具"""
XSS_PAYLOADS = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"<svg onload=alert('XSS')>",
"javascript:alert('XSS')",
"'><script>alert('XSS')</script>",
"\"><script>alert('XSS')</script>",
"<body onload=alert('XSS')>",
]
def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."):
super().__init__()
self.sandbox_manager = sandbox_manager or SandboxManager()
self.project_root = project_root
@property
def name(self) -> str:
return "test_xss"
@property
def description(self) -> str:
return """专门测试 XSS (跨站脚本) 漏洞的工具。
支持类型: Reflected XSS, Stored XSS, DOM XSS
输入:
- target_file: 目标文件路径
- param_name: 注入参数名 (默认 'input')
- payload: XSS payload (默认 "<script>alert('XSS')</script>")
- xss_type: XSS 类型 (reflected, stored, dom)
常用 Payload:
- Script 标签: <script>alert('XSS')</script>
- 事件处理: <img src=x onerror=alert('XSS')>
- SVG: <svg onload=alert('XSS')>
示例:
{"target_file": "search.php", "param_name": "q", "payload": "<script>alert(1)</script>"}"""
@property
def args_schema(self):
return XssTestInput
async def _execute(
self,
target_file: str,
param_name: str = "input",
payload: str = "<script>alert('XSS')</script>",
xss_type: str = "reflected",
language: str = "auto",
**kwargs
) -> ToolResult:
"""执行 XSS 测试"""
try:
await self.sandbox_manager.initialize()
except Exception as e:
logger.warning(f"Sandbox init failed: {e}")
if not self.sandbox_manager.is_available:
return ToolResult(success=False, error="沙箱环境不可用")
# 读取目标文件
full_path = os.path.join(self.project_root, target_file)
if not os.path.exists(full_path):
return ToolResult(success=False, error=f"文件不存在: {target_file}")
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
code = f.read()
# 检测语言
if language == "auto":
ext = os.path.splitext(target_file)[1].lower()
language = {".php": "php", ".py": "python", ".js": "javascript"}.get(ext, "php")
# 执行测试
result = await self._test_xss(language, code, param_name, payload)
# 分析结果 - 检查 payload 是否被反射
is_vulnerable = False
evidence = None
if result.get("stdout"):
output = result["stdout"]
# 检查 payload 是否原样出现在输出中
if payload in output:
is_vulnerable = True
evidence = "XSS payload 被原样反射到输出中"
# 检查关键字符是否被编码
elif "<script>" in payload and "<script>" not in output:
if "&lt;script&gt;" in output:
evidence = "Payload 被 HTML 编码 (部分防护)"
else:
evidence = "Payload 未出现在输出中"
# 检查事件处理器
elif "onerror=" in payload or "onload=" in payload:
if "onerror=" in output or "onload=" in output:
is_vulnerable = True
evidence = "事件处理器 payload 被反射"
# 构建 PoC
poc = None
if is_vulnerable:
encoded_payload = payload.replace("<", "%3C").replace(">", "%3E").replace("'", "%27")
poc = f"curl 'http://target/{target_file}?{param_name}={encoded_payload}'"
# 格式化输出
output_parts = ["🔍 XSS 测试结果\n"]
output_parts.append(f"目标文件: {target_file}")
output_parts.append(f"XSS 类型: {xss_type}")
output_parts.append(f"注入参数: {param_name}")
output_parts.append(f"Payload: {payload}")
output_parts.append(f"\n退出码: {result.get('exit_code', -1)}")
if result.get("stdout"):
output_parts.append(f"\n输出:\n```html\n{result['stdout'][:2000]}\n```")
if is_vulnerable:
output_parts.append(f"\n\n🔴 **XSS 漏洞确认!**")
output_parts.append(f"证据: {evidence}")
if poc:
output_parts.append(f"\nPoC: `{poc}`")
else:
output_parts.append(f"\n\n🟡 未能确认 XSS 漏洞")
if evidence:
output_parts.append(f"备注: {evidence}")
return ToolResult(
success=True,
data="\n".join(output_parts),
metadata={
"vulnerability_type": "xss",
"xss_type": xss_type,
"is_vulnerable": is_vulnerable,
"evidence": evidence,
"poc": poc,
}
)
async def _test_xss(self, language: str, code: str, param_name: str, payload: str) -> Dict:
"""测试 XSS"""
# 转义 payload 中的特殊字符
safe_payload = payload.replace("'", "\\'").replace('"', '\\"')
if language == "php":
# php -r 不需要 <?php 标签
wrapper = f"""$_GET['{param_name}'] = '{safe_payload}';
$_POST['{param_name}'] = '{safe_payload}';
$_REQUEST['{param_name}'] = '{safe_payload}';
"""
clean_code = code.strip()
if clean_code.startswith("<?php"):
clean_code = clean_code[5:]
elif clean_code.startswith("<?"):
clean_code = clean_code[2:]
if clean_code.endswith("?>"):
clean_code = clean_code[:-2]
full_code = wrapper + clean_code.strip()
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"php -r '{escaped}'", timeout=30)
elif language == "python":
wrapper = f"""
class MockArgs:
def get(self, key, default=None):
if key == '{param_name}':
return '''{safe_payload}'''
return default
class MockRequest:
args = MockArgs()
form = MockArgs()
request = MockRequest()
"""
full_code = wrapper + code
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"python3 -c '{escaped}'", timeout=30)
else:
return {"exit_code": -1, "stdout": "", "stderr": f"不支持的语言: {language}"}
# ============ 路径遍历测试工具 ============
class PathTraversalTestInput(BaseModel):
"""路径遍历测试输入"""
target_file: str = Field(..., description="目标文件路径")
param_name: str = Field(default="file", description="文件参数名")
payload: str = Field(default="../../../etc/passwd", description="路径遍历 payload")
language: str = Field(default="auto", description="语言")
class PathTraversalTestTool(AgentTool):
"""路径遍历漏洞测试工具"""
TRAVERSAL_PAYLOADS = [
"../../../etc/passwd",
"....//....//....//etc/passwd",
"..%2f..%2f..%2fetc/passwd",
"..%252f..%252f..%252fetc/passwd",
"/etc/passwd",
"....\\....\\....\\windows\\win.ini",
"..\\..\\..\\windows\\win.ini",
]
SENSITIVE_FILES = {
"unix": [
"/etc/passwd",
"/etc/shadow",
"/etc/hosts",
"/proc/self/environ",
"/var/log/apache2/access.log",
],
"windows": [
"C:\\Windows\\win.ini",
"C:\\Windows\\System32\\drivers\\etc\\hosts",
"C:\\boot.ini",
]
}
def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."):
super().__init__()
self.sandbox_manager = sandbox_manager or SandboxManager()
self.project_root = project_root
@property
def name(self) -> str:
return "test_path_traversal"
@property
def description(self) -> str:
return """专门测试路径遍历/LFI/RFI 漏洞的工具。
输入:
- target_file: 目标文件路径
- param_name: 文件参数名 (默认 'file')
- payload: 路径遍历 payload (默认 "../../../etc/passwd")
常用 Payload:
- Unix: ../../../etc/passwd
- 编码绕过: ..%2f..%2f..%2fetc/passwd
- 双写绕过: ....//....//....//etc/passwd
- Windows: ..\\..\\..\\windows\\win.ini
示例:
{"target_file": "download.php", "param_name": "file", "payload": "../../../etc/passwd"}"""
@property
def args_schema(self):
return PathTraversalTestInput
async def _execute(
self,
target_file: str,
param_name: str = "file",
payload: str = "../../../etc/passwd",
language: str = "auto",
**kwargs
) -> ToolResult:
"""执行路径遍历测试"""
try:
await self.sandbox_manager.initialize()
except Exception as e:
logger.warning(f"Sandbox init failed: {e}")
if not self.sandbox_manager.is_available:
return ToolResult(success=False, error="沙箱环境不可用")
# 读取目标文件
full_path = os.path.join(self.project_root, target_file)
if not os.path.exists(full_path):
return ToolResult(success=False, error=f"文件不存在: {target_file}")
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
code = f.read()
# 检测语言
if language == "auto":
ext = os.path.splitext(target_file)[1].lower()
language = {".php": "php", ".py": "python", ".js": "javascript"}.get(ext, "php")
# 执行测试
result = await self._test_traversal(language, code, param_name, payload)
# 分析结果
is_vulnerable = False
evidence = None
if result.get("stdout"):
output = result["stdout"]
# 检测敏感文件内容特征
passwd_patterns = [
r"root:.*:0:0:",
r"daemon:.*:",
r"nobody:.*:",
r"www-data:",
]
for pattern in passwd_patterns:
if re.search(pattern, output):
is_vulnerable = True
evidence = "成功读取 /etc/passwd 文件内容"
break
# Windows 特征
if not is_vulnerable:
win_patterns = [
r"\[fonts\]",
r"\[extensions\]",
r"for 16-bit app support",
]
for pattern in win_patterns:
if re.search(pattern, output, re.IGNORECASE):
is_vulnerable = True
evidence = "成功读取 Windows 系统文件"
break
# 构建 PoC
poc = None
if is_vulnerable:
encoded_payload = payload.replace("../", "..%2f")
poc = f"curl 'http://target/{target_file}?{param_name}={encoded_payload}'"
# 格式化输出
output_parts = ["📁 路径遍历测试结果\n"]
output_parts.append(f"目标文件: {target_file}")
output_parts.append(f"参数名: {param_name}")
output_parts.append(f"Payload: {payload}")
output_parts.append(f"\n退出码: {result.get('exit_code', -1)}")
if result.get("stdout"):
output_parts.append(f"\n输出:\n```\n{result['stdout'][:2000]}\n```")
if is_vulnerable:
output_parts.append(f"\n\n🔴 **路径遍历漏洞确认!**")
output_parts.append(f"证据: {evidence}")
if poc:
output_parts.append(f"\nPoC: `{poc}`")
else:
output_parts.append(f"\n\n🟡 未能确认路径遍历漏洞")
return ToolResult(
success=True,
data="\n".join(output_parts),
metadata={
"vulnerability_type": "path_traversal",
"is_vulnerable": is_vulnerable,
"evidence": evidence,
"poc": poc,
}
)
async def _test_traversal(self, language: str, code: str, param_name: str, payload: str) -> Dict:
"""测试路径遍历"""
if language == "php":
# php -r 不需要 <?php 标签
wrapper = f"""$_GET['{param_name}'] = '{payload}';
$_POST['{param_name}'] = '{payload}';
$_REQUEST['{param_name}'] = '{payload}';
"""
clean_code = code.strip()
if clean_code.startswith("<?php"):
clean_code = clean_code[5:]
elif clean_code.startswith("<?"):
clean_code = clean_code[2:]
if clean_code.endswith("?>"):
clean_code = clean_code[:-2]
full_code = wrapper + clean_code.strip()
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"php -r '{escaped}'", timeout=30)
elif language == "python":
wrapper = f"""
class MockArgs:
def get(self, key, default=None):
if key == '{param_name}':
return '{payload}'
return default
class MockRequest:
args = MockArgs()
request = MockRequest()
"""
full_code = wrapper + code
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"python3 -c '{escaped}'", timeout=30)
else:
return {"exit_code": -1, "stdout": "", "stderr": f"不支持的语言: {language}"}
# ============ SSTI (服务端模板注入) 测试工具 ============
class SstiTestInput(BaseModel):
"""SSTI 测试输入"""
target_file: str = Field(..., description="目标文件路径")
param_name: str = Field(default="name", description="注入参数名")
payload: str = Field(default="{{7*7}}", description="SSTI payload")
template_engine: str = Field(default="auto", description="模板引擎: auto, jinja2, twig, freemarker, velocity, smarty")
class SstiTestTool(AgentTool):
"""SSTI (服务端模板注入) 漏洞测试工具"""
SSTI_PAYLOADS = {
"jinja2": [
"{{7*7}}",
"{{config}}",
"{{''.__class__.__mro__[2].__subclasses__()}}",
],
"twig": [
"{{7*7}}",
"{{_self.env.getFilter('id')}}",
],
"freemarker": [
"${7*7}",
"<#assign ex=\"freemarker.template.utility.Execute\"?new()>${ex(\"id\")}",
],
"velocity": [
"#set($x=7*7)$x",
"#set($str=$class.inspect(\"java.lang.Runtime\").type.getRuntime().exec(\"id\"))",
],
"smarty": [
"{7*7}",
"{php}echo `id`;{/php}",
],
}
def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."):
super().__init__()
self.sandbox_manager = sandbox_manager or SandboxManager()
self.project_root = project_root
@property
def name(self) -> str:
return "test_ssti"
@property
def description(self) -> str:
return """专门测试 SSTI (服务端模板注入) 漏洞的工具。
支持模板引擎: Jinja2, Twig, Freemarker, Velocity, Smarty
输入:
- target_file: 目标文件路径
- param_name: 注入参数名
- payload: SSTI payload (默认 "{{7*7}}")
- template_engine: 模板引擎类型
常用 Payload:
- Jinja2/Twig: {{7*7}}, {{config}}
- Freemarker: ${7*7}
- Velocity: #set($x=7*7)$x
- Smarty: {7*7}
示例:
{"target_file": "render.py", "param_name": "name", "payload": "{{7*7}}", "template_engine": "jinja2"}"""
@property
def args_schema(self):
return SstiTestInput
async def _execute(
self,
target_file: str,
param_name: str = "name",
payload: str = "{{7*7}}",
template_engine: str = "auto",
**kwargs
) -> ToolResult:
"""执行 SSTI 测试"""
try:
await self.sandbox_manager.initialize()
except Exception as e:
logger.warning(f"Sandbox init failed: {e}")
if not self.sandbox_manager.is_available:
return ToolResult(success=False, error="沙箱环境不可用")
# 读取目标文件
full_path = os.path.join(self.project_root, target_file)
if not os.path.exists(full_path):
return ToolResult(success=False, error=f"文件不存在: {target_file}")
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
code = f.read()
# 检测语言和模板引擎
ext = os.path.splitext(target_file)[1].lower()
language = {".php": "php", ".py": "python", ".js": "javascript", ".java": "java"}.get(ext, "python")
if template_engine == "auto":
if "jinja" in code.lower() or "render_template" in code:
template_engine = "jinja2"
elif "twig" in code.lower():
template_engine = "twig"
elif "freemarker" in code.lower():
template_engine = "freemarker"
else:
template_engine = "jinja2"
# 执行测试
result = await self._test_ssti(language, code, param_name, payload)
# 分析结果
is_vulnerable = False
evidence = None
if result.get("stdout"):
output = result["stdout"]
# 检测数学表达式计算结果
if "{{7*7}}" in payload or "${7*7}" in payload or "{7*7}" in payload:
if "49" in output:
is_vulnerable = True
evidence = "模板表达式 7*7 被计算为 49"
# 检测配置泄露
if "{{config}}" in payload:
if "secret" in output.lower() or "debug" in output.lower():
is_vulnerable = True
evidence = "模板可以访问配置对象"
# 检测命令执行
if "id" in payload or "whoami" in payload:
if "uid=" in output or "root" in output.lower():
is_vulnerable = True
evidence = "SSTI 导致远程代码执行"
# 构建 PoC
poc = None
if is_vulnerable:
encoded_payload = payload.replace("{", "%7B").replace("}", "%7D")
poc = f"curl 'http://target/{target_file}?{param_name}={encoded_payload}'"
# 格式化输出
output_parts = ["🎭 SSTI 测试结果\n"]
output_parts.append(f"目标文件: {target_file}")
output_parts.append(f"模板引擎: {template_engine}")
output_parts.append(f"参数名: {param_name}")
output_parts.append(f"Payload: {payload}")
output_parts.append(f"\n退出码: {result.get('exit_code', -1)}")
if result.get("stdout"):
output_parts.append(f"\n输出:\n```\n{result['stdout'][:2000]}\n```")
if is_vulnerable:
output_parts.append(f"\n\n🔴 **SSTI 漏洞确认!**")
output_parts.append(f"证据: {evidence}")
if poc:
output_parts.append(f"\nPoC: `{poc}`")
else:
output_parts.append(f"\n\n🟡 未能确认 SSTI 漏洞")
return ToolResult(
success=True,
data="\n".join(output_parts),
metadata={
"vulnerability_type": "ssti",
"template_engine": template_engine,
"is_vulnerable": is_vulnerable,
"evidence": evidence,
"poc": poc,
}
)
async def _test_ssti(self, language: str, code: str, param_name: str, payload: str) -> Dict:
"""测试 SSTI"""
safe_payload = payload.replace("'", "\\'")
if language == "python":
wrapper = f"""
class MockArgs:
def get(self, key, default=None):
if key == '{param_name}':
return '''{safe_payload}'''
return default
class MockRequest:
args = MockArgs()
form = MockArgs()
request = MockRequest()
"""
full_code = wrapper + code
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"python3 -c '{escaped}'", timeout=30)
elif language == "php":
# php -r 不需要 <?php 标签
wrapper = f"""$_GET['{param_name}'] = '{safe_payload}';
$_POST['{param_name}'] = '{safe_payload}';
"""
clean_code = code.strip()
if clean_code.startswith("<?php"):
clean_code = clean_code[5:]
elif clean_code.startswith("<?"):
clean_code = clean_code[2:]
if clean_code.endswith("?>"):
clean_code = clean_code[:-2]
full_code = wrapper + clean_code.strip()
escaped = full_code.replace("'", "'\"'\"'")
return await self.sandbox_manager.execute_command(f"php -r '{escaped}'", timeout=30)
else:
return {"exit_code": -1, "stdout": "", "stderr": f"不支持的语言: {language}"}
# ============ 反序列化测试工具 ============
class DeserializationTestInput(BaseModel):
"""反序列化测试输入"""
target_file: str = Field(..., description="目标文件路径")
language: str = Field(default="auto", description="语言: auto, php, python, java, ruby")
payload_type: str = Field(default="detect", description="payload 类型: detect, pickle, yaml, php_serialize")
class DeserializationTestTool(AgentTool):
"""反序列化漏洞测试工具"""
def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."):
super().__init__()
self.sandbox_manager = sandbox_manager or SandboxManager()
self.project_root = project_root
@property
def name(self) -> str:
return "test_deserialization"
@property
def description(self) -> str:
return """测试不安全反序列化漏洞的工具。
支持语言: PHP (unserialize), Python (pickle, yaml), Java, Ruby (Marshal)
输入:
- target_file: 目标文件路径
- language: 语言
- payload_type: payload 类型 (detect 自动检测)
检测模式:
- 分析代码中是否存在危险的反序列化调用
- 检测用户可控数据是否进入反序列化函数
危险函数:
- PHP: unserialize()
- Python: pickle.loads(), yaml.load(), eval()
- Java: ObjectInputStream.readObject()
- Ruby: Marshal.load()
示例:
{"target_file": "api.py", "language": "python"}"""
@property
def args_schema(self):
return DeserializationTestInput
async def _execute(
self,
target_file: str,
language: str = "auto",
payload_type: str = "detect",
**kwargs
) -> ToolResult:
"""执行反序列化漏洞检测"""
# 读取目标文件
full_path = os.path.join(self.project_root, target_file)
if not os.path.exists(full_path):
return ToolResult(success=False, error=f"文件不存在: {target_file}")
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
code = f.read()
# 检测语言
if language == "auto":
ext = os.path.splitext(target_file)[1].lower()
language = {
".php": "php",
".py": "python",
".java": "java",
".rb": "ruby",
}.get(ext, "unknown")
# 分析代码中的反序列化调用
is_vulnerable = False
evidence = None
dangerous_calls = []
if language == "php":
# PHP 反序列化
patterns = [
(r"unserialize\s*\(\s*\$_(GET|POST|REQUEST|COOKIE)", "直接反序列化用户输入"),
(r"unserialize\s*\(", "使用 unserialize"),
]
for pattern, desc in patterns:
if re.search(pattern, code, re.IGNORECASE):
dangerous_calls.append(desc)
if "$_" in pattern:
is_vulnerable = True
evidence = desc
elif language == "python":
# Python 反序列化
patterns = [
(r"pickle\.loads?\s*\(", "使用 pickle"),
(r"yaml\.load\s*\([^)]*Loader\s*=\s*None", "yaml.load 不安全调用"),
(r"yaml\.unsafe_load", "yaml.unsafe_load"),
(r"marshal\.loads?\s*\(", "使用 marshal"),
(r"shelve\.open", "使用 shelve"),
]
for pattern, desc in patterns:
if re.search(pattern, code):
dangerous_calls.append(desc)
# 检查是否用户可控
if "request" in code.lower() or "input" in code.lower():
is_vulnerable = True
evidence = f"{desc} 且可能接受用户输入"
elif language == "java":
# Java 反序列化
patterns = [
(r"ObjectInputStream", "使用 ObjectInputStream"),
(r"readObject\s*\(\s*\)", "调用 readObject"),
(r"XMLDecoder", "使用 XMLDecoder"),
]
for pattern, desc in patterns:
if re.search(pattern, code):
dangerous_calls.append(desc)
elif language == "ruby":
# Ruby 反序列化
patterns = [
(r"Marshal\.load", "使用 Marshal.load"),
(r"YAML\.load", "使用 YAML.load"),
]
for pattern, desc in patterns:
if re.search(pattern, code):
dangerous_calls.append(desc)
# 格式化输出
output_parts = ["🔓 反序列化漏洞检测结果\n"]
output_parts.append(f"目标文件: {target_file}")
output_parts.append(f"语言: {language}")
if dangerous_calls:
output_parts.append(f"\n发现的危险调用:")
for call in dangerous_calls:
output_parts.append(f" - {call}")
if is_vulnerable:
output_parts.append(f"\n\n🔴 **存在反序列化漏洞风险!**")
output_parts.append(f"证据: {evidence}")
output_parts.append(f"\n建议: 避免反序列化不可信数据,使用 JSON 等安全格式")
elif dangerous_calls:
output_parts.append(f"\n\n🟡 存在潜在风险")
output_parts.append(f"建议: 检查反序列化数据来源是否可信")
else:
output_parts.append(f"\n\n🟢 未发现明显的反序列化漏洞")
return ToolResult(
success=True,
data="\n".join(output_parts),
metadata={
"vulnerability_type": "deserialization",
"language": language,
"is_vulnerable": is_vulnerable,
"evidence": evidence,
"dangerous_calls": dangerous_calls,
}
)
# ============ 通用漏洞测试工具 ============
class UniversalVulnTestInput(BaseModel):
"""通用漏洞测试输入"""
target_file: str = Field(..., description="目标文件路径")
vuln_type: str = Field(..., description="漏洞类型: command_injection, sql_injection, xss, path_traversal, ssti, deserialization")
param_name: str = Field(default="input", description="参数名")
payload: Optional[str] = Field(default=None, description="自定义 payload")
language: str = Field(default="auto", description="语言")
class UniversalVulnTestTool(AgentTool):
"""通用漏洞测试工具 - 自动选择合适的测试器"""
def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."):
super().__init__()
self.sandbox_manager = sandbox_manager or SandboxManager()
self.project_root = project_root
# 初始化所有漏洞测试器
self._testers = {
"command_injection": CommandInjectionTestTool(sandbox_manager, project_root),
"cmd": CommandInjectionTestTool(sandbox_manager, project_root),
"rce": CommandInjectionTestTool(sandbox_manager, project_root),
"sql_injection": SqlInjectionTestTool(sandbox_manager, project_root),
"sqli": SqlInjectionTestTool(sandbox_manager, project_root),
"xss": XssTestTool(sandbox_manager, project_root),
"path_traversal": PathTraversalTestTool(sandbox_manager, project_root),
"lfi": PathTraversalTestTool(sandbox_manager, project_root),
"rfi": PathTraversalTestTool(sandbox_manager, project_root),
"ssti": SstiTestTool(sandbox_manager, project_root),
"deserialization": DeserializationTestTool(sandbox_manager, project_root),
}
# 默认 payloads
self._default_payloads = {
"command_injection": "id",
"sql_injection": "1' OR '1'='1",
"xss": "<script>alert('XSS')</script>",
"path_traversal": "../../../etc/passwd",
"ssti": "{{7*7}}",
}
@property
def name(self) -> str:
return "vuln_test"
@property
def description(self) -> str:
return """通用漏洞测试工具,支持多种漏洞类型的自动化测试。
支持的漏洞类型:
- command_injection (cmd/rce): 命令注入
- sql_injection (sqli): SQL 注入
- xss: 跨站脚本
- path_traversal (lfi/rfi): 路径遍历
- ssti: 服务端模板注入
- deserialization: 不安全反序列化
输入:
- target_file: 目标文件路径
- vuln_type: 漏洞类型
- param_name: 参数名
- payload: 自定义 payload (可选)
- language: 语言 (auto 自动检测)
示例:
1. 命令注入: {"target_file": "api.php", "vuln_type": "command_injection", "param_name": "cmd"}
2. SQL 注入: {"target_file": "login.php", "vuln_type": "sql_injection", "param_name": "username", "payload": "admin'--"}
3. XSS: {"target_file": "search.php", "vuln_type": "xss", "param_name": "q"}"""
@property
def args_schema(self):
return UniversalVulnTestInput
async def _execute(
self,
target_file: str,
vuln_type: str,
param_name: str = "input",
payload: Optional[str] = None,
language: str = "auto",
**kwargs
) -> ToolResult:
"""执行通用漏洞测试"""
vuln_type = vuln_type.lower().strip()
tester = self._testers.get(vuln_type)
if not tester:
return ToolResult(
success=False,
error=f"不支持的漏洞类型: {vuln_type}。支持: {list(self._testers.keys())}",
)
# 使用默认 payload
if not payload:
payload = self._default_payloads.get(vuln_type, "test")
# 构建测试参数
test_kwargs = {
"target_file": target_file,
"param_name": param_name,
"language": language,
}
# 根据漏洞类型添加特定参数
if vuln_type in ["command_injection", "cmd", "rce"]:
test_kwargs["test_command"] = payload
elif vuln_type in ["sql_injection", "sqli"]:
test_kwargs["payload"] = payload
elif vuln_type == "xss":
test_kwargs["payload"] = payload
elif vuln_type in ["path_traversal", "lfi", "rfi"]:
test_kwargs["payload"] = payload
elif vuln_type == "ssti":
test_kwargs["payload"] = payload
return await tester._execute(**test_kwargs)