From 189274fd5624098400deeebc7ae9c750fb632eca Mon Sep 17 00:00:00 2001 From: lintsinghua Date: Sun, 14 Dec 2025 00:07:04 +0800 Subject: [PATCH] =?UTF-8?q?feat(sandbox):=20=E9=87=8D=E6=9E=84=E5=AE=89?= =?UTF-8?q?=E5=85=A8=E5=B7=A5=E5=85=B7=E4=BB=A5=E4=BD=BF=E7=94=A8=E6=B2=99?= =?UTF-8?q?=E7=AE=B1=E7=8E=AF=E5=A2=83=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重构所有安全工具(Semgrep, Bandit等)使用Docker沙箱环境执行,提升安全隔离性 更新Dockerfile基础镜像并集成常用安全扫描工具 添加沙箱管理器的工具命令执行方法,支持资源限制和临时文件系统 --- .../services/agent/tools/external_tools.py | 384 +++++++++--------- .../app/services/agent/tools/sandbox_tool.py | 113 +++++- docker/sandbox/Dockerfile | 45 +- 3 files changed, 340 insertions(+), 202 deletions(-) diff --git a/backend/app/services/agent/tools/external_tools.py b/backend/app/services/agent/tools/external_tools.py index 4010d05..777127a 100644 --- a/backend/app/services/agent/tools/external_tools.py +++ b/backend/app/services/agent/tools/external_tools.py @@ -14,6 +14,7 @@ from pydantic import BaseModel, Field from dataclasses import dataclass from .base import AgentTool, ToolResult +from .sandbox_tool import SandboxManager logger = logging.getLogger(__name__) @@ -70,6 +71,7 @@ class SemgrepTool(AgentTool): def __init__(self, project_root: str): super().__init__() self.project_root = project_root + self.sandbox_manager = SandboxManager() @property def name(self) -> str: @@ -107,26 +109,15 @@ Semgrep 是业界领先的静态分析工具,支持 30+ 种编程语言。 **kwargs ) -> ToolResult: """执行 Semgrep 扫描""" - # 检查 semgrep 是否可用 - if not await self._check_semgrep(): - # 尝试自动安装 - logger.info("Semgrep 未安装,尝试自动安装...") - install_success = await self._try_install_semgrep() - if not install_success: - return ToolResult( - success=False, - error="Semgrep 未安装。请使用 'pip install semgrep' 安装,或联系管理员安装。", - ) + # 确保 Docker 可用 + await self.sandbox_manager.initialize() + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="Docker 沙箱不可用,无法执行 Semgrep") + + # 构建命令 (相对于 /workspace) + # 注意: target_path 是相对于 project_root 的 + safe_target_path = target_path if not target_path.startswith("/") else target_path.lstrip("/") - # 构建完整路径 - full_path = os.path.normpath(os.path.join(self.project_root, target_path)) - if not full_path.startswith(os.path.normpath(self.project_root)): - return ToolResult( - success=False, - error="安全错误:不允许扫描项目目录外的路径", - ) - - # 构建命令 cmd = ["semgrep", "--json", "--quiet"] if rules == "auto": @@ -139,30 +130,36 @@ Semgrep 是业界领先的静态分析工具,支持 30+ 种编程语言。 if severity: cmd.extend(["--severity", severity]) - cmd.append(full_path) + # 在容器内,路径相对于 /workspace + cmd.append(safe_target_path) + + cmd_str = " ".join(cmd) try: - proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd=self.project_root, + result = await self.sandbox_manager.execute_tool_command( + command=cmd_str, + host_workdir=self.project_root, + timeout=300 ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300) - if proc.returncode not in [0, 1]: # 1 means findings were found + if not result["success"] and result["exit_code"] != 1: # 1 means findings were found return ToolResult( success=False, - error=f"Semgrep 执行失败: {stderr.decode()[:500]}", + error=f"Semgrep 执行失败: {result['stderr'][:500] or result['error']}", ) # 解析结果 try: - results = json.loads(stdout.decode()) + # 尝试从 stdout 查找 JSON + json_start = result['stdout'].find('{') + if json_start >= 0: + results = json.loads(result['stdout'][json_start:]) + else: + results = {} except json.JSONDecodeError: return ToolResult( success=False, - error="无法解析 Semgrep 输出", + error=f"无法解析 Semgrep 输出: {result['stdout'][:200]}", ) findings = results.get("results", [])[:max_results] @@ -203,60 +200,11 @@ Semgrep 是业界领先的静态分析工具,支持 30+ 种编程语言。 } ) - except asyncio.TimeoutError: - # 🔥 超时时提供更有用的信息 + except Exception as e: return ToolResult( success=False, - error=f"Semgrep 扫描超时(超过300秒)。可能原因:\n" - f"1. 规则集 '{rules}' 需要从网络下载,网络较慢\n" - f"2. 扫描目标过大\n" - f"建议:尝试使用 pattern_match 或 smart_scan 工具进行快速扫描" + error=f"Semgrep 执行错误: {str(e)}" ) - except Exception as e: - error_msg = str(e) - # 🔥 提供更详细的错误诊断 - return ToolResult( - success=False, - error=f"Semgrep 执行错误: {error_msg[:300]}\n" - f"建议:使用 pattern_match 或 smart_scan 工具作为替代" - ) - - async def _check_semgrep(self) -> bool: - """检查 Semgrep 是否可用""" - try: - proc = await asyncio.create_subprocess_exec( - "semgrep", "--version", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - await proc.communicate() - return proc.returncode == 0 - except: - return False - - async def _try_install_semgrep(self) -> bool: - """尝试自动安装 Semgrep""" - try: - logger.info("正在安装 Semgrep...") - proc = await asyncio.create_subprocess_exec( - "pip", "install", "semgrep", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120) - if proc.returncode == 0: - logger.info("Semgrep 安装成功") - # 验证安装 - return await self._check_semgrep() - else: - logger.warning(f"Semgrep 安装失败: {stderr.decode()[:200]}") - return False - except asyncio.TimeoutError: - logger.warning("Semgrep 安装超时") - return False - except Exception as e: - logger.warning(f"Semgrep 安装出错: {e}") - return False # ============ Bandit 工具 (Python) ============ @@ -285,6 +233,7 @@ class BanditTool(AgentTool): def __init__(self, project_root: str): super().__init__() self.project_root = project_root + self.sandbox_manager = SandboxManager() @property def name(self) -> str: @@ -319,15 +268,12 @@ Bandit 是 Python 专用的安全分析工具,由 OpenStack 安全团队开发 **kwargs ) -> ToolResult: """执行 Bandit 扫描""" - if not await self._check_bandit(): - return ToolResult( - success=False, - error="Bandit 未安装。请使用 'pip install bandit' 安装。", - ) - - full_path = os.path.normpath(os.path.join(self.project_root, target_path)) - if not full_path.startswith(os.path.normpath(self.project_root)): - return ToolResult(success=False, error="安全错误:路径越界") + # 确保 Docker 可用 + await self.sandbox_manager.initialize() + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="Docker 沙箱不可用") + + safe_target_path = target_path if not target_path.startswith("/") else target_path.lstrip("/") # 构建命令 severity_map = {"low": "l", "medium": "m", "high": "h"} @@ -337,21 +283,27 @@ Bandit 是 Python 专用的安全分析工具,由 OpenStack 安全团队开发 "bandit", "-r", "-f", "json", "-ll" if severity == "low" else f"-l{severity_map.get(severity, 'm')}", f"-i{confidence_map.get(confidence, 'm')}", - full_path + safe_target_path ] + cmd_str = " ".join(cmd) + try: - proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + result = await self.sandbox_manager.execute_tool_command( + command=cmd_str, + host_workdir=self.project_root, + timeout=120 ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120) try: - results = json.loads(stdout.decode()) + # find json in output + json_start = result['stdout'].find('{') + if json_start >= 0: + results = json.loads(result['stdout'][json_start:]) + else: + results = {} except json.JSONDecodeError: - return ToolResult(success=False, error="无法解析 Bandit 输出") + return ToolResult(success=False, error=f"无法解析 Bandit 输出: {result['stdout'][:200]}") findings = results.get("results", [])[:max_results] @@ -382,22 +334,8 @@ Bandit 是 Python 专用的安全分析工具,由 OpenStack 安全团队开发 metadata={"findings_count": len(findings), "findings": findings[:10]} ) - except asyncio.TimeoutError: - return ToolResult(success=False, error="Bandit 扫描超时") except Exception as e: return ToolResult(success=False, error=f"Bandit 执行错误: {str(e)}") - - async def _check_bandit(self) -> bool: - try: - proc = await asyncio.create_subprocess_exec( - "bandit", "--version", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - await proc.communicate() - return proc.returncode == 0 - except: - return False # ============ Gitleaks 工具 ============ @@ -425,6 +363,7 @@ class GitleaksTool(AgentTool): def __init__(self, project_root: str): super().__init__() self.project_root = project_root + self.sandbox_manager = SandboxManager() @property def name(self) -> str: @@ -460,35 +399,35 @@ Gitleaks 是专业的密钥检测工具,支持 150+ 种密钥类型。 **kwargs ) -> ToolResult: """执行 Gitleaks 扫描""" - if not await self._check_gitleaks(): - return ToolResult( - success=False, - error="Gitleaks 未安装。Gitleaks 需要手动安装,请参考: https://github.com/gitleaks/gitleaks/releases\n" - "安装方法:\n" - "- macOS: brew install gitleaks\n" - "- Linux: 下载二进制文件并添加到 PATH\n" - "- Windows: 下载二进制文件并添加到 PATH", - ) + # 确保 Docker 可用 + await self.sandbox_manager.initialize() + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="Docker 沙箱不可用") + + safe_target_path = target_path if not target_path.startswith("/") else target_path.lstrip("/") - full_path = os.path.normpath(os.path.join(self.project_root, target_path)) - if not full_path.startswith(os.path.normpath(self.project_root)): - return ToolResult(success=False, error="安全错误:路径越界") + # 构建命令 using . as source because we are mounted to /workspace + # But if user specified a subdirectory, we append it. + # Actually gitleaks detects pwd by default if source is . - cmd = ["gitleaks", "detect", "--source", full_path, "-f", "json"] + cmd = ["gitleaks", "detect", "--source", safe_target_path, "-f", "json"] if no_git: cmd.append("--no-git") + cmd_str = " ".join(cmd) + try: - proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + result = await self.sandbox_manager.execute_tool_command( + command=cmd_str, + host_workdir=self.project_root, + timeout=120 ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120) # Gitleaks returns 1 if secrets found - if proc.returncode not in [0, 1]: - return ToolResult(success=False, error=f"Gitleaks 执行失败: {stderr.decode()[:300]}") + if result['exit_code'] not in [0, 1]: + return ToolResult(success=False, error=f"Gitleaks 执行失败: {result['stderr'][:300]}") + + stdout = result['stdout'] if not stdout.strip(): return ToolResult( @@ -498,12 +437,17 @@ Gitleaks 是专业的密钥检测工具,支持 150+ 种密钥类型。 ) try: - findings = json.loads(stdout.decode()) + # Find JSON start + json_start = stdout.find('[') + if json_start >= 0: + findings = json.loads(stdout[json_start:]) + else: + findings = [] except json.JSONDecodeError: findings = [] if not findings: - return ToolResult( + return ToolResult( success=True, data="🔐 Gitleaks 扫描完成,未发现密钥泄露", metadata={"findings_count": 0} @@ -539,22 +483,8 @@ Gitleaks 是专业的密钥检测工具,支持 150+ 种密钥类型。 } ) - except asyncio.TimeoutError: - return ToolResult(success=False, error="Gitleaks 扫描超时") except Exception as e: return ToolResult(success=False, error=f"Gitleaks 执行错误: {str(e)}") - - async def _check_gitleaks(self) -> bool: - try: - proc = await asyncio.create_subprocess_exec( - "gitleaks", "version", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - await proc.communicate() - return proc.returncode == 0 - except: - return False # ============ npm audit 工具 ============ @@ -575,6 +505,7 @@ class NpmAuditTool(AgentTool): def __init__(self, project_root: str): super().__init__() self.project_root = project_root + self.sandbox_manager = SandboxManager() @property def name(self) -> str: @@ -602,9 +533,20 @@ class NpmAuditTool(AgentTool): **kwargs ) -> ToolResult: """执行 npm audit""" + # 确保 Docker 可用 + await self.sandbox_manager.initialize() + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="Docker 沙箱不可用") + + # 这里的 target_path 是相对于 project_root 的 + # 防止空路径 + safe_target_path = target_path if not target_path.startswith("/") else target_path.lstrip("/") + if not safe_target_path: + safe_target_path = "." + full_path = os.path.normpath(os.path.join(self.project_root, target_path)) - # 检查 package.json + # 宿主机预检查 package_json = os.path.join(full_path, "package.json") if not os.path.exists(package_json): return ToolResult( @@ -616,19 +558,35 @@ class NpmAuditTool(AgentTool): if production_only: cmd.append("--production") + # 组合命令: cd 到目标目录然后执行 + cmd_str = f"cd {safe_target_path} && {' '.join(cmd)}" + try: - proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - cwd=full_path, + # 清除代理设置,避免容器内网络问题 + proxy_env = { + "HTTPS_PROXY": "", + "HTTP_PROXY": "", + "https_proxy": "", + "http_proxy": "" + } + + result = await self.sandbox_manager.execute_tool_command( + command=cmd_str, + host_workdir=self.project_root, + timeout=120, + network_mode="bridge", + env=proxy_env ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120) try: - results = json.loads(stdout.decode()) + # npm audit json starts with { + json_start = result['stdout'].find('{') + if json_start >= 0: + results = json.loads(result['stdout'][json_start:]) + else: + return ToolResult(success=True, data=f"npm audit 输出为空或格式错误: {result['stdout'][:100]}") except json.JSONDecodeError: - return ToolResult(success=True, data="npm audit 输出为空或格式错误") + return ToolResult(success=True, data=f"npm audit 输出格式错误") vulnerabilities = results.get("vulnerabilities", {}) @@ -669,8 +627,6 @@ class NpmAuditTool(AgentTool): } ) - except asyncio.TimeoutError: - return ToolResult(success=False, error="npm audit 超时") except Exception as e: return ToolResult(success=False, error=f"npm audit 错误: {str(e)}") @@ -692,6 +648,7 @@ class SafetyTool(AgentTool): def __init__(self, project_root: str): super().__init__() self.project_root = project_root + self.sandbox_manager = SandboxManager() @property def name(self) -> str: @@ -717,32 +674,55 @@ class SafetyTool(AgentTool): **kwargs ) -> ToolResult: """执行 Safety 扫描""" + # 确保 Docker 可用 + await self.sandbox_manager.initialize() + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="Docker 沙箱不可用") + full_path = os.path.join(self.project_root, requirements_file) - if not os.path.exists(full_path): return ToolResult(success=False, error=f"未找到依赖文件: {requirements_file}") + + # commands + # requirements_file relative path inside container is just requirements_file (assuming it's relative to root) + # If requirements_file is absolute, we need to make it relative. + # But for security, `requirements_file` should be relative to project_root. + safe_req_file = requirements_file if not requirements_file.startswith("/") else requirements_file.lstrip("/") + + cmd = ["safety", "check", "-r", safe_req_file, "--json"] + cmd_str = " ".join(cmd) try: - proc = await asyncio.create_subprocess_exec( - "safety", "check", "-r", full_path, "--json", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + result = await self.sandbox_manager.execute_tool_command( + command=cmd_str, + host_workdir=self.project_root, + timeout=120 ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=60) + stdout = result['stdout'] try: # Safety 输出的 JSON 格式可能不同版本有差异 - output = stdout.decode() - if "No known security" in output: - return ToolResult( - success=True, - data="🐍 Safety 扫描完成,未发现 Python 依赖漏洞", - metadata={"findings_count": 0} - ) + # find first { or [ + start_idx = -1 + for i, char in enumerate(stdout): + if char in ['{', '[']: + start_idx = i + break - results = json.loads(output) + if start_idx >= 0: + output_json = stdout[start_idx:] + if "No known security" in output_json: + return ToolResult( + success=True, + data="🐍 Safety 扫描完成,未发现 Python 依赖漏洞", + metadata={"findings_count": 0} + ) + results = json.loads(output_json) + else: + return ToolResult(success=True, data=f"Safety 输出:\n{stdout[:1000]}") + except: - return ToolResult(success=True, data=f"Safety 输出:\n{stdout.decode()[:1000]}") + return ToolResult(success=True, data=f"Safety 输出解析失败:\n{stdout[:1000]}") vulnerabilities = results if isinstance(results, list) else results.get("vulnerabilities", []) @@ -791,6 +771,7 @@ class TruffleHogTool(AgentTool): def __init__(self, project_root: str): super().__init__() self.project_root = project_root + self.sandbox_manager = SandboxManager() @property def name(self) -> str: @@ -820,19 +801,27 @@ TruffleHog 可以扫描代码和 Git 历史,并验证密钥是否有效。 **kwargs ) -> ToolResult: """执行 TruffleHog 扫描""" - full_path = os.path.normpath(os.path.join(self.project_root, target_path)) + # 确保 Docker 可用 + await self.sandbox_manager.initialize() + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="Docker 沙箱不可用") + + safe_target_path = target_path if not target_path.startswith("/") else target_path.lstrip("/") - cmd = ["trufflehog", "filesystem", full_path, "--json"] + cmd = ["trufflehog", "filesystem", safe_target_path, "--json"] if only_verified: cmd.append("--only-verified") + cmd_str = " ".join(cmd) + try: - proc = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + result = await self.sandbox_manager.execute_tool_command( + command=cmd_str, + host_workdir=self.project_root, + timeout=180 ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=180) + + stdout = result['stdout'] if not stdout.strip(): return ToolResult( @@ -843,7 +832,7 @@ TruffleHog 可以扫描代码和 Git 历史,并验证密钥是否有效。 # TruffleHog 输出每行一个 JSON 对象 findings = [] - for line in stdout.decode().strip().split('\n'): + for line in stdout.strip().split('\n'): if line.strip(): try: findings.append(json.loads(line)) @@ -871,8 +860,6 @@ TruffleHog 可以扫描代码和 Git 历史,并验证密钥是否有效。 metadata={"findings_count": len(findings)} ) - except asyncio.TimeoutError: - return ToolResult(success=False, error="TruffleHog 扫描超时") except Exception as e: return ToolResult(success=False, error=f"TruffleHog 执行错误: {str(e)}") @@ -895,6 +882,7 @@ class OSVScannerTool(AgentTool): def __init__(self, project_root: str): super().__init__() self.project_root = project_root + self.sandbox_manager = SandboxManager() @property def name(self) -> str: @@ -929,22 +917,32 @@ Google 开源的漏洞扫描工具,使用 OSV (Open Source Vulnerabilities) **kwargs ) -> ToolResult: """执行 OSV-Scanner""" - full_path = os.path.normpath(os.path.join(self.project_root, target_path)) + # 确保 Docker 可用 + await self.sandbox_manager.initialize() + if not self.sandbox_manager.is_available: + return ToolResult(success=False, error="Docker 沙箱不可用") + + safe_target_path = target_path if not target_path.startswith("/") else target_path.lstrip("/") + + # OSV-Scanner + cmd = ["osv-scanner", "--json", "-r", safe_target_path] + cmd_str = " ".join(cmd) try: - proc = await asyncio.create_subprocess_exec( - "osv-scanner", "--json", "-r", full_path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + result = await self.sandbox_manager.execute_tool_command( + command=cmd_str, + host_workdir=self.project_root, + timeout=120 ) - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=120) + + stdout = result['stdout'] try: - results = json.loads(stdout.decode()) + results = json.loads(stdout) except: - if "no package sources found" in stdout.decode().lower(): + if "no package sources found" in stdout.lower(): return ToolResult(success=True, data="OSV-Scanner: 未找到可扫描的包文件") - return ToolResult(success=True, data=f"OSV-Scanner 输出:\n{stdout.decode()[:1000]}") + return ToolResult(success=True, data=f"OSV-Scanner 输出:\n{stdout[:1000]}") vulns = results.get("results", []) diff --git a/backend/app/services/agent/tools/sandbox_tool.py b/backend/app/services/agent/tools/sandbox_tool.py index 91a0257..3ad0f53 100644 --- a/backend/app/services/agent/tools/sandbox_tool.py +++ b/backend/app/services/agent/tools/sandbox_tool.py @@ -21,7 +21,7 @@ logger = logging.getLogger(__name__) @dataclass class SandboxConfig: """沙箱配置""" - image: str = "python:3.11-slim" + image: str = "deepaudit/sandbox:latest" memory_limit: str = "512m" cpu_limit: float = 1.0 timeout: int = 60 @@ -109,6 +109,9 @@ class SandboxManager: "volumes": { temp_dir: {"bind": "/workspace", "mode": "rw"}, }, + "tmpfs": { + "/home/sandbox": "rw,size=100m,mode=1777" + }, "working_dir": working_dir or "/workspace", "environment": env or {}, # 安全配置 @@ -169,6 +172,114 @@ class SandboxManager: "exit_code": -1, } + async def execute_tool_command( + self, + command: str, + host_workdir: str, + timeout: Optional[int] = None, + env: Optional[Dict[str, str]] = None, + network_mode: str = "none", + ) -> Dict[str, Any]: + """ + 在沙箱中对指定目录执行工具命令 + + Args: + command: 要执行的命令 + host_workdir: 宿主机上的工作目录(将被挂载到 /workspace) + timeout: 超时时间 + env: 环境变量 + network_mode: 网络模式 (none, bridge, host) + + Returns: + 执行结果 + """ + if not self.is_available: + return { + "success": False, + "error": "Docker 不可用", + "stdout": "", + "stderr": "", + "exit_code": -1, + } + + timeout = timeout or self.config.timeout + + try: + # 准备容器配置 + container_config = { + "image": self.config.image, + "command": ["sh", "-c", command], + "detach": True, + "mem_limit": self.config.memory_limit, + "cpu_period": 100000, + "cpu_quota": int(100000 * self.config.cpu_limit), + "network_mode": network_mode, + "user": self.config.user, + "read_only": self.config.read_only, + "volumes": { + host_workdir: {"bind": "/workspace", "mode": "ro"}, # 只读挂载项目代码 + }, + "tmpfs": { + "/home/sandbox": "rw,size=100m,mode=1777" + }, + "working_dir": "/workspace", + "environment": env or {}, + "cap_drop": ["ALL"], + "security_opt": ["no-new-privileges:true"], + } + + # 创建并启动容器 + container = await asyncio.to_thread( + self._docker_client.containers.run, + **container_config + ) + + try: + # 等待执行完成 + result = await asyncio.wait_for( + asyncio.to_thread(container.wait), + timeout=timeout + ) + + # 获取日志 + stdout = await asyncio.to_thread( + container.logs, stdout=True, stderr=False + ) + stderr = await asyncio.to_thread( + container.logs, stdout=False, stderr=True + ) + + return { + "success": result["StatusCode"] == 0, + "stdout": stdout.decode('utf-8', errors='ignore')[:50000], # 增大日志限制 + "stderr": stderr.decode('utf-8', errors='ignore')[:5000], + "exit_code": result["StatusCode"], + "error": None, + } + + except asyncio.TimeoutError: + await asyncio.to_thread(container.kill) + return { + "success": False, + "error": f"执行超时 ({timeout}秒)", + "stdout": "", + "stderr": "", + "exit_code": -1, + } + + finally: + # 清理容器 + await asyncio.to_thread(container.remove, force=True) + + except Exception as e: + logger.error(f"Tool execution error: {e}") + return { + "success": False, + "error": str(e), + "stdout": "", + "stderr": "", + "exit_code": -1, + } async def execute_python( self, code: str, diff --git a/docker/sandbox/Dockerfile b/docker/sandbox/Dockerfile index 530ae4e..782984c 100644 --- a/docker/sandbox/Dockerfile +++ b/docker/sandbox/Dockerfile @@ -1,13 +1,19 @@ # DeepAudit Agent Sandbox # 安全沙箱环境用于漏洞验证和 PoC 执行 +# 集成各类安全扫描工具 (Semgrep, Bandit, Gitleaks, etc.) -FROM python:3.11-slim-bookworm +FROM python:3.11-bullseye LABEL maintainer="XCodeReviewer Team" -LABEL description="Secure sandbox environment for vulnerability verification" +LABEL description="Secure sandbox environment for vulnerability verification and security scanning" # 安装基本工具 -RUN apt-get update && apt-get install -y --no-install-recommends \ +# 安装基本工具 +#Configuring mirrors for CN and unsetting broken proxy +RUN sed -i 's/deb.debian.org/mirrors.aliyun.com/g' /etc/apt/sources.list && \ + sed -i 's/security.debian.org/mirrors.aliyun.com/g' /etc/apt/sources.list && \ + unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY && \ + apt-get update && apt-get install -y --no-install-recommends \ curl \ wget \ netcat-openbsd \ @@ -15,15 +21,19 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ iputils-ping \ ca-certificates \ git \ + unzip \ + jq \ && rm -rf /var/lib/apt/lists/* -# 安装 Node.js (用于 JavaScript/TypeScript 代码执行) -RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \ +# 安装 Node.js (用于 JavaScript/TypeScript 代码执行 和 npm audit) +RUN unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY && \ + curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \ && apt-get install -y nodejs \ && rm -rf /var/lib/apt/lists/* -# 安装常用的安全测试 Python 库 -RUN pip install --no-cache-dir \ +# 安装 Python 安全工具 +RUN unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY && \ + pip install --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple \ requests \ httpx \ aiohttp \ @@ -33,7 +43,26 @@ RUN pip install --no-cache-dir \ paramiko \ pyjwt \ python-jose \ - sqlparse + sqlparse \ + semgrep \ + bandit \ + safety + +# 安装 Gitleaks +RUN unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY && \ + curl -L https://github.com/gitleaks/gitleaks/releases/download/v8.18.2/gitleaks_8.18.2_linux_x64.tar.gz -o gitleaks.tar.gz && \ + tar -xzf gitleaks.tar.gz && \ + mv gitleaks /usr/local/bin/ && \ + rm gitleaks.tar.gz + +# 安装 TruffleHog +RUN unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY && \ + curl -sSfL https://raw.githubusercontent.com/trufflesecurity/trufflehog/main/scripts/install.sh | sh -s -- -b /usr/local/bin + +# 安装 OSV-Scanner +RUN unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY && \ + curl -L https://github.com/google/osv-scanner/releases/download/v1.6.2/osv-scanner_1.6.2_linux_amd64 -o /usr/local/bin/osv-scanner && \ + chmod +x /usr/local/bin/osv-scanner # 创建非 root 用户 RUN groupadd -g 1000 sandbox && \