CodeReview/backend/app/services/llm/service.py

587 lines
23 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
LLM服务 - 代码分析核心服务
支持中英文双语输出
"""
import json
import re
import logging
from typing import Dict, Any, Optional
from .types import LLMConfig, LLMProvider, LLMMessage, LLMRequest, DEFAULT_MODELS
from .factory import LLMFactory
from app.core.config import settings
logger = logging.getLogger(__name__)
class LLMService:
"""LLM服务类"""
def __init__(self, user_config: Optional[Dict[str, Any]] = None):
"""
初始化LLM服务
Args:
user_config: 用户配置字典包含llmConfig字段
"""
self._config: Optional[LLMConfig] = None
self._user_config = user_config or {}
@property
def config(self) -> LLMConfig:
"""获取LLM配置优先使用用户配置然后使用系统配置"""
if self._config is None:
user_llm_config = self._user_config.get('llmConfig', {})
# 优先使用用户配置的provider否则使用系统配置
provider_str = user_llm_config.get('llmProvider') or getattr(settings, 'LLM_PROVIDER', 'openai')
provider = self._parse_provider(provider_str)
# 获取API Key - 优先级:用户配置 > 系统通用配置 > 系统平台专属配置
api_key = (
user_llm_config.get('llmApiKey') or
getattr(settings, 'LLM_API_KEY', '') or
self._get_provider_api_key_from_user_config(provider, user_llm_config) or
self._get_provider_api_key(provider)
)
# 获取Base URL
base_url = (
user_llm_config.get('llmBaseUrl') or
getattr(settings, 'LLM_BASE_URL', None) or
self._get_provider_base_url(provider)
)
# 获取模型
model = (
user_llm_config.get('llmModel') or
getattr(settings, 'LLM_MODEL', '') or
DEFAULT_MODELS.get(provider, 'gpt-4o-mini')
)
# 获取超时时间(用户配置是毫秒,系统配置是秒)
timeout_ms = user_llm_config.get('llmTimeout')
if timeout_ms:
# 用户配置是毫秒,转换为秒
timeout = int(timeout_ms / 1000) if timeout_ms > 1000 else int(timeout_ms)
else:
# 系统配置是秒
timeout = int(getattr(settings, 'LLM_TIMEOUT', 150))
# 获取温度
temperature = user_llm_config.get('llmTemperature') if user_llm_config.get('llmTemperature') is not None else float(getattr(settings, 'LLM_TEMPERATURE', 0.1))
# 获取最大token数
max_tokens = user_llm_config.get('llmMaxTokens') or int(getattr(settings, 'LLM_MAX_TOKENS', 4096))
self._config = LLMConfig(
provider=provider,
api_key=api_key,
model=model,
base_url=base_url,
timeout=timeout,
temperature=temperature,
max_tokens=max_tokens,
)
return self._config
def _get_provider_api_key_from_user_config(self, provider: LLMProvider, user_llm_config: Dict[str, Any]) -> Optional[str]:
"""从用户配置中获取平台专属API Key"""
provider_key_map = {
LLMProvider.OPENAI: 'openaiApiKey',
LLMProvider.GEMINI: 'geminiApiKey',
LLMProvider.CLAUDE: 'claudeApiKey',
LLMProvider.QWEN: 'qwenApiKey',
LLMProvider.DEEPSEEK: 'deepseekApiKey',
LLMProvider.ZHIPU: 'zhipuApiKey',
LLMProvider.MOONSHOT: 'moonshotApiKey',
LLMProvider.BAIDU: 'baiduApiKey',
LLMProvider.MINIMAX: 'minimaxApiKey',
LLMProvider.DOUBAO: 'doubaoApiKey',
}
key_name = provider_key_map.get(provider)
if key_name:
return user_llm_config.get(key_name)
return None
def _get_provider_api_key(self, provider: LLMProvider) -> str:
"""根据提供商获取API Key"""
provider_key_map = {
LLMProvider.OPENAI: 'OPENAI_API_KEY',
LLMProvider.GEMINI: 'GEMINI_API_KEY',
LLMProvider.CLAUDE: 'CLAUDE_API_KEY',
LLMProvider.QWEN: 'QWEN_API_KEY',
LLMProvider.DEEPSEEK: 'DEEPSEEK_API_KEY',
LLMProvider.ZHIPU: 'ZHIPU_API_KEY',
LLMProvider.MOONSHOT: 'MOONSHOT_API_KEY',
LLMProvider.BAIDU: 'BAIDU_API_KEY',
LLMProvider.MINIMAX: 'MINIMAX_API_KEY',
LLMProvider.DOUBAO: 'DOUBAO_API_KEY',
LLMProvider.OLLAMA: None, # Ollama 不需要 API Key
}
key_name = provider_key_map.get(provider)
if key_name:
return getattr(settings, key_name, '') or ''
return 'ollama' # Ollama的默认值
def _get_provider_base_url(self, provider: LLMProvider) -> Optional[str]:
"""根据提供商获取Base URL"""
if provider == LLMProvider.OPENAI:
return getattr(settings, 'OPENAI_BASE_URL', None)
elif provider == LLMProvider.OLLAMA:
return getattr(settings, 'OLLAMA_BASE_URL', 'http://localhost:11434/v1')
return None
def _parse_provider(self, provider_str: str) -> LLMProvider:
"""解析provider字符串"""
provider_map = {
'gemini': LLMProvider.GEMINI,
'openai': LLMProvider.OPENAI,
'claude': LLMProvider.CLAUDE,
'qwen': LLMProvider.QWEN,
'deepseek': LLMProvider.DEEPSEEK,
'zhipu': LLMProvider.ZHIPU,
'moonshot': LLMProvider.MOONSHOT,
'baidu': LLMProvider.BAIDU,
'minimax': LLMProvider.MINIMAX,
'doubao': LLMProvider.DOUBAO,
'ollama': LLMProvider.OLLAMA,
}
return provider_map.get(provider_str.lower(), LLMProvider.OPENAI)
def _get_output_language(self) -> str:
"""获取输出语言配置(优先使用用户配置)"""
user_other_config = self._user_config.get('otherConfig', {})
return user_other_config.get('outputLanguage') or getattr(settings, 'OUTPUT_LANGUAGE', 'zh-CN')
def _build_system_prompt(self, is_chinese: bool) -> str:
"""构建系统提示词(支持中英文)"""
schema = """{
"issues": [
{
"type": "security|bug|performance|style|maintainability",
"severity": "critical|high|medium|low",
"title": "string",
"description": "string",
"suggestion": "string",
"line": 1,
"column": 1,
"code_snippet": "string",
"ai_explanation": "string",
"xai": {
"what": "string",
"why": "string",
"how": "string",
"learn_more": "string(optional)"
}
}
],
"quality_score": 0-100,
"summary": {
"total_issues": number,
"critical_issues": number,
"high_issues": number,
"medium_issues": number,
"low_issues": number
},
"metrics": {
"complexity": 0-100,
"maintainability": 0-100,
"security": 0-100,
"performance": 0-100
}
}"""
if is_chinese:
return f"""⚠️⚠️⚠️ 只输出JSON禁止输出其他任何格式禁止markdown禁止文本分析
你是一个专业的代码审计助手。你的任务是分析代码并返回严格符合JSON Schema的结果。
【最重要】输出格式要求:
1. 必须只输出纯JSON对象{{开始,到}}结束
2. 禁止在JSON前后添加任何文字、说明、markdown标记
3. 禁止输出```json或###等markdown语法
4. 如果是文档文件如README也必须以JSON格式输出分析结果
【内容要求】:
1. 所有文本内容必须统一使用简体中文
2. JSON字符串值中的特殊字符必须正确转义换行用\\n双引号用\\",反斜杠用\\\\
3. code_snippet字段必须使用\\n表示换行
请从以下维度全面分析代码:
- 编码规范和代码风格
- 潜在的 Bug 和逻辑错误
- 性能问题和优化建议
- 安全漏洞和风险
- 可维护性和可读性
- 最佳实践和设计模式
输出格式必须严格符合以下 JSON Schema
{schema}
注意:
- title: 问题的简短标题(中文)
- description: 详细描述问题(中文)
- suggestion: 具体的修复建议(中文)
- line: 问题所在的行号从1开始计数必须准确对应代码中的行号
- column: 问题所在的列号从1开始计数指向问题代码的起始位置
- code_snippet: 包含问题的代码片段建议包含问题行及其前后1-2行作为上下文保持原始缩进格式
- ai_explanation: AI 的深入解释(中文)
- xai.what: 这是什么问题(中文)
- xai.why: 为什么会有这个问题(中文)
- xai.how: 如何修复这个问题(中文)
【重要】关于行号和代码片段:
1. line 必须是问题代码的行号!!!代码左侧有"行号|"标注,例如"25| const x = 1"表示第25行line字段必须填25
2. column 是问题代码在该行中的起始列位置从1开始不包括"行号|"前缀部分)
3. code_snippet 应该包含问题代码及其上下文前后各1-2行去掉"行号|"前缀,保持原始代码的缩进
4. 如果代码片段包含多行,必须使用 \\n 表示换行符这是JSON的要求
5. 如果无法确定准确的行号不要填写line和column字段不要填0
【严格禁止】:
- 禁止在任何字段中使用英文,所有内容必须是简体中文
- 禁止在JSON字符串值中使用真实换行符必须用\\n转义
- 禁止输出markdown代码块标记如```json
⚠️ 重要提醒line字段必须从代码左侧的行号标注中读取不要猜测或填0"""
else:
return f"""⚠️⚠️⚠️ OUTPUT JSON ONLY! NO OTHER FORMAT! NO MARKDOWN! NO TEXT ANALYSIS! ⚠️⚠️⚠️
You are a professional code auditing assistant. Your task is to analyze code and return results in strict JSON Schema format.
【MOST IMPORTANT】Output format requirements:
1. MUST output pure JSON object only, starting with {{ and ending with }}
2. NO text, explanation, or markdown markers before or after JSON
3. NO ```json or ### markdown syntax
4. Even for document files (like README), output analysis in JSON format
【Content requirements】:
1. All text content MUST be in English ONLY
2. Special characters in JSON strings must be properly escaped (\\n for newlines, \\" for quotes, \\\\ for backslashes)
3. code_snippet field MUST use \\n for newlines
Please comprehensively analyze the code from the following dimensions:
- Coding standards and code style
- Potential bugs and logical errors
- Performance issues and optimization suggestions
- Security vulnerabilities and risks
- Maintainability and readability
- Best practices and design patterns
The output format MUST strictly conform to the following JSON Schema:
{schema}
Note:
- title: Brief title of the issue (in English)
- description: Detailed description of the issue (in English)
- suggestion: Specific fix suggestions (in English)
- line: Line number where the issue occurs (1-indexed, must accurately correspond to the line in the code)
- column: Column number where the issue starts (1-indexed, pointing to the start position of the problematic code)
- code_snippet: Code snippet containing the issue (should include the problem line plus 1-2 lines before and after for context, preserve original indentation)
- ai_explanation: AI's in-depth explanation (in English)
- xai.what: What is this issue (in English)
- xai.why: Why does this issue exist (in English)
- xai.how: How to fix this issue (in English)
【IMPORTANT】About line numbers and code snippets:
1. 'line' MUST be the line number from code!!! Code has "lineNumber|" prefix, e.g. "25| const x = 1" means line 25, you MUST set line to 25
2. 'column' is the starting column position in that line (1-indexed, excluding the "lineNumber|" prefix)
3. 'code_snippet' should include the problematic code with context (1-2 lines before/after), remove "lineNumber|" prefix, preserve indentation
4. If code snippet has multiple lines, use \\n for newlines (JSON requirement)
5. If you cannot determine the exact line number, do NOT fill line and column fields (don't use 0)
【STRICTLY PROHIBITED】:
- NO Chinese characters in any field - English ONLY
- NO real newline characters in JSON string values - must use \\n
- NO markdown code block markers (like ```json)
⚠️ CRITICAL: Read line numbers from the "lineNumber|" prefix on the left of each code line. Do NOT guess or use 0!"""
async def analyze_code(self, code: str, language: str) -> Dict[str, Any]:
"""
分析代码并返回结构化问题
支持中英文输出
"""
# 获取输出语言配置
output_language = self._get_output_language()
is_chinese = output_language == 'zh-CN'
# 添加行号帮助LLM定位问题
code_with_lines = '\n'.join(
f"{i+1}| {line}" for i, line in enumerate(code.split('\n'))
)
# 构建系统提示词
system_prompt = self._build_system_prompt(is_chinese)
# 构建用户提示词
if is_chinese:
user_prompt = f"""编程语言: {language}
⚠️ 代码已标注行号(格式:行号| 代码内容),请根据行号准确填写 line 字段!
请分析以下代码:
{code_with_lines}"""
else:
user_prompt = f"""Programming Language: {language}
⚠️ Code is annotated with line numbers (format: lineNumber| code), please fill the 'line' field accurately based on these numbers!
Please analyze the following code:
{code_with_lines}"""
try:
adapter = LLMFactory.create_adapter(self.config)
request = LLMRequest(
messages=[
LLMMessage(role="system", content=system_prompt),
LLMMessage(role="user", content=user_prompt)
],
temperature=0.1,
)
response = await adapter.complete(request)
content = response.content
# 检查响应内容是否为空
if not content or not content.strip():
logger.warning(f"LLM返回空响应 - Provider: {self.config.provider.value}, Model: {self.config.model}")
logger.warning(f"响应详情 - Finish Reason: {response.finish_reason}, Usage: {response.usage}")
return self._get_default_response()
# 尝试从响应中提取JSON
result = self._parse_json(content)
return result
except Exception as e:
logger.error(f"LLM Analysis failed: {e}", exc_info=True)
logger.error(f"Provider: {self.config.provider.value}, Model: {self.config.model}")
return self._get_default_response()
def _parse_json(self, text: str) -> Dict[str, Any]:
"""从LLM响应中解析JSON增强版"""
# 检查输入是否为空
if not text or not text.strip():
logger.warning("LLM响应内容为空无法解析JSON")
return self._get_default_response()
def clean_text(s: str) -> str:
"""清理文本中的控制字符"""
# 移除BOM和零宽字符
s = s.replace('\ufeff', '').replace('\u200b', '').replace('\u200c', '').replace('\u200d', '')
return s
def fix_json_format(s: str) -> str:
"""修复常见的JSON格式问题"""
s = s.strip()
# 移除尾部逗号
s = re.sub(r',(\s*[}\]])', r'\1', s)
# 修复未转义的换行符(在字符串值中)
s = re.sub(r':\s*"([^"]*)\n([^"]*)"', r': "\1\\n\2"', s)
return s
def aggressive_fix_json(s: str) -> str:
"""激进的JSON修复尝试修复更多格式问题"""
s = clean_text(s)
s = s.strip()
# 找到第一个 { 和最后一个 }
start_idx = s.find('{')
if start_idx == -1:
raise ValueError("No JSON object found")
# 尝试找到最后一个 }
last_brace = s.rfind('}')
if last_brace > start_idx:
s = s[start_idx:last_brace + 1]
# 修复常见的JSON问题
# 1. 移除尾部逗号
s = re.sub(r',(\s*[}\]])', r'\1', s)
# 2. 修复单引号为双引号(仅在键名中,小心处理)
s = re.sub(r"'(\w+)'\s*:", r'"\1":', s)
# 3. 修复未转义的控制字符(在字符串值中,但不在键名中)
# 只移除不在引号内的控制字符,或未转义的换行符/制表符
lines = []
in_string = False
escape_next = False
for char in s:
if escape_next:
escape_next = False
lines.append(char)
continue
if char == '\\':
escape_next = True
lines.append(char)
continue
if char == '"':
in_string = not in_string
lines.append(char)
continue
# 如果在字符串外,移除控制字符;如果在字符串内,保留(假设已转义)
if not in_string and ord(char) < 32 and char not in ['\n', '\t', '\r']:
continue # 跳过控制字符
lines.append(char)
s = ''.join(lines)
return s
# 尝试多种方式解析
attempts = [
# 1. 直接解析
lambda: json.loads(text),
# 2. 清理后解析
lambda: json.loads(fix_json_format(clean_text(text))),
# 3. 从markdown代码块提取
lambda: self._extract_from_markdown(text),
# 4. 智能提取JSON对象
lambda: self._extract_json_object(clean_text(text)),
# 5. 修复截断的JSON
lambda: self._fix_truncated_json(clean_text(text)),
# 6. 激进修复后解析
lambda: json.loads(aggressive_fix_json(text)),
]
last_error = None
for i, attempt in enumerate(attempts):
try:
result = attempt()
if result and isinstance(result, dict):
if i > 0:
logger.info(f"✅ JSON解析成功方法 {i + 1}/{len(attempts)}")
return result
except Exception as e:
last_error = e
if i == 0:
logger.debug(f"直接解析失败,尝试其他方法... {e}")
# 所有尝试都失败
logger.warning("⚠️ 无法解析LLM响应为JSON")
logger.warning(f"原始内容长度: {len(text)} 字符")
logger.warning(f"原始内容前500字符: {text[:500]}")
logger.warning(f"原始内容后500字符: {text[-500:] if len(text) > 500 else text}")
if last_error:
logger.warning(f"最后错误: {type(last_error).__name__}: {str(last_error)}")
return self._get_default_response()
def _extract_from_markdown(self, text: str) -> Dict[str, Any]:
"""从markdown代码块提取JSON"""
match = re.search(r'```(?:json)?\s*(\{[\s\S]*?\})\s*```', text)
if match:
return json.loads(match.group(1))
raise ValueError("No markdown code block found")
def _extract_json_object(self, text: str) -> Dict[str, Any]:
"""智能提取JSON对象"""
start_idx = text.find('{')
if start_idx == -1:
raise ValueError("No JSON object found")
# 考虑字符串内的花括号和转义字符
brace_count = 0
bracket_count = 0
in_string = False
escape_next = False
end_idx = -1
for i in range(start_idx, len(text)):
char = text[i]
if escape_next:
escape_next = False
continue
if char == '\\':
escape_next = True
continue
if char == '"' and not escape_next:
in_string = not in_string
continue
if not in_string:
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0 and bracket_count == 0:
end_idx = i + 1
break
elif char == '[':
bracket_count += 1
elif char == ']':
bracket_count -= 1
if end_idx == -1:
# 如果找不到完整的JSON尝试使用最后一个 }
last_brace = text.rfind('}')
if last_brace > start_idx:
end_idx = last_brace + 1
else:
raise ValueError("Incomplete JSON object")
json_str = text[start_idx:end_idx]
# 修复格式问题
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str)
# 尝试修复未闭合的括号
open_braces = json_str.count('{') - json_str.count('}')
open_brackets = json_str.count('[') - json_str.count(']')
if open_braces > 0:
json_str += '}' * open_braces
if open_brackets > 0:
json_str += ']' * open_brackets
return json.loads(json_str)
def _fix_truncated_json(self, text: str) -> Dict[str, Any]:
"""修复截断的JSON"""
start_idx = text.find('{')
if start_idx == -1:
raise ValueError("Cannot fix truncated JSON")
json_str = text[start_idx:]
# 计算缺失的闭合符号
open_braces = json_str.count('{')
close_braces = json_str.count('}')
open_brackets = json_str.count('[')
close_brackets = json_str.count(']')
# 补全缺失的闭合符号
json_str += ']' * max(0, open_brackets - close_brackets)
json_str += '}' * max(0, open_braces - close_braces)
# 修复格式
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str)
return json.loads(json_str)
def _get_default_response(self) -> Dict[str, Any]:
"""返回默认响应"""
return {
"issues": [],
"quality_score": 80,
"summary": {
"total_issues": 0,
"critical_issues": 0,
"high_issues": 0,
"medium_issues": 0,
"low_issues": 0
},
"metrics": {
"complexity": 80,
"maintainability": 80,
"security": 80,
"performance": 80
}
}
# 全局服务实例
llm_service = LLMService()