diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a42179f..eafba48 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -72,8 +72,7 @@ jobs: - name: 构建前端项目 working-directory: ./frontend run: pnpm build - env: - VITE_USE_LOCAL_DB: 'true' + # 8. 设置 Python 环境(用于后端) - name: 设置 Python @@ -164,6 +163,7 @@ jobs: echo "- 🧠 **RAG 知识库增强**: 代码语义理解 + CWE/CVE 漏洞知识库" >> CHANGELOG.md echo "- 🔒 **沙箱漏洞验证**: Docker 安全容器自动执行 PoC" >> CHANGELOG.md echo "- 🛠️ **专业安全工具集成**: Semgrep, Bandit, Gitleaks, OSV-Scanner" >> CHANGELOG.md + echo "- 🐛 **稳定性增强**: 修复多智能体工具调用循环、UI 显示及 Docker 环境兼容性问题" >> CHANGELOG.md echo "" >> CHANGELOG.md echo "## 📦 下载说明" >> CHANGELOG.md echo "" >> CHANGELOG.md diff --git a/backend/app/api/v1/endpoints/agent_tasks.py b/backend/app/api/v1/endpoints/agent_tasks.py index 814bc82..affc960 100644 --- a/backend/app/api/v1/endpoints/agent_tasks.py +++ b/backend/app/api/v1/endpoints/agent_tasks.py @@ -364,6 +364,17 @@ async def _execute_agent_task(task_id: str): }, ) + # 🔥 设置外部取消检查回调 + # 这确保即使 runner.cancel() 失败,Agent 也能通过 checking 全局标志感知取消 + def check_global_cancel(): + return is_task_cancelled(task_id) + + orchestrator.set_cancel_callback(check_global_cancel) + # 同时也为子 Agent 设置(虽然 Orchestrator 会传播) + recon_agent.set_cancel_callback(check_global_cancel) + analysis_agent.set_cancel_callback(check_global_cancel) + verification_agent.set_cancel_callback(check_global_cancel) + # 注册到全局 _running_orchestrators[task_id] = orchestrator _running_tasks[task_id] = orchestrator # 兼容旧的取消逻辑 @@ -437,7 +448,13 @@ async def _execute_agent_task(task_id: str): await _save_findings(db, task_id, findings) # 更新任务统计 - task.status = AgentTaskStatus.COMPLETED + # 🔥 CRITICAL FIX: 在设置完成前再次检查取消状态 + # 避免 "取消后后端继续运行并最终标记为完成" 的问题 + if is_task_cancelled(task_id): + logger.info(f"[AgentTask] Task {task_id} was cancelled, overriding success result") + task.status = AgentTaskStatus.CANCELLED + else: + task.status = AgentTaskStatus.COMPLETED task.completed_at = datetime.now(timezone.utc) task.current_phase = AgentTaskPhase.REPORTING task.findings_count = len(findings) diff --git a/backend/app/services/agent/agents/base.py b/backend/app/services/agent/agents/base.py index 1ad5347..a294568 100644 --- a/backend/app/services/agent/agents/base.py +++ b/backend/app/services/agent/agents/base.py @@ -485,9 +485,24 @@ class BaseAgent(ABC): self._cancelled = True logger.info(f"[{self.name}] Cancel requested") + # 🔥 外部取消检查回调 + self._cancel_callback = None + + def set_cancel_callback(self, callback) -> None: + """设置外部取消检查回调""" + self._cancel_callback = callback + @property def is_cancelled(self) -> bool: - return self._cancelled + """检查是否已取消(包含内部标志和外部回调)""" + if self._cancelled: + return True + # 检查外部回调 + if self._cancel_callback and self._cancel_callback(): + self._cancelled = True + logger.info(f"[{self.name}] Detected cancellation from callback") + return True + return False # ============ 协作方法 ============ @@ -949,41 +964,83 @@ class BaseAgent(ABC): logger.info(f"[{self.name}] ✅ thinking_start emitted, starting LLM stream...") try: - async for chunk in self.llm_service.chat_completion_stream( + # 获取流式迭代器 + stream = self.llm_service.chat_completion_stream( messages=messages, temperature=temperature, max_tokens=max_tokens, - ): + ) + # 兼容不同版本的 python async generator + iterator = stream.__aiter__() + + import time + first_token_received = False + last_activity = time.time() + + while True: # 检查取消 if self.is_cancelled: - logger.info(f"[{self.name}] Cancelled during LLM streaming") + logger.info(f"[{self.name}] Cancelled during LLM streaming loop") break - if chunk["type"] == "token": - token = chunk["content"] - accumulated = chunk["accumulated"] - await self.emit_thinking_token(token, accumulated) - # 🔥 CRITICAL: 让出控制权给事件循环,让 SSE 有机会发送事件 - # 如果不这样做,所有 token 会在循环结束后一起发送 - await asyncio.sleep(0) + try: + # 🔥 第一個 token 30秒超时,后续 token 60秒超时 + # 这是一个应用层的安全网,防止底层 LLM 客户端挂死 + timeout = 30.0 if not first_token_received else 60.0 - elif chunk["type"] == "done": - accumulated = chunk["content"] - if chunk.get("usage"): - total_tokens = chunk["usage"].get("total_tokens", 0) + chunk = await asyncio.wait_for(iterator.__anext__(), timeout=timeout) + + last_activity = time.time() + + if chunk["type"] == "token": + first_token_received = True + token = chunk["content"] + # 🔥 累积 content,确保 accumulated 变量更新 + # 注意:某些 adapter 返回的 chunk["accumulated"] 可能已经包含了累积值, + # 但为了安全起见,如果不一致,我们自己累积 + if "accumulated" in chunk: + accumulated = chunk["accumulated"] + else: + # 如果 adapter 没返回 accumulated,我们自己拼 + # 注意:如果是 token 类型,content 是增量 + # 如果 accumulated 被覆盖了,需要小心。 + # 实际上 service.py 中 chat_completion_stream 保证了 accumulated 存在 + # 这里我们信任 service 层的 accumulated + pass + + # Double check if accumulated is empty but we have token + if not accumulated and token: + accumulated += token # Fallback + + await self.emit_thinking_token(token, accumulated) + # 🔥 CRITICAL: 让出控制权给事件循环,让 SSE 有机会发送事件 + await asyncio.sleep(0) + + elif chunk["type"] == "done": + accumulated = chunk["content"] + if chunk.get("usage"): + total_tokens = chunk["usage"].get("total_tokens", 0) + break + + elif chunk["type"] == "error": + accumulated = chunk.get("accumulated", "") + error_msg = chunk.get("error", "Unknown error") + logger.error(f"[{self.name}] Stream error: {error_msg}") + if accumulated: + total_tokens = chunk.get("usage", {}).get("total_tokens", 0) + else: + accumulated = f"[系统错误: {error_msg}] 请重新思考并输出你的决策。" + break + + except StopAsyncIteration: break - - elif chunk["type"] == "error": - accumulated = chunk.get("accumulated", "") - error_msg = chunk.get("error", "Unknown error") - logger.error(f"[{self.name}] Stream error: {error_msg}") - # 🔥 如果有部分累积内容,尝试使用它 - if accumulated: - logger.warning(f"[{self.name}] Using partial accumulated content ({len(accumulated)} chars)") - total_tokens = chunk.get("usage", {}).get("total_tokens", 0) - else: - # 🔥 返回一个提示 LLM 继续的消息,而不是空字符串 - accumulated = f"[系统错误: {error_msg}] 请重新思考并输出你的决策。" + except asyncio.TimeoutError: + timeout_type = "First Token" if not first_token_received else "Stream" + logger.error(f"[{self.name}] LLM {timeout_type} Timeout ({timeout}s)") + error_msg = f"LLM 响应超时 ({timeout_type}, {timeout}s)" + await self.emit_event("error", error_msg) + if not accumulated: + accumulated = f"[超时错误: {timeout}s 无响应] 请尝试简化请求或重试。" break except asyncio.CancelledError: @@ -993,7 +1050,6 @@ class BaseAgent(ABC): # 🔥 增强异常处理,避免吞掉错误 logger.error(f"[{self.name}] Unexpected error in stream_llm_call: {e}", exc_info=True) await self.emit_event("error", f"LLM 调用错误: {str(e)}") - # 返回错误提示,让 Agent 知道发生了什么 accumulated = f"[LLM调用错误: {str(e)}] 请重试。" finally: await self.emit_thinking_end(accumulated) diff --git a/backend/app/services/agent/agents/orchestrator.py b/backend/app/services/agent/agents/orchestrator.py index 41f0b4f..d527d6a 100644 --- a/backend/app/services/agent/agents/orchestrator.py +++ b/backend/app/services/agent/agents/orchestrator.py @@ -657,7 +657,7 @@ Action Input: {{"参数": "值"}} agent_timeouts = { "recon": 300, # 5 分钟 "analysis": 600, # 10 分钟 - "verification": 300, # 5 分钟 + "verification": 600, # 10 分钟 } timeout = agent_timeouts.get(agent_name, 300) diff --git a/backend/app/services/agent/event_manager.py b/backend/app/services/agent/event_manager.py index c2d2afb..827fd40 100644 --- a/backend/app/services/agent/event_manager.py +++ b/backend/app/services/agent/event_manager.py @@ -473,10 +473,10 @@ class EventManager: buffered_count += 1 yield buffered_event - # 🔥 为缓存事件添加小延迟,但比之前少很多(避免拖慢) + # 🔥 取消人为延迟,防止队列堆积 event_type = buffered_event.get("event_type") - if event_type == "thinking_token": - await asyncio.sleep(0.005) # 5ms for tokens (reduced from 15ms) + # if event_type == "thinking_token": + # await asyncio.sleep(0.005) # 其他事件不加延迟,快速发送 # 检查是否是结束事件 @@ -513,9 +513,9 @@ class EventManager: yield event - # 🔥 为 thinking_token 添加微延迟确保流式效果 - if event_type == "thinking_token": - await asyncio.sleep(0.01) # 10ms + # 🔥 取消人为延迟,防止队列堆积 + # if event_type == "thinking_token": + # await asyncio.sleep(0.01) # 检查是否是结束事件 if event.get("event_type") in ["task_complete", "task_error", "task_cancel"]: