fix(agent): 修复任务取消和LLM流式处理的多项问题

修复任务取消后仍可能标记为完成的问题,增加全局取消检查回调
移除事件管理器的人为延迟,防止队列堆积
为LLM流式调用增加超时机制和安全检查
增加验证阶段超时时间至10分钟
This commit is contained in:
lintsinghua 2025-12-16 22:47:04 +08:00
parent be0bdcbbdc
commit a980fa34e1
5 changed files with 111 additions and 38 deletions

View File

@ -72,8 +72,7 @@ jobs:
- name: 构建前端项目
working-directory: ./frontend
run: pnpm build
env:
VITE_USE_LOCAL_DB: 'true'
# 8. 设置 Python 环境(用于后端)
- name: 设置 Python
@ -164,6 +163,7 @@ jobs:
echo "- 🧠 **RAG 知识库增强**: 代码语义理解 + CWE/CVE 漏洞知识库" >> CHANGELOG.md
echo "- 🔒 **沙箱漏洞验证**: Docker 安全容器自动执行 PoC" >> CHANGELOG.md
echo "- 🛠️ **专业安全工具集成**: Semgrep, Bandit, Gitleaks, OSV-Scanner" >> CHANGELOG.md
echo "- 🐛 **稳定性增强**: 修复多智能体工具调用循环、UI 显示及 Docker 环境兼容性问题" >> CHANGELOG.md
echo "" >> CHANGELOG.md
echo "## 📦 下载说明" >> CHANGELOG.md
echo "" >> CHANGELOG.md

View File

@ -364,6 +364,17 @@ async def _execute_agent_task(task_id: str):
},
)
# 🔥 设置外部取消检查回调
# 这确保即使 runner.cancel() 失败Agent 也能通过 checking 全局标志感知取消
def check_global_cancel():
return is_task_cancelled(task_id)
orchestrator.set_cancel_callback(check_global_cancel)
# 同时也为子 Agent 设置(虽然 Orchestrator 会传播)
recon_agent.set_cancel_callback(check_global_cancel)
analysis_agent.set_cancel_callback(check_global_cancel)
verification_agent.set_cancel_callback(check_global_cancel)
# 注册到全局
_running_orchestrators[task_id] = orchestrator
_running_tasks[task_id] = orchestrator # 兼容旧的取消逻辑
@ -437,7 +448,13 @@ async def _execute_agent_task(task_id: str):
await _save_findings(db, task_id, findings)
# 更新任务统计
task.status = AgentTaskStatus.COMPLETED
# 🔥 CRITICAL FIX: 在设置完成前再次检查取消状态
# 避免 "取消后后端继续运行并最终标记为完成" 的问题
if is_task_cancelled(task_id):
logger.info(f"[AgentTask] Task {task_id} was cancelled, overriding success result")
task.status = AgentTaskStatus.CANCELLED
else:
task.status = AgentTaskStatus.COMPLETED
task.completed_at = datetime.now(timezone.utc)
task.current_phase = AgentTaskPhase.REPORTING
task.findings_count = len(findings)

View File

@ -485,9 +485,24 @@ class BaseAgent(ABC):
self._cancelled = True
logger.info(f"[{self.name}] Cancel requested")
# 🔥 外部取消检查回调
self._cancel_callback = None
def set_cancel_callback(self, callback) -> None:
"""设置外部取消检查回调"""
self._cancel_callback = callback
@property
def is_cancelled(self) -> bool:
return self._cancelled
"""检查是否已取消(包含内部标志和外部回调)"""
if self._cancelled:
return True
# 检查外部回调
if self._cancel_callback and self._cancel_callback():
self._cancelled = True
logger.info(f"[{self.name}] Detected cancellation from callback")
return True
return False
# ============ 协作方法 ============
@ -949,41 +964,83 @@ class BaseAgent(ABC):
logger.info(f"[{self.name}] ✅ thinking_start emitted, starting LLM stream...")
try:
async for chunk in self.llm_service.chat_completion_stream(
# 获取流式迭代器
stream = self.llm_service.chat_completion_stream(
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
):
)
# 兼容不同版本的 python async generator
iterator = stream.__aiter__()
import time
first_token_received = False
last_activity = time.time()
while True:
# 检查取消
if self.is_cancelled:
logger.info(f"[{self.name}] Cancelled during LLM streaming")
logger.info(f"[{self.name}] Cancelled during LLM streaming loop")
break
if chunk["type"] == "token":
token = chunk["content"]
accumulated = chunk["accumulated"]
await self.emit_thinking_token(token, accumulated)
# 🔥 CRITICAL: 让出控制权给事件循环,让 SSE 有机会发送事件
# 如果不这样做,所有 token 会在循环结束后一起发送
await asyncio.sleep(0)
try:
# 🔥 第一個 token 30秒超时后续 token 60秒超时
# 这是一个应用层的安全网,防止底层 LLM 客户端挂死
timeout = 30.0 if not first_token_received else 60.0
elif chunk["type"] == "done":
accumulated = chunk["content"]
if chunk.get("usage"):
total_tokens = chunk["usage"].get("total_tokens", 0)
chunk = await asyncio.wait_for(iterator.__anext__(), timeout=timeout)
last_activity = time.time()
if chunk["type"] == "token":
first_token_received = True
token = chunk["content"]
# 🔥 累积 content确保 accumulated 变量更新
# 注意:某些 adapter 返回的 chunk["accumulated"] 可能已经包含了累积值,
# 但为了安全起见,如果不一致,我们自己累积
if "accumulated" in chunk:
accumulated = chunk["accumulated"]
else:
# 如果 adapter 没返回 accumulated我们自己拼
# 注意:如果是 token 类型content 是增量
# 如果 accumulated 被覆盖了,需要小心。
# 实际上 service.py 中 chat_completion_stream 保证了 accumulated 存在
# 这里我们信任 service 层的 accumulated
pass
# Double check if accumulated is empty but we have token
if not accumulated and token:
accumulated += token # Fallback
await self.emit_thinking_token(token, accumulated)
# 🔥 CRITICAL: 让出控制权给事件循环,让 SSE 有机会发送事件
await asyncio.sleep(0)
elif chunk["type"] == "done":
accumulated = chunk["content"]
if chunk.get("usage"):
total_tokens = chunk["usage"].get("total_tokens", 0)
break
elif chunk["type"] == "error":
accumulated = chunk.get("accumulated", "")
error_msg = chunk.get("error", "Unknown error")
logger.error(f"[{self.name}] Stream error: {error_msg}")
if accumulated:
total_tokens = chunk.get("usage", {}).get("total_tokens", 0)
else:
accumulated = f"[系统错误: {error_msg}] 请重新思考并输出你的决策。"
break
except StopAsyncIteration:
break
elif chunk["type"] == "error":
accumulated = chunk.get("accumulated", "")
error_msg = chunk.get("error", "Unknown error")
logger.error(f"[{self.name}] Stream error: {error_msg}")
# 🔥 如果有部分累积内容,尝试使用它
if accumulated:
logger.warning(f"[{self.name}] Using partial accumulated content ({len(accumulated)} chars)")
total_tokens = chunk.get("usage", {}).get("total_tokens", 0)
else:
# 🔥 返回一个提示 LLM 继续的消息,而不是空字符串
accumulated = f"[系统错误: {error_msg}] 请重新思考并输出你的决策。"
except asyncio.TimeoutError:
timeout_type = "First Token" if not first_token_received else "Stream"
logger.error(f"[{self.name}] LLM {timeout_type} Timeout ({timeout}s)")
error_msg = f"LLM 响应超时 ({timeout_type}, {timeout}s)"
await self.emit_event("error", error_msg)
if not accumulated:
accumulated = f"[超时错误: {timeout}s 无响应] 请尝试简化请求或重试。"
break
except asyncio.CancelledError:
@ -993,7 +1050,6 @@ class BaseAgent(ABC):
# 🔥 增强异常处理,避免吞掉错误
logger.error(f"[{self.name}] Unexpected error in stream_llm_call: {e}", exc_info=True)
await self.emit_event("error", f"LLM 调用错误: {str(e)}")
# 返回错误提示,让 Agent 知道发生了什么
accumulated = f"[LLM调用错误: {str(e)}] 请重试。"
finally:
await self.emit_thinking_end(accumulated)

View File

@ -657,7 +657,7 @@ Action Input: {{"参数": "值"}}
agent_timeouts = {
"recon": 300, # 5 分钟
"analysis": 600, # 10 分钟
"verification": 300, # 5 分钟
"verification": 600, # 10 分钟
}
timeout = agent_timeouts.get(agent_name, 300)

View File

@ -473,10 +473,10 @@ class EventManager:
buffered_count += 1
yield buffered_event
# 🔥 为缓存事件添加小延迟,但比之前少很多(避免拖慢)
# 🔥 取消人为延迟,防止队列堆积
event_type = buffered_event.get("event_type")
if event_type == "thinking_token":
await asyncio.sleep(0.005) # 5ms for tokens (reduced from 15ms)
# if event_type == "thinking_token":
# await asyncio.sleep(0.005)
# 其他事件不加延迟,快速发送
# 检查是否是结束事件
@ -513,9 +513,9 @@ class EventManager:
yield event
# 🔥 为 thinking_token 添加微延迟确保流式效果
if event_type == "thinking_token":
await asyncio.sleep(0.01) # 10ms
# 🔥 取消人为延迟,防止队列堆积
# if event_type == "thinking_token":
# await asyncio.sleep(0.01)
# 检查是否是结束事件
if event.get("event_type") in ["task_complete", "task_error", "task_cancel"]: