feat(agent): 增强API错误处理机制

添加对API错误的分类处理,包括速率限制、配额用尽、认证和连接错误
在base.py中标记API错误前缀,orchestrator.py中实现重试逻辑
litellm_adapter.py中完善错误类型识别和用户友好提示
This commit is contained in:
lintsinghua 2025-12-25 17:35:31 +08:00
parent c7632afdab
commit 39e2f43210
3 changed files with 145 additions and 8 deletions

View File

@ -1024,10 +1024,18 @@ class BaseAgent(ABC):
elif chunk["type"] == "error":
accumulated = chunk.get("accumulated", "")
error_msg = chunk.get("error", "Unknown error")
logger.error(f"[{self.name}] Stream error: {error_msg}")
if accumulated:
total_tokens = chunk.get("usage", {}).get("total_tokens", 0)
else:
error_type = chunk.get("error_type", "unknown")
user_message = chunk.get("user_message", error_msg)
logger.error(f"[{self.name}] Stream error ({error_type}): {error_msg}")
if chunk.get("usage"):
total_tokens = chunk["usage"].get("total_tokens", 0)
# 使用特殊前缀标记 API 错误,让调用方能够识别
# 格式:[API_ERROR:error_type] user_message
if error_type in ("rate_limit", "quota_exceeded", "authentication", "connection"):
accumulated = f"[API_ERROR:{error_type}] {user_message}"
elif not accumulated:
accumulated = f"[系统错误: {error_msg}] 请重新思考并输出你的决策。"
break

View File

@ -285,6 +285,55 @@ Action Input: {{"参数": "值"}}
# 重置空响应计数器
self._empty_retry_count = 0
# 🔥 检查是否是 API 错误(而非格式错误)
if llm_output.startswith("[API_ERROR:"):
# 提取错误类型和消息
match = re.match(r"\[API_ERROR:(\w+)\]\s*(.*)", llm_output)
if match:
error_type = match.group(1)
error_message = match.group(2)
if error_type == "rate_limit":
# 速率限制 - 等待后重试
api_retry_count = getattr(self, '_api_retry_count', 0) + 1
self._api_retry_count = api_retry_count
if api_retry_count >= 3:
logger.error(f"[{self.name}] Too many rate limit errors, stopping")
await self.emit_event("error", f"API 速率限制重试次数过多: {error_message}")
break
logger.warning(f"[{self.name}] Rate limit hit, waiting before retry ({api_retry_count}/3)")
await self.emit_event("warning", f"API 速率限制,等待后重试 ({api_retry_count}/3)")
await asyncio.sleep(30) # 等待 30 秒后重试
continue
elif error_type == "quota_exceeded":
# 配额用尽 - 终止任务
logger.error(f"[{self.name}] API quota exceeded: {error_message}")
await self.emit_event("error", f"API 配额已用尽: {error_message}")
break
elif error_type == "authentication":
# 认证错误 - 终止任务
logger.error(f"[{self.name}] API authentication error: {error_message}")
await self.emit_event("error", f"API 认证失败: {error_message}")
break
elif error_type == "connection":
# 连接错误 - 重试
api_retry_count = getattr(self, '_api_retry_count', 0) + 1
self._api_retry_count = api_retry_count
if api_retry_count >= 3:
logger.error(f"[{self.name}] Too many connection errors, stopping")
await self.emit_event("error", f"API 连接错误重试次数过多: {error_message}")
break
logger.warning(f"[{self.name}] Connection error, retrying ({api_retry_count}/3)")
await self.emit_event("warning", f"API 连接错误,重试中 ({api_retry_count}/3)")
await asyncio.sleep(5) # 等待 5 秒后重试
continue
# 重置 API 重试计数器(成功获取响应后)
self._api_retry_count = 0
# 解析 LLM 的决策
step = self._parse_llm_response(llm_output)

View File

@ -416,13 +416,93 @@ class LiteLLMAdapter(BaseLLMAdapter):
"finish_reason": "complete",
}
except Exception as e:
# 🔥 即使出错,也尝试返回估算的 usage
logger.error(f"Stream error: {e}")
except litellm.exceptions.RateLimitError as e:
# 速率限制错误 - 需要特殊处理
logger.error(f"Stream rate limit error: {e}")
error_msg = str(e)
# 区分"余额不足"和"频率超限"
if any(keyword in error_msg.lower() for keyword in ["余额不足", "资源包", "充值", "quota", "exceeded", "billing"]):
error_type = "quota_exceeded"
user_message = "API 配额已用尽,请检查账户余额或升级计划"
else:
error_type = "rate_limit"
# 尝试从错误消息中提取重试时间
import re
retry_match = re.search(r"retry\s*(?:in|after)\s*(\d+(?:\.\d+)?)\s*s", error_msg, re.IGNORECASE)
retry_seconds = float(retry_match.group(1)) if retry_match else 60
user_message = f"API 调用频率超限,建议等待 {int(retry_seconds)} 秒后重试"
output_tokens_estimate = estimate_tokens(accumulated_content) if accumulated_content else 0
yield {
"type": "error",
"error_type": error_type,
"error": error_msg,
"user_message": user_message,
"accumulated": accumulated_content,
"usage": {
"prompt_tokens": input_tokens_estimate,
"completion_tokens": output_tokens_estimate,
"total_tokens": input_tokens_estimate + output_tokens_estimate,
} if accumulated_content else None,
}
except litellm.exceptions.AuthenticationError as e:
# 认证错误 - API Key 无效
logger.error(f"Stream authentication error: {e}")
yield {
"type": "error",
"error_type": "authentication",
"error": str(e),
"user_message": "API Key 无效或已过期,请检查配置",
"accumulated": accumulated_content,
"usage": None,
}
except litellm.exceptions.APIConnectionError as e:
# 连接错误 - 网络问题
logger.error(f"Stream connection error: {e}")
yield {
"type": "error",
"error_type": "connection",
"error": str(e),
"user_message": "无法连接到 API 服务,请检查网络连接",
"accumulated": accumulated_content,
"usage": None,
}
except Exception as e:
# 其他错误 - 检查是否是包装的速率限制错误
error_msg = str(e)
logger.error(f"Stream error: {e}")
# 检查是否是包装的速率限制错误(如 ServiceUnavailableError 包装 RateLimitError
is_rate_limit = any(keyword in error_msg.lower() for keyword in [
"ratelimiterror", "rate limit", "429", "resource_exhausted",
"quota exceeded", "too many requests"
])
if is_rate_limit:
# 按速率限制错误处理
import re
# 检查是否是配额用尽
if any(keyword in error_msg.lower() for keyword in ["quota", "exceeded", "billing"]):
error_type = "quota_exceeded"
user_message = "API 配额已用尽,请检查账户余额或升级计划"
else:
error_type = "rate_limit"
retry_match = re.search(r"retry\s*(?:in|after)\s*(\d+(?:\.\d+)?)\s*s", error_msg, re.IGNORECASE)
retry_seconds = float(retry_match.group(1)) if retry_match else 60
user_message = f"API 调用频率超限,建议等待 {int(retry_seconds)} 秒后重试"
else:
error_type = "unknown"
user_message = "LLM 调用发生错误,请重试"
output_tokens_estimate = estimate_tokens(accumulated_content) if accumulated_content else 0
yield {
"type": "error",
"error_type": error_type,
"error": error_msg,
"user_message": user_message,
"accumulated": accumulated_content,
"usage": {
"prompt_tokens": input_tokens_estimate,