From 22b1610825a57c1caf27532ec6a79db94f22cd2e Mon Sep 17 00:00:00 2001 From: lintsinghua Date: Thu, 18 Dec 2025 15:03:30 +0800 Subject: [PATCH] =?UTF-8?q?feat(agent):=20=E6=96=B0=E5=A2=9E=E9=80=9A?= =?UTF-8?q?=E7=94=A8=E4=BB=A3=E7=A0=81=E6=89=A7=E8=A1=8C=E5=B7=A5=E5=85=B7?= =?UTF-8?q?=E5=92=8C=E5=87=BD=E6=95=B0=E6=8F=90=E5=8F=96=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 添加 RunCodeTool 和 ExtractFunctionTool 用于 LLM 驱动的漏洞验证 放宽沙箱命令限制以支持更灵活的测试 更新验证代理提示词以优先使用动态代码验证 --- backend/app/api/v1/endpoints/agent_tasks.py | 6 + .../app/services/agent/agents/verification.py | 317 ++++++----- backend/app/services/agent/tools/__init__.py | 7 + backend/app/services/agent/tools/run_code.py | 513 ++++++++++++++++++ .../app/services/agent/tools/sandbox_tool.py | 22 +- 5 files changed, 730 insertions(+), 135 deletions(-) create mode 100644 backend/app/services/agent/tools/run_code.py diff --git a/backend/app/api/v1/endpoints/agent_tasks.py b/backend/app/api/v1/endpoints/agent_tasks.py index 13f872f..8f0582c 100644 --- a/backend/app/api/v1/endpoints/agent_tasks.py +++ b/backend/app/api/v1/endpoints/agent_tasks.py @@ -948,6 +948,8 @@ async def _initialize_tools( CommandInjectionTestTool, SqlInjectionTestTool, XssTestTool, PathTraversalTestTool, SstiTestTool, DeserializationTestTool, UniversalVulnTestTool, + # 🔥 新增:通用代码执行工具 (LLM 驱动的 Fuzzing Harness) + RunCodeTool, ExtractFunctionTool, ) verification_tools = { @@ -976,6 +978,10 @@ async def _initialize_tools( "test_deserialization": DeserializationTestTool(sandbox_manager, project_root), "universal_vuln_test": UniversalVulnTestTool(sandbox_manager, project_root), + # 🔥 新增:通用代码执行工具 (LLM 驱动的 Fuzzing Harness) + "run_code": RunCodeTool(sandbox_manager, project_root), + "extract_function": ExtractFunctionTool(project_root), + # 报告工具 "create_vulnerability_report": CreateVulnerabilityReportTool(), } diff --git a/backend/app/services/agent/agents/verification.py b/backend/app/services/agent/agents/verification.py index 0959ba3..ee19053 100644 --- a/backend/app/services/agent/agents/verification.py +++ b/backend/app/services/agent/agents/verification.py @@ -32,90 +32,188 @@ VERIFICATION_SYSTEM_PROMPT = """你是 DeepAudit 的漏洞验证 Agent,一个* 你是漏洞验证的**大脑**,不是机械验证器。你需要: 1. 理解每个漏洞的上下文 2. 设计合适的验证策略 -3. 使用工具获取更多信息 +3. **编写测试代码进行动态验证** 4. 判断漏洞是否真实存在 -5. 评估实际影响 +5. 评估实际影响并生成 PoC + +## 核心理念:Fuzzing Harness +即使整个项目无法运行,你也应该能够验证漏洞!方法是: +1. **提取目标函数** - 从代码中提取存在漏洞的函数 +2. **构建 Mock** - 模拟函数依赖(数据库、HTTP、文件系统等) +3. **编写测试脚本** - 构造各种恶意输入测试函数 +4. **分析执行结果** - 判断是否触发漏洞 ## 你可以使用的工具 +### 🔥 核心验证工具(优先使用) +- **run_code**: 执行你编写的测试代码(支持 Python/PHP/JS/Ruby/Go/Java/Bash) + - 用于运行 Fuzzing Harness、PoC 脚本 + - 你可以完全控制测试逻辑 + - 参数: code (str), language (str), timeout (int), description (str) + +- **extract_function**: 从源文件提取指定函数代码 + - 用于获取目标函数,构建 Fuzzing Harness + - 参数: file_path (str), function_name (str), include_imports (bool) + ### 文件操作 -- **read_file**: 读取更多代码上下文 +- **read_file**: 读取代码文件获取上下文 参数: file_path (str), start_line (int), end_line (int) -- **list_files**: ⚠️ 仅用于确认文件是否存在,严禁遍历 - 参数: directory (str), pattern (str) -### 沙箱核心工具 -- **sandbox_exec**: 在沙箱中执行命令 - 参数: command (str), timeout (int) -- **sandbox_http**: 发送 HTTP 请求测试 - 参数: method (str), url (str), data (dict), headers (dict) -- **verify_vulnerability**: 自动化漏洞验证 - 参数: vulnerability_type (str), target_url (str), payload (str), expected_pattern (str) +### 沙箱工具 +- **sandbox_exec**: 在沙箱中执行命令(用于验证命令执行类漏洞) +- **sandbox_http**: 发送 HTTP 请求(如果有运行的服务) -### 🔥 多语言代码测试工具 (按语言选择) -- **php_test**: 测试 PHP 代码,支持模拟 GET/POST 参数 - 参数: file_path (str), php_code (str), get_params (dict), post_params (dict), timeout (int) - 示例: {"file_path": "vuln.php", "get_params": {"cmd": "whoami"}} +## 🔥 Fuzzing Harness 编写指南 -- **python_test**: 测试 Python 代码,支持模拟 Flask/Django 请求 - 参数: file_path (str), code (str), request_params (dict), form_data (dict), timeout (int) - 示例: {"code": "import os; os.system(params['cmd'])", "request_params": {"cmd": "id"}} +### 原则 +1. **你是大脑** - 你决定测试策略、payload、检测方法 +2. **不依赖完整项目** - 提取函数,mock 依赖,隔离测试 +3. **多种 payload** - 设计多种恶意输入,不要只测一个 +4. **检测漏洞特征** - 根据漏洞类型设计检测逻辑 -- **javascript_test**: 测试 JavaScript/Node.js 代码 - 参数: file_path (str), code (str), req_query (dict), req_body (dict), timeout (int) - 示例: {"code": "exec(req.query.cmd)", "req_query": {"cmd": "id"}} +### 命令注入 Fuzzing Harness 示例 (Python) +```python +import os +import subprocess -- **java_test**: 测试 Java 代码,支持模拟 Servlet 请求 - 参数: file_path (str), code (str), request_params (dict), timeout (int) +# === Mock 危险函数来检测调用 === +executed_commands = [] +original_system = os.system -- **go_test**: 测试 Go 代码 - 参数: file_path (str), code (str), args (list), timeout (int) +def mock_system(cmd): + print(f"[DETECTED] os.system called: {cmd}") + executed_commands.append(cmd) + return 0 -- **ruby_test**: 测试 Ruby 代码,支持模拟 Rails 请求 - 参数: file_path (str), code (str), params (dict), timeout (int) +os.system = mock_system -- **shell_test**: 测试 Shell/Bash 脚本 - 参数: file_path (str), code (str), args (list), env (dict), timeout (int) +# === 目标函数(从项目代码复制) === +def vulnerable_function(user_input): + os.system(f"echo {user_input}") -- **universal_code_test**: 通用多语言测试工具 (自动检测语言) - 参数: language (str), file_path (str), code (str), params (dict), timeout (int) +# === Fuzzing 测试 === +payloads = [ + "test", # 正常输入 + "; id", # 命令连接符 + "| whoami", # 管道 + "$(cat /etc/passwd)", # 命令替换 + "`id`", # 反引号 + "&& ls -la", # AND 连接 +] -### 🔥 漏洞验证专用工具 (按漏洞类型选择,推荐使用) -- **test_command_injection**: 专门测试命令注入漏洞 - 参数: target_file (str), param_name (str), test_command (str), language (str) - 示例: {"target_file": "vuln.php", "param_name": "cmd", "test_command": "whoami"} +print("=== Fuzzing Start ===") +for payload in payloads: + print(f"\\nPayload: {payload}") + executed_commands.clear() + try: + vulnerable_function(payload) + if executed_commands: + print(f"[VULN] Detected! Commands: {executed_commands}") + except Exception as e: + print(f"[ERROR] {e}") +``` -- **test_sql_injection**: 专门测试 SQL 注入漏洞 - 参数: target_file (str), param_name (str), db_type (str), injection_type (str) - 示例: {"target_file": "login.php", "param_name": "username", "db_type": "mysql"} +### SQL 注入 Fuzzing Harness 示例 (Python) +```python +# === Mock 数据库 === +class MockCursor: + def __init__(self): + self.queries = [] -- **test_xss**: 专门测试 XSS 漏洞 - 参数: target_file (str), param_name (str), xss_type (str), context (str) - 示例: {"target_file": "search.php", "param_name": "q", "xss_type": "reflected"} + def execute(self, query, params=None): + print(f"[SQL] Query: {query}") + print(f"[SQL] Params: {params}") + self.queries.append((query, params)) -- **test_path_traversal**: 专门测试路径遍历漏洞 - 参数: target_file (str), param_name (str), target_path (str) - 示例: {"target_file": "download.php", "param_name": "file", "target_path": "/etc/passwd"} + # 检测 SQL 注入特征 + if params is None and ("'" in query or "OR" in query.upper() or "--" in query): + print("[VULN] Possible SQL injection - no parameterized query!") -- **test_ssti**: 专门测试模板注入漏洞 - 参数: target_file (str), param_name (str), template_engine (str) - 示例: {"target_file": "render.py", "param_name": "name", "template_engine": "jinja2"} +class MockDB: + def cursor(self): + return MockCursor() -- **test_deserialization**: 专门测试反序列化漏洞 - 参数: target_file (str), language (str), serialization_format (str) - 示例: {"target_file": "api.php", "language": "php", "serialization_format": "php_serialize"} +# === 目标函数 === +def get_user(db, user_id): + cursor = db.cursor() + cursor.execute(f"SELECT * FROM users WHERE id = '{user_id}'") # 漏洞! -- **universal_vuln_test**: 通用漏洞测试工具 (自动选择测试策略) - 参数: vuln_type (str), target_file (str), param_name (str), additional_params (dict) - 支持: command_injection, sql_injection, xss, path_traversal, ssti, deserialization +# === Fuzzing === +db = MockDB() +payloads = ["1", "1'", "1' OR '1'='1", "1'; DROP TABLE users--", "1 UNION SELECT * FROM admin"] -## 工作方式 -你将收到一批待验证的漏洞发现。对于每个发现,你需要: +for p in payloads: + print(f"\\n=== Testing: {p} ===") + get_user(db, p) +``` + +### PHP 命令注入 Fuzzing Harness 示例 +```php +// 注意:php -r 不需要 Hello, {user_input}!" + +payloads = [ + "test", + "", + "", + "{{7*7}}", # SSTI +] + +for p in payloads: + output = vulnerable_render(p) + print(f"Input: {p}") + print(f"Output: {output}") + # 检测:payload 是否原样出现在输出中 + if p in output and ("<" in p or "{{" in p): + print("[VULN] XSS - input not escaped!") +``` + +## 验证策略 + +### 对于可执行的漏洞(命令注入、代码注入等) +1. 使用 `extract_function` 或 `read_file` 获取目标代码 +2. 编写 Fuzzing Harness,mock 危险函数来检测调用 +3. 使用 `run_code` 执行 Harness +4. 分析输出,确认漏洞是否触发 + +### 对于数据泄露型漏洞(SQL注入、路径遍历等) +1. 获取目标代码 +2. 编写 Harness,mock 数据库/文件系统 +3. 检查是否能构造恶意查询/路径 +4. 分析输出 + +### 对于配置类漏洞(硬编码密钥等) +1. 使用 `read_file` 直接读取配置文件 +2. 验证敏感信息是否存在 +3. 评估影响(密钥是否有效、权限范围等) + +## 工作流程 +你将收到一批待验证的漏洞发现。对于每个发现: ``` -Thought: [分析这个漏洞,思考如何验证] +Thought: [分析漏洞类型,设计验证策略] Action: [工具名称] -Action Input: [JSON 格式的参数] +Action Input: [参数] ``` 验证完所有发现后,输出: @@ -139,7 +237,8 @@ Final Answer: [JSON 格式的验证报告] "poc": { "description": "PoC 描述", "steps": ["步骤1", "步骤2"], - "payload": "curl 'http://target/vuln.php?cmd=id' 或完整利用代码" + "payload": "完整可执行的 PoC 代码或命令", + "harness_code": "Fuzzing Harness 代码(如果使用)" }, "impact": "实际影响分析", "recommendation": "修复建议" @@ -155,82 +254,22 @@ Final Answer: [JSON 格式的验证报告] ``` ## 验证判定标准 -- **confirmed**: 漏洞确认存在且可利用,有明确证据 -- **likely**: 高度可能存在漏洞,但无法完全确认 +- **confirmed**: 漏洞确认存在且可利用,有明确证据(如 Harness 成功触发) +- **likely**: 高度可能存在漏洞,代码分析明确但无法动态验证 - **uncertain**: 需要更多信息才能判断 - **false_positive**: 确认是误报,有明确理由 -## 验证策略建议 - -### 对于命令注入漏洞 -1. 使用 **test_command_injection** 工具,它会自动构建测试环境 -2. 或使用对应语言的测试工具 (php_test, python_test 等) -3. 检查命令输出是否包含 uid=, root, www-data 等特征 - -### 对于 SQL 注入漏洞 -1. 使用 **test_sql_injection** 工具 -2. 提供数据库类型 (mysql, postgresql, sqlite) -3. 检查是否能执行 UNION 查询或提取数据 - -### 对于 XSS 漏洞 -1. 使用 **test_xss** 工具 -2. 指定 XSS 类型 (reflected, stored, dom) -3. 检查 payload 是否在输出中未转义 - -### 对于路径遍历漏洞 -1. 使用 **test_path_traversal** 工具 -2. 尝试读取 /etc/passwd 或其他已知文件 -3. 检查是否能访问目标文件 - -### 对于模板注入 (SSTI) 漏洞 -1. 使用 **test_ssti** 工具 -2. 指定模板引擎 (jinja2, twig, freemarker 等) -3. 检查数学表达式是否被执行 - -### 对于反序列化漏洞 -1. 使用 **test_deserialization** 工具 -2. 指定语言和序列化格式 -3. 检查是否能执行任意代码 - -### 对于其他漏洞 -1. **上下文分析**: 用 read_file 获取更多代码上下文 -2. **通用测试**: 使用 universal_vuln_test 或 universal_code_test -3. **沙箱测试**: 对高危漏洞用沙箱进行安全测试 +## ⚠️ 关键约束 +1. **必须先调用工具验证** - 不允许仅凭已知信息直接判断 +2. **优先使用 run_code** - 编写 Harness 进行动态验证 +3. **PoC 必须完整可执行** - poc.payload 应该是可直接运行的代码 +4. **不要假设环境** - 沙箱中没有运行的服务,需要 mock ## 重要原则 -1. **质量优先** - 宁可漏报也不要误报太多 -2. **深入理解** - 理解代码逻辑,不要表面判断 -3. **证据支撑** - 判定要有依据 -4. **安全第一** - 沙箱测试要谨慎 -5. **🔥 PoC 生成** - 对于 confirmed 和 likely 的漏洞,**必须**生成完整的 PoC: - - poc.description: 简要描述这个 PoC 的作用 - - poc.steps: 详细的复现步骤列表 - - poc.payload: **完整的**利用代码或命令,例如: - - Web漏洞: 完整URL如 `http://target/path?param=` - - 命令注入: 完整的 curl 命令或 HTTP 请求 - - SQL注入: 完整的利用语句或请求 - - 代码执行: 可直接运行的利用脚本 - - ⚠️ payload 字段必须是**可直接复制执行**的完整利用代码,不要只写参数值 - -## ⚠️ 关键约束 - 必须遵守! -1. **禁止直接输出 Final Answer** - 你必须先调用至少一个工具来验证漏洞 -2. **每个漏洞至少调用一次工具** - 使用 read_file 读取代码,或使用 test_* 工具测试 -3. **没有工具调用的验证无效** - 不允许仅凭已知信息直接判断 -4. **先 Action 后 Final Answer** - 必须先执行工具,获取 Observation,再输出最终结论 - -错误示例(禁止): -``` -Thought: 根据已有信息,我认为这是漏洞 -Final Answer: {...} ❌ 没有调用任何工具! -``` - -正确示例(必须): -``` -Thought: 我需要先读取 config.php 文件来验证硬编码凭据 -Action: read_file -Action Input: {"file_path": "config.php"} -``` -然后等待 Observation,再继续验证其他发现或输出 Final Answer。 +1. **你是验证的大脑** - 你决定如何测试,工具只提供执行能力 +2. **动态验证优先** - 能运行代码验证的就不要仅靠静态分析 +3. **质量优先** - 宁可漏报也不要误报太多 +4. **证据支撑** - 每个判定都需要有依据 现在开始验证漏洞发现!""" @@ -583,6 +622,24 @@ class VerificationAgent(BaseAgent): # 检查是否完成 if step.is_final: + # 🔥 强制检查:必须至少调用过一次工具才能完成 + if self._tool_calls == 0: + logger.warning(f"[{self.name}] LLM tried to finish without any tool calls! Forcing tool usage.") + await self.emit_thinking("⚠️ 拒绝过早完成:必须先使用工具验证漏洞") + self._conversation_history.append({ + "role": "user", + "content": ( + "⚠️ **系统拒绝**: 你必须先使用工具验证漏洞!\n\n" + "不允许在没有调用任何工具的情况下直接输出 Final Answer。\n\n" + "请立即使用以下工具之一进行验证:\n" + "1. `read_file` - 读取漏洞所在文件的代码\n" + "2. `run_code` - 编写并执行 Fuzzing Harness 验证漏洞\n" + "3. `extract_function` - 提取目标函数进行分析\n\n" + "现在请输出 Thought 和 Action,开始验证第一个漏洞。" + ), + }) + continue + await self.emit_llm_decision("完成漏洞验证", "LLM 判断验证已充分") final_result = step.final_answer diff --git a/backend/app/services/agent/tools/__init__.py b/backend/app/services/agent/tools/__init__.py index 9690918..2bacadb 100644 --- a/backend/app/services/agent/tools/__init__.py +++ b/backend/app/services/agent/tools/__init__.py @@ -82,6 +82,9 @@ from .smart_scan_tool import SmartScanTool, QuickAuditTool # 🔥 新增:Kunlun-M 静态代码分析工具 (MIT License) from .kunlun_tool import KunlunMTool, KunlunRuleListTool, KunlunPluginTool +# 🔥 新增:通用代码执行工具 (LLM 驱动的 Fuzzing Harness) +from .run_code import RunCodeTool, ExtractFunctionTool + __all__ = [ # 基础 "AgentTool", @@ -164,4 +167,8 @@ __all__ = [ "KunlunMTool", "KunlunRuleListTool", "KunlunPluginTool", + + # 🔥 通用代码执行工具 (LLM 驱动的 Fuzzing Harness) + "RunCodeTool", + "ExtractFunctionTool", ] diff --git a/backend/app/services/agent/tools/run_code.py b/backend/app/services/agent/tools/run_code.py new file mode 100644 index 0000000..98d70a8 --- /dev/null +++ b/backend/app/services/agent/tools/run_code.py @@ -0,0 +1,513 @@ +""" +通用代码执行工具 - LLM 驱动的漏洞验证 + +核心理念: +- LLM 是验证的大脑,工具只提供执行能力 +- 不硬编码 payload、检测规则 +- LLM 自己决定测试策略、编写测试代码、分析结果 + +使用场景: +- LLM 编写 Fuzzing Harness 进行局部测试 +- LLM 构造 PoC 验证漏洞 +- LLM 编写 mock 代码隔离测试函数 +""" + +import asyncio +import logging +import os +import tempfile +from typing import Optional, Dict, Any +from pydantic import BaseModel, Field + +from .base import AgentTool, ToolResult +from .sandbox_tool import SandboxManager, SandboxConfig + +logger = logging.getLogger(__name__) + + +class RunCodeInput(BaseModel): + """代码执行输入""" + code: str = Field(..., description="要执行的代码") + language: str = Field(default="python", description="编程语言: python, php, javascript, ruby, go, java, bash") + timeout: int = Field(default=60, description="超时时间(秒),复杂测试可设置更长") + description: str = Field(default="", description="简短描述这段代码的目的(用于日志)") + + +class RunCodeTool(AgentTool): + """ + 通用代码执行工具 + + 让 LLM 自由编写测试代码,在沙箱中执行。 + + LLM 可以: + - 编写 Fuzzing Harness 隔离测试单个函数 + - 构造 mock 对象模拟依赖 + - 设计各种 payload 进行测试 + - 分析执行结果判断漏洞 + + 工具不做任何假设,完全由 LLM 控制测试逻辑。 + """ + + def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."): + super().__init__() + # 使用更宽松的沙箱配置 + config = SandboxConfig( + timeout=120, + memory_limit="1g", # 更大内存 + ) + self.sandbox_manager = sandbox_manager or SandboxManager(config) + self.project_root = project_root + + @property + def name(self) -> str: + return "run_code" + + @property + def description(self) -> str: + return """🔥 通用代码执行工具 - 在沙箱中运行你编写的测试代码 + +这是你进行漏洞验证的核心工具。你可以: +1. 编写 Fuzzing Harness 隔离测试单个函数 +2. 构造 mock 对象模拟数据库、HTTP 请求等依赖 +3. 设计各种 payload 进行漏洞测试 +4. 编写完整的 PoC 验证脚本 + +输入: +- code: 你编写的测试代码(完整可执行) +- language: python, php, javascript, ruby, go, java, bash +- timeout: 超时秒数(默认60,复杂测试可设更长) +- description: 简短描述代码目的 + +支持的语言和执行方式: +- python: python3 -c 'code' +- php: php -r 'code' (注意:不需要 ToolResult: + """执行用户编写的代码""" + + # 初始化沙箱 + try: + await self.sandbox_manager.initialize() + except Exception as e: + logger.warning(f"Sandbox init failed: {e}") + + if not self.sandbox_manager.is_available: + return ToolResult( + success=False, + error="沙箱环境不可用 (Docker 未运行)", + data="请确保 Docker 已启动。如果无法使用沙箱,你可以通过静态分析代码来验证漏洞。" + ) + + # 构建执行命令 + language = language.lower().strip() + command = self._build_command(code, language) + + if command is None: + return ToolResult( + success=False, + error=f"不支持的语言: {language}", + data=f"支持的语言: python, php, javascript, ruby, go, java, bash" + ) + + # 在沙箱中执行 + result = await self.sandbox_manager.execute_command( + command=command, + timeout=timeout, + ) + + # 格式化输出 + output_parts = [f"🔬 代码执行结果"] + if description: + output_parts.append(f"目的: {description}") + output_parts.append(f"语言: {language}") + output_parts.append(f"退出码: {result['exit_code']}") + + if result.get("stdout"): + stdout = result["stdout"] + if len(stdout) > 5000: + stdout = stdout[:5000] + f"\n... (截断,共 {len(result['stdout'])} 字符)" + output_parts.append(f"\n输出:\n```\n{stdout}\n```") + + if result.get("stderr"): + stderr = result["stderr"] + if len(stderr) > 2000: + stderr = stderr[:2000] + "\n... (截断)" + output_parts.append(f"\n错误输出:\n```\n{stderr}\n```") + + if result.get("error"): + output_parts.append(f"\n执行错误: {result['error']}") + + # 提示 LLM 分析结果 + output_parts.append("\n---") + output_parts.append("请根据上述输出分析漏洞是否存在。") + + return ToolResult( + success=result.get("success", False), + data="\n".join(output_parts), + error=result.get("error"), + metadata={ + "language": language, + "exit_code": result.get("exit_code", -1), + "stdout_length": len(result.get("stdout", "")), + "stderr_length": len(result.get("stderr", "")), + } + ) + + def _build_command(self, code: str, language: str) -> Optional[str]: + """根据语言构建执行命令""" + + # 转义单引号的通用方法 + def escape_for_shell(s: str) -> str: + return s.replace("'", "'\"'\"'") + + if language == "python": + escaped = escape_for_shell(code) + return f"python3 -c '{escaped}'" + + elif language == "php": + # PHP: php -r 不需要 "): + clean_code = clean_code[:-2].strip() + escaped = escape_for_shell(clean_code) + return f"php -r '{escaped}'" + + elif language in ["javascript", "js", "node"]: + escaped = escape_for_shell(code) + return f"node -e '{escaped}'" + + elif language == "ruby": + escaped = escape_for_shell(code) + return f"ruby -e '{escaped}'" + + elif language == "bash": + escaped = escape_for_shell(code) + return f"bash -c '{escaped}'" + + elif language == "go": + # Go 需要完整的 package main + escaped = escape_for_shell(code).replace("\\", "\\\\") + return f"echo '{escaped}' > /tmp/main.go && go run /tmp/main.go" + + elif language == "java": + # Java 需要完整的 class + escaped = escape_for_shell(code).replace("\\", "\\\\") + # 提取类名 + import re + class_match = re.search(r'public\s+class\s+(\w+)', code) + class_name = class_match.group(1) if class_match else "Test" + return f"echo '{escaped}' > /tmp/{class_name}.java && javac /tmp/{class_name}.java && java -cp /tmp {class_name}" + + return None + + +class ExtractFunctionInput(BaseModel): + """函数提取输入""" + file_path: str = Field(..., description="源文件路径") + function_name: str = Field(..., description="要提取的函数名") + include_imports: bool = Field(default=True, description="是否包含 import 语句") + + +class ExtractFunctionTool(AgentTool): + """ + 函数提取工具 + + 从源文件中提取指定函数及其依赖,用于构建 Fuzzing Harness + """ + + def __init__(self, project_root: str = "."): + super().__init__() + self.project_root = project_root + + @property + def name(self) -> str: + return "extract_function" + + @property + def description(self) -> str: + return """从源文件中提取指定函数的代码 + +用于构建 Fuzzing Harness 时获取目标函数代码。 + +输入: +- file_path: 源文件路径 +- function_name: 要提取的函数名 +- include_imports: 是否包含文件开头的 import 语句(默认 true) + +返回: +- 函数代码 +- 相关的 import 语句 +- 函数参数列表 + +示例: +{"file_path": "app/api.py", "function_name": "process_command"}""" + + @property + def args_schema(self): + return ExtractFunctionInput + + async def _execute( + self, + file_path: str, + function_name: str, + include_imports: bool = True, + **kwargs + ) -> ToolResult: + """提取函数代码""" + import ast + import re + + full_path = os.path.join(self.project_root, file_path) + if not os.path.exists(full_path): + return ToolResult(success=False, error=f"文件不存在: {file_path}") + + with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: + code = f.read() + + # 检测语言 + ext = os.path.splitext(file_path)[1].lower() + + if ext == ".py": + result = self._extract_python(code, function_name, include_imports) + elif ext == ".php": + result = self._extract_php(code, function_name) + elif ext in [".js", ".ts"]: + result = self._extract_javascript(code, function_name) + else: + result = self._extract_generic(code, function_name) + + if result["success"]: + output_parts = [f"📦 函数提取结果\n"] + output_parts.append(f"文件: {file_path}") + output_parts.append(f"函数: {function_name}") + + if result.get("imports"): + output_parts.append(f"\n相关 imports:\n```\n{result['imports']}\n```") + + if result.get("parameters"): + output_parts.append(f"\n参数: {', '.join(result['parameters'])}") + + output_parts.append(f"\n函数代码:\n```\n{result['code']}\n```") + + output_parts.append("\n---") + output_parts.append("你现在可以使用这段代码构建 Fuzzing Harness") + + return ToolResult( + success=True, + data="\n".join(output_parts), + metadata=result + ) + else: + return ToolResult( + success=False, + error=result.get("error", "提取失败"), + data=f"无法提取函数 '{function_name}'。你可以使用 read_file 工具直接读取文件,手动定位函数代码。" + ) + + def _extract_python(self, code: str, function_name: str, include_imports: bool) -> Dict: + """提取 Python 函数""" + import ast + + try: + tree = ast.parse(code) + except SyntaxError: + # 降级到正则提取 + return self._extract_generic(code, function_name) + + # 收集 imports + imports = [] + if include_imports: + for node in ast.walk(tree): + if isinstance(node, ast.Import): + imports.append(ast.unparse(node)) + elif isinstance(node, ast.ImportFrom): + imports.append(ast.unparse(node)) + + # 查找函数 + for node in ast.walk(tree): + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + if node.name == function_name: + lines = code.split('\n') + func_code = '\n'.join(lines[node.lineno - 1:node.end_lineno]) + params = [arg.arg for arg in node.args.args] + + return { + "success": True, + "code": func_code, + "imports": '\n'.join(imports) if imports else None, + "parameters": params, + "line_start": node.lineno, + "line_end": node.end_lineno, + } + + return {"success": False, "error": f"未找到函数 '{function_name}'"} + + def _extract_php(self, code: str, function_name: str) -> Dict: + """提取 PHP 函数""" + import re + + pattern = rf'function\s+{re.escape(function_name)}\s*\([^)]*\)\s*\{{' + match = re.search(pattern, code) + + if not match: + return {"success": False, "error": f"未找到函数 '{function_name}'"} + + start_pos = match.start() + brace_count = 0 + end_pos = match.end() - 1 + + for i, char in enumerate(code[match.end() - 1:], start=match.end() - 1): + if char == '{': + brace_count += 1 + elif char == '}': + brace_count -= 1 + if brace_count == 0: + end_pos = i + 1 + break + + func_code = code[start_pos:end_pos] + + # 提取参数 + param_match = re.search(r'function\s+\w+\s*\(([^)]*)\)', func_code) + params = [] + if param_match: + params_str = param_match.group(1) + params = [p.strip().split('=')[0].strip().replace('$', '') + for p in params_str.split(',') if p.strip()] + + return { + "success": True, + "code": func_code, + "parameters": params, + } + + def _extract_javascript(self, code: str, function_name: str) -> Dict: + """提取 JavaScript 函数""" + import re + + patterns = [ + rf'function\s+{re.escape(function_name)}\s*\([^)]*\)\s*\{{', + rf'(?:const|let|var)\s+{re.escape(function_name)}\s*=\s*function\s*\([^)]*\)\s*\{{', + rf'(?:const|let|var)\s+{re.escape(function_name)}\s*=\s*\([^)]*\)\s*=>\s*\{{', + rf'async\s+function\s+{re.escape(function_name)}\s*\([^)]*\)\s*\{{', + ] + + for pattern in patterns: + match = re.search(pattern, code) + if match: + start_pos = match.start() + brace_count = 0 + end_pos = match.end() - 1 + + for i, char in enumerate(code[match.end() - 1:], start=match.end() - 1): + if char == '{': + brace_count += 1 + elif char == '}': + brace_count -= 1 + if brace_count == 0: + end_pos = i + 1 + break + + func_code = code[start_pos:end_pos] + + return { + "success": True, + "code": func_code, + } + + return {"success": False, "error": f"未找到函数 '{function_name}'"} + + def _extract_generic(self, code: str, function_name: str) -> Dict: + """通用函数提取(正则)""" + import re + + # 尝试多种模式 + patterns = [ + rf'def\s+{re.escape(function_name)}\s*\([^)]*\)\s*:', # Python + rf'function\s+{re.escape(function_name)}\s*\([^)]*\)', # PHP/JS + rf'func\s+{re.escape(function_name)}\s*\([^)]*\)', # Go + ] + + for pattern in patterns: + match = re.search(pattern, code, re.MULTILINE) + if match: + start_line = code[:match.start()].count('\n') + lines = code.split('\n') + + # 尝试找到函数结束 + end_line = start_line + 1 + indent = len(lines[start_line]) - len(lines[start_line].lstrip()) + + for i in range(start_line + 1, min(start_line + 100, len(lines))): + line = lines[i] + if line.strip() and not line.startswith(' ' * (indent + 1)): + if not line.strip().startswith('#'): + end_line = i + break + end_line = i + 1 + + func_code = '\n'.join(lines[start_line:end_line]) + + return { + "success": True, + "code": func_code, + } + + return {"success": False, "error": f"未找到函数 '{function_name}'"} diff --git a/backend/app/services/agent/tools/sandbox_tool.py b/backend/app/services/agent/tools/sandbox_tool.py index 55b375e..781bf94 100644 --- a/backend/app/services/agent/tools/sandbox_tool.py +++ b/backend/app/services/agent/tools/sandbox_tool.py @@ -514,12 +514,24 @@ class SandboxTool(AgentTool): 在安全隔离的环境中执行代码和命令 """ - # 允许的命令前缀 + # 允许的命令前缀 - 放宽限制以支持更灵活的测试 ALLOWED_COMMANDS = [ - "python", "python3", "node", "curl", "wget", - "cat", "head", "tail", "grep", "find", "ls", - "echo", "printf", "test", "id", "whoami", - "php", # 🔥 添加 PHP 支持 + # 编程语言解释器 + "python", "python3", "node", "php", "ruby", "perl", + "go", "java", "javac", "bash", "sh", + # 网络工具 + "curl", "wget", "nc", "netcat", + # 文件操作 + "cat", "head", "tail", "grep", "find", "ls", "wc", + "sed", "awk", "cut", "sort", "uniq", "tr", "xargs", + # 系统信息(用于验证命令执行) + "echo", "printf", "test", "id", "whoami", "uname", + "env", "printenv", "pwd", "hostname", + # 编码/解码工具 + "base64", "xxd", "od", "hexdump", + # 其他实用工具 + "timeout", "time", "sleep", "true", "false", + "md5sum", "sha256sum", "strings", ] def __init__(self, sandbox_manager: Optional[SandboxManager] = None):