236 lines
9.0 KiB
Python
236 lines
9.0 KiB
Python
|
|
"""
|
|||
|
|
漏洞报告工具
|
|||
|
|
|
|||
|
|
正式记录漏洞的唯一方式,确保漏洞报告的规范性和完整性。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import logging
|
|||
|
|
import uuid
|
|||
|
|
from datetime import datetime, timezone
|
|||
|
|
from typing import Optional, List, Dict, Any
|
|||
|
|
from pydantic import BaseModel, Field
|
|||
|
|
|
|||
|
|
from .base import AgentTool, ToolResult
|
|||
|
|
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class VulnerabilityReportInput(BaseModel):
|
|||
|
|
"""漏洞报告输入参数"""
|
|||
|
|
title: str = Field(..., description="漏洞标题")
|
|||
|
|
vulnerability_type: str = Field(
|
|||
|
|
...,
|
|||
|
|
description="漏洞类型: sql_injection, xss, ssrf, command_injection, path_traversal, idor, auth_bypass, etc."
|
|||
|
|
)
|
|||
|
|
severity: str = Field(
|
|||
|
|
...,
|
|||
|
|
description="严重程度: critical, high, medium, low, info"
|
|||
|
|
)
|
|||
|
|
description: str = Field(..., description="漏洞详细描述")
|
|||
|
|
file_path: str = Field(..., description="漏洞所在文件路径")
|
|||
|
|
line_start: Optional[int] = Field(default=None, description="起始行号")
|
|||
|
|
line_end: Optional[int] = Field(default=None, description="结束行号")
|
|||
|
|
code_snippet: Optional[str] = Field(default=None, description="相关代码片段")
|
|||
|
|
source: Optional[str] = Field(default=None, description="污点来源(用户输入点)")
|
|||
|
|
sink: Optional[str] = Field(default=None, description="危险函数(漏洞触发点)")
|
|||
|
|
poc: Optional[str] = Field(default=None, description="概念验证/利用方法")
|
|||
|
|
impact: Optional[str] = Field(default=None, description="影响分析")
|
|||
|
|
recommendation: Optional[str] = Field(default=None, description="修复建议")
|
|||
|
|
confidence: float = Field(default=0.8, description="置信度 0.0-1.0")
|
|||
|
|
cwe_id: Optional[str] = Field(default=None, description="CWE编号")
|
|||
|
|
cvss_score: Optional[float] = Field(default=None, description="CVSS评分")
|
|||
|
|
|
|||
|
|
|
|||
|
|
class CreateVulnerabilityReportTool(AgentTool):
|
|||
|
|
"""
|
|||
|
|
创建漏洞报告工具
|
|||
|
|
|
|||
|
|
这是正式记录漏洞的唯一方式。只有通过这个工具创建的漏洞才会被计入最终报告。
|
|||
|
|
这个设计确保了漏洞报告的规范性和完整性。
|
|||
|
|
|
|||
|
|
通常只有专门的报告Agent或验证Agent才会调用这个工具,
|
|||
|
|
确保漏洞在被正式报告之前已经经过了充分的验证。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
# 存储所有报告的漏洞
|
|||
|
|
_vulnerability_reports: List[Dict[str, Any]] = []
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
super().__init__()
|
|||
|
|
self._reports: List[Dict[str, Any]] = []
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def name(self) -> str:
|
|||
|
|
return "create_vulnerability_report"
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def description(self) -> str:
|
|||
|
|
return """创建正式的漏洞报告。这是记录已确认漏洞的唯一方式。
|
|||
|
|
|
|||
|
|
只有在以下情况下才应该使用此工具:
|
|||
|
|
1. 漏洞已经过充分分析和验证
|
|||
|
|
2. 有明确的证据支持漏洞存在
|
|||
|
|
3. 已经评估了漏洞的影响
|
|||
|
|
|
|||
|
|
必需参数:
|
|||
|
|
- title: 漏洞标题
|
|||
|
|
- vulnerability_type: 漏洞类型
|
|||
|
|
- severity: 严重程度 (critical/high/medium/low/info)
|
|||
|
|
- description: 详细描述
|
|||
|
|
- file_path: 文件路径
|
|||
|
|
|
|||
|
|
可选参数:
|
|||
|
|
- line_start/line_end: 行号范围
|
|||
|
|
- code_snippet: 代码片段
|
|||
|
|
- source/sink: 数据流信息
|
|||
|
|
- poc: 概念验证
|
|||
|
|
- impact: 影响分析
|
|||
|
|
- recommendation: 修复建议
|
|||
|
|
- confidence: 置信度
|
|||
|
|
- cwe_id: CWE编号
|
|||
|
|
- cvss_score: CVSS评分"""
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def args_schema(self):
|
|||
|
|
return VulnerabilityReportInput
|
|||
|
|
|
|||
|
|
async def _execute(
|
|||
|
|
self,
|
|||
|
|
title: str,
|
|||
|
|
vulnerability_type: str,
|
|||
|
|
severity: str,
|
|||
|
|
description: str,
|
|||
|
|
file_path: str,
|
|||
|
|
line_start: Optional[int] = None,
|
|||
|
|
line_end: Optional[int] = None,
|
|||
|
|
code_snippet: Optional[str] = None,
|
|||
|
|
source: Optional[str] = None,
|
|||
|
|
sink: Optional[str] = None,
|
|||
|
|
poc: Optional[str] = None,
|
|||
|
|
impact: Optional[str] = None,
|
|||
|
|
recommendation: Optional[str] = None,
|
|||
|
|
confidence: float = 0.8,
|
|||
|
|
cwe_id: Optional[str] = None,
|
|||
|
|
cvss_score: Optional[float] = None,
|
|||
|
|
**kwargs
|
|||
|
|
) -> ToolResult:
|
|||
|
|
"""创建漏洞报告"""
|
|||
|
|
|
|||
|
|
# 验证必需字段
|
|||
|
|
if not title or not title.strip():
|
|||
|
|
return ToolResult(success=False, error="标题不能为空")
|
|||
|
|
|
|||
|
|
if not description or not description.strip():
|
|||
|
|
return ToolResult(success=False, error="描述不能为空")
|
|||
|
|
|
|||
|
|
if not file_path or not file_path.strip():
|
|||
|
|
return ToolResult(success=False, error="文件路径不能为空")
|
|||
|
|
|
|||
|
|
# 验证严重程度
|
|||
|
|
valid_severities = ["critical", "high", "medium", "low", "info"]
|
|||
|
|
severity = severity.lower()
|
|||
|
|
if severity not in valid_severities:
|
|||
|
|
return ToolResult(
|
|||
|
|
success=False,
|
|||
|
|
error=f"无效的严重程度 '{severity}',必须是: {', '.join(valid_severities)}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 验证漏洞类型
|
|||
|
|
valid_types = [
|
|||
|
|
"sql_injection", "nosql_injection", "xss", "ssrf",
|
|||
|
|
"command_injection", "code_injection", "path_traversal",
|
|||
|
|
"file_inclusion", "idor", "auth_bypass", "broken_auth",
|
|||
|
|
"sensitive_data_exposure", "hardcoded_secret", "weak_crypto",
|
|||
|
|
"xxe", "deserialization", "race_condition", "business_logic",
|
|||
|
|
"csrf", "open_redirect", "mass_assignment", "other"
|
|||
|
|
]
|
|||
|
|
vulnerability_type = vulnerability_type.lower()
|
|||
|
|
if vulnerability_type not in valid_types:
|
|||
|
|
# 允许未知类型,但记录警告
|
|||
|
|
logger.warning(f"Unknown vulnerability type: {vulnerability_type}")
|
|||
|
|
|
|||
|
|
# 验证置信度
|
|||
|
|
confidence = max(0.0, min(1.0, confidence))
|
|||
|
|
|
|||
|
|
# 生成报告ID
|
|||
|
|
report_id = f"vuln_{uuid.uuid4().hex[:8]}"
|
|||
|
|
|
|||
|
|
# 构建报告
|
|||
|
|
report = {
|
|||
|
|
"id": report_id,
|
|||
|
|
"title": title.strip(),
|
|||
|
|
"vulnerability_type": vulnerability_type,
|
|||
|
|
"severity": severity,
|
|||
|
|
"description": description.strip(),
|
|||
|
|
"file_path": file_path.strip(),
|
|||
|
|
"line_start": line_start,
|
|||
|
|
"line_end": line_end,
|
|||
|
|
"code_snippet": code_snippet,
|
|||
|
|
"source": source,
|
|||
|
|
"sink": sink,
|
|||
|
|
"poc": poc,
|
|||
|
|
"impact": impact,
|
|||
|
|
"recommendation": recommendation or self._get_default_recommendation(vulnerability_type),
|
|||
|
|
"confidence": confidence,
|
|||
|
|
"cwe_id": cwe_id,
|
|||
|
|
"cvss_score": cvss_score,
|
|||
|
|
"created_at": datetime.now(timezone.utc).isoformat(),
|
|||
|
|
"is_verified": True, # 通过此工具创建的都视为已验证
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 存储报告
|
|||
|
|
self._reports.append(report)
|
|||
|
|
CreateVulnerabilityReportTool._vulnerability_reports.append(report)
|
|||
|
|
|
|||
|
|
logger.info(f"Created vulnerability report: [{severity.upper()}] {title}")
|
|||
|
|
|
|||
|
|
# 返回结果
|
|||
|
|
severity_emoji = {
|
|||
|
|
"critical": "🔴",
|
|||
|
|
"high": "🟠",
|
|||
|
|
"medium": "🟡",
|
|||
|
|
"low": "🟢",
|
|||
|
|
"info": "🔵",
|
|||
|
|
}.get(severity, "⚪")
|
|||
|
|
|
|||
|
|
return ToolResult(
|
|||
|
|
success=True,
|
|||
|
|
data={
|
|||
|
|
"message": f"漏洞报告已创建: {severity_emoji} [{severity.upper()}] {title}",
|
|||
|
|
"report_id": report_id,
|
|||
|
|
"severity": severity,
|
|||
|
|
},
|
|||
|
|
metadata=report,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def _get_default_recommendation(self, vuln_type: str) -> str:
|
|||
|
|
"""获取默认修复建议"""
|
|||
|
|
recommendations = {
|
|||
|
|
"sql_injection": "使用参数化查询或ORM,避免字符串拼接构造SQL语句",
|
|||
|
|
"xss": "对用户输入进行HTML实体编码,使用CSP策略,避免innerHTML",
|
|||
|
|
"ssrf": "验证和限制目标URL,使用白名单,禁止访问内网地址",
|
|||
|
|
"command_injection": "避免使用shell执行,使用参数列表传递命令,严格验证输入",
|
|||
|
|
"path_traversal": "规范化路径后验证,使用白名单,限制访问目录",
|
|||
|
|
"idor": "实现细粒度访问控制,验证资源所有权,使用UUID替代自增ID",
|
|||
|
|
"auth_bypass": "加强认证逻辑,实现多因素认证,定期审计认证代码",
|
|||
|
|
"hardcoded_secret": "使用环境变量或密钥管理服务存储敏感信息",
|
|||
|
|
"weak_crypto": "使用强加密算法(AES-256, SHA-256+),避免MD5/SHA1",
|
|||
|
|
"xxe": "禁用外部实体解析,使用安全的XML解析器配置",
|
|||
|
|
"deserialization": "避免反序列化不可信数据,使用JSON替代pickle/yaml",
|
|||
|
|
}
|
|||
|
|
return recommendations.get(vuln_type, "请根据具体情况修复此安全问题")
|
|||
|
|
|
|||
|
|
def get_reports(self) -> List[Dict[str, Any]]:
|
|||
|
|
"""获取所有报告"""
|
|||
|
|
return self._reports.copy()
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def get_all_reports(cls) -> List[Dict[str, Any]]:
|
|||
|
|
"""获取所有实例的报告"""
|
|||
|
|
return cls._vulnerability_reports.copy()
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def clear_all_reports(cls) -> None:
|
|||
|
|
"""清空所有报告"""
|
|||
|
|
cls._vulnerability_reports.clear()
|