feat(sandbox): 重构安全工具以使用沙箱环境执行
重构所有安全工具(Semgrep, Bandit等)使用Docker沙箱环境执行,提升安全隔离性 更新Dockerfile基础镜像并集成常用安全扫描工具 添加沙箱管理器的工具命令执行方法,支持资源限制和临时文件系统
This commit is contained in:
parent
3d4f90c547
commit
189274fd56
|
|
@ -14,6 +14,7 @@ from pydantic import BaseModel, Field
|
|||
from dataclasses import dataclass
|
||||
|
||||
from .base import AgentTool, ToolResult
|
||||
from .sandbox_tool import SandboxManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -70,6 +71,7 @@ class SemgrepTool(AgentTool):
|
|||
def __init__(self, project_root: str):
|
||||
super().__init__()
|
||||
self.project_root = project_root
|
||||
self.sandbox_manager = SandboxManager()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
|
|
@ -107,26 +109,15 @@ Semgrep 是业界领先的静态分析工具,支持 30+ 种编程语言。
|
|||
**kwargs
|
||||
) -> ToolResult:
|
||||
"""执行 Semgrep 扫描"""
|
||||
# 检查 semgrep 是否可用
|
||||
if not await self._check_semgrep():
|
||||
# 尝试自动安装
|
||||
logger.info("Semgrep 未安装,尝试自动安装...")
|
||||
install_success = await self._try_install_semgrep()
|
||||
if not install_success:
|
||||
return ToolResult(
|
||||
success=False,
|
||||
error="Semgrep 未安装。请使用 'pip install semgrep' 安装,或联系管理员安装。",
|
||||
)
|
||||
# 确保 Docker 可用
|
||||
await self.sandbox_manager.initialize()
|
||||
if not self.sandbox_manager.is_available:
|
||||
return ToolResult(success=False, error="Docker 沙箱不可用,无法执行 Semgrep")
|
||||
|
||||
# 构建命令 (相对于 /workspace)
|
||||
# 注意: target_path 是相对于 project_root 的
|
||||
safe_target_path = target_path if not target_path.startswith("/") else target_path.lstrip("/")
|
||||
|
||||
# 构建完整路径
|
||||
full_path = os.path.normpath(os.path.join(self.project_root, target_path))
|
||||
if not full_path.startswith(os.path.normpath(self.project_root)):
|
||||
return ToolResult(
|
||||
success=False,
|
||||
error="安全错误:不允许扫描项目目录外的路径",
|
||||
)
|
||||
|
||||
# 构建命令
|
||||
cmd = ["semgrep", "--json", "--quiet"]
|
||||
|
||||
if rules == "auto":
|
||||
|
|
@ -139,30 +130,36 @@ Semgrep 是业界领先的静态分析工具,支持 30+ 种编程语言。
|
|||
if severity:
|
||||
cmd.extend(["--severity", severity])
|
||||
|
||||
cmd.append(full_path)
|
||||
# 在容器内,路径相对于 /workspace
|
||||
cmd.append(safe_target_path)
|
||||
|
||||
cmd_str = " ".join(cmd)
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=self.project_root,
|
||||
result = await self.sandbox_manager.execute_tool_command(
|
||||
command=cmd_str,
|
||||
host_workdir=self.project_root,
|
||||
timeout=300
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300)
|
||||
|
||||
if proc.returncode not in [0, 1]: # 1 means findings were found
|
||||
if not result["success"] and result["exit_code"] != 1: # 1 means findings were found
|
||||
return ToolResult(
|
||||
success=False,
|
||||
error=f"Semgrep 执行失败: {stderr.decode()[:500]}",
|
||||
error=f"Semgrep 执行失败: {result['stderr'][:500] or result['error']}",
|
||||
)
|
||||
|
||||
# 解析结果
|
||||
try:
|
||||
results = json.loads(stdout.decode())
|
||||
# 尝试从 stdout 查找 JSON
|
||||
json_start = result['stdout'].find('{')
|
||||
if json_start >= 0:
|
||||
results = json.loads(result['stdout'][json_start:])
|
||||
else:
|
||||
results = {}
|
||||
except json.JSONDecodeError:
|
||||
return ToolResult(
|
||||
success=False,
|
||||
error="无法解析 Semgrep 输出",
|
||||
error=f"无法解析 Semgrep 输出: {result['stdout'][:200]}",
|
||||
)
|
||||
|
||||
findings = results.get("results", [])[:max_results]
|
||||
|
|
@ -203,60 +200,11 @@ Semgrep 是业界领先的静态分析工具,支持 30+ 种编程语言。
|
|||
}
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# 🔥 超时时提供更有用的信息
|
||||
except Exception as e:
|
||||
return ToolResult(
|
||||
success=False,
|
||||
error=f"Semgrep 扫描超时(超过300秒)。可能原因:\n"
|
||||
f"1. 规则集 '{rules}' 需要从网络下载,网络较慢\n"
|
||||
f"2. 扫描目标过大\n"
|
||||
f"建议:尝试使用 pattern_match 或 smart_scan 工具进行快速扫描"
|
||||
error=f"Semgrep 执行错误: {str(e)}"
|
||||
)
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
# 🔥 提供更详细的错误诊断
|
||||
return ToolResult(
|
||||
success=False,
|
||||
error=f"Semgrep 执行错误: {error_msg[:300]}\n"
|
||||
f"建议:使用 pattern_match 或 smart_scan 工具作为替代"
|
||||
)
|
||||
|
||||
async def _check_semgrep(self) -> bool:
|
||||
"""检查 Semgrep 是否可用"""
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"semgrep", "--version",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
await proc.communicate()
|
||||
return proc.returncode == 0
|
||||
except:
|
||||
return False
|
||||
|
||||
async def _try_install_semgrep(self) -> bool:
|
||||
"""尝试自动安装 Semgrep"""
|
||||
try:
|
||||
logger.info("正在安装 Semgrep...")
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"pip", "install", "semgrep",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
|
||||
if proc.returncode == 0:
|
||||
logger.info("Semgrep 安装成功")
|
||||
# 验证安装
|
||||
return await self._check_semgrep()
|
||||
else:
|
||||
logger.warning(f"Semgrep 安装失败: {stderr.decode()[:200]}")
|
||||
return False
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("Semgrep 安装超时")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.warning(f"Semgrep 安装出错: {e}")
|
||||
return False
|
||||
|
||||
|
||||
# ============ Bandit 工具 (Python) ============
|
||||
|
|
@ -285,6 +233,7 @@ class BanditTool(AgentTool):
|
|||
def __init__(self, project_root: str):
|
||||
super().__init__()
|
||||
self.project_root = project_root
|
||||
self.sandbox_manager = SandboxManager()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
|
|
@ -319,15 +268,12 @@ Bandit 是 Python 专用的安全分析工具,由 OpenStack 安全团队开发
|
|||
**kwargs
|
||||
) -> ToolResult:
|
||||
"""执行 Bandit 扫描"""
|
||||
if not await self._check_bandit():
|
||||
return ToolResult(
|
||||
success=False,
|
||||
error="Bandit 未安装。请使用 'pip install bandit' 安装。",
|
||||
)
|
||||
|
||||
full_path = os.path.normpath(os.path.join(self.project_root, target_path))
|
||||
if not full_path.startswith(os.path.normpath(self.project_root)):
|
||||
return ToolResult(success=False, error="安全错误:路径越界")
|
||||
# 确保 Docker 可用
|
||||
await self.sandbox_manager.initialize()
|
||||
if not self.sandbox_manager.is_available:
|
||||
return ToolResult(success=False, error="Docker 沙箱不可用")
|
||||
|
||||
safe_target_path = target_path if not target_path.startswith("/") else target_path.lstrip("/")
|
||||
|
||||
# 构建命令
|
||||
severity_map = {"low": "l", "medium": "m", "high": "h"}
|
||||
|
|
@ -337,21 +283,27 @@ Bandit 是 Python 专用的安全分析工具,由 OpenStack 安全团队开发
|
|||
"bandit", "-r", "-f", "json",
|
||||
"-ll" if severity == "low" else f"-l{severity_map.get(severity, 'm')}",
|
||||
f"-i{confidence_map.get(confidence, 'm')}",
|
||||
full_path
|
||||
safe_target_path
|
||||
]
|
||||
|
||||
cmd_str = " ".join(cmd)
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
result = await self.sandbox_manager.execute_tool_command(
|
||||
command=cmd_str,
|
||||
host_workdir=self.project_root,
|
||||
timeout=120
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
|
||||
|
||||
try:
|
||||
results = json.loads(stdout.decode())
|
||||
# find json in output
|
||||
json_start = result['stdout'].find('{')
|
||||
if json_start >= 0:
|
||||
results = json.loads(result['stdout'][json_start:])
|
||||
else:
|
||||
results = {}
|
||||
except json.JSONDecodeError:
|
||||
return ToolResult(success=False, error="无法解析 Bandit 输出")
|
||||
return ToolResult(success=False, error=f"无法解析 Bandit 输出: {result['stdout'][:200]}")
|
||||
|
||||
findings = results.get("results", [])[:max_results]
|
||||
|
||||
|
|
@ -382,22 +334,8 @@ Bandit 是 Python 专用的安全分析工具,由 OpenStack 安全团队开发
|
|||
metadata={"findings_count": len(findings), "findings": findings[:10]}
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
return ToolResult(success=False, error="Bandit 扫描超时")
|
||||
except Exception as e:
|
||||
return ToolResult(success=False, error=f"Bandit 执行错误: {str(e)}")
|
||||
|
||||
async def _check_bandit(self) -> bool:
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"bandit", "--version",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
await proc.communicate()
|
||||
return proc.returncode == 0
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
# ============ Gitleaks 工具 ============
|
||||
|
|
@ -425,6 +363,7 @@ class GitleaksTool(AgentTool):
|
|||
def __init__(self, project_root: str):
|
||||
super().__init__()
|
||||
self.project_root = project_root
|
||||
self.sandbox_manager = SandboxManager()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
|
|
@ -460,35 +399,35 @@ Gitleaks 是专业的密钥检测工具,支持 150+ 种密钥类型。
|
|||
**kwargs
|
||||
) -> ToolResult:
|
||||
"""执行 Gitleaks 扫描"""
|
||||
if not await self._check_gitleaks():
|
||||
return ToolResult(
|
||||
success=False,
|
||||
error="Gitleaks 未安装。Gitleaks 需要手动安装,请参考: https://github.com/gitleaks/gitleaks/releases\n"
|
||||
"安装方法:\n"
|
||||
"- macOS: brew install gitleaks\n"
|
||||
"- Linux: 下载二进制文件并添加到 PATH\n"
|
||||
"- Windows: 下载二进制文件并添加到 PATH",
|
||||
)
|
||||
# 确保 Docker 可用
|
||||
await self.sandbox_manager.initialize()
|
||||
if not self.sandbox_manager.is_available:
|
||||
return ToolResult(success=False, error="Docker 沙箱不可用")
|
||||
|
||||
safe_target_path = target_path if not target_path.startswith("/") else target_path.lstrip("/")
|
||||
|
||||
full_path = os.path.normpath(os.path.join(self.project_root, target_path))
|
||||
if not full_path.startswith(os.path.normpath(self.project_root)):
|
||||
return ToolResult(success=False, error="安全错误:路径越界")
|
||||
# 构建命令 using . as source because we are mounted to /workspace
|
||||
# But if user specified a subdirectory, we append it.
|
||||
# Actually gitleaks detects pwd by default if source is .
|
||||
|
||||
cmd = ["gitleaks", "detect", "--source", full_path, "-f", "json"]
|
||||
cmd = ["gitleaks", "detect", "--source", safe_target_path, "-f", "json"]
|
||||
if no_git:
|
||||
cmd.append("--no-git")
|
||||
|
||||
cmd_str = " ".join(cmd)
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
result = await self.sandbox_manager.execute_tool_command(
|
||||
command=cmd_str,
|
||||
host_workdir=self.project_root,
|
||||
timeout=120
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
|
||||
|
||||
# Gitleaks returns 1 if secrets found
|
||||
if proc.returncode not in [0, 1]:
|
||||
return ToolResult(success=False, error=f"Gitleaks 执行失败: {stderr.decode()[:300]}")
|
||||
if result['exit_code'] not in [0, 1]:
|
||||
return ToolResult(success=False, error=f"Gitleaks 执行失败: {result['stderr'][:300]}")
|
||||
|
||||
stdout = result['stdout']
|
||||
|
||||
if not stdout.strip():
|
||||
return ToolResult(
|
||||
|
|
@ -498,12 +437,17 @@ Gitleaks 是专业的密钥检测工具,支持 150+ 种密钥类型。
|
|||
)
|
||||
|
||||
try:
|
||||
findings = json.loads(stdout.decode())
|
||||
# Find JSON start
|
||||
json_start = stdout.find('[')
|
||||
if json_start >= 0:
|
||||
findings = json.loads(stdout[json_start:])
|
||||
else:
|
||||
findings = []
|
||||
except json.JSONDecodeError:
|
||||
findings = []
|
||||
|
||||
if not findings:
|
||||
return ToolResult(
|
||||
return ToolResult(
|
||||
success=True,
|
||||
data="🔐 Gitleaks 扫描完成,未发现密钥泄露",
|
||||
metadata={"findings_count": 0}
|
||||
|
|
@ -539,22 +483,8 @@ Gitleaks 是专业的密钥检测工具,支持 150+ 种密钥类型。
|
|||
}
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
return ToolResult(success=False, error="Gitleaks 扫描超时")
|
||||
except Exception as e:
|
||||
return ToolResult(success=False, error=f"Gitleaks 执行错误: {str(e)}")
|
||||
|
||||
async def _check_gitleaks(self) -> bool:
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"gitleaks", "version",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
await proc.communicate()
|
||||
return proc.returncode == 0
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
# ============ npm audit 工具 ============
|
||||
|
|
@ -575,6 +505,7 @@ class NpmAuditTool(AgentTool):
|
|||
def __init__(self, project_root: str):
|
||||
super().__init__()
|
||||
self.project_root = project_root
|
||||
self.sandbox_manager = SandboxManager()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
|
|
@ -602,9 +533,20 @@ class NpmAuditTool(AgentTool):
|
|||
**kwargs
|
||||
) -> ToolResult:
|
||||
"""执行 npm audit"""
|
||||
# 确保 Docker 可用
|
||||
await self.sandbox_manager.initialize()
|
||||
if not self.sandbox_manager.is_available:
|
||||
return ToolResult(success=False, error="Docker 沙箱不可用")
|
||||
|
||||
# 这里的 target_path 是相对于 project_root 的
|
||||
# 防止空路径
|
||||
safe_target_path = target_path if not target_path.startswith("/") else target_path.lstrip("/")
|
||||
if not safe_target_path:
|
||||
safe_target_path = "."
|
||||
|
||||
full_path = os.path.normpath(os.path.join(self.project_root, target_path))
|
||||
|
||||
# 检查 package.json
|
||||
# 宿主机预检查
|
||||
package_json = os.path.join(full_path, "package.json")
|
||||
if not os.path.exists(package_json):
|
||||
return ToolResult(
|
||||
|
|
@ -616,19 +558,35 @@ class NpmAuditTool(AgentTool):
|
|||
if production_only:
|
||||
cmd.append("--production")
|
||||
|
||||
# 组合命令: cd 到目标目录然后执行
|
||||
cmd_str = f"cd {safe_target_path} && {' '.join(cmd)}"
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
cwd=full_path,
|
||||
# 清除代理设置,避免容器内网络问题
|
||||
proxy_env = {
|
||||
"HTTPS_PROXY": "",
|
||||
"HTTP_PROXY": "",
|
||||
"https_proxy": "",
|
||||
"http_proxy": ""
|
||||
}
|
||||
|
||||
result = await self.sandbox_manager.execute_tool_command(
|
||||
command=cmd_str,
|
||||
host_workdir=self.project_root,
|
||||
timeout=120,
|
||||
network_mode="bridge",
|
||||
env=proxy_env
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
|
||||
|
||||
try:
|
||||
results = json.loads(stdout.decode())
|
||||
# npm audit json starts with {
|
||||
json_start = result['stdout'].find('{')
|
||||
if json_start >= 0:
|
||||
results = json.loads(result['stdout'][json_start:])
|
||||
else:
|
||||
return ToolResult(success=True, data=f"npm audit 输出为空或格式错误: {result['stdout'][:100]}")
|
||||
except json.JSONDecodeError:
|
||||
return ToolResult(success=True, data="npm audit 输出为空或格式错误")
|
||||
return ToolResult(success=True, data=f"npm audit 输出格式错误")
|
||||
|
||||
vulnerabilities = results.get("vulnerabilities", {})
|
||||
|
||||
|
|
@ -669,8 +627,6 @@ class NpmAuditTool(AgentTool):
|
|||
}
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
return ToolResult(success=False, error="npm audit 超时")
|
||||
except Exception as e:
|
||||
return ToolResult(success=False, error=f"npm audit 错误: {str(e)}")
|
||||
|
||||
|
|
@ -692,6 +648,7 @@ class SafetyTool(AgentTool):
|
|||
def __init__(self, project_root: str):
|
||||
super().__init__()
|
||||
self.project_root = project_root
|
||||
self.sandbox_manager = SandboxManager()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
|
|
@ -717,32 +674,55 @@ class SafetyTool(AgentTool):
|
|||
**kwargs
|
||||
) -> ToolResult:
|
||||
"""执行 Safety 扫描"""
|
||||
# 确保 Docker 可用
|
||||
await self.sandbox_manager.initialize()
|
||||
if not self.sandbox_manager.is_available:
|
||||
return ToolResult(success=False, error="Docker 沙箱不可用")
|
||||
|
||||
full_path = os.path.join(self.project_root, requirements_file)
|
||||
|
||||
if not os.path.exists(full_path):
|
||||
return ToolResult(success=False, error=f"未找到依赖文件: {requirements_file}")
|
||||
|
||||
# commands
|
||||
# requirements_file relative path inside container is just requirements_file (assuming it's relative to root)
|
||||
# If requirements_file is absolute, we need to make it relative.
|
||||
# But for security, `requirements_file` should be relative to project_root.
|
||||
safe_req_file = requirements_file if not requirements_file.startswith("/") else requirements_file.lstrip("/")
|
||||
|
||||
cmd = ["safety", "check", "-r", safe_req_file, "--json"]
|
||||
cmd_str = " ".join(cmd)
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"safety", "check", "-r", full_path, "--json",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
result = await self.sandbox_manager.execute_tool_command(
|
||||
command=cmd_str,
|
||||
host_workdir=self.project_root,
|
||||
timeout=120
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=60)
|
||||
|
||||
stdout = result['stdout']
|
||||
try:
|
||||
# Safety 输出的 JSON 格式可能不同版本有差异
|
||||
output = stdout.decode()
|
||||
if "No known security" in output:
|
||||
return ToolResult(
|
||||
success=True,
|
||||
data="🐍 Safety 扫描完成,未发现 Python 依赖漏洞",
|
||||
metadata={"findings_count": 0}
|
||||
)
|
||||
# find first { or [
|
||||
start_idx = -1
|
||||
for i, char in enumerate(stdout):
|
||||
if char in ['{', '[']:
|
||||
start_idx = i
|
||||
break
|
||||
|
||||
results = json.loads(output)
|
||||
if start_idx >= 0:
|
||||
output_json = stdout[start_idx:]
|
||||
if "No known security" in output_json:
|
||||
return ToolResult(
|
||||
success=True,
|
||||
data="🐍 Safety 扫描完成,未发现 Python 依赖漏洞",
|
||||
metadata={"findings_count": 0}
|
||||
)
|
||||
results = json.loads(output_json)
|
||||
else:
|
||||
return ToolResult(success=True, data=f"Safety 输出:\n{stdout[:1000]}")
|
||||
|
||||
except:
|
||||
return ToolResult(success=True, data=f"Safety 输出:\n{stdout.decode()[:1000]}")
|
||||
return ToolResult(success=True, data=f"Safety 输出解析失败:\n{stdout[:1000]}")
|
||||
|
||||
vulnerabilities = results if isinstance(results, list) else results.get("vulnerabilities", [])
|
||||
|
||||
|
|
@ -791,6 +771,7 @@ class TruffleHogTool(AgentTool):
|
|||
def __init__(self, project_root: str):
|
||||
super().__init__()
|
||||
self.project_root = project_root
|
||||
self.sandbox_manager = SandboxManager()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
|
|
@ -820,19 +801,27 @@ TruffleHog 可以扫描代码和 Git 历史,并验证密钥是否有效。
|
|||
**kwargs
|
||||
) -> ToolResult:
|
||||
"""执行 TruffleHog 扫描"""
|
||||
full_path = os.path.normpath(os.path.join(self.project_root, target_path))
|
||||
# 确保 Docker 可用
|
||||
await self.sandbox_manager.initialize()
|
||||
if not self.sandbox_manager.is_available:
|
||||
return ToolResult(success=False, error="Docker 沙箱不可用")
|
||||
|
||||
safe_target_path = target_path if not target_path.startswith("/") else target_path.lstrip("/")
|
||||
|
||||
cmd = ["trufflehog", "filesystem", full_path, "--json"]
|
||||
cmd = ["trufflehog", "filesystem", safe_target_path, "--json"]
|
||||
if only_verified:
|
||||
cmd.append("--only-verified")
|
||||
|
||||
cmd_str = " ".join(cmd)
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
result = await self.sandbox_manager.execute_tool_command(
|
||||
command=cmd_str,
|
||||
host_workdir=self.project_root,
|
||||
timeout=180
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=180)
|
||||
|
||||
stdout = result['stdout']
|
||||
|
||||
if not stdout.strip():
|
||||
return ToolResult(
|
||||
|
|
@ -843,7 +832,7 @@ TruffleHog 可以扫描代码和 Git 历史,并验证密钥是否有效。
|
|||
|
||||
# TruffleHog 输出每行一个 JSON 对象
|
||||
findings = []
|
||||
for line in stdout.decode().strip().split('\n'):
|
||||
for line in stdout.strip().split('\n'):
|
||||
if line.strip():
|
||||
try:
|
||||
findings.append(json.loads(line))
|
||||
|
|
@ -871,8 +860,6 @@ TruffleHog 可以扫描代码和 Git 历史,并验证密钥是否有效。
|
|||
metadata={"findings_count": len(findings)}
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
return ToolResult(success=False, error="TruffleHog 扫描超时")
|
||||
except Exception as e:
|
||||
return ToolResult(success=False, error=f"TruffleHog 执行错误: {str(e)}")
|
||||
|
||||
|
|
@ -895,6 +882,7 @@ class OSVScannerTool(AgentTool):
|
|||
def __init__(self, project_root: str):
|
||||
super().__init__()
|
||||
self.project_root = project_root
|
||||
self.sandbox_manager = SandboxManager()
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
|
|
@ -929,22 +917,32 @@ Google 开源的漏洞扫描工具,使用 OSV (Open Source Vulnerabilities)
|
|||
**kwargs
|
||||
) -> ToolResult:
|
||||
"""执行 OSV-Scanner"""
|
||||
full_path = os.path.normpath(os.path.join(self.project_root, target_path))
|
||||
# 确保 Docker 可用
|
||||
await self.sandbox_manager.initialize()
|
||||
if not self.sandbox_manager.is_available:
|
||||
return ToolResult(success=False, error="Docker 沙箱不可用")
|
||||
|
||||
safe_target_path = target_path if not target_path.startswith("/") else target_path.lstrip("/")
|
||||
|
||||
# OSV-Scanner
|
||||
cmd = ["osv-scanner", "--json", "-r", safe_target_path]
|
||||
cmd_str = " ".join(cmd)
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"osv-scanner", "--json", "-r", full_path,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
result = await self.sandbox_manager.execute_tool_command(
|
||||
command=cmd_str,
|
||||
host_workdir=self.project_root,
|
||||
timeout=120
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120)
|
||||
|
||||
stdout = result['stdout']
|
||||
|
||||
try:
|
||||
results = json.loads(stdout.decode())
|
||||
results = json.loads(stdout)
|
||||
except:
|
||||
if "no package sources found" in stdout.decode().lower():
|
||||
if "no package sources found" in stdout.lower():
|
||||
return ToolResult(success=True, data="OSV-Scanner: 未找到可扫描的包文件")
|
||||
return ToolResult(success=True, data=f"OSV-Scanner 输出:\n{stdout.decode()[:1000]}")
|
||||
return ToolResult(success=True, data=f"OSV-Scanner 输出:\n{stdout[:1000]}")
|
||||
|
||||
vulns = results.get("results", [])
|
||||
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ logger = logging.getLogger(__name__)
|
|||
@dataclass
|
||||
class SandboxConfig:
|
||||
"""沙箱配置"""
|
||||
image: str = "python:3.11-slim"
|
||||
image: str = "deepaudit/sandbox:latest"
|
||||
memory_limit: str = "512m"
|
||||
cpu_limit: float = 1.0
|
||||
timeout: int = 60
|
||||
|
|
@ -109,6 +109,9 @@ class SandboxManager:
|
|||
"volumes": {
|
||||
temp_dir: {"bind": "/workspace", "mode": "rw"},
|
||||
},
|
||||
"tmpfs": {
|
||||
"/home/sandbox": "rw,size=100m,mode=1777"
|
||||
},
|
||||
"working_dir": working_dir or "/workspace",
|
||||
"environment": env or {},
|
||||
# 安全配置
|
||||
|
|
@ -169,6 +172,114 @@ class SandboxManager:
|
|||
"exit_code": -1,
|
||||
}
|
||||
|
||||
async def execute_tool_command(
|
||||
self,
|
||||
command: str,
|
||||
host_workdir: str,
|
||||
timeout: Optional[int] = None,
|
||||
env: Optional[Dict[str, str]] = None,
|
||||
network_mode: str = "none",
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
在沙箱中对指定目录执行工具命令
|
||||
|
||||
Args:
|
||||
command: 要执行的命令
|
||||
host_workdir: 宿主机上的工作目录(将被挂载到 /workspace)
|
||||
timeout: 超时时间
|
||||
env: 环境变量
|
||||
network_mode: 网络模式 (none, bridge, host)
|
||||
|
||||
Returns:
|
||||
执行结果
|
||||
"""
|
||||
if not self.is_available:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Docker 不可用",
|
||||
"stdout": "",
|
||||
"stderr": "",
|
||||
"exit_code": -1,
|
||||
}
|
||||
|
||||
timeout = timeout or self.config.timeout
|
||||
|
||||
try:
|
||||
# 准备容器配置
|
||||
container_config = {
|
||||
"image": self.config.image,
|
||||
"command": ["sh", "-c", command],
|
||||
"detach": True,
|
||||
"mem_limit": self.config.memory_limit,
|
||||
"cpu_period": 100000,
|
||||
"cpu_quota": int(100000 * self.config.cpu_limit),
|
||||
"network_mode": network_mode,
|
||||
"user": self.config.user,
|
||||
"read_only": self.config.read_only,
|
||||
"volumes": {
|
||||
host_workdir: {"bind": "/workspace", "mode": "ro"}, # 只读挂载项目代码
|
||||
},
|
||||
"tmpfs": {
|
||||
"/home/sandbox": "rw,size=100m,mode=1777"
|
||||
},
|
||||
"working_dir": "/workspace",
|
||||
"environment": env or {},
|
||||
"cap_drop": ["ALL"],
|
||||
"security_opt": ["no-new-privileges:true"],
|
||||
}
|
||||
|
||||
# 创建并启动容器
|
||||
container = await asyncio.to_thread(
|
||||
self._docker_client.containers.run,
|
||||
**container_config
|
||||
)
|
||||
|
||||
try:
|
||||
# 等待执行完成
|
||||
result = await asyncio.wait_for(
|
||||
asyncio.to_thread(container.wait),
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
# 获取日志
|
||||
stdout = await asyncio.to_thread(
|
||||
container.logs, stdout=True, stderr=False
|
||||
)
|
||||
stderr = await asyncio.to_thread(
|
||||
container.logs, stdout=False, stderr=True
|
||||
)
|
||||
|
||||
return {
|
||||
"success": result["StatusCode"] == 0,
|
||||
"stdout": stdout.decode('utf-8', errors='ignore')[:50000], # 增大日志限制
|
||||
"stderr": stderr.decode('utf-8', errors='ignore')[:5000],
|
||||
"exit_code": result["StatusCode"],
|
||||
"error": None,
|
||||
}
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
await asyncio.to_thread(container.kill)
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"执行超时 ({timeout}秒)",
|
||||
"stdout": "",
|
||||
"stderr": "",
|
||||
"exit_code": -1,
|
||||
}
|
||||
|
||||
finally:
|
||||
# 清理容器
|
||||
await asyncio.to_thread(container.remove, force=True)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Tool execution error: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"stdout": "",
|
||||
"stderr": "",
|
||||
"exit_code": -1,
|
||||
}
|
||||
async def execute_python(
|
||||
self,
|
||||
code: str,
|
||||
|
|
|
|||
|
|
@ -1,13 +1,19 @@
|
|||
# DeepAudit Agent Sandbox
|
||||
# 安全沙箱环境用于漏洞验证和 PoC 执行
|
||||
# 集成各类安全扫描工具 (Semgrep, Bandit, Gitleaks, etc.)
|
||||
|
||||
FROM python:3.11-slim-bookworm
|
||||
FROM python:3.11-bullseye
|
||||
|
||||
LABEL maintainer="XCodeReviewer Team"
|
||||
LABEL description="Secure sandbox environment for vulnerability verification"
|
||||
LABEL description="Secure sandbox environment for vulnerability verification and security scanning"
|
||||
|
||||
# 安装基本工具
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
# 安装基本工具
|
||||
#Configuring mirrors for CN and unsetting broken proxy
|
||||
RUN sed -i 's/deb.debian.org/mirrors.aliyun.com/g' /etc/apt/sources.list && \
|
||||
sed -i 's/security.debian.org/mirrors.aliyun.com/g' /etc/apt/sources.list && \
|
||||
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY && \
|
||||
apt-get update && apt-get install -y --no-install-recommends \
|
||||
curl \
|
||||
wget \
|
||||
netcat-openbsd \
|
||||
|
|
@ -15,15 +21,19 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
|||
iputils-ping \
|
||||
ca-certificates \
|
||||
git \
|
||||
unzip \
|
||||
jq \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# 安装 Node.js (用于 JavaScript/TypeScript 代码执行)
|
||||
RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \
|
||||
# 安装 Node.js (用于 JavaScript/TypeScript 代码执行 和 npm audit)
|
||||
RUN unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY && \
|
||||
curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \
|
||||
&& apt-get install -y nodejs \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# 安装常用的安全测试 Python 库
|
||||
RUN pip install --no-cache-dir \
|
||||
# 安装 Python 安全工具
|
||||
RUN unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY && \
|
||||
pip install --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple \
|
||||
requests \
|
||||
httpx \
|
||||
aiohttp \
|
||||
|
|
@ -33,7 +43,26 @@ RUN pip install --no-cache-dir \
|
|||
paramiko \
|
||||
pyjwt \
|
||||
python-jose \
|
||||
sqlparse
|
||||
sqlparse \
|
||||
semgrep \
|
||||
bandit \
|
||||
safety
|
||||
|
||||
# 安装 Gitleaks
|
||||
RUN unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY && \
|
||||
curl -L https://github.com/gitleaks/gitleaks/releases/download/v8.18.2/gitleaks_8.18.2_linux_x64.tar.gz -o gitleaks.tar.gz && \
|
||||
tar -xzf gitleaks.tar.gz && \
|
||||
mv gitleaks /usr/local/bin/ && \
|
||||
rm gitleaks.tar.gz
|
||||
|
||||
# 安装 TruffleHog
|
||||
RUN unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY && \
|
||||
curl -sSfL https://raw.githubusercontent.com/trufflesecurity/trufflehog/main/scripts/install.sh | sh -s -- -b /usr/local/bin
|
||||
|
||||
# 安装 OSV-Scanner
|
||||
RUN unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY && \
|
||||
curl -L https://github.com/google/osv-scanner/releases/download/v1.6.2/osv-scanner_1.6.2_linux_amd64 -o /usr/local/bin/osv-scanner && \
|
||||
chmod +x /usr/local/bin/osv-scanner
|
||||
|
||||
# 创建非 root 用户
|
||||
RUN groupadd -g 1000 sandbox && \
|
||||
|
|
|
|||
Loading…
Reference in New Issue