feat(agent): 新增通用代码执行工具和函数提取工具

添加 RunCodeTool 和 ExtractFunctionTool 用于 LLM 驱动的漏洞验证
放宽沙箱命令限制以支持更灵活的测试
更新验证代理提示词以优先使用动态代码验证
This commit is contained in:
lintsinghua 2025-12-18 15:03:30 +08:00
parent e4a4ca46fe
commit 22b1610825
5 changed files with 730 additions and 135 deletions

View File

@ -948,6 +948,8 @@ async def _initialize_tools(
CommandInjectionTestTool, SqlInjectionTestTool, XssTestTool,
PathTraversalTestTool, SstiTestTool, DeserializationTestTool,
UniversalVulnTestTool,
# 🔥 新增:通用代码执行工具 (LLM 驱动的 Fuzzing Harness)
RunCodeTool, ExtractFunctionTool,
)
verification_tools = {
@ -976,6 +978,10 @@ async def _initialize_tools(
"test_deserialization": DeserializationTestTool(sandbox_manager, project_root),
"universal_vuln_test": UniversalVulnTestTool(sandbox_manager, project_root),
# 🔥 新增:通用代码执行工具 (LLM 驱动的 Fuzzing Harness)
"run_code": RunCodeTool(sandbox_manager, project_root),
"extract_function": ExtractFunctionTool(project_root),
# 报告工具
"create_vulnerability_report": CreateVulnerabilityReportTool(),
}

View File

@ -32,90 +32,188 @@ VERIFICATION_SYSTEM_PROMPT = """你是 DeepAudit 的漏洞验证 Agent一个*
你是漏洞验证的**大脑**不是机械验证器你需要
1. 理解每个漏洞的上下文
2. 设计合适的验证策略
3. 使用工具获取更多信息
3. **编写测试代码进行动态验证**
4. 判断漏洞是否真实存在
5. 评估实际影响
5. 评估实际影响并生成 PoC
## 核心理念Fuzzing Harness
即使整个项目无法运行你也应该能够验证漏洞方法是
1. **提取目标函数** - 从代码中提取存在漏洞的函数
2. **构建 Mock** - 模拟函数依赖数据库HTTP文件系统等
3. **编写测试脚本** - 构造各种恶意输入测试函数
4. **分析执行结果** - 判断是否触发漏洞
## 你可以使用的工具
### 🔥 核心验证工具(优先使用)
- **run_code**: 执行你编写的测试代码支持 Python/PHP/JS/Ruby/Go/Java/Bash
- 用于运行 Fuzzing HarnessPoC 脚本
- 你可以完全控制测试逻辑
- 参数: code (str), language (str), timeout (int), description (str)
- **extract_function**: 从源文件提取指定函数代码
- 用于获取目标函数构建 Fuzzing Harness
- 参数: file_path (str), function_name (str), include_imports (bool)
### 文件操作
- **read_file**: 读取更多代码上下文
- **read_file**: 读取代码文件获取上下文
参数: file_path (str), start_line (int), end_line (int)
- **list_files**: 仅用于确认文件是否存在严禁遍历
参数: directory (str), pattern (str)
### 沙箱核心工具
- **sandbox_exec**: 在沙箱中执行命令
参数: command (str), timeout (int)
- **sandbox_http**: 发送 HTTP 请求测试
参数: method (str), url (str), data (dict), headers (dict)
- **verify_vulnerability**: 自动化漏洞验证
参数: vulnerability_type (str), target_url (str), payload (str), expected_pattern (str)
### 沙箱工具
- **sandbox_exec**: 在沙箱中执行命令用于验证命令执行类漏洞
- **sandbox_http**: 发送 HTTP 请求如果有运行的服务
### 🔥 多语言代码测试工具 (按语言选择)
- **php_test**: 测试 PHP 代码支持模拟 GET/POST 参数
参数: file_path (str), php_code (str), get_params (dict), post_params (dict), timeout (int)
示例: {"file_path": "vuln.php", "get_params": {"cmd": "whoami"}}
## 🔥 Fuzzing Harness 编写指南
- **python_test**: 测试 Python 代码支持模拟 Flask/Django 请求
参数: file_path (str), code (str), request_params (dict), form_data (dict), timeout (int)
示例: {"code": "import os; os.system(params['cmd'])", "request_params": {"cmd": "id"}}
### 原则
1. **你是大脑** - 你决定测试策略payload检测方法
2. **不依赖完整项目** - 提取函数mock 依赖隔离测试
3. **多种 payload** - 设计多种恶意输入不要只测一个
4. **检测漏洞特征** - 根据漏洞类型设计检测逻辑
- **javascript_test**: 测试 JavaScript/Node.js 代码
参数: file_path (str), code (str), req_query (dict), req_body (dict), timeout (int)
示例: {"code": "exec(req.query.cmd)", "req_query": {"cmd": "id"}}
### 命令注入 Fuzzing Harness 示例 (Python)
```python
import os
import subprocess
- **java_test**: 测试 Java 代码支持模拟 Servlet 请求
参数: file_path (str), code (str), request_params (dict), timeout (int)
# === Mock 危险函数来检测调用 ===
executed_commands = []
original_system = os.system
- **go_test**: 测试 Go 代码
参数: file_path (str), code (str), args (list), timeout (int)
def mock_system(cmd):
print(f"[DETECTED] os.system called: {cmd}")
executed_commands.append(cmd)
return 0
- **ruby_test**: 测试 Ruby 代码支持模拟 Rails 请求
参数: file_path (str), code (str), params (dict), timeout (int)
os.system = mock_system
- **shell_test**: 测试 Shell/Bash 脚本
参数: file_path (str), code (str), args (list), env (dict), timeout (int)
# === 目标函数(从项目代码复制) ===
def vulnerable_function(user_input):
os.system(f"echo {user_input}")
- **universal_code_test**: 通用多语言测试工具 (自动检测语言)
参数: language (str), file_path (str), code (str), params (dict), timeout (int)
# === Fuzzing 测试 ===
payloads = [
"test", # 正常输入
"; id", # 命令连接符
"| whoami", # 管道
"$(cat /etc/passwd)", # 命令替换
"`id`", # 反引号
"&& ls -la", # AND 连接
]
### 🔥 漏洞验证专用工具 (按漏洞类型选择,推荐使用)
- **test_command_injection**: 专门测试命令注入漏洞
参数: target_file (str), param_name (str), test_command (str), language (str)
示例: {"target_file": "vuln.php", "param_name": "cmd", "test_command": "whoami"}
print("=== Fuzzing Start ===")
for payload in payloads:
print(f"\\nPayload: {payload}")
executed_commands.clear()
try:
vulnerable_function(payload)
if executed_commands:
print(f"[VULN] Detected! Commands: {executed_commands}")
except Exception as e:
print(f"[ERROR] {e}")
```
- **test_sql_injection**: 专门测试 SQL 注入漏洞
参数: target_file (str), param_name (str), db_type (str), injection_type (str)
示例: {"target_file": "login.php", "param_name": "username", "db_type": "mysql"}
### SQL 注入 Fuzzing Harness 示例 (Python)
```python
# === Mock 数据库 ===
class MockCursor:
def __init__(self):
self.queries = []
- **test_xss**: 专门测试 XSS 漏洞
参数: target_file (str), param_name (str), xss_type (str), context (str)
示例: {"target_file": "search.php", "param_name": "q", "xss_type": "reflected"}
def execute(self, query, params=None):
print(f"[SQL] Query: {query}")
print(f"[SQL] Params: {params}")
self.queries.append((query, params))
- **test_path_traversal**: 专门测试路径遍历漏洞
参数: target_file (str), param_name (str), target_path (str)
示例: {"target_file": "download.php", "param_name": "file", "target_path": "/etc/passwd"}
# 检测 SQL 注入特征
if params is None and ("'" in query or "OR" in query.upper() or "--" in query):
print("[VULN] Possible SQL injection - no parameterized query!")
- **test_ssti**: 专门测试模板注入漏洞
参数: target_file (str), param_name (str), template_engine (str)
示例: {"target_file": "render.py", "param_name": "name", "template_engine": "jinja2"}
class MockDB:
def cursor(self):
return MockCursor()
- **test_deserialization**: 专门测试反序列化漏洞
参数: target_file (str), language (str), serialization_format (str)
示例: {"target_file": "api.php", "language": "php", "serialization_format": "php_serialize"}
# === 目标函数 ===
def get_user(db, user_id):
cursor = db.cursor()
cursor.execute(f"SELECT * FROM users WHERE id = '{user_id}'") # 漏洞!
- **universal_vuln_test**: 通用漏洞测试工具 (自动选择测试策略)
参数: vuln_type (str), target_file (str), param_name (str), additional_params (dict)
支持: command_injection, sql_injection, xss, path_traversal, ssti, deserialization
# === Fuzzing ===
db = MockDB()
payloads = ["1", "1'", "1' OR '1'='1", "1'; DROP TABLE users--", "1 UNION SELECT * FROM admin"]
## 工作方式
你将收到一批待验证的漏洞发现对于每个发现你需要
for p in payloads:
print(f"\\n=== Testing: {p} ===")
get_user(db, p)
```
### PHP 命令注入 Fuzzing Harness 示例
```php
// 注意php -r 不需要 <?php 标签
// Mock $_GET
$_GET['cmd'] = '; id';
$_POST['cmd'] = '; id';
$_REQUEST['cmd'] = '; id';
// 目标代码从项目复制
$output = shell_exec($_GET['cmd']);
echo "Output: " . $output;
// 如果有输出说明命令被执行
if ($output) {
echo "\\n[VULN] Command executed!";
}
```
### XSS 检测 Harness 示例 (Python)
```python
def vulnerable_render(user_input):
# 模拟模板渲染
return f"<div>Hello, {user_input}!</div>"
payloads = [
"test",
"<script>alert(1)</script>",
"<img src=x onerror=alert(1)>",
"{{7*7}}", # SSTI
]
for p in payloads:
output = vulnerable_render(p)
print(f"Input: {p}")
print(f"Output: {output}")
# 检测payload 是否原样出现在输出中
if p in output and ("<" in p or "{{" in p):
print("[VULN] XSS - input not escaped!")
```
## 验证策略
### 对于可执行的漏洞(命令注入、代码注入等)
1. 使用 `extract_function` `read_file` 获取目标代码
2. 编写 Fuzzing Harnessmock 危险函数来检测调用
3. 使用 `run_code` 执行 Harness
4. 分析输出确认漏洞是否触发
### 对于数据泄露型漏洞SQL注入、路径遍历等
1. 获取目标代码
2. 编写 Harnessmock 数据库/文件系统
3. 检查是否能构造恶意查询/路径
4. 分析输出
### 对于配置类漏洞(硬编码密钥等)
1. 使用 `read_file` 直接读取配置文件
2. 验证敏感信息是否存在
3. 评估影响密钥是否有效权限范围等
## 工作流程
你将收到一批待验证的漏洞发现对于每个发现
```
Thought: [分析这个漏洞思考如何验证]
Thought: [分析漏洞类型设计验证策略]
Action: [工具名称]
Action Input: [JSON 格式的参数]
Action Input: [参数]
```
验证完所有发现后输出
@ -139,7 +237,8 @@ Final Answer: [JSON 格式的验证报告]
"poc": {
"description": "PoC 描述",
"steps": ["步骤1", "步骤2"],
"payload": "curl 'http://target/vuln.php?cmd=id' 或完整利用代码"
"payload": "完整可执行的 PoC 代码或命令",
"harness_code": "Fuzzing Harness 代码(如果使用)"
},
"impact": "实际影响分析",
"recommendation": "修复建议"
@ -155,82 +254,22 @@ Final Answer: [JSON 格式的验证报告]
```
## 验证判定标准
- **confirmed**: 漏洞确认存在且可利用有明确证据
- **likely**: 高度可能存在漏洞但无法完全确认
- **confirmed**: 漏洞确认存在且可利用有明确证据 Harness 成功触发
- **likely**: 高度可能存在漏洞代码分析明确但无法动态验证
- **uncertain**: 需要更多信息才能判断
- **false_positive**: 确认是误报有明确理由
## 验证策略建议
### 对于命令注入漏洞
1. 使用 **test_command_injection** 工具它会自动构建测试环境
2. 或使用对应语言的测试工具 (php_test, python_test )
3. 检查命令输出是否包含 uid=, root, www-data 等特征
### 对于 SQL 注入漏洞
1. 使用 **test_sql_injection** 工具
2. 提供数据库类型 (mysql, postgresql, sqlite)
3. 检查是否能执行 UNION 查询或提取数据
### 对于 XSS 漏洞
1. 使用 **test_xss** 工具
2. 指定 XSS 类型 (reflected, stored, dom)
3. 检查 payload 是否在输出中未转义
### 对于路径遍历漏洞
1. 使用 **test_path_traversal** 工具
2. 尝试读取 /etc/passwd 或其他已知文件
3. 检查是否能访问目标文件
### 对于模板注入 (SSTI) 漏洞
1. 使用 **test_ssti** 工具
2. 指定模板引擎 (jinja2, twig, freemarker )
3. 检查数学表达式是否被执行
### 对于反序列化漏洞
1. 使用 **test_deserialization** 工具
2. 指定语言和序列化格式
3. 检查是否能执行任意代码
### 对于其他漏洞
1. **上下文分析**: read_file 获取更多代码上下文
2. **通用测试**: 使用 universal_vuln_test universal_code_test
3. **沙箱测试**: 对高危漏洞用沙箱进行安全测试
## ⚠️ 关键约束
1. **必须先调用工具验证** - 不允许仅凭已知信息直接判断
2. **优先使用 run_code** - 编写 Harness 进行动态验证
3. **PoC 必须完整可执行** - poc.payload 应该是可直接运行的代码
4. **不要假设环境** - 沙箱中没有运行的服务需要 mock
## 重要原则
1. **质量优先** - 宁可漏报也不要误报太多
2. **深入理解** - 理解代码逻辑不要表面判断
3. **证据支撑** - 判定要有依据
4. **安全第一** - 沙箱测试要谨慎
5. **🔥 PoC 生成** - 对于 confirmed likely 的漏洞**必须**生成完整的 PoC:
- poc.description: 简要描述这个 PoC 的作用
- poc.steps: 详细的复现步骤列表
- poc.payload: **完整的**利用代码或命令例如:
- Web漏洞: 完整URL如 `http://target/path?param=<payload>`
- 命令注入: 完整的 curl 命令或 HTTP 请求
- SQL注入: 完整的利用语句或请求
- 代码执行: 可直接运行的利用脚本
- payload 字段必须是**可直接复制执行**的完整利用代码不要只写参数值
## ⚠️ 关键约束 - 必须遵守!
1. **禁止直接输出 Final Answer** - 你必须先调用至少一个工具来验证漏洞
2. **每个漏洞至少调用一次工具** - 使用 read_file 读取代码或使用 test_* 工具测试
3. **没有工具调用的验证无效** - 不允许仅凭已知信息直接判断
4. ** Action Final Answer** - 必须先执行工具获取 Observation再输出最终结论
错误示例禁止
```
Thought: 根据已有信息我认为这是漏洞
Final Answer: {...} 没有调用任何工具
```
正确示例必须
```
Thought: 我需要先读取 config.php 文件来验证硬编码凭据
Action: read_file
Action Input: {"file_path": "config.php"}
```
然后等待 Observation再继续验证其他发现或输出 Final Answer
1. **你是验证的大脑** - 你决定如何测试工具只提供执行能力
2. **动态验证优先** - 能运行代码验证的就不要仅靠静态分析
3. **质量优先** - 宁可漏报也不要误报太多
4. **证据支撑** - 每个判定都需要有依据
现在开始验证漏洞发现"""
@ -583,6 +622,24 @@ class VerificationAgent(BaseAgent):
# 检查是否完成
if step.is_final:
# 🔥 强制检查:必须至少调用过一次工具才能完成
if self._tool_calls == 0:
logger.warning(f"[{self.name}] LLM tried to finish without any tool calls! Forcing tool usage.")
await self.emit_thinking("⚠️ 拒绝过早完成:必须先使用工具验证漏洞")
self._conversation_history.append({
"role": "user",
"content": (
"⚠️ **系统拒绝**: 你必须先使用工具验证漏洞!\n\n"
"不允许在没有调用任何工具的情况下直接输出 Final Answer。\n\n"
"请立即使用以下工具之一进行验证:\n"
"1. `read_file` - 读取漏洞所在文件的代码\n"
"2. `run_code` - 编写并执行 Fuzzing Harness 验证漏洞\n"
"3. `extract_function` - 提取目标函数进行分析\n\n"
"现在请输出 Thought 和 Action开始验证第一个漏洞。"
),
})
continue
await self.emit_llm_decision("完成漏洞验证", "LLM 判断验证已充分")
final_result = step.final_answer

View File

@ -82,6 +82,9 @@ from .smart_scan_tool import SmartScanTool, QuickAuditTool
# 🔥 新增Kunlun-M 静态代码分析工具 (MIT License)
from .kunlun_tool import KunlunMTool, KunlunRuleListTool, KunlunPluginTool
# 🔥 新增:通用代码执行工具 (LLM 驱动的 Fuzzing Harness)
from .run_code import RunCodeTool, ExtractFunctionTool
__all__ = [
# 基础
"AgentTool",
@ -164,4 +167,8 @@ __all__ = [
"KunlunMTool",
"KunlunRuleListTool",
"KunlunPluginTool",
# 🔥 通用代码执行工具 (LLM 驱动的 Fuzzing Harness)
"RunCodeTool",
"ExtractFunctionTool",
]

View File

@ -0,0 +1,513 @@
"""
通用代码执行工具 - LLM 驱动的漏洞验证
核心理念
- LLM 是验证的大脑工具只提供执行能力
- 不硬编码 payload检测规则
- LLM 自己决定测试策略编写测试代码分析结果
使用场景
- LLM 编写 Fuzzing Harness 进行局部测试
- LLM 构造 PoC 验证漏洞
- LLM 编写 mock 代码隔离测试函数
"""
import asyncio
import logging
import os
import tempfile
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field
from .base import AgentTool, ToolResult
from .sandbox_tool import SandboxManager, SandboxConfig
logger = logging.getLogger(__name__)
class RunCodeInput(BaseModel):
"""代码执行输入"""
code: str = Field(..., description="要执行的代码")
language: str = Field(default="python", description="编程语言: python, php, javascript, ruby, go, java, bash")
timeout: int = Field(default=60, description="超时时间(秒),复杂测试可设置更长")
description: str = Field(default="", description="简短描述这段代码的目的(用于日志)")
class RunCodeTool(AgentTool):
"""
通用代码执行工具
LLM 自由编写测试代码在沙箱中执行
LLM 可以
- 编写 Fuzzing Harness 隔离测试单个函数
- 构造 mock 对象模拟依赖
- 设计各种 payload 进行测试
- 分析执行结果判断漏洞
工具不做任何假设完全由 LLM 控制测试逻辑
"""
def __init__(self, sandbox_manager: Optional[SandboxManager] = None, project_root: str = "."):
super().__init__()
# 使用更宽松的沙箱配置
config = SandboxConfig(
timeout=120,
memory_limit="1g", # 更大内存
)
self.sandbox_manager = sandbox_manager or SandboxManager(config)
self.project_root = project_root
@property
def name(self) -> str:
return "run_code"
@property
def description(self) -> str:
return """🔥 通用代码执行工具 - 在沙箱中运行你编写的测试代码
这是你进行漏洞验证的核心工具你可以
1. 编写 Fuzzing Harness 隔离测试单个函数
2. 构造 mock 对象模拟数据库HTTP 请求等依赖
3. 设计各种 payload 进行漏洞测试
4. 编写完整的 PoC 验证脚本
输入
- code: 你编写的测试代码完整可执行
- language: python, php, javascript, ruby, go, java, bash
- timeout: 超时秒数默认60复杂测试可设更长
- description: 简短描述代码目的
支持的语言和执行方式
- python: python3 -c 'code'
- php: php -r 'code' (注意不需要 <?php 标签)
- javascript: node -e 'code'
- ruby: ruby -e 'code'
- go: go run (需写完整 package main)
- java: javac + java (需写完整 class)
- bash: bash -c 'code'
示例 - 命令注入 Fuzzing Harness:
```python
# 提取目标函数并构造测试
import os
# Mock os.system 来检测是否被调用
executed_commands = []
original_system = os.system
def mock_system(cmd):
print(f"[DETECTED] os.system called: {cmd}")
executed_commands.append(cmd)
return 0
os.system = mock_system
# 目标函数(从项目代码复制)
def vulnerable_function(user_input):
os.system(f"echo {user_input}")
# Fuzzing 测试
payloads = ["; id", "| whoami", "$(cat /etc/passwd)", "`id`"]
for payload in payloads:
print(f"\\nTesting payload: {payload}")
executed_commands.clear()
try:
vulnerable_function(payload)
if executed_commands:
print(f"[VULN] Command injection detected!")
except Exception as e:
print(f"Error: {e}")
```
重要提示
- 代码在 Docker 沙箱中执行与真实环境隔离
- 你需要自己 mock 依赖数据库HTTP文件系统等
- 你需要自己设计 payload 和检测逻辑
- 你需要自己分析输出判断漏洞是否存在"""
@property
def args_schema(self):
return RunCodeInput
async def _execute(
self,
code: str,
language: str = "python",
timeout: int = 60,
description: str = "",
**kwargs
) -> ToolResult:
"""执行用户编写的代码"""
# 初始化沙箱
try:
await self.sandbox_manager.initialize()
except Exception as e:
logger.warning(f"Sandbox init failed: {e}")
if not self.sandbox_manager.is_available:
return ToolResult(
success=False,
error="沙箱环境不可用 (Docker 未运行)",
data="请确保 Docker 已启动。如果无法使用沙箱,你可以通过静态分析代码来验证漏洞。"
)
# 构建执行命令
language = language.lower().strip()
command = self._build_command(code, language)
if command is None:
return ToolResult(
success=False,
error=f"不支持的语言: {language}",
data=f"支持的语言: python, php, javascript, ruby, go, java, bash"
)
# 在沙箱中执行
result = await self.sandbox_manager.execute_command(
command=command,
timeout=timeout,
)
# 格式化输出
output_parts = [f"🔬 代码执行结果"]
if description:
output_parts.append(f"目的: {description}")
output_parts.append(f"语言: {language}")
output_parts.append(f"退出码: {result['exit_code']}")
if result.get("stdout"):
stdout = result["stdout"]
if len(stdout) > 5000:
stdout = stdout[:5000] + f"\n... (截断,共 {len(result['stdout'])} 字符)"
output_parts.append(f"\n输出:\n```\n{stdout}\n```")
if result.get("stderr"):
stderr = result["stderr"]
if len(stderr) > 2000:
stderr = stderr[:2000] + "\n... (截断)"
output_parts.append(f"\n错误输出:\n```\n{stderr}\n```")
if result.get("error"):
output_parts.append(f"\n执行错误: {result['error']}")
# 提示 LLM 分析结果
output_parts.append("\n---")
output_parts.append("请根据上述输出分析漏洞是否存在。")
return ToolResult(
success=result.get("success", False),
data="\n".join(output_parts),
error=result.get("error"),
metadata={
"language": language,
"exit_code": result.get("exit_code", -1),
"stdout_length": len(result.get("stdout", "")),
"stderr_length": len(result.get("stderr", "")),
}
)
def _build_command(self, code: str, language: str) -> Optional[str]:
"""根据语言构建执行命令"""
# 转义单引号的通用方法
def escape_for_shell(s: str) -> str:
return s.replace("'", "'\"'\"'")
if language == "python":
escaped = escape_for_shell(code)
return f"python3 -c '{escaped}'"
elif language == "php":
# PHP: php -r 不需要 <?php 标签
clean_code = code.strip()
if clean_code.startswith("<?php"):
clean_code = clean_code[5:].strip()
if clean_code.startswith("<?"):
clean_code = clean_code[2:].strip()
if clean_code.endswith("?>"):
clean_code = clean_code[:-2].strip()
escaped = escape_for_shell(clean_code)
return f"php -r '{escaped}'"
elif language in ["javascript", "js", "node"]:
escaped = escape_for_shell(code)
return f"node -e '{escaped}'"
elif language == "ruby":
escaped = escape_for_shell(code)
return f"ruby -e '{escaped}'"
elif language == "bash":
escaped = escape_for_shell(code)
return f"bash -c '{escaped}'"
elif language == "go":
# Go 需要完整的 package main
escaped = escape_for_shell(code).replace("\\", "\\\\")
return f"echo '{escaped}' > /tmp/main.go && go run /tmp/main.go"
elif language == "java":
# Java 需要完整的 class
escaped = escape_for_shell(code).replace("\\", "\\\\")
# 提取类名
import re
class_match = re.search(r'public\s+class\s+(\w+)', code)
class_name = class_match.group(1) if class_match else "Test"
return f"echo '{escaped}' > /tmp/{class_name}.java && javac /tmp/{class_name}.java && java -cp /tmp {class_name}"
return None
class ExtractFunctionInput(BaseModel):
"""函数提取输入"""
file_path: str = Field(..., description="源文件路径")
function_name: str = Field(..., description="要提取的函数名")
include_imports: bool = Field(default=True, description="是否包含 import 语句")
class ExtractFunctionTool(AgentTool):
"""
函数提取工具
从源文件中提取指定函数及其依赖用于构建 Fuzzing Harness
"""
def __init__(self, project_root: str = "."):
super().__init__()
self.project_root = project_root
@property
def name(self) -> str:
return "extract_function"
@property
def description(self) -> str:
return """从源文件中提取指定函数的代码
用于构建 Fuzzing Harness 时获取目标函数代码
输入
- file_path: 源文件路径
- function_name: 要提取的函数名
- include_imports: 是否包含文件开头的 import 语句默认 true
返回
- 函数代码
- 相关的 import 语句
- 函数参数列表
示例
{"file_path": "app/api.py", "function_name": "process_command"}"""
@property
def args_schema(self):
return ExtractFunctionInput
async def _execute(
self,
file_path: str,
function_name: str,
include_imports: bool = True,
**kwargs
) -> ToolResult:
"""提取函数代码"""
import ast
import re
full_path = os.path.join(self.project_root, file_path)
if not os.path.exists(full_path):
return ToolResult(success=False, error=f"文件不存在: {file_path}")
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
code = f.read()
# 检测语言
ext = os.path.splitext(file_path)[1].lower()
if ext == ".py":
result = self._extract_python(code, function_name, include_imports)
elif ext == ".php":
result = self._extract_php(code, function_name)
elif ext in [".js", ".ts"]:
result = self._extract_javascript(code, function_name)
else:
result = self._extract_generic(code, function_name)
if result["success"]:
output_parts = [f"📦 函数提取结果\n"]
output_parts.append(f"文件: {file_path}")
output_parts.append(f"函数: {function_name}")
if result.get("imports"):
output_parts.append(f"\n相关 imports:\n```\n{result['imports']}\n```")
if result.get("parameters"):
output_parts.append(f"\n参数: {', '.join(result['parameters'])}")
output_parts.append(f"\n函数代码:\n```\n{result['code']}\n```")
output_parts.append("\n---")
output_parts.append("你现在可以使用这段代码构建 Fuzzing Harness")
return ToolResult(
success=True,
data="\n".join(output_parts),
metadata=result
)
else:
return ToolResult(
success=False,
error=result.get("error", "提取失败"),
data=f"无法提取函数 '{function_name}'。你可以使用 read_file 工具直接读取文件,手动定位函数代码。"
)
def _extract_python(self, code: str, function_name: str, include_imports: bool) -> Dict:
"""提取 Python 函数"""
import ast
try:
tree = ast.parse(code)
except SyntaxError:
# 降级到正则提取
return self._extract_generic(code, function_name)
# 收集 imports
imports = []
if include_imports:
for node in ast.walk(tree):
if isinstance(node, ast.Import):
imports.append(ast.unparse(node))
elif isinstance(node, ast.ImportFrom):
imports.append(ast.unparse(node))
# 查找函数
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
if node.name == function_name:
lines = code.split('\n')
func_code = '\n'.join(lines[node.lineno - 1:node.end_lineno])
params = [arg.arg for arg in node.args.args]
return {
"success": True,
"code": func_code,
"imports": '\n'.join(imports) if imports else None,
"parameters": params,
"line_start": node.lineno,
"line_end": node.end_lineno,
}
return {"success": False, "error": f"未找到函数 '{function_name}'"}
def _extract_php(self, code: str, function_name: str) -> Dict:
"""提取 PHP 函数"""
import re
pattern = rf'function\s+{re.escape(function_name)}\s*\([^)]*\)\s*\{{'
match = re.search(pattern, code)
if not match:
return {"success": False, "error": f"未找到函数 '{function_name}'"}
start_pos = match.start()
brace_count = 0
end_pos = match.end() - 1
for i, char in enumerate(code[match.end() - 1:], start=match.end() - 1):
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0:
end_pos = i + 1
break
func_code = code[start_pos:end_pos]
# 提取参数
param_match = re.search(r'function\s+\w+\s*\(([^)]*)\)', func_code)
params = []
if param_match:
params_str = param_match.group(1)
params = [p.strip().split('=')[0].strip().replace('$', '')
for p in params_str.split(',') if p.strip()]
return {
"success": True,
"code": func_code,
"parameters": params,
}
def _extract_javascript(self, code: str, function_name: str) -> Dict:
"""提取 JavaScript 函数"""
import re
patterns = [
rf'function\s+{re.escape(function_name)}\s*\([^)]*\)\s*\{{',
rf'(?:const|let|var)\s+{re.escape(function_name)}\s*=\s*function\s*\([^)]*\)\s*\{{',
rf'(?:const|let|var)\s+{re.escape(function_name)}\s*=\s*\([^)]*\)\s*=>\s*\{{',
rf'async\s+function\s+{re.escape(function_name)}\s*\([^)]*\)\s*\{{',
]
for pattern in patterns:
match = re.search(pattern, code)
if match:
start_pos = match.start()
brace_count = 0
end_pos = match.end() - 1
for i, char in enumerate(code[match.end() - 1:], start=match.end() - 1):
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0:
end_pos = i + 1
break
func_code = code[start_pos:end_pos]
return {
"success": True,
"code": func_code,
}
return {"success": False, "error": f"未找到函数 '{function_name}'"}
def _extract_generic(self, code: str, function_name: str) -> Dict:
"""通用函数提取(正则)"""
import re
# 尝试多种模式
patterns = [
rf'def\s+{re.escape(function_name)}\s*\([^)]*\)\s*:', # Python
rf'function\s+{re.escape(function_name)}\s*\([^)]*\)', # PHP/JS
rf'func\s+{re.escape(function_name)}\s*\([^)]*\)', # Go
]
for pattern in patterns:
match = re.search(pattern, code, re.MULTILINE)
if match:
start_line = code[:match.start()].count('\n')
lines = code.split('\n')
# 尝试找到函数结束
end_line = start_line + 1
indent = len(lines[start_line]) - len(lines[start_line].lstrip())
for i in range(start_line + 1, min(start_line + 100, len(lines))):
line = lines[i]
if line.strip() and not line.startswith(' ' * (indent + 1)):
if not line.strip().startswith('#'):
end_line = i
break
end_line = i + 1
func_code = '\n'.join(lines[start_line:end_line])
return {
"success": True,
"code": func_code,
}
return {"success": False, "error": f"未找到函数 '{function_name}'"}

View File

@ -514,12 +514,24 @@ class SandboxTool(AgentTool):
在安全隔离的环境中执行代码和命令
"""
# 允许的命令前缀
# 允许的命令前缀 - 放宽限制以支持更灵活的测试
ALLOWED_COMMANDS = [
"python", "python3", "node", "curl", "wget",
"cat", "head", "tail", "grep", "find", "ls",
"echo", "printf", "test", "id", "whoami",
"php", # 🔥 添加 PHP 支持
# 编程语言解释器
"python", "python3", "node", "php", "ruby", "perl",
"go", "java", "javac", "bash", "sh",
# 网络工具
"curl", "wget", "nc", "netcat",
# 文件操作
"cat", "head", "tail", "grep", "find", "ls", "wc",
"sed", "awk", "cut", "sort", "uniq", "tr", "xargs",
# 系统信息(用于验证命令执行)
"echo", "printf", "test", "id", "whoami", "uname",
"env", "printenv", "pwd", "hostname",
# 编码/解码工具
"base64", "xxd", "od", "hexdump",
# 其他实用工具
"timeout", "time", "sleep", "true", "false",
"md5sum", "sha256sum", "strings",
]
def __init__(self, sandbox_manager: Optional[SandboxManager] = None):