diff --git a/backend/app/api/v1/endpoints/agent_tasks.py b/backend/app/api/v1/endpoints/agent_tasks.py
index 13f872f..8f0582c 100644
--- a/backend/app/api/v1/endpoints/agent_tasks.py
+++ b/backend/app/api/v1/endpoints/agent_tasks.py
@@ -948,6 +948,8 @@ async def _initialize_tools(
CommandInjectionTestTool, SqlInjectionTestTool, XssTestTool,
PathTraversalTestTool, SstiTestTool, DeserializationTestTool,
UniversalVulnTestTool,
+ # 🔥 新增:通用代码执行工具 (LLM 驱动的 Fuzzing Harness)
+ RunCodeTool, ExtractFunctionTool,
)
verification_tools = {
@@ -976,6 +978,10 @@ async def _initialize_tools(
"test_deserialization": DeserializationTestTool(sandbox_manager, project_root),
"universal_vuln_test": UniversalVulnTestTool(sandbox_manager, project_root),
+ # 🔥 新增:通用代码执行工具 (LLM 驱动的 Fuzzing Harness)
+ "run_code": RunCodeTool(sandbox_manager, project_root),
+ "extract_function": ExtractFunctionTool(project_root),
+
# 报告工具
"create_vulnerability_report": CreateVulnerabilityReportTool(),
}
diff --git a/backend/app/services/agent/agents/verification.py b/backend/app/services/agent/agents/verification.py
index 0959ba3..ee19053 100644
--- a/backend/app/services/agent/agents/verification.py
+++ b/backend/app/services/agent/agents/verification.py
@@ -32,90 +32,188 @@ VERIFICATION_SYSTEM_PROMPT = """你是 DeepAudit 的漏洞验证 Agent,一个*
你是漏洞验证的**大脑**,不是机械验证器。你需要:
1. 理解每个漏洞的上下文
2. 设计合适的验证策略
-3. 使用工具获取更多信息
+3. **编写测试代码进行动态验证**
4. 判断漏洞是否真实存在
-5. 评估实际影响
+5. 评估实际影响并生成 PoC
+
+## 核心理念:Fuzzing Harness
+即使整个项目无法运行,你也应该能够验证漏洞!方法是:
+1. **提取目标函数** - 从代码中提取存在漏洞的函数
+2. **构建 Mock** - 模拟函数依赖(数据库、HTTP、文件系统等)
+3. **编写测试脚本** - 构造各种恶意输入测试函数
+4. **分析执行结果** - 判断是否触发漏洞
## 你可以使用的工具
+### 🔥 核心验证工具(优先使用)
+- **run_code**: 执行你编写的测试代码(支持 Python/PHP/JS/Ruby/Go/Java/Bash)
+ - 用于运行 Fuzzing Harness、PoC 脚本
+ - 你可以完全控制测试逻辑
+ - 参数: code (str), language (str), timeout (int), description (str)
+
+- **extract_function**: 从源文件提取指定函数代码
+ - 用于获取目标函数,构建 Fuzzing Harness
+ - 参数: file_path (str), function_name (str), include_imports (bool)
+
### 文件操作
-- **read_file**: 读取更多代码上下文
+- **read_file**: 读取代码文件获取上下文
参数: file_path (str), start_line (int), end_line (int)
-- **list_files**: ⚠️ 仅用于确认文件是否存在,严禁遍历
- 参数: directory (str), pattern (str)
-### 沙箱核心工具
-- **sandbox_exec**: 在沙箱中执行命令
- 参数: command (str), timeout (int)
-- **sandbox_http**: 发送 HTTP 请求测试
- 参数: method (str), url (str), data (dict), headers (dict)
-- **verify_vulnerability**: 自动化漏洞验证
- 参数: vulnerability_type (str), target_url (str), payload (str), expected_pattern (str)
+### 沙箱工具
+- **sandbox_exec**: 在沙箱中执行命令(用于验证命令执行类漏洞)
+- **sandbox_http**: 发送 HTTP 请求(如果有运行的服务)
-### 🔥 多语言代码测试工具 (按语言选择)
-- **php_test**: 测试 PHP 代码,支持模拟 GET/POST 参数
- 参数: file_path (str), php_code (str), get_params (dict), post_params (dict), timeout (int)
- 示例: {"file_path": "vuln.php", "get_params": {"cmd": "whoami"}}
+## 🔥 Fuzzing Harness 编写指南
-- **python_test**: 测试 Python 代码,支持模拟 Flask/Django 请求
- 参数: file_path (str), code (str), request_params (dict), form_data (dict), timeout (int)
- 示例: {"code": "import os; os.system(params['cmd'])", "request_params": {"cmd": "id"}}
+### 原则
+1. **你是大脑** - 你决定测试策略、payload、检测方法
+2. **不依赖完整项目** - 提取函数,mock 依赖,隔离测试
+3. **多种 payload** - 设计多种恶意输入,不要只测一个
+4. **检测漏洞特征** - 根据漏洞类型设计检测逻辑
-- **javascript_test**: 测试 JavaScript/Node.js 代码
- 参数: file_path (str), code (str), req_query (dict), req_body (dict), timeout (int)
- 示例: {"code": "exec(req.query.cmd)", "req_query": {"cmd": "id"}}
+### 命令注入 Fuzzing Harness 示例 (Python)
+```python
+import os
+import subprocess
-- **java_test**: 测试 Java 代码,支持模拟 Servlet 请求
- 参数: file_path (str), code (str), request_params (dict), timeout (int)
+# === Mock 危险函数来检测调用 ===
+executed_commands = []
+original_system = os.system
-- **go_test**: 测试 Go 代码
- 参数: file_path (str), code (str), args (list), timeout (int)
+def mock_system(cmd):
+ print(f"[DETECTED] os.system called: {cmd}")
+ executed_commands.append(cmd)
+ return 0
-- **ruby_test**: 测试 Ruby 代码,支持模拟 Rails 请求
- 参数: file_path (str), code (str), params (dict), timeout (int)
+os.system = mock_system
-- **shell_test**: 测试 Shell/Bash 脚本
- 参数: file_path (str), code (str), args (list), env (dict), timeout (int)
+# === 目标函数(从项目代码复制) ===
+def vulnerable_function(user_input):
+ os.system(f"echo {user_input}")
-- **universal_code_test**: 通用多语言测试工具 (自动检测语言)
- 参数: language (str), file_path (str), code (str), params (dict), timeout (int)
+# === Fuzzing 测试 ===
+payloads = [
+ "test", # 正常输入
+ "; id", # 命令连接符
+ "| whoami", # 管道
+ "$(cat /etc/passwd)", # 命令替换
+ "`id`", # 反引号
+ "&& ls -la", # AND 连接
+]
-### 🔥 漏洞验证专用工具 (按漏洞类型选择,推荐使用)
-- **test_command_injection**: 专门测试命令注入漏洞
- 参数: target_file (str), param_name (str), test_command (str), language (str)
- 示例: {"target_file": "vuln.php", "param_name": "cmd", "test_command": "whoami"}
+print("=== Fuzzing Start ===")
+for payload in payloads:
+ print(f"\\nPayload: {payload}")
+ executed_commands.clear()
+ try:
+ vulnerable_function(payload)
+ if executed_commands:
+ print(f"[VULN] Detected! Commands: {executed_commands}")
+ except Exception as e:
+ print(f"[ERROR] {e}")
+```
-- **test_sql_injection**: 专门测试 SQL 注入漏洞
- 参数: target_file (str), param_name (str), db_type (str), injection_type (str)
- 示例: {"target_file": "login.php", "param_name": "username", "db_type": "mysql"}
+### SQL 注入 Fuzzing Harness 示例 (Python)
+```python
+# === Mock 数据库 ===
+class MockCursor:
+ def __init__(self):
+ self.queries = []
-- **test_xss**: 专门测试 XSS 漏洞
- 参数: target_file (str), param_name (str), xss_type (str), context (str)
- 示例: {"target_file": "search.php", "param_name": "q", "xss_type": "reflected"}
+ def execute(self, query, params=None):
+ print(f"[SQL] Query: {query}")
+ print(f"[SQL] Params: {params}")
+ self.queries.append((query, params))
-- **test_path_traversal**: 专门测试路径遍历漏洞
- 参数: target_file (str), param_name (str), target_path (str)
- 示例: {"target_file": "download.php", "param_name": "file", "target_path": "/etc/passwd"}
+ # 检测 SQL 注入特征
+ if params is None and ("'" in query or "OR" in query.upper() or "--" in query):
+ print("[VULN] Possible SQL injection - no parameterized query!")
-- **test_ssti**: 专门测试模板注入漏洞
- 参数: target_file (str), param_name (str), template_engine (str)
- 示例: {"target_file": "render.py", "param_name": "name", "template_engine": "jinja2"}
+class MockDB:
+ def cursor(self):
+ return MockCursor()
-- **test_deserialization**: 专门测试反序列化漏洞
- 参数: target_file (str), language (str), serialization_format (str)
- 示例: {"target_file": "api.php", "language": "php", "serialization_format": "php_serialize"}
+# === 目标函数 ===
+def get_user(db, user_id):
+ cursor = db.cursor()
+ cursor.execute(f"SELECT * FROM users WHERE id = '{user_id}'") # 漏洞!
-- **universal_vuln_test**: 通用漏洞测试工具 (自动选择测试策略)
- 参数: vuln_type (str), target_file (str), param_name (str), additional_params (dict)
- 支持: command_injection, sql_injection, xss, path_traversal, ssti, deserialization
+# === Fuzzing ===
+db = MockDB()
+payloads = ["1", "1'", "1' OR '1'='1", "1'; DROP TABLE users--", "1 UNION SELECT * FROM admin"]
-## 工作方式
-你将收到一批待验证的漏洞发现。对于每个发现,你需要:
+for p in payloads:
+ print(f"\\n=== Testing: {p} ===")
+ get_user(db, p)
+```
+
+### PHP 命令注入 Fuzzing Harness 示例
+```php
+// 注意:php -r 不需要 Hello, {user_input}!"
+
+payloads = [
+ "test",
+ "",
+ "
",
+ "{{7*7}}", # SSTI
+]
+
+for p in payloads:
+ output = vulnerable_render(p)
+ print(f"Input: {p}")
+ print(f"Output: {output}")
+ # 检测:payload 是否原样出现在输出中
+ if p in output and ("<" in p or "{{" in p):
+ print("[VULN] XSS - input not escaped!")
+```
+
+## 验证策略
+
+### 对于可执行的漏洞(命令注入、代码注入等)
+1. 使用 `extract_function` 或 `read_file` 获取目标代码
+2. 编写 Fuzzing Harness,mock 危险函数来检测调用
+3. 使用 `run_code` 执行 Harness
+4. 分析输出,确认漏洞是否触发
+
+### 对于数据泄露型漏洞(SQL注入、路径遍历等)
+1. 获取目标代码
+2. 编写 Harness,mock 数据库/文件系统
+3. 检查是否能构造恶意查询/路径
+4. 分析输出
+
+### 对于配置类漏洞(硬编码密钥等)
+1. 使用 `read_file` 直接读取配置文件
+2. 验证敏感信息是否存在
+3. 评估影响(密钥是否有效、权限范围等)
+
+## 工作流程
+你将收到一批待验证的漏洞发现。对于每个发现:
```
-Thought: [分析这个漏洞,思考如何验证]
+Thought: [分析漏洞类型,设计验证策略]
Action: [工具名称]
-Action Input: [JSON 格式的参数]
+Action Input: [参数]
```
验证完所有发现后,输出:
@@ -139,7 +237,8 @@ Final Answer: [JSON 格式的验证报告]
"poc": {
"description": "PoC 描述",
"steps": ["步骤1", "步骤2"],
- "payload": "curl 'http://target/vuln.php?cmd=id' 或完整利用代码"
+ "payload": "完整可执行的 PoC 代码或命令",
+ "harness_code": "Fuzzing Harness 代码(如果使用)"
},
"impact": "实际影响分析",
"recommendation": "修复建议"
@@ -155,82 +254,22 @@ Final Answer: [JSON 格式的验证报告]
```
## 验证判定标准
-- **confirmed**: 漏洞确认存在且可利用,有明确证据
-- **likely**: 高度可能存在漏洞,但无法完全确认
+- **confirmed**: 漏洞确认存在且可利用,有明确证据(如 Harness 成功触发)
+- **likely**: 高度可能存在漏洞,代码分析明确但无法动态验证
- **uncertain**: 需要更多信息才能判断
- **false_positive**: 确认是误报,有明确理由
-## 验证策略建议
-
-### 对于命令注入漏洞
-1. 使用 **test_command_injection** 工具,它会自动构建测试环境
-2. 或使用对应语言的测试工具 (php_test, python_test 等)
-3. 检查命令输出是否包含 uid=, root, www-data 等特征
-
-### 对于 SQL 注入漏洞
-1. 使用 **test_sql_injection** 工具
-2. 提供数据库类型 (mysql, postgresql, sqlite)
-3. 检查是否能执行 UNION 查询或提取数据
-
-### 对于 XSS 漏洞
-1. 使用 **test_xss** 工具
-2. 指定 XSS 类型 (reflected, stored, dom)
-3. 检查 payload 是否在输出中未转义
-
-### 对于路径遍历漏洞
-1. 使用 **test_path_traversal** 工具
-2. 尝试读取 /etc/passwd 或其他已知文件
-3. 检查是否能访问目标文件
-
-### 对于模板注入 (SSTI) 漏洞
-1. 使用 **test_ssti** 工具
-2. 指定模板引擎 (jinja2, twig, freemarker 等)
-3. 检查数学表达式是否被执行
-
-### 对于反序列化漏洞
-1. 使用 **test_deserialization** 工具
-2. 指定语言和序列化格式
-3. 检查是否能执行任意代码
-
-### 对于其他漏洞
-1. **上下文分析**: 用 read_file 获取更多代码上下文
-2. **通用测试**: 使用 universal_vuln_test 或 universal_code_test
-3. **沙箱测试**: 对高危漏洞用沙箱进行安全测试
+## ⚠️ 关键约束
+1. **必须先调用工具验证** - 不允许仅凭已知信息直接判断
+2. **优先使用 run_code** - 编写 Harness 进行动态验证
+3. **PoC 必须完整可执行** - poc.payload 应该是可直接运行的代码
+4. **不要假设环境** - 沙箱中没有运行的服务,需要 mock
## 重要原则
-1. **质量优先** - 宁可漏报也不要误报太多
-2. **深入理解** - 理解代码逻辑,不要表面判断
-3. **证据支撑** - 判定要有依据
-4. **安全第一** - 沙箱测试要谨慎
-5. **🔥 PoC 生成** - 对于 confirmed 和 likely 的漏洞,**必须**生成完整的 PoC:
- - poc.description: 简要描述这个 PoC 的作用
- - poc.steps: 详细的复现步骤列表
- - poc.payload: **完整的**利用代码或命令,例如:
- - Web漏洞: 完整URL如 `http://target/path?param=`
- - 命令注入: 完整的 curl 命令或 HTTP 请求
- - SQL注入: 完整的利用语句或请求
- - 代码执行: 可直接运行的利用脚本
- - ⚠️ payload 字段必须是**可直接复制执行**的完整利用代码,不要只写参数值
-
-## ⚠️ 关键约束 - 必须遵守!
-1. **禁止直接输出 Final Answer** - 你必须先调用至少一个工具来验证漏洞
-2. **每个漏洞至少调用一次工具** - 使用 read_file 读取代码,或使用 test_* 工具测试
-3. **没有工具调用的验证无效** - 不允许仅凭已知信息直接判断
-4. **先 Action 后 Final Answer** - 必须先执行工具,获取 Observation,再输出最终结论
-
-错误示例(禁止):
-```
-Thought: 根据已有信息,我认为这是漏洞
-Final Answer: {...} ❌ 没有调用任何工具!
-```
-
-正确示例(必须):
-```
-Thought: 我需要先读取 config.php 文件来验证硬编码凭据
-Action: read_file
-Action Input: {"file_path": "config.php"}
-```
-然后等待 Observation,再继续验证其他发现或输出 Final Answer。
+1. **你是验证的大脑** - 你决定如何测试,工具只提供执行能力
+2. **动态验证优先** - 能运行代码验证的就不要仅靠静态分析
+3. **质量优先** - 宁可漏报也不要误报太多
+4. **证据支撑** - 每个判定都需要有依据
现在开始验证漏洞发现!"""
@@ -583,6 +622,24 @@ class VerificationAgent(BaseAgent):
# 检查是否完成
if step.is_final:
+ # 🔥 强制检查:必须至少调用过一次工具才能完成
+ if self._tool_calls == 0:
+ logger.warning(f"[{self.name}] LLM tried to finish without any tool calls! Forcing tool usage.")
+ await self.emit_thinking("⚠️ 拒绝过早完成:必须先使用工具验证漏洞")
+ self._conversation_history.append({
+ "role": "user",
+ "content": (
+ "⚠️ **系统拒绝**: 你必须先使用工具验证漏洞!\n\n"
+ "不允许在没有调用任何工具的情况下直接输出 Final Answer。\n\n"
+ "请立即使用以下工具之一进行验证:\n"
+ "1. `read_file` - 读取漏洞所在文件的代码\n"
+ "2. `run_code` - 编写并执行 Fuzzing Harness 验证漏洞\n"
+ "3. `extract_function` - 提取目标函数进行分析\n\n"
+ "现在请输出 Thought 和 Action,开始验证第一个漏洞。"
+ ),
+ })
+ continue
+
await self.emit_llm_decision("完成漏洞验证", "LLM 判断验证已充分")
final_result = step.final_answer
diff --git a/backend/app/services/agent/tools/__init__.py b/backend/app/services/agent/tools/__init__.py
index 9690918..2bacadb 100644
--- a/backend/app/services/agent/tools/__init__.py
+++ b/backend/app/services/agent/tools/__init__.py
@@ -82,6 +82,9 @@ from .smart_scan_tool import SmartScanTool, QuickAuditTool
# 🔥 新增:Kunlun-M 静态代码分析工具 (MIT License)
from .kunlun_tool import KunlunMTool, KunlunRuleListTool, KunlunPluginTool
+# 🔥 新增:通用代码执行工具 (LLM 驱动的 Fuzzing Harness)
+from .run_code import RunCodeTool, ExtractFunctionTool
+
__all__ = [
# 基础
"AgentTool",
@@ -164,4 +167,8 @@ __all__ = [
"KunlunMTool",
"KunlunRuleListTool",
"KunlunPluginTool",
+
+ # 🔥 通用代码执行工具 (LLM 驱动的 Fuzzing Harness)
+ "RunCodeTool",
+ "ExtractFunctionTool",
]
diff --git a/backend/app/services/agent/tools/run_code.py b/backend/app/services/agent/tools/run_code.py
new file mode 100644
index 0000000..98d70a8
--- /dev/null
+++ b/backend/app/services/agent/tools/run_code.py
@@ -0,0 +1,513 @@
+"""
+通用代码执行工具 - LLM 驱动的漏洞验证
+
+核心理念:
+- LLM 是验证的大脑,工具只提供执行能力
+- 不硬编码 payload、检测规则
+- LLM 自己决定测试策略、编写测试代码、分析结果
+
+使用场景:
+- LLM 编写 Fuzzing Harness 进行局部测试
+- LLM 构造 PoC 验证漏洞
+- LLM 编写 mock 代码隔离测试函数
+"""
+
+import asyncio
+import logging
+import os
+import tempfile
+from typing import Optional, Dict, Any
+from pydantic import BaseModel, Field
+
+from .base import AgentTool, ToolResult
+from .sandbox_tool import SandboxManager, SandboxConfig
+
+logger = logging.getLogger(__name__)
+
+
+class RunCodeInput(BaseModel):
+ """代码执行输入"""
+ code: str = Field(..., description="要执行的代码")
+ language: str = Field(default="python", description="编程语言: python, php, javascript, ruby, go, java, bash")
+ timeout: int = Field(default=60, description="超时时间(秒),复杂测试可设置更长")
+ description: str = Field(default="", description="简短描述这段代码的目的(用于日志)")
+
+
+class RunCodeTool(AgentTool):
+ """
+ 通用代码执行工具
+
+ 让 LLM 自由编写测试代码,在沙箱中执行。
+
+ LLM 可以:
+ - 编写 Fuzzing Harness 隔离测试单个函数
+ - 构造 mock 对象模拟依赖
+ - 设计各种 payload 进行测试
+ - 分析执行结果判断漏洞
+
+ 工具不做任何假设,完全由 LLM 控制测试逻辑。
+ """
+
+ def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."):
+ super().__init__()
+ # 使用更宽松的沙箱配置
+ config = SandboxConfig(
+ timeout=120,
+ memory_limit="1g", # 更大内存
+ )
+ self.sandbox_manager = sandbox_manager or SandboxManager(config)
+ self.project_root = project_root
+
+ @property
+ def name(self) -> str:
+ return "run_code"
+
+ @property
+ def description(self) -> str:
+ return """🔥 通用代码执行工具 - 在沙箱中运行你编写的测试代码
+
+这是你进行漏洞验证的核心工具。你可以:
+1. 编写 Fuzzing Harness 隔离测试单个函数
+2. 构造 mock 对象模拟数据库、HTTP 请求等依赖
+3. 设计各种 payload 进行漏洞测试
+4. 编写完整的 PoC 验证脚本
+
+输入:
+- code: 你编写的测试代码(完整可执行)
+- language: python, php, javascript, ruby, go, java, bash
+- timeout: 超时秒数(默认60,复杂测试可设更长)
+- description: 简短描述代码目的
+
+支持的语言和执行方式:
+- python: python3 -c 'code'
+- php: php -r 'code' (注意:不需要 ToolResult:
+ """执行用户编写的代码"""
+
+ # 初始化沙箱
+ try:
+ await self.sandbox_manager.initialize()
+ except Exception as e:
+ logger.warning(f"Sandbox init failed: {e}")
+
+ if not self.sandbox_manager.is_available:
+ return ToolResult(
+ success=False,
+ error="沙箱环境不可用 (Docker 未运行)",
+ data="请确保 Docker 已启动。如果无法使用沙箱,你可以通过静态分析代码来验证漏洞。"
+ )
+
+ # 构建执行命令
+ language = language.lower().strip()
+ command = self._build_command(code, language)
+
+ if command is None:
+ return ToolResult(
+ success=False,
+ error=f"不支持的语言: {language}",
+ data=f"支持的语言: python, php, javascript, ruby, go, java, bash"
+ )
+
+ # 在沙箱中执行
+ result = await self.sandbox_manager.execute_command(
+ command=command,
+ timeout=timeout,
+ )
+
+ # 格式化输出
+ output_parts = [f"🔬 代码执行结果"]
+ if description:
+ output_parts.append(f"目的: {description}")
+ output_parts.append(f"语言: {language}")
+ output_parts.append(f"退出码: {result['exit_code']}")
+
+ if result.get("stdout"):
+ stdout = result["stdout"]
+ if len(stdout) > 5000:
+ stdout = stdout[:5000] + f"\n... (截断,共 {len(result['stdout'])} 字符)"
+ output_parts.append(f"\n输出:\n```\n{stdout}\n```")
+
+ if result.get("stderr"):
+ stderr = result["stderr"]
+ if len(stderr) > 2000:
+ stderr = stderr[:2000] + "\n... (截断)"
+ output_parts.append(f"\n错误输出:\n```\n{stderr}\n```")
+
+ if result.get("error"):
+ output_parts.append(f"\n执行错误: {result['error']}")
+
+ # 提示 LLM 分析结果
+ output_parts.append("\n---")
+ output_parts.append("请根据上述输出分析漏洞是否存在。")
+
+ return ToolResult(
+ success=result.get("success", False),
+ data="\n".join(output_parts),
+ error=result.get("error"),
+ metadata={
+ "language": language,
+ "exit_code": result.get("exit_code", -1),
+ "stdout_length": len(result.get("stdout", "")),
+ "stderr_length": len(result.get("stderr", "")),
+ }
+ )
+
+ def _build_command(self, code: str, language: str) -> Optional[str]:
+ """根据语言构建执行命令"""
+
+ # 转义单引号的通用方法
+ def escape_for_shell(s: str) -> str:
+ return s.replace("'", "'\"'\"'")
+
+ if language == "python":
+ escaped = escape_for_shell(code)
+ return f"python3 -c '{escaped}'"
+
+ elif language == "php":
+ # PHP: php -r 不需要 "):
+ clean_code = clean_code[:-2].strip()
+ escaped = escape_for_shell(clean_code)
+ return f"php -r '{escaped}'"
+
+ elif language in ["javascript", "js", "node"]:
+ escaped = escape_for_shell(code)
+ return f"node -e '{escaped}'"
+
+ elif language == "ruby":
+ escaped = escape_for_shell(code)
+ return f"ruby -e '{escaped}'"
+
+ elif language == "bash":
+ escaped = escape_for_shell(code)
+ return f"bash -c '{escaped}'"
+
+ elif language == "go":
+ # Go 需要完整的 package main
+ escaped = escape_for_shell(code).replace("\\", "\\\\")
+ return f"echo '{escaped}' > /tmp/main.go && go run /tmp/main.go"
+
+ elif language == "java":
+ # Java 需要完整的 class
+ escaped = escape_for_shell(code).replace("\\", "\\\\")
+ # 提取类名
+ import re
+ class_match = re.search(r'public\s+class\s+(\w+)', code)
+ class_name = class_match.group(1) if class_match else "Test"
+ return f"echo '{escaped}' > /tmp/{class_name}.java && javac /tmp/{class_name}.java && java -cp /tmp {class_name}"
+
+ return None
+
+
+class ExtractFunctionInput(BaseModel):
+ """函数提取输入"""
+ file_path: str = Field(..., description="源文件路径")
+ function_name: str = Field(..., description="要提取的函数名")
+ include_imports: bool = Field(default=True, description="是否包含 import 语句")
+
+
+class ExtractFunctionTool(AgentTool):
+ """
+ 函数提取工具
+
+ 从源文件中提取指定函数及其依赖,用于构建 Fuzzing Harness
+ """
+
+ def __init__(self, project_root: str = "."):
+ super().__init__()
+ self.project_root = project_root
+
+ @property
+ def name(self) -> str:
+ return "extract_function"
+
+ @property
+ def description(self) -> str:
+ return """从源文件中提取指定函数的代码
+
+用于构建 Fuzzing Harness 时获取目标函数代码。
+
+输入:
+- file_path: 源文件路径
+- function_name: 要提取的函数名
+- include_imports: 是否包含文件开头的 import 语句(默认 true)
+
+返回:
+- 函数代码
+- 相关的 import 语句
+- 函数参数列表
+
+示例:
+{"file_path": "app/api.py", "function_name": "process_command"}"""
+
+ @property
+ def args_schema(self):
+ return ExtractFunctionInput
+
+ async def _execute(
+ self,
+ file_path: str,
+ function_name: str,
+ include_imports: bool = True,
+ **kwargs
+ ) -> ToolResult:
+ """提取函数代码"""
+ import ast
+ import re
+
+ full_path = os.path.join(self.project_root, file_path)
+ if not os.path.exists(full_path):
+ return ToolResult(success=False, error=f"文件不存在: {file_path}")
+
+ with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
+ code = f.read()
+
+ # 检测语言
+ ext = os.path.splitext(file_path)[1].lower()
+
+ if ext == ".py":
+ result = self._extract_python(code, function_name, include_imports)
+ elif ext == ".php":
+ result = self._extract_php(code, function_name)
+ elif ext in [".js", ".ts"]:
+ result = self._extract_javascript(code, function_name)
+ else:
+ result = self._extract_generic(code, function_name)
+
+ if result["success"]:
+ output_parts = [f"📦 函数提取结果\n"]
+ output_parts.append(f"文件: {file_path}")
+ output_parts.append(f"函数: {function_name}")
+
+ if result.get("imports"):
+ output_parts.append(f"\n相关 imports:\n```\n{result['imports']}\n```")
+
+ if result.get("parameters"):
+ output_parts.append(f"\n参数: {', '.join(result['parameters'])}")
+
+ output_parts.append(f"\n函数代码:\n```\n{result['code']}\n```")
+
+ output_parts.append("\n---")
+ output_parts.append("你现在可以使用这段代码构建 Fuzzing Harness")
+
+ return ToolResult(
+ success=True,
+ data="\n".join(output_parts),
+ metadata=result
+ )
+ else:
+ return ToolResult(
+ success=False,
+ error=result.get("error", "提取失败"),
+ data=f"无法提取函数 '{function_name}'。你可以使用 read_file 工具直接读取文件,手动定位函数代码。"
+ )
+
+ def _extract_python(self, code: str, function_name: str, include_imports: bool) -> Dict:
+ """提取 Python 函数"""
+ import ast
+
+ try:
+ tree = ast.parse(code)
+ except SyntaxError:
+ # 降级到正则提取
+ return self._extract_generic(code, function_name)
+
+ # 收集 imports
+ imports = []
+ if include_imports:
+ for node in ast.walk(tree):
+ if isinstance(node, ast.Import):
+ imports.append(ast.unparse(node))
+ elif isinstance(node, ast.ImportFrom):
+ imports.append(ast.unparse(node))
+
+ # 查找函数
+ for node in ast.walk(tree):
+ if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
+ if node.name == function_name:
+ lines = code.split('\n')
+ func_code = '\n'.join(lines[node.lineno - 1:node.end_lineno])
+ params = [arg.arg for arg in node.args.args]
+
+ return {
+ "success": True,
+ "code": func_code,
+ "imports": '\n'.join(imports) if imports else None,
+ "parameters": params,
+ "line_start": node.lineno,
+ "line_end": node.end_lineno,
+ }
+
+ return {"success": False, "error": f"未找到函数 '{function_name}'"}
+
+ def _extract_php(self, code: str, function_name: str) -> Dict:
+ """提取 PHP 函数"""
+ import re
+
+ pattern = rf'function\s+{re.escape(function_name)}\s*\([^)]*\)\s*\{{'
+ match = re.search(pattern, code)
+
+ if not match:
+ return {"success": False, "error": f"未找到函数 '{function_name}'"}
+
+ start_pos = match.start()
+ brace_count = 0
+ end_pos = match.end() - 1
+
+ for i, char in enumerate(code[match.end() - 1:], start=match.end() - 1):
+ if char == '{':
+ brace_count += 1
+ elif char == '}':
+ brace_count -= 1
+ if brace_count == 0:
+ end_pos = i + 1
+ break
+
+ func_code = code[start_pos:end_pos]
+
+ # 提取参数
+ param_match = re.search(r'function\s+\w+\s*\(([^)]*)\)', func_code)
+ params = []
+ if param_match:
+ params_str = param_match.group(1)
+ params = [p.strip().split('=')[0].strip().replace('$', '')
+ for p in params_str.split(',') if p.strip()]
+
+ return {
+ "success": True,
+ "code": func_code,
+ "parameters": params,
+ }
+
+ def _extract_javascript(self, code: str, function_name: str) -> Dict:
+ """提取 JavaScript 函数"""
+ import re
+
+ patterns = [
+ rf'function\s+{re.escape(function_name)}\s*\([^)]*\)\s*\{{',
+ rf'(?:const|let|var)\s+{re.escape(function_name)}\s*=\s*function\s*\([^)]*\)\s*\{{',
+ rf'(?:const|let|var)\s+{re.escape(function_name)}\s*=\s*\([^)]*\)\s*=>\s*\{{',
+ rf'async\s+function\s+{re.escape(function_name)}\s*\([^)]*\)\s*\{{',
+ ]
+
+ for pattern in patterns:
+ match = re.search(pattern, code)
+ if match:
+ start_pos = match.start()
+ brace_count = 0
+ end_pos = match.end() - 1
+
+ for i, char in enumerate(code[match.end() - 1:], start=match.end() - 1):
+ if char == '{':
+ brace_count += 1
+ elif char == '}':
+ brace_count -= 1
+ if brace_count == 0:
+ end_pos = i + 1
+ break
+
+ func_code = code[start_pos:end_pos]
+
+ return {
+ "success": True,
+ "code": func_code,
+ }
+
+ return {"success": False, "error": f"未找到函数 '{function_name}'"}
+
+ def _extract_generic(self, code: str, function_name: str) -> Dict:
+ """通用函数提取(正则)"""
+ import re
+
+ # 尝试多种模式
+ patterns = [
+ rf'def\s+{re.escape(function_name)}\s*\([^)]*\)\s*:', # Python
+ rf'function\s+{re.escape(function_name)}\s*\([^)]*\)', # PHP/JS
+ rf'func\s+{re.escape(function_name)}\s*\([^)]*\)', # Go
+ ]
+
+ for pattern in patterns:
+ match = re.search(pattern, code, re.MULTILINE)
+ if match:
+ start_line = code[:match.start()].count('\n')
+ lines = code.split('\n')
+
+ # 尝试找到函数结束
+ end_line = start_line + 1
+ indent = len(lines[start_line]) - len(lines[start_line].lstrip())
+
+ for i in range(start_line + 1, min(start_line + 100, len(lines))):
+ line = lines[i]
+ if line.strip() and not line.startswith(' ' * (indent + 1)):
+ if not line.strip().startswith('#'):
+ end_line = i
+ break
+ end_line = i + 1
+
+ func_code = '\n'.join(lines[start_line:end_line])
+
+ return {
+ "success": True,
+ "code": func_code,
+ }
+
+ return {"success": False, "error": f"未找到函数 '{function_name}'"}
diff --git a/backend/app/services/agent/tools/sandbox_tool.py b/backend/app/services/agent/tools/sandbox_tool.py
index 55b375e..781bf94 100644
--- a/backend/app/services/agent/tools/sandbox_tool.py
+++ b/backend/app/services/agent/tools/sandbox_tool.py
@@ -514,12 +514,24 @@ class SandboxTool(AgentTool):
在安全隔离的环境中执行代码和命令
"""
- # 允许的命令前缀
+ # 允许的命令前缀 - 放宽限制以支持更灵活的测试
ALLOWED_COMMANDS = [
- "python", "python3", "node", "curl", "wget",
- "cat", "head", "tail", "grep", "find", "ls",
- "echo", "printf", "test", "id", "whoami",
- "php", # 🔥 添加 PHP 支持
+ # 编程语言解释器
+ "python", "python3", "node", "php", "ruby", "perl",
+ "go", "java", "javac", "bash", "sh",
+ # 网络工具
+ "curl", "wget", "nc", "netcat",
+ # 文件操作
+ "cat", "head", "tail", "grep", "find", "ls", "wc",
+ "sed", "awk", "cut", "sort", "uniq", "tr", "xargs",
+ # 系统信息(用于验证命令执行)
+ "echo", "printf", "test", "id", "whoami", "uname",
+ "env", "printenv", "pwd", "hostname",
+ # 编码/解码工具
+ "base64", "xxd", "od", "hexdump",
+ # 其他实用工具
+ "timeout", "time", "sleep", "true", "false",
+ "md5sum", "sha256sum", "strings",
]
def __init__(self, sandbox_manager: Optional[SandboxManager] = None):