CodeReview/backend/app/services/agent/json_parser.py

332 lines
11 KiB
Python
Raw Normal View History

"""
Agent JSON 解析工具
LLM 响应中安全地解析 JSON优先使用 json-repair
"""
import json
import re
import logging
from typing import Dict, Any, List, Optional, Union
logger = logging.getLogger(__name__)
# 尝试导入 json-repair 库
try:
from json_repair import repair_json
JSON_REPAIR_AVAILABLE = True
logger.info("✅ json-repair 库已加载")
except ImportError:
JSON_REPAIR_AVAILABLE = False
logger.warning("⚠️ json-repair 库未安装,将使用备用解析方法")
class AgentJsonParser:
"""Agent 专用的 JSON 解析器 - 优先使用 json-repair"""
@staticmethod
def clean_text(text: str) -> str:
"""清理文本中的控制字符"""
if not text:
return ""
# 移除 BOM 和零宽字符
text = text.replace('\ufeff', '').replace('\u200b', '').replace('\u200c', '').replace('\u200d', '')
return text
@staticmethod
def fix_json_format(text: str) -> str:
"""修复常见的 JSON 格式问题"""
text = text.strip()
# 移除尾部逗号
text = re.sub(r',(\s*[}\]])', r'\1', text)
# 修复未转义的换行符(在字符串值中)
text = re.sub(r':\s*"([^"]*)\n([^"]*)"', r': "\1\\n\2"', text)
return text
@classmethod
def extract_json_string(cls, text: str) -> str:
"""从文本中提取 JSON 字符串部分"""
# 先尝试从 markdown 代码块提取
md_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', text)
if md_match:
return md_match.group(1).strip()
# 找到第一个 { 或 [
start_brace = text.find('{')
start_bracket = text.find('[')
if start_brace == -1 and start_bracket == -1:
return text # 没有找到,返回原文
if start_brace == -1:
start_idx = start_bracket
elif start_bracket == -1:
start_idx = start_brace
else:
start_idx = min(start_brace, start_bracket)
# 找到最后一个 } 或 ]
end_brace = text.rfind('}')
end_bracket = text.rfind(']')
if end_brace == -1 and end_bracket == -1:
return text[start_idx:] # 没有找到结尾,返回从开始到末尾
end_idx = max(end_brace, end_bracket) + 1
return text[start_idx:end_idx]
@classmethod
def repair_with_library(cls, text: str) -> Dict[str, Any]:
"""使用 json-repair 库修复并解析 JSON"""
if not JSON_REPAIR_AVAILABLE:
raise ValueError("json-repair library not available")
# 提取 JSON 字符串
json_str = cls.extract_json_string(text)
if not json_str.strip():
raise ValueError("No JSON content found")
# 使用 json-repair 修复并解析
repaired = repair_json(json_str, return_objects=True)
if isinstance(repaired, dict):
return repaired
elif isinstance(repaired, list):
# 如果返回列表,包装为字典
return {"items": repaired}
elif isinstance(repaired, str):
# 如果返回字符串,尝试再次解析
return json.loads(repaired)
raise ValueError(f"json-repair returned unexpected type: {type(repaired)}")
@classmethod
def extract_from_markdown(cls, text: str) -> Dict[str, Any]:
"""从 markdown 代码块提取 JSON"""
match = re.search(r'```(?:json)?\s*(\{[\s\S]*?\})\s*```', text)
if match:
return json.loads(match.group(1))
raise ValueError("No markdown code block found")
@classmethod
def extract_json_object(cls, text: str) -> Dict[str, Any]:
"""智能提取 JSON 对象"""
start_idx = text.find('{')
if start_idx == -1:
raise ValueError("No JSON object found")
# 考虑字符串内的花括号和转义字符
brace_count = 0
in_string = False
escape_next = False
end_idx = -1
for i in range(start_idx, len(text)):
char = text[i]
if escape_next:
escape_next = False
continue
if char == '\\':
escape_next = True
continue
if char == '"' and not escape_next:
in_string = not in_string
continue
if not in_string:
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0:
end_idx = i + 1
break
if end_idx == -1:
# 如果找不到完整的 JSON尝试使用最后一个 }
last_brace = text.rfind('}')
if last_brace > start_idx:
end_idx = last_brace + 1
else:
raise ValueError("Incomplete JSON object")
json_str = text[start_idx:end_idx]
# 修复格式问题
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str)
return json.loads(json_str)
@classmethod
def fix_truncated_json(cls, text: str) -> Dict[str, Any]:
"""修复截断的 JSON"""
start_idx = text.find('{')
if start_idx == -1:
raise ValueError("Cannot fix truncated JSON")
json_str = text[start_idx:]
# 计算缺失的闭合符号
open_braces = json_str.count('{')
close_braces = json_str.count('}')
open_brackets = json_str.count('[')
close_brackets = json_str.count(']')
# 补全缺失的闭合符号
json_str += ']' * max(0, open_brackets - close_brackets)
json_str += '}' * max(0, open_braces - close_braces)
# 修复格式
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str)
return json.loads(json_str)
@classmethod
def parse(cls, text: str, default: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
LLM 响应中解析 JSON优先使用 json-repair
Args:
text: LLM 响应文本
default: 解析失败时返回的默认值如果为 None 则抛出异常
Returns:
解析后的字典
"""
if not text or not text.strip():
if default is not None:
logger.warning("LLM 响应为空,返回默认值")
return default
raise ValueError("LLM 响应内容为空")
clean = cls.clean_text(text)
# 🔥 优先使用 json-repair它能处理大多数格式问题
attempts = []
# 如果 json-repair 可用,优先使用它
if JSON_REPAIR_AVAILABLE:
attempts.append(("json-repair", lambda: cls.repair_with_library(text)))
# 然后尝试其他方法作为后备
attempts.extend([
("直接解析", lambda: json.loads(text)),
("清理后解析", lambda: json.loads(cls.fix_json_format(clean))),
("Markdown 提取", lambda: cls.extract_from_markdown(text)),
("智能提取", lambda: cls.extract_json_object(clean)),
("截断修复", lambda: cls.fix_truncated_json(clean)),
])
last_error = None
for name, attempt in attempts:
try:
result = attempt()
if result and isinstance(result, dict):
if name != "直接解析":
logger.debug(f"✅ JSON 解析成功(方法: {name}")
return result
except Exception as e:
last_error = e
logger.debug(f"JSON 解析方法 '{name}' 失败: {e}")
# 所有尝试都失败
if default is not None:
logger.warning(f"JSON 解析失败,返回默认值。原始内容: {text[:200]}...")
return default
logger.error(f"❌ 无法解析 JSON原始内容: {text[:500]}...")
raise ValueError(f"无法解析 JSON: {last_error}")
@classmethod
def parse_findings(cls, text: str) -> List[Dict[str, Any]]:
"""
专门解析 findings 列表
Args:
text: LLM 响应文本
Returns:
findings 列表每个元素都是字典
"""
try:
result = cls.parse(text, default={"findings": []})
findings = result.get("findings", [])
# 确保每个 finding 都是字典
valid_findings = []
for f in findings:
if isinstance(f, dict):
valid_findings.append(f)
elif isinstance(f, str):
# 尝试将字符串解析为 JSON
try:
# 优先使用 json-repair
if JSON_REPAIR_AVAILABLE:
parsed = repair_json(f, return_objects=True)
else:
parsed = json.loads(f)
if isinstance(parsed, dict):
valid_findings.append(parsed)
except Exception:
logger.warning(f"跳过无效的 finding字符串: {f[:100]}...")
else:
logger.warning(f"跳过无效的 finding类型: {type(f)}")
return valid_findings
except Exception as e:
logger.error(f"解析 findings 失败: {e}")
return []
@classmethod
def safe_get(cls, data: Union[Dict, str, Any], key: str, default: Any = None) -> Any:
"""
安全地从数据中获取值
Args:
data: 可能是字典或其他类型
key: 要获取的键
default: 默认值
Returns:
获取的值或默认值
"""
if isinstance(data, dict):
return data.get(key, default)
return default
@classmethod
def parse_any(cls, text: str, default: Any = None) -> Any:
"""
解析任意 JSON 类型对象数组字符串等
Args:
text: LLM 响应文本
default: 解析失败时返回的默认值
Returns:
解析后的 Python 对象
"""
if not text or not text.strip():
return default
clean = cls.clean_text(text)
json_str = cls.extract_json_string(clean)
# 优先使用 json-repair
if JSON_REPAIR_AVAILABLE:
try:
return repair_json(json_str, return_objects=True)
except Exception as e:
logger.debug(f"json-repair 解析失败: {e}")
# 后备方法
try:
return json.loads(json_str)
except Exception as e:
logger.debug(f"标准 JSON 解析失败: {e}")
return default