CodeReview/backend/app/services/llm/service.py

1001 lines
38 KiB
Python
Raw Normal View History

"""
LLM服务 - 代码分析核心服务
支持中英文双语输出
"""
import json
import re
import logging
from typing import Dict, Any, Optional, List
from .types import LLMConfig, LLMProvider, LLMMessage, LLMRequest, DEFAULT_MODELS
from .factory import LLMFactory, NATIVE_ONLY_PROVIDERS
from app.core.config import settings
# json-repair 库用于修复损坏的 JSON
try:
from json_repair import repair_json
JSON_REPAIR_AVAILABLE = True
except ImportError:
JSON_REPAIR_AVAILABLE = False
logger = logging.getLogger(__name__)
class LLMService:
"""LLM服务类"""
def __init__(self, user_config: Optional[Dict[str, Any]] = None):
"""
初始化LLM服务
Args:
user_config: 用户配置字典包含llmConfig字段
"""
self._config: Optional[LLMConfig] = None
self._user_config = user_config or {}
@property
def config(self) -> LLMConfig:
"""
获取LLM配置
🔥 锁定模式始终从环境变量.env读取
不再合并数据库中的用户配置确保系统一致性和安全性
"""
if self._config is None:
# 锁定:全部来自 settings
provider_str = settings.LLM_PROVIDER
provider = self._parse_provider(provider_str)
# API Key 优先级:平台专属配置 > 通用 LLM_API_KEY
api_key = self._get_provider_api_key(provider) or settings.LLM_API_KEY
# Base URL 优先级:通用 LLM_BASE_URL > 平台默认
base_url = settings.LLM_BASE_URL or self._get_provider_base_url(provider)
# Model
model = settings.LLM_MODEL or DEFAULT_MODELS.get(provider, 'gpt-4o-mini')
# Timeout (settings 中是秒)
timeout = int(settings.LLM_TIMEOUT)
# Temperature
temperature = float(settings.LLM_TEMPERATURE)
# Max Tokens
max_tokens = int(settings.LLM_MAX_TOKENS)
self._config = LLMConfig(
provider=provider,
api_key=api_key,
model=model,
base_url=base_url,
timeout=timeout,
temperature=temperature,
max_tokens=max_tokens,
)
return self._config
def _get_provider_api_key(self, provider: LLMProvider) -> str:
"""根据提供商从 settings 获取专属 API Key"""
provider_key_map = {
LLMProvider.OPENAI: 'OPENAI_API_KEY',
LLMProvider.GEMINI: 'GEMINI_API_KEY',
LLMProvider.CLAUDE: 'CLAUDE_API_KEY',
LLMProvider.QWEN: 'QWEN_API_KEY',
LLMProvider.DEEPSEEK: 'DEEPSEEK_API_KEY',
LLMProvider.ZHIPU: 'ZHIPU_API_KEY',
LLMProvider.MOONSHOT: 'MOONSHOT_API_KEY',
LLMProvider.BAIDU: 'BAIDU_API_KEY',
LLMProvider.MINIMAX: 'MINIMAX_API_KEY',
LLMProvider.DOUBAO: 'DOUBAO_API_KEY',
}
key_name = provider_key_map.get(provider)
if key_name:
return getattr(settings, key_name, '') or ''
return ''
def _get_provider_base_url(self, provider: LLMProvider) -> Optional[str]:
"""根据提供商获取Base URL"""
if provider == LLMProvider.OPENAI:
return getattr(settings, 'OPENAI_BASE_URL', None)
elif provider == LLMProvider.OLLAMA:
return getattr(settings, 'OLLAMA_BASE_URL', 'http://localhost:11434/v1')
return None
def _parse_provider(self, provider_str: str) -> LLMProvider:
"""解析provider字符串"""
provider_map = {
'gemini': LLMProvider.GEMINI,
'openai': LLMProvider.OPENAI,
'claude': LLMProvider.CLAUDE,
'qwen': LLMProvider.QWEN,
'deepseek': LLMProvider.DEEPSEEK,
'zhipu': LLMProvider.ZHIPU,
'moonshot': LLMProvider.MOONSHOT,
'baidu': LLMProvider.BAIDU,
'minimax': LLMProvider.MINIMAX,
'doubao': LLMProvider.DOUBAO,
'ollama': LLMProvider.OLLAMA,
}
return provider_map.get(provider_str.lower(), LLMProvider.OPENAI)
def _get_output_language(self) -> str:
"""获取输出语言配置(优先使用用户配置)"""
user_other_config = self._user_config.get('otherConfig', {})
return user_other_config.get('outputLanguage') or getattr(settings, 'OUTPUT_LANGUAGE', 'zh-CN')
def _build_system_prompt(self, is_chinese: bool) -> str:
"""构建系统提示词(支持中英文)"""
schema = """{
"issues": [
{
"type": "security|bug|performance|style|maintainability",
"severity": "critical|high|medium|low",
"title": "string",
"description": "string",
"suggestion": "string",
"line": 1,
"column": 1,
"code_snippet": "string",
"ai_explanation": "string"
}
],
"quality_score": 0-100,
"summary": {
"total_issues": number,
"critical_issues": number,
"high_issues": number,
"medium_issues": number,
"low_issues": number
},
"metrics": {
"complexity": 0-100,
"maintainability": 0-100,
"security": 0-100,
"performance": 0-100
}
}"""
if is_chinese:
return f"""⚠️⚠️⚠️ 只输出JSON禁止输出其他任何格式禁止markdown禁止文本分析
你是一个专业的代码审计助手你的任务是分析代码并返回严格符合JSON Schema的结果
最重要输出格式要求
1. 必须只输出纯JSON对象{{开始}}结束
2. 禁止在JSON前后添加任何文字说明markdown标记
3. 禁止输出```json或###等markdown语法
4. 如果是文档文件如README也必须以JSON格式输出分析结果
内容要求
1. 所有文本内容必须统一使用简体中文
2. JSON字符串值中的特殊字符必须正确转义换行用\\n双引号用\\"
3. code_snippet字段必须使用\\n表示换行
4. 保持分析简明扼要重点关注关键问题
请从以下维度全面分析代码找出所有问题
- 安全漏洞SQL注入XSS命令注入路径遍历SSRFXXE反序列化硬编码密钥等
- 潜在的 Bug 和逻辑错误
- 性能问题和优化建议
- 编码规范和代码风格
- 可维护性和可读性
重要请尽可能多地找出代码中的问题但只需报告确定的问题避免误报
输出格式必须严格符合以下 JSON Schema
{schema}
注意
- title: 问题的简短标题中文
- description: 详细描述问题中文
- suggestion: 具体的修复建议中文
- line: 问题所在的行号从1开始计数必须准确对应代码中的行号
- column: 问题所在的列号从1开始计数指向问题代码的起始位置
- code_snippet: 包含问题的代码片段包含前后1行上下文保持缩进
- ai_explanation: AI 的补充解释中文简短说明原因和危害
重要关于行号和代码片段
1. line 必须是问题代码的行号代码左侧有"行号|"标注
2. column 是问题代码在该行中的起始列位置
3. code_snippet 应该包含问题代码及其上下文去掉"行号|"前缀
4. 如果代码片段包含多行必须使用 \\n 表示换行符
严格禁止
- 禁止在任何字段中使用英文所有内容必须是简体中文
- 禁止在JSON字符串值中使用真实换行符必须用\\n转义
- 禁止输出markdown代码块标记```json
重要提醒line字段必须从代码左侧的行号标注中读取不要猜测或填0"""
else:
return f"""⚠️⚠️⚠️ OUTPUT JSON ONLY! NO OTHER FORMAT! NO MARKDOWN! NO TEXT ANALYSIS! ⚠️⚠️⚠️
You are a professional code auditing assistant. Your task is to analyze code and return results in strict JSON Schema format.
MOST IMPORTANTOutput format requirements:
1. MUST output pure JSON object only, starting with {{ and ending with }}
2. NO text, explanation, or markdown markers before or after JSON
3. NO ```json or ### markdown syntax
4. Even for document files (like README), output analysis in JSON format
Content requirements:
1. All text content MUST be in English ONLY
2. Special characters in JSON strings must be properly escaped (\\n for newlines, \\" for quotes)
3. code_snippet field MUST use \\n for newlines
4. Keep analysis concise and focused on key issues
Please comprehensively analyze the code, finding issues from the following dimensions:
- Security vulnerabilities (SQL injection, XSS, command injection, hardcoded secrets, etc.)
- Potential bugs and logical errors
- Performance issues
- Coding standards and style
- Maintainability
IMPORTANTFind as many issues as possible, but avoid false positives!
The output format MUST strictly conform to the following JSON Schema:
{schema}
Note:
- title: Brief title of the issue (in English)
- description: Detailed description of the issue (in English)
- suggestion: Specific fix suggestions (in English)
- line: Line number where the issue occurs (1-indexed)
- column: Column number where the issue starts
- code_snippet: Code snippet containing the issue (1 line context, preserve indentation)
- ai_explanation: AI's supplementary explanation (Briefly explain cause and impact)
IMPORTANTAbout line numbers and code snippets:
1. 'line' MUST be the line number from code!!! Code has "lineNumber|" prefix
2. 'column' is the starting column position
3. 'code_snippet' should include the problematic code with context, remove "lineNumber|" prefix
4. Use \\n for newlines in code snippets
STRICTLY PROHIBITED:
- NO Chinese characters in any field - English ONLY
- NO real newline characters in JSON string values
- NO markdown code block markers
CRITICAL: Read line numbers from the "lineNumber|" prefix. Do NOT guess or use 0!"""
async def analyze_code(self, code: str, language: str) -> Dict[str, Any]:
"""
分析代码并返回结构化问题
支持中英文输出
Raises:
Exception: 当LLM调用失败或返回无效响应时抛出异常
"""
# 获取输出语言配置
output_language = self._get_output_language()
is_chinese = output_language == 'zh-CN'
# 添加行号帮助LLM定位问题
code_with_lines = '\n'.join(
f"{i+1}| {line}" for i, line in enumerate(code.split('\n'))
)
# 构建系统提示词
system_prompt = self._build_system_prompt(is_chinese)
# 构建用户提示词
if is_chinese:
user_prompt = f"""编程语言: {language}
代码已标注行号格式行号| 代码内容请根据行号准确填写 line 字段
请分析以下代码:
{code_with_lines}"""
else:
user_prompt = f"""Programming Language: {language}
Code is annotated with line numbers (format: lineNumber| code), please fill the 'line' field accurately based on these numbers!
Please analyze the following code:
{code_with_lines}"""
try:
adapter = LLMFactory.create_adapter(self.config)
# 使用用户配置的 temperature如果未设置则使用 config 中的默认值)
request = LLMRequest(
messages=[
LLMMessage(role="system", content=system_prompt),
LLMMessage(role="user", content=user_prompt)
],
temperature=self.config.temperature,
max_tokens=self.config.max_tokens,
)
response = await adapter.complete(request)
content = response.content
# 记录 LLM 原始响应(用于调试)
logger.info(f"📥 LLM 原始响应长度: {len(content) if content else 0} 字符")
logger.info(f"📥 LLM 原始响应内容:\n{content}")
# 检查响应内容是否为空
if not content or not content.strip():
error_msg = f"LLM返回空响应 - Provider: {self.config.provider.value}, Model: {self.config.model}"
logger.error(error_msg)
logger.error(f"响应详情 - Finish Reason: {response.finish_reason}, Usage: {response.usage}")
raise Exception(error_msg)
# 尝试从响应中提取JSON
result = self._parse_json(content)
# 记录解析后的问题数量
issues_count = len(result.get("issues", []))
logger.info(f"📊 LLM 分析结果: 发现 {issues_count} 个问题, 质量评分: {result.get('quality_score', 'N/A')}")
# 检查解析结果是否有效(不是默认响应)
if result == self._get_default_response():
error_msg = f"无法解析LLM响应为有效的分析结果 - Provider: {self.config.provider.value}"
logger.error(error_msg)
raise Exception(error_msg)
return result
except Exception as e:
logger.error(f"LLM Analysis failed: {e}", exc_info=True)
logger.error(f"Provider: {self.config.provider.value}, Model: {self.config.model}")
# 转换为更具描述性的错误消息
error_str = str(e)
if "401" in error_str:
error_msg = f"LLM 认证失败 (401): 请检查 {self.config.provider.value} API Key 是否正确"
elif "404" in error_str:
error_msg = f"LLM 模型不存在 (404): 请检查模型名称 '{self.config.model}' 是否正确"
elif "429" in error_str:
error_msg = f"LLM 额度不足或频率限制 (429): 请检查账户余额或稍后重试"
elif "timeout" in error_str.lower():
error_msg = f"LLM 请求超时: 请检查网络连接或增加超时时间"
else:
error_msg = f"LLM 分析失败: {error_str[:200]}"
raise Exception(error_msg)
async def chat_completion(
self,
messages: List[Dict[str, str]],
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
tools: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""
🔥 Agent 使用的聊天完成接口支持工具调用
Args:
messages: 消息列表格式为 [{"role": "user", "content": "..."}]
temperature: 温度参数None 时使用用户配置
max_tokens: 最大token数None 时使用用户配置
tools: 工具描述列表可选
Returns:
包含 contentusage tool_calls 的字典
"""
# 使用用户配置作为默认值
actual_temperature = temperature if temperature is not None else self.config.temperature
actual_max_tokens = max_tokens if max_tokens is not None else self.config.max_tokens
# 转换消息格式
llm_messages = [
LLMMessage(role=msg["role"], content=msg["content"])
for msg in messages
]
request = LLMRequest(
messages=llm_messages,
temperature=actual_temperature,
max_tokens=actual_max_tokens,
tools=tools,
)
adapter = LLMFactory.create_adapter(self.config)
response = await adapter.complete(request)
result = {
"content": response.content,
"usage": {
"prompt_tokens": response.usage.prompt_tokens if response.usage else 0,
"completion_tokens": response.usage.completion_tokens if response.usage else 0,
"total_tokens": response.usage.total_tokens if response.usage else 0,
},
}
# 添加工具调用信息
if response.tool_calls:
result["tool_calls"] = response.tool_calls
return result
async def chat_completion_raw(
self,
messages: List[Dict[str, str]],
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
) -> Dict[str, Any]:
"""
🔥 Agent 使用的原始聊天完成接口兼容旧接口
Args:
messages: 消息列表格式为 [{"role": "user", "content": "..."}]
temperature: 温度参数None 时使用用户配置
max_tokens: 最大token数None 时使用用户配置
Returns:
包含 content usage 的字典
"""
# 使用用户配置作为默认值
actual_temperature = temperature if temperature is not None else self.config.temperature
actual_max_tokens = max_tokens if max_tokens is not None else self.config.max_tokens
# 转换消息格式
llm_messages = [
LLMMessage(role=msg["role"], content=msg["content"])
for msg in messages
]
request = LLMRequest(
messages=llm_messages,
temperature=actual_temperature,
max_tokens=actual_max_tokens,
)
adapter = LLMFactory.create_adapter(self.config)
response = await adapter.complete(request)
return {
"content": response.content,
"usage": {
"prompt_tokens": response.usage.prompt_tokens if response.usage else 0,
"completion_tokens": response.usage.completion_tokens if response.usage else 0,
"total_tokens": response.usage.total_tokens if response.usage else 0,
},
}
async def chat_completion_stream(
self,
messages: List[Dict[str, str]],
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
):
"""
流式聊天完成接口 token 返回
Args:
messages: 消息列表
temperature: 温度参数None 时使用用户配置
max_tokens: 最大token数None 时使用用户配置
Yields:
dict: {"type": "token", "content": str} {"type": "done", ...}
"""
# 使用用户配置作为默认值
actual_temperature = temperature if temperature is not None else self.config.temperature
actual_max_tokens = max_tokens if max_tokens is not None else self.config.max_tokens
llm_messages = [
LLMMessage(role=msg["role"], content=msg["content"])
for msg in messages
]
request = LLMRequest(
messages=llm_messages,
temperature=actual_temperature,
max_tokens=actual_max_tokens,
)
if self.config.provider in NATIVE_ONLY_PROVIDERS:
adapter = LLMFactory.create_adapter(self.config)
response = await adapter.complete(request)
content = response.content or ""
usage = None
if response.usage:
usage = {
"prompt_tokens": response.usage.prompt_tokens or 0,
"completion_tokens": response.usage.completion_tokens or 0,
"total_tokens": response.usage.total_tokens or 0,
}
if not content:
yield {
"type": "done",
"content": "",
"usage": usage,
"finish_reason": response.finish_reason or "stop",
}
else:
accumulated = ""
chunk_size = 20
for i in range(0, len(content), chunk_size):
part = content[i:i + chunk_size]
accumulated += part
yield {
"type": "token",
"content": part,
"accumulated": accumulated,
}
yield {
"type": "done",
"content": content,
"usage": usage,
"finish_reason": response.finish_reason or "stop",
}
else:
from .adapters.litellm_adapter import LiteLLMAdapter
adapter = LiteLLMAdapter(self.config)
async for chunk in adapter.stream_complete(request):
yield chunk
def _parse_json(self, text: str) -> Dict[str, Any]:
"""从LLM响应中解析JSON增强版"""
# 检查输入是否为空
if not text or not text.strip():
logger.error("LLM响应内容为空无法解析JSON")
raise ValueError("LLM响应内容为空")
def clean_text(s: str) -> str:
"""清理文本中的控制字符"""
# 移除BOM和零宽字符
s = s.replace('\ufeff', '').replace('\u200b', '').replace('\u200c', '').replace('\u200d', '')
return s
def fix_json_format(s: str) -> str:
"""修复常见的JSON格式问题"""
s = s.strip()
# 移除尾部逗号
s = re.sub(r',(\s*[}\]])', r'\1', s)
# 修复未转义的换行符(在字符串值中)
s = re.sub(r':\s*"([^"]*)\n([^"]*)"', r': "\1\\n\2"', s)
return s
def aggressive_fix_json(s: str) -> str:
"""激进的JSON修复尝试修复更多格式问题"""
s = clean_text(s)
s = s.strip()
# 找到第一个 { 和最后一个 }
start_idx = s.find('{')
if start_idx == -1:
raise ValueError("No JSON object found")
# 尝试找到最后一个 }
last_brace = s.rfind('}')
if last_brace > start_idx:
s = s[start_idx:last_brace + 1]
# 修复常见的JSON问题
# 1. 移除尾部逗号
s = re.sub(r',(\s*[}\]])', r'\1', s)
# 2. 修复单引号为双引号(仅在键名中,小心处理)
s = re.sub(r"'(\w+)'\s*:", r'"\1":', s)
# 3. 修复未转义的控制字符(在字符串值中,但不在键名中)
# 只移除不在引号内的控制字符,或未转义的换行符/制表符
lines = []
in_string = False
escape_next = False
for char in s:
if escape_next:
escape_next = False
lines.append(char)
continue
if char == '\\':
escape_next = True
lines.append(char)
continue
if char == '"':
in_string = not in_string
lines.append(char)
continue
# 如果在字符串外,移除控制字符;如果在字符串内,保留(假设已转义)
if not in_string and ord(char) < 32 and char not in ['\n', '\t', '\r']:
continue # 跳过控制字符
lines.append(char)
s = ''.join(lines)
return s
# 尝试多种方式解析
attempts = [
# 1. 直接解析
lambda: json.loads(text),
# 2. 清理后解析
lambda: json.loads(fix_json_format(clean_text(text))),
# 3. 从markdown代码块提取
lambda: self._extract_from_markdown(text),
# 4. 智能提取JSON对象
lambda: self._extract_json_object(clean_text(text)),
# 5. 修复截断的JSON
lambda: self._fix_truncated_json(clean_text(text)),
# 6. 激进修复后解析
lambda: json.loads(aggressive_fix_json(text)),
# 7. 使用 json-repair 库作为最终兜底方案
lambda: self._repair_json_with_library(text),
]
last_error = None
for i, attempt in enumerate(attempts):
try:
result = attempt()
if result and isinstance(result, dict):
if i > 0:
logger.info(f"✅ JSON解析成功方法 {i + 1}/{len(attempts)}")
return result
except Exception as e:
last_error = e
if i == 0:
logger.debug(f"直接解析失败,尝试其他方法... {e}")
# 所有尝试都失败
logger.error("❌ 无法解析LLM响应为JSON")
logger.error(f"原始内容长度: {len(text)} 字符")
logger.error(f"原始内容前500字符: {text[:500]}")
logger.error(f"原始内容后500字符: {text[-500:] if len(text) > 500 else text}")
if last_error:
logger.error(f"最后错误: {type(last_error).__name__}: {str(last_error)}")
# 抛出异常而不是返回默认响应
raise ValueError(f"无法解析LLM响应为有效的JSON格式: {str(last_error) if last_error else '未知错误'}")
def _extract_from_markdown(self, text: str) -> Dict[str, Any]:
"""从markdown代码块提取JSON"""
match = re.search(r'```(?:json)?\s*(\{[\s\S]*?\})\s*```', text)
if match:
return json.loads(match.group(1))
raise ValueError("No markdown code block found")
def _extract_json_object(self, text: str) -> Dict[str, Any]:
"""智能提取JSON对象"""
start_idx = text.find('{')
if start_idx == -1:
raise ValueError("No JSON object found")
# 考虑字符串内的花括号和转义字符
brace_count = 0
bracket_count = 0
in_string = False
escape_next = False
end_idx = -1
for i in range(start_idx, len(text)):
char = text[i]
if escape_next:
escape_next = False
continue
if char == '\\':
escape_next = True
continue
if char == '"' and not escape_next:
in_string = not in_string
continue
if not in_string:
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0 and bracket_count == 0:
end_idx = i + 1
break
elif char == '[':
bracket_count += 1
elif char == ']':
bracket_count -= 1
if end_idx == -1:
# 如果找不到完整的JSON尝试使用最后一个 }
last_brace = text.rfind('}')
if last_brace > start_idx:
end_idx = last_brace + 1
else:
raise ValueError("Incomplete JSON object")
json_str = text[start_idx:end_idx]
# 修复格式问题
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str)
# 尝试修复未闭合的括号
open_braces = json_str.count('{') - json_str.count('}')
open_brackets = json_str.count('[') - json_str.count(']')
if open_braces > 0:
json_str += '}' * open_braces
if open_brackets > 0:
json_str += ']' * open_brackets
return json.loads(json_str)
def _fix_truncated_json(self, text: str) -> Dict[str, Any]:
"""修复截断的JSON"""
start_idx = text.find('{')
if start_idx == -1:
raise ValueError("Cannot fix truncated JSON")
json_str = text[start_idx:]
# 计算缺失的闭合符号
open_braces = json_str.count('{')
close_braces = json_str.count('}')
open_brackets = json_str.count('[')
close_brackets = json_str.count(']')
# 补全缺失的闭合符号
json_str += ']' * max(0, open_brackets - close_brackets)
json_str += '}' * max(0, open_braces - close_braces)
# 修复格式
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str)
return json.loads(json_str)
def _repair_json_with_library(self, text: str) -> Dict[str, Any]:
"""使用 json-repair 库修复损坏的 JSON兜底方案"""
if not JSON_REPAIR_AVAILABLE:
raise ValueError("json-repair library not available")
# 先尝试提取 JSON 部分
start_idx = text.find('{')
if start_idx == -1:
raise ValueError("No JSON object found for repair")
# 尝试找到最后一个 }
end_idx = text.rfind('}')
if end_idx > start_idx:
json_str = text[start_idx:end_idx + 1]
else:
json_str = text[start_idx:]
# 使用 json-repair 修复
repaired = repair_json(json_str, return_objects=True)
if isinstance(repaired, dict):
logger.info("✅ json-repair 库成功修复 JSON")
return repaired
raise ValueError(f"json-repair returned unexpected type: {type(repaired)}")
def _get_default_response(self) -> Dict[str, Any]:
"""返回默认响应"""
return {
"issues": [],
"quality_score": 80,
"summary": {
"total_issues": 0,
"critical_issues": 0,
"high_issues": 0,
"medium_issues": 0,
"low_issues": 0
},
"metrics": {
"complexity": 80,
"maintainability": 80,
"security": 80,
"performance": 80
}
}
async def analyze_code_with_custom_prompt(
self,
code: str,
language: str,
custom_prompt: str,
rules: Optional[list] = None,
output_language: Optional[str] = None
) -> Dict[str, Any]:
"""
使用自定义提示词分析代码
Args:
code: 要分析的代码
language: 编程语言
custom_prompt: 自定义系统提示词
rules: 可选的审计规则列表
output_language: 输出语言 (zh/en)如果不指定则使用系统配置
"""
if output_language:
is_chinese = output_language == 'zh'
else:
system_output_language = self._get_output_language()
is_chinese = system_output_language == 'zh-CN'
# 添加行号
code_with_lines = '\n'.join(
f"{i+1}| {line}" for i, line in enumerate(code.split('\n'))
)
# 构建规则提示词
rules_prompt = ""
if rules:
rules_prompt = "\n\n【审计规则】请特别关注以下规则:\n"
for rule in rules:
if rule.get('enabled', True):
rules_prompt += f"- [{rule.get('rule_code', '')}] {rule.get('name', '')}: {rule.get('description', '')}\n"
if rule.get('custom_prompt'):
rules_prompt += f" 检测要点: {rule.get('custom_prompt')}\n"
# JSON Schema
schema = """{
"issues": [
{
"type": "security|bug|performance|style|maintainability",
"severity": "critical|high|medium|low",
"title": "string",
"description": "string",
"suggestion": "string",
"line": 1,
"column": 1,
"code_snippet": "string",
"rule_code": "string (optional, if matched a specific rule)"
}
],
"quality_score": 0-100,
"summary": {
"total_issues": number,
"critical_issues": number,
"high_issues": number,
"medium_issues": number,
"low_issues": number
}
}"""
# 构建完整的系统提示词
if is_chinese:
format_instruction = f"""
输出格式要求
1. 必须只输出纯JSON对象
2. 禁止在JSON前后添加任何文字说明markdown标记
3. 所有文本字段title, description, suggestion等必须使用中文输出
4. 输出格式必须符合以下 JSON Schema
{schema}
{rules_prompt}"""
else:
format_instruction = f"""
Output Format Requirements
1. Must output pure JSON object only
2. Do not add any text, explanation, or markdown markers before or after JSON
3. All text fields (title, description, suggestion, etc.) must be in English
4. Output format must conform to the following JSON Schema:
{schema}
{rules_prompt}"""
full_system_prompt = custom_prompt + format_instruction
# 构建用户提示词
if is_chinese:
user_prompt = f"""编程语言: {language}
代码已标注行号格式行号| 代码内容请根据行号准确填写 line 字段
请分析以下代码:
{code_with_lines}"""
else:
user_prompt = f"""Programming Language: {language}
Code is annotated with line numbers (format: lineNumber| code), please fill the 'line' field accurately.
Please analyze the following code:
{code_with_lines}"""
try:
adapter = LLMFactory.create_adapter(self.config)
# 使用用户配置的 temperature 和 max_tokens
request = LLMRequest(
messages=[
LLMMessage(role="system", content=full_system_prompt),
LLMMessage(role="user", content=user_prompt)
],
temperature=self.config.temperature,
max_tokens=self.config.max_tokens,
)
response = await adapter.complete(request)
content = response.content
if not content or not content.strip():
raise Exception("LLM返回空响应")
result = self._parse_json(content)
return result
except Exception as e:
logger.error(f"Custom prompt analysis failed: {e}", exc_info=True)
raise
async def analyze_code_with_rules(
self,
code: str,
language: str,
rule_set_id: Optional[str] = None,
prompt_template_id: Optional[str] = None,
db_session = None,
use_default_template: bool = True
) -> Dict[str, Any]:
"""
使用指定的规则集和提示词模板分析代码
Args:
code: 要分析的代码
language: 编程语言
rule_set_id: 规则集ID可选
prompt_template_id: 提示词模板ID可选
db_session: 数据库会话
use_default_template: 当没有指定模板时是否使用数据库中的默认模板
"""
custom_prompt = None
rules = None
if db_session:
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
from app.models.prompt_template import PromptTemplate
# 获取提示词模板
if prompt_template_id:
result = await db_session.execute(
select(PromptTemplate).where(PromptTemplate.id == prompt_template_id)
)
template = result.scalar_one_or_none()
if template:
output_language = self._get_output_language()
custom_prompt = template.content_zh if output_language == 'zh-CN' else template.content_en
elif use_default_template:
# 没有指定模板时,使用数据库中的默认模板
result = await db_session.execute(
select(PromptTemplate).where(
PromptTemplate.is_default == True,
PromptTemplate.is_active == True,
PromptTemplate.template_type == 'system'
)
)
template = result.scalar_one_or_none()
if template:
output_language = self._get_output_language()
custom_prompt = template.content_zh if output_language == 'zh-CN' else template.content_en
logger.info(f"📋 使用默认提示词模板: {template.name}")
# 获取规则集
if rule_set_id:
from app.models.audit_rule import AuditRuleSet
result = await db_session.execute(
select(AuditRuleSet)
.options(selectinload(AuditRuleSet.rules))
.where(AuditRuleSet.id == rule_set_id)
)
rule_set = result.scalar_one_or_none()
if rule_set and rule_set.rules:
rules = [
{
"rule_code": r.rule_code,
"name": r.name,
"description": r.description,
"category": r.category,
"severity": r.severity,
"custom_prompt": r.custom_prompt,
"enabled": r.enabled,
}
for r in rule_set.rules if r.enabled
]
# 如果有自定义提示词,使用自定义分析
if custom_prompt:
return await self.analyze_code_with_custom_prompt(code, language, custom_prompt, rules)
# 否则使用硬编码的默认分析(兜底)
return await self.analyze_code(code, language)
# 全局服务实例
llm_service = LLMService()