#!/usr/bin/env python3 """ 创建 Agent 审计任务演示数据 用于生成 HTML 报告示例展示 运行方式: cd backend && python -m scripts.create_agent_demo_data """ import asyncio import json import uuid import sys import os from datetime import datetime, timedelta, timezone # 添加backend目录到路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from sqlalchemy.future import select from app.core.config import settings from app.models.user import User from app.models.project import Project from app.models.agent_task import ( AgentTask, AgentEvent, AgentFinding, AgentTreeNode, AgentCheckpoint, AgentTaskStatus, AgentTaskPhase, AgentEventType, VulnerabilitySeverity, VulnerabilityType, FindingStatus ) # 演示数据配置 DEMO_PROJECT_NAME = "VulnWebApp - 安全演示项目" DEMO_TASK_NAME = "智能漏洞挖掘审计 - 完整示例" async def get_or_create_demo_project(db: AsyncSession, user_id: str) -> Project: """获取或创建演示项目""" result = await db.execute( select(Project).where(Project.name == DEMO_PROJECT_NAME) ) project = result.scalars().first() if not project: project = Project( name=DEMO_PROJECT_NAME, description="用于演示 Agent 智能审计功能的示例 Web 应用项目,包含多种常见安全漏洞", source_type="zip", owner_id=user_id, is_active=True, default_branch="main", programming_languages=json.dumps(["Python", "JavaScript", "SQL"]), created_at=datetime.now(timezone.utc) - timedelta(days=7), ) db.add(project) await db.flush() print(f"✓ 创建演示项目: {project.name}") else: print(f"演示项目已存在: {project.name}") return project async def create_agent_demo_task(db: AsyncSession, project: Project, user_id: str) -> AgentTask: """创建 Agent 审计任务演示数据""" # 检查是否已存在 result = await db.execute( select(AgentTask).where(AgentTask.name == DEMO_TASK_NAME) ) existing = result.scalars().first() if existing: print(f"删除已存在的演示任务: {existing.id}") await db.delete(existing) await db.flush() now = datetime.now(timezone.utc) task_start = now - timedelta(minutes=15) task_end = now - timedelta(minutes=2) # 创建 Agent 任务 task = AgentTask( id=str(uuid.uuid4()), project_id=project.id, created_by=user_id, name=DEMO_TASK_NAME, description="对 VulnWebApp 进行全面的安全漏洞扫描,包括 SQL 注入、XSS、命令注入等常见漏洞类型的检测与验证", task_type="agent_audit", # 配置 audit_scope={"include": ["**/*.py", "**/*.js", "**/*.html"], "exclude": ["tests/*", "node_modules/*"]}, target_vulnerabilities=["sql_injection", "xss", "command_injection", "path_traversal", "ssrf", "hardcoded_secret"], verification_level="sandbox", branch_name="main", exclude_patterns=["*.test.py", "*.spec.js", "__pycache__/*"], # LLM 配置 llm_config={"provider": "openai", "model": "gpt-4", "temperature": 0.1}, agent_config={"max_depth": 3, "enable_verification": True, "enable_poc_generation": True}, max_iterations=50, token_budget=100000, timeout_seconds=1800, # 状态 status=AgentTaskStatus.COMPLETED, current_phase=AgentTaskPhase.REPORTING, current_step="报告生成完成", # 进度统计 total_files=48, indexed_files=48, analyzed_files=48, total_chunks=156, # Agent 统计 total_iterations=32, tool_calls_count=87, tokens_used=45680, # 发现统计 findings_count=8, verified_count=6, false_positive_count=1, # 严重程度统计 critical_count=2, high_count=3, medium_count=2, low_count=1, # 评分 quality_score=72.5, security_score=35.8, # 审计计划 audit_plan={ "phases": [ {"name": "代码索引", "description": "建立代码向量索引,支持语义检索"}, {"name": "入口点识别", "description": "识别用户输入入口点和敏感API"}, {"name": "漏洞模式匹配", "description": "基于已知漏洞模式进行检测"}, {"name": "数据流分析", "description": "追踪污点数据流,验证漏洞可达性"}, {"name": "沙箱验证", "description": "在隔离环境中验证漏洞可利用性"}, {"name": "PoC 生成", "description": "为已验证漏洞生成概念验证代码"}, ], "focus_areas": ["用户认证模块", "数据库查询接口", "文件上传功能", "API 端点"], }, # 时间戳 created_at=task_start - timedelta(minutes=1), started_at=task_start, completed_at=task_end, ) db.add(task) await db.flush() print(f"✓ 创建 Agent 任务: {task.id}") return task async def create_agent_events(db: AsyncSession, task: AgentTask) -> list: """创建 Agent 事件流""" events = [] base_time = task.started_at sequence = 0 def add_event(event_type: str, message: str, phase: str = None, tool_name: str = None, tool_input: dict = None, tool_output: dict = None, tool_duration_ms: int = None, finding_id: str = None, tokens_used: int = 0, metadata: dict = None, time_offset_seconds: int = 0): nonlocal sequence sequence += 1 event = AgentEvent( id=str(uuid.uuid4()), task_id=task.id, event_type=event_type, phase=phase, message=message, tool_name=tool_name, tool_input=tool_input, tool_output=tool_output, tool_duration_ms=tool_duration_ms, finding_id=finding_id, tokens_used=tokens_used, event_metadata=metadata, sequence=sequence, created_at=base_time + timedelta(seconds=time_offset_seconds), ) events.append(event) return event # ========== 任务启动 ========== add_event( AgentEventType.TASK_START, "Agent 审计任务启动,开始智能漏洞挖掘", metadata={"target_vulnerabilities": task.target_vulnerabilities}, time_offset_seconds=0 ) # ========== 规划阶段 ========== add_event( AgentEventType.PHASE_START, "进入规划阶段 - 分析项目结构,制定审计策略", phase=AgentTaskPhase.PLANNING, time_offset_seconds=5 ) add_event( AgentEventType.THINKING, "分析项目结构:检测到 Flask Web 应用框架,包含用户认证、数据库操作、文件处理等模块。重点关注 SQL 注入、XSS、命令注入等高危漏洞。", phase=AgentTaskPhase.PLANNING, tokens_used=450, time_offset_seconds=10 ) add_event( AgentEventType.PLANNING, "制定审计计划:1) 索引代码库 2) 识别入口点 3) 模式匹配检测 4) 数据流分析 5) 沙箱验证 6) 生成报告", phase=AgentTaskPhase.PLANNING, tokens_used=380, time_offset_seconds=15 ) add_event( AgentEventType.PHASE_COMPLETE, "规划阶段完成,识别出 12 个高优先级检查点", phase=AgentTaskPhase.PLANNING, time_offset_seconds=20 ) # ========== 索引阶段 ========== add_event( AgentEventType.PHASE_START, "进入索引阶段 - 构建代码向量索引", phase=AgentTaskPhase.INDEXING, time_offset_seconds=25 ) add_event( AgentEventType.TOOL_CALL, "调用 RAG 索引工具,处理源代码文件", phase=AgentTaskPhase.INDEXING, tool_name="rag_index", tool_input={"paths": ["app/", "routes/", "models/", "utils/"], "chunk_size": 1500}, time_offset_seconds=30 ) add_event( AgentEventType.RAG_RESULT, "代码索引完成:48 个文件,156 个代码块,向量维度 1536", phase=AgentTaskPhase.INDEXING, tool_name="rag_index", tool_output={"files_indexed": 48, "chunks_created": 156, "vector_dim": 1536}, tool_duration_ms=8500, time_offset_seconds=45 ) add_event( AgentEventType.PHASE_COMPLETE, "索引阶段完成", phase=AgentTaskPhase.INDEXING, time_offset_seconds=50 ) # ========== 分析阶段 ========== add_event( AgentEventType.PHASE_START, "进入分析阶段 - 执行漏洞检测", phase=AgentTaskPhase.ANALYSIS, time_offset_seconds=55 ) # SQL 注入检测 add_event( AgentEventType.THINKING, "开始检测 SQL 注入漏洞:搜索数据库查询相关代码,识别用户输入拼接到 SQL 语句的模式", phase=AgentTaskPhase.ANALYSIS, tokens_used=320, time_offset_seconds=60 ) add_event( AgentEventType.RAG_QUERY, "语义检索:查找 SQL 查询和用户输入处理代码", phase=AgentTaskPhase.ANALYSIS, tool_name="rag_search", tool_input={"query": "SQL query user input parameter database execute", "top_k": 10}, time_offset_seconds=65 ) add_event( AgentEventType.TOOL_CALL, "读取文件: app/routes/user.py", phase=AgentTaskPhase.ANALYSIS, tool_name="read_file", tool_input={"path": "app/routes/user.py", "start_line": 45, "end_line": 80}, time_offset_seconds=70 ) add_event( AgentEventType.FINDING_NEW, "发现 SQL 注入漏洞 [Critical]", phase=AgentTaskPhase.ANALYSIS, metadata={"vulnerability_type": "sql_injection", "severity": "critical", "file": "app/routes/user.py", "line": 52}, time_offset_seconds=80 ) # XSS 检测 add_event( AgentEventType.THINKING, "开始检测 XSS 漏洞:搜索 HTML 渲染和用户输入输出相关代码", phase=AgentTaskPhase.ANALYSIS, tokens_used=280, time_offset_seconds=120 ) add_event( AgentEventType.TOOL_CALL, "读取文件: app/templates/comment.html", phase=AgentTaskPhase.ANALYSIS, tool_name="read_file", tool_input={"path": "app/templates/comment.html"}, time_offset_seconds=130 ) add_event( AgentEventType.FINDING_NEW, "发现存储型 XSS 漏洞 [High]", phase=AgentTaskPhase.ANALYSIS, metadata={"vulnerability_type": "xss", "severity": "high", "file": "app/templates/comment.html", "line": 28}, time_offset_seconds=145 ) # 命令注入检测 add_event( AgentEventType.RAG_QUERY, "语义检索:查找系统命令执行相关代码", phase=AgentTaskPhase.ANALYSIS, tool_name="rag_search", tool_input={"query": "os.system subprocess shell command execute", "top_k": 10}, time_offset_seconds=180 ) add_event( AgentEventType.FINDING_NEW, "发现命令注入漏洞 [Critical]", phase=AgentTaskPhase.ANALYSIS, metadata={"vulnerability_type": "command_injection", "severity": "critical", "file": "app/utils/backup.py", "line": 34}, time_offset_seconds=210 ) # 路径遍历检测 add_event( AgentEventType.TOOL_CALL, "分析文件操作代码", phase=AgentTaskPhase.ANALYSIS, tool_name="analyze_code", tool_input={"pattern": "file path user input", "scope": "app/routes/"}, time_offset_seconds=250 ) add_event( AgentEventType.FINDING_NEW, "发现路径遍历漏洞 [High]", phase=AgentTaskPhase.ANALYSIS, metadata={"vulnerability_type": "path_traversal", "severity": "high", "file": "app/routes/download.py", "line": 18}, time_offset_seconds=280 ) # SSRF 检测 add_event( AgentEventType.FINDING_NEW, "发现 SSRF 漏洞 [High]", phase=AgentTaskPhase.ANALYSIS, metadata={"vulnerability_type": "ssrf", "severity": "high", "file": "app/routes/proxy.py", "line": 42}, time_offset_seconds=320 ) # 硬编码密钥检测 add_event( AgentEventType.TOOL_CALL, "扫描硬编码密钥和敏感信息", phase=AgentTaskPhase.ANALYSIS, tool_name="secret_scan", tool_input={"patterns": ["api_key", "password", "secret", "token"]}, time_offset_seconds=360 ) add_event( AgentEventType.FINDING_NEW, "发现硬编码 API 密钥 [Medium]", phase=AgentTaskPhase.ANALYSIS, metadata={"vulnerability_type": "hardcoded_secret", "severity": "medium", "file": "app/config.py", "line": 15}, time_offset_seconds=380 ) add_event( AgentEventType.FINDING_NEW, "发现弱加密配置 [Medium]", phase=AgentTaskPhase.ANALYSIS, metadata={"vulnerability_type": "weak_crypto", "severity": "medium", "file": "app/utils/crypto.py", "line": 8}, time_offset_seconds=400 ) add_event( AgentEventType.FINDING_NEW, "发现调试模式未关闭 [Low]", phase=AgentTaskPhase.ANALYSIS, metadata={"vulnerability_type": "security_misconfiguration", "severity": "low", "file": "app/__init__.py", "line": 25}, time_offset_seconds=420 ) add_event( AgentEventType.PHASE_COMPLETE, "分析阶段完成,发现 8 个潜在漏洞", phase=AgentTaskPhase.ANALYSIS, time_offset_seconds=450 ) # ========== 验证阶段 ========== add_event( AgentEventType.PHASE_START, "进入验证阶段 - 在沙箱环境中验证漏洞", phase=AgentTaskPhase.VERIFICATION, time_offset_seconds=460 ) # SQL 注入验证 add_event( AgentEventType.SANDBOX_START, "启动沙箱环境验证 SQL 注入漏洞", phase=AgentTaskPhase.VERIFICATION, tool_name="sandbox", time_offset_seconds=470 ) add_event( AgentEventType.SANDBOX_EXEC, "执行 SQL 注入 PoC:' OR '1'='1' --", phase=AgentTaskPhase.VERIFICATION, tool_name="sandbox", tool_input={"payload": "' OR '1'='1' --", "target": "/api/user/search?name="}, time_offset_seconds=480 ) add_event( AgentEventType.SANDBOX_RESULT, "SQL 注入验证成功 - 成功绕过认证获取所有用户数据", phase=AgentTaskPhase.VERIFICATION, tool_name="sandbox", tool_output={"success": True, "response_code": 200, "data_leaked": True}, tool_duration_ms=1200, time_offset_seconds=490 ) add_event( AgentEventType.FINDING_VERIFIED, "SQL 注入漏洞已验证 [Critical]", phase=AgentTaskPhase.VERIFICATION, time_offset_seconds=495 ) # 命令注入验证 add_event( AgentEventType.SANDBOX_EXEC, "执行命令注入 PoC:; id; whoami", phase=AgentTaskPhase.VERIFICATION, tool_name="sandbox", tool_input={"payload": "; id; whoami", "target": "/api/backup?filename="}, time_offset_seconds=520 ) add_event( AgentEventType.SANDBOX_RESULT, "命令注入验证成功 - 成功执行任意系统命令", phase=AgentTaskPhase.VERIFICATION, tool_name="sandbox", tool_output={"success": True, "output": "uid=1000(www-data) gid=1000(www-data)"}, tool_duration_ms=800, time_offset_seconds=535 ) add_event( AgentEventType.FINDING_VERIFIED, "命令注入漏洞已验证 [Critical]", phase=AgentTaskPhase.VERIFICATION, time_offset_seconds=540 ) # XSS 验证 add_event( AgentEventType.SANDBOX_EXEC, "执行 XSS PoC:", phase=AgentTaskPhase.VERIFICATION, tool_name="sandbox", tool_input={"payload": "", "target": "/api/comment"}, time_offset_seconds=560 ) add_event( AgentEventType.FINDING_VERIFIED, "存储型 XSS 漏洞已验证 [High]", phase=AgentTaskPhase.VERIFICATION, time_offset_seconds=580 ) # 路径遍历验证 add_event( AgentEventType.SANDBOX_EXEC, "执行路径遍历 PoC:../../../etc/passwd", phase=AgentTaskPhase.VERIFICATION, tool_name="sandbox", tool_input={"payload": "../../../etc/passwd", "target": "/api/download?file="}, time_offset_seconds=600 ) add_event( AgentEventType.FINDING_VERIFIED, "路径遍历漏洞已验证 [High]", phase=AgentTaskPhase.VERIFICATION, time_offset_seconds=620 ) # SSRF 验证 add_event( AgentEventType.SANDBOX_EXEC, "执行 SSRF PoC:http://169.254.169.254/latest/meta-data/", phase=AgentTaskPhase.VERIFICATION, tool_name="sandbox", tool_input={"payload": "http://169.254.169.254/latest/meta-data/", "target": "/api/proxy?url="}, time_offset_seconds=640 ) add_event( AgentEventType.FINDING_VERIFIED, "SSRF 漏洞已验证 [High]", phase=AgentTaskPhase.VERIFICATION, time_offset_seconds=660 ) # 误报排除 add_event( AgentEventType.THINKING, "验证硬编码密钥:检查是否为测试/示例配置", phase=AgentTaskPhase.VERIFICATION, tokens_used=180, time_offset_seconds=680 ) add_event( AgentEventType.FINDING_FALSE_POSITIVE, "硬编码密钥为误报 - 该文件为示例配置模板", phase=AgentTaskPhase.VERIFICATION, metadata={"reason": "File is example configuration template, not production code"}, time_offset_seconds=700 ) add_event( AgentEventType.PHASE_COMPLETE, "验证阶段完成:6 个漏洞已验证,1 个误报已排除", phase=AgentTaskPhase.VERIFICATION, time_offset_seconds=720 ) # ========== 报告阶段 ========== add_event( AgentEventType.PHASE_START, "进入报告阶段 - 生成安全审计报告", phase=AgentTaskPhase.REPORTING, time_offset_seconds=730 ) add_event( AgentEventType.TOOL_CALL, "生成漏洞详情和修复建议", phase=AgentTaskPhase.REPORTING, tool_name="generate_report", tool_input={"format": "html", "include_poc": True, "include_fix": True}, time_offset_seconds=740 ) add_event( AgentEventType.INFO, "报告生成完成:包含 8 个发现、6 个已验证漏洞、详细修复建议和 PoC 代码", phase=AgentTaskPhase.REPORTING, time_offset_seconds=760 ) add_event( AgentEventType.PHASE_COMPLETE, "报告阶段完成", phase=AgentTaskPhase.REPORTING, time_offset_seconds=770 ) # ========== 任务完成 ========== add_event( AgentEventType.TASK_COMPLETE, "Agent 审计任务完成!发现 8 个安全问题,其中 2 个严重、3 个高危、2 个中危、1 个低危", metadata={ "total_findings": 8, "verified": 6, "false_positives": 1, "severity_distribution": {"critical": 2, "high": 3, "medium": 2, "low": 1}, "duration_seconds": 780, "tokens_used": 45680, }, time_offset_seconds=780 ) # 批量保存事件 for event in events: db.add(event) await db.flush() print(f"✓ 创建了 {len(events)} 个 Agent 事件") return events async def create_agent_findings(db: AsyncSession, task: AgentTask) -> list: """创建 Agent 发现的漏洞""" findings_data = [ { "vulnerability_type": VulnerabilityType.SQL_INJECTION, "severity": VulnerabilitySeverity.CRITICAL, "title": "用户搜索接口存在 SQL 注入漏洞", "description": "在 /api/user/search 接口中,用户输入的 name 参数直接拼接到 SQL 查询语句中,未经过任何过滤或参数化处理,攻击者可以通过构造恶意输入执行任意 SQL 语句。", "file_path": "app/routes/user.py", "line_start": 52, "line_end": 58, "function_name": "search_user", "code_snippet": '''@app.route('/api/user/search') def search_user(): name = request.args.get('name', '') # 危险:直接拼接用户输入到SQL语句 query = f"SELECT * FROM users WHERE name LIKE '%{name}%'" result = db.execute(query) return jsonify(result.fetchall())''', "source": "request.args.get('name')", "sink": "db.execute(query)", "dataflow_path": [ {"step": 1, "location": "line 54", "description": "用户输入从 request.args.get() 获取"}, {"step": 2, "location": "line 56", "description": "用户输入直接拼接到 SQL 字符串"}, {"step": 3, "location": "line 57", "description": "拼接后的 SQL 被执行"}, ], "status": FindingStatus.VERIFIED, "is_verified": True, "verification_method": "沙箱验证 - 成功执行 SQL 注入攻击", "verification_result": {"success": True, "payload": "' OR '1'='1' --", "impact": "绕过认证,获取所有用户数据"}, "has_poc": True, "poc_code": '''import requests # SQL 注入 PoC target_url = "http://target.com/api/user/search" # Payload: 绕过认证获取所有用户 payload = "' OR '1'='1' --" response = requests.get(target_url, params={"name": payload}) print(f"Status: {response.status_code}") print(f"Data: {response.json()}") # 预期结果:返回所有用户数据,而非仅匹配搜索条件的用户''', "poc_description": "通过在 name 参数中注入 SQL 语句,绕过查询条件获取数据库中所有用户信息", "poc_steps": [ "访问目标 URL: /api/user/search?name=' OR '1'='1' --", "观察响应:应返回所有用户数据", "进一步利用:可尝试 UNION 注入获取其他表数据", ], "suggestion": "使用参数化查询或 ORM 框架来防止 SQL 注入", "fix_code": '''@app.route('/api/user/search') def search_user(): name = request.args.get('name', '') # 修复:使用参数化查询 query = "SELECT * FROM users WHERE name LIKE :name" result = db.execute(query, {"name": f"%{name}%"}) return jsonify(result.fetchall())''', "fix_description": "使用 SQLAlchemy 的参数化查询功能,将用户输入作为参数传递,而非直接拼接到 SQL 语句中", "references": [ {"type": "CWE", "id": "CWE-89", "url": "https://cwe.mitre.org/data/definitions/89.html"}, {"type": "OWASP", "id": "A03:2021", "url": "https://owasp.org/Top10/A03_2021-Injection/"}, ], "ai_explanation": "这是一个典型的 SQL 注入漏洞。代码直接将用户输入拼接到 SQL 查询字符串中,没有进行任何转义或参数化处理。攻击者可以通过特殊字符(如单引号)闭合原有的 SQL 语句,然后注入自己的 SQL 代码。", "ai_confidence": 0.98, "xai_what": "SQL 注入是一种代码注入技术,攻击者通过在输入字段中插入恶意 SQL 代码来操纵数据库查询。", "xai_why": "该漏洞存在是因为开发者直接将用户输入拼接到 SQL 语句中,没有使用参数化查询或进行输入验证。", "xai_how": "攻击者可以在 name 参数中输入 ' OR '1'='1' -- 来绕过查询条件,或使用 UNION SELECT 来获取其他表的数据。", "xai_impact": "攻击者可以:1) 绕过认证 2) 读取敏感数据 3) 修改或删除数据 4) 在某些情况下执行系统命令。", "cvss_score": 9.8, "cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "tags": ["owasp-top10", "injection", "database", "authentication-bypass"], }, { "vulnerability_type": VulnerabilityType.COMMAND_INJECTION, "severity": VulnerabilitySeverity.CRITICAL, "title": "备份功能存在命令注入漏洞", "description": "在备份功能中,用户提供的文件名参数直接传递给 os.system() 函数执行,攻击者可以通过命令分隔符(如 ; 或 |)注入任意系统命令。", "file_path": "app/utils/backup.py", "line_start": 34, "line_end": 40, "function_name": "create_backup", "code_snippet": '''def create_backup(filename): """创建备份文件""" # 危险:直接将用户输入传递给系统命令 backup_path = f"/backups/{filename}.tar.gz" cmd = f"tar -czf {backup_path} /data/" os.system(cmd) # 命令注入风险 return backup_path''', "source": "filename 参数", "sink": "os.system(cmd)", "dataflow_path": [ {"step": 1, "location": "line 34", "description": "filename 参数从外部传入"}, {"step": 2, "location": "line 36", "description": "filename 拼接到 shell 命令"}, {"step": 3, "location": "line 37", "description": "命令通过 os.system() 执行"}, ], "status": FindingStatus.VERIFIED, "is_verified": True, "verification_method": "沙箱验证 - 成功执行任意命令", "verification_result": {"success": True, "payload": "; id; whoami", "output": "uid=1000(www-data)"}, "has_poc": True, "poc_code": '''import requests # 命令注入 PoC target_url = "http://target.com/api/backup" # Payload: 注入系统命令 payload = "test; id; cat /etc/passwd" response = requests.post(target_url, json={"filename": payload}) print(f"Response: {response.text}") # 预期结果:服务器执行 id 和 cat /etc/passwd 命令''', "poc_description": "通过在 filename 参数中注入分号和系统命令,在服务器上执行任意代码", "poc_steps": [ "构造恶意 filename: test; id; cat /etc/passwd", "发送请求到 /api/backup 接口", "观察服务器响应或日志中的命令执行结果", ], "suggestion": "避免使用 os.system(),改用 subprocess 模块并禁用 shell=True,对用户输入进行严格的白名单验证", "fix_code": '''import subprocess import re def create_backup(filename): """创建备份文件 - 安全版本""" # 修复:验证文件名只包含安全字符 if not re.match(r'^[a-zA-Z0-9_-]+$', filename): raise ValueError("Invalid filename") backup_path = f"/backups/{filename}.tar.gz" # 修复:使用 subprocess 并传递参数列表 subprocess.run( ["tar", "-czf", backup_path, "/data/"], check=True, shell=False # 禁用shell ) return backup_path''', "fix_description": "1) 使用正则表达式验证文件名只包含安全字符 2) 使用 subprocess.run() 替代 os.system() 3) 禁用 shell 模式,将参数作为列表传递", "references": [ {"type": "CWE", "id": "CWE-78", "url": "https://cwe.mitre.org/data/definitions/78.html"}, {"type": "OWASP", "id": "A03:2021", "url": "https://owasp.org/Top10/A03_2021-Injection/"}, ], "ai_explanation": "这是一个严重的命令注入漏洞。os.system() 函数会通过 shell 执行命令,当用户输入被直接拼接到命令字符串中时,攻击者可以使用 shell 的特殊字符(如 ;、|、&&)来注入额外的命令。", "ai_confidence": 0.99, "xai_what": "命令注入允许攻击者在目标系统上执行任意操作系统命令。", "xai_why": "该漏洞存在是因为用户输入直接拼接到 shell 命令中,没有进行任何过滤或转义。", "xai_how": "攻击者可以在 filename 参数中输入 ; rm -rf / 来删除服务器文件,或执行反弹 shell 获取服务器控制权。", "xai_impact": "完全的服务器控制权,包括:读取敏感文件、安装后门、横向移动、数据窃取、服务中断等。", "cvss_score": 10.0, "cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H", "tags": ["owasp-top10", "injection", "rce", "critical"], }, { "vulnerability_type": VulnerabilityType.XSS, "severity": VulnerabilitySeverity.HIGH, "title": "评论功能存在存储型 XSS 漏洞", "description": "用户提交的评论内容在展示时未经 HTML 转义直接渲染,攻击者可以在评论中注入恶意 JavaScript 代码,当其他用户查看评论时会执行这些代码。", "file_path": "app/templates/comment.html", "line_start": 28, "line_end": 32, "function_name": None, "code_snippet": '''
{{ comment.content }}
{{ comment.content | safe }}