diff --git a/backend/app/services/agent/agents/base.py b/backend/app/services/agent/agents/base.py index 337b063..c3cbf51 100644 --- a/backend/app/services/agent/agents/base.py +++ b/backend/app/services/agent/agents/base.py @@ -942,7 +942,9 @@ class BaseAgent(ABC): logger.info(f"[{self.name}] Cancelled before LLM call") return "", 0 + logger.info(f"[{self.name}] 🚀 Starting stream_llm_call, emitting thinking_start...") await self.emit_thinking_start() + logger.info(f"[{self.name}] ✅ thinking_start emitted, starting LLM stream...") try: async for chunk in self.llm_service.chat_completion_stream( diff --git a/backend/app/services/agent/event_manager.py b/backend/app/services/agent/event_manager.py index 1584cde..c2d2afb 100644 --- a/backend/app/services/agent/event_manager.py +++ b/backend/app/services/agent/event_manager.py @@ -312,6 +312,13 @@ class EventManager: if task_id in self._event_queues: try: self._event_queues[task_id].put_nowait(event_data) + # 🔥 DEBUG: 记录重要事件被添加到队列 + if event_type in ["thinking_start", "thinking_end", "dispatch", "task_complete", "task_error"]: + logger.info(f"[EventQueue] Added {event_type} to queue for task {task_id}, queue size: {self._event_queues[task_id].qsize()}") + elif event_type == "thinking_token": + # 每10个token记录一次 + if sequence % 10 == 0: + logger.debug(f"[EventQueue] Added thinking_token #{sequence} to queue, size: {self._event_queues[task_id].qsize()}") except asyncio.QueueFull: logger.warning(f"Event queue full for task {task_id}, dropping event: {event_type}") @@ -438,16 +445,22 @@ class EventManager: # 获取现有队列(由 AgentRunner 在初始化时创建) queue = self._event_queues.get(task_id) - if not queue: # 如果队列不存在,创建一个新的(回退逻辑) queue = self.create_queue(task_id) logger.warning(f"Queue not found for task {task_id}, created new one") - # 🔥 先排空队列中已缓存的事件(这些是在 SSE 连接前产生的) + # 🔥 CRITICAL FIX: 记录当前队列大小,只消耗这些已存在的事件 + # 之前的 bug: while not queue.empty() 会永远循环,因为 LLM 持续添加事件 + initial_queue_size = queue.qsize() + logger.info(f"[StreamEvents] Task {task_id}: Draining {initial_queue_size} buffered events...") + + # 🔥 先排空队列中已缓存的事件(只消耗连接时已存在的事件数量) buffered_count = 0 skipped_count = 0 - while not queue.empty(): + max_drain = initial_queue_size # 只消耗这么多事件,避免无限循环 + + for _ in range(max_drain): try: buffered_event = queue.get_nowait() @@ -460,38 +473,48 @@ class EventManager: buffered_count += 1 yield buffered_event - # 🔥 为所有缓存事件添加延迟,确保不会一起输出 + # 🔥 为缓存事件添加小延迟,但比之前少很多(避免拖慢) event_type = buffered_event.get("event_type") if event_type == "thinking_token": - await asyncio.sleep(0.015) # 15ms for tokens - else: - await asyncio.sleep(0.005) # 5ms for other events + await asyncio.sleep(0.005) # 5ms for tokens (reduced from 15ms) + # 其他事件不加延迟,快速发送 # 检查是否是结束事件 if event_type in ["task_complete", "task_error", "task_cancel"]: - logger.debug(f"Task {task_id} already completed, sent {buffered_count} buffered events (skipped {skipped_count})") + logger.info(f"[StreamEvents] Task {task_id} already completed, sent {buffered_count} buffered events (skipped {skipped_count})") return except asyncio.QueueEmpty: break if buffered_count > 0 or skipped_count > 0: - logger.debug(f"Drained queue for task {task_id}: sent {buffered_count}, skipped {skipped_count} (after_sequence={after_sequence})") + logger.info(f"[StreamEvents] Task {task_id}: Drained {buffered_count} buffered events, skipped {skipped_count}") + + # 🔥 DEBUG: 记录进入实时循环 + logger.info(f"[StreamEvents] Task {task_id}: Entering real-time loop, queue size: {queue.qsize()}") # 然后实时推送新事件 try: while True: try: + logger.debug(f"[StreamEvents] Task {task_id}: Waiting for next event from queue...") event = await asyncio.wait_for(queue.get(), timeout=30) + logger.debug(f"[StreamEvents] Task {task_id}: Got event from queue: {event.get('event_type')}") # 🔥 过滤掉序列号 <= after_sequence 的事件 event_sequence = event.get("sequence", 0) if event_sequence <= after_sequence: + logger.debug(f"[StreamEvents] Task {task_id}: Skipping event seq={event_sequence} (after_sequence={after_sequence})") continue + # 🔥 DEBUG: 记录重要事件被发送 + event_type = event.get("event_type") + if event_type in ["thinking_start", "thinking_end", "dispatch", "task_complete", "task_error"]: + logger.info(f"[StreamEvents] Yielding {event_type} (seq={event_sequence}) for task {task_id}") + yield event # 🔥 为 thinking_token 添加微延迟确保流式效果 - if event.get("event_type") == "thinking_token": + if event_type == "thinking_token": await asyncio.sleep(0.01) # 10ms # 检查是否是结束事件 diff --git a/frontend/src/hooks/useAgentStream.ts b/frontend/src/hooks/useAgentStream.ts index b635e18..3818473 100644 --- a/frontend/src/hooks/useAgentStream.ts +++ b/frontend/src/hooks/useAgentStream.ts @@ -93,6 +93,10 @@ export function useAgentStream( const handlerRef = useRef(null); const thinkingBufferRef = useRef([]); + // 🔥 使用 ref 存储 afterSequence,避免 connect 函数依赖变化导致重连 + const afterSequenceRef = useRef(afterSequence); + afterSequenceRef.current = afterSequence; + // 连接 const connect = useCallback(() => { if (!taskId) return; @@ -114,11 +118,15 @@ export function useAgentStream( setError(null); thinkingBufferRef.current = []; + // 🔥 使用 ref 获取最新的 afterSequence 值 + const currentAfterSequence = afterSequenceRef.current; + console.log(`[useAgentStream] Creating handler with afterSequence=${currentAfterSequence}`); + // 创建新的 handler handlerRef.current = new AgentStreamHandler(taskId, { includeThinking, includeToolCalls, - afterSequence, + afterSequence: currentAfterSequence, onEvent: (event) => { // Pass to custom callback first (important for capturing metadata like agent_name) @@ -215,7 +223,7 @@ export function useAgentStream( handlerRef.current.connect(); setIsConnected(true); - }, [taskId, includeThinking, includeToolCalls, afterSequence, maxEvents]); // 🔥 移除 callbackOptions 依赖 + }, [taskId, includeThinking, includeToolCalls, maxEvents]); // 🔥 移除 afterSequence 依赖,使用 ref 代替 // 断开连接 const disconnect = useCallback(() => { diff --git a/frontend/src/pages/AgentAudit/index.tsx b/frontend/src/pages/AgentAudit/index.tsx index c036828..e85b529 100644 --- a/frontend/src/pages/AgentAudit/index.tsx +++ b/frontend/src/pages/AgentAudit/index.tsx @@ -179,8 +179,8 @@ function AgentAuditPageContent() { // 提取 agent_name const agentName = (event.metadata?.agent_name as string) || - (event.metadata?.agent as string) || - undefined; + (event.metadata?.agent as string) || + undefined; // 根据事件类型创建日志项 switch (event.event_type) { @@ -423,14 +423,16 @@ function AgentAuditPageContent() { if (!currentId) { // 预生成 ID,这样我们可以跟踪这个日志 const newLogId = `thinking-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; - dispatch({ type: 'ADD_LOG', payload: { - id: newLogId, - type: 'thinking', - title: 'Thinking...', - content: cleanContent, - isStreaming: true, - agentName: getCurrentAgentName() || undefined, - }}); + dispatch({ + type: 'ADD_LOG', payload: { + id: newLogId, + type: 'thinking', + title: 'Thinking...', + content: cleanContent, + isStreaming: true, + agentName: getCurrentAgentName() || undefined, + } + }); setCurrentThinkingId(newLogId); } else { updateLog(currentId, { content: cleanContent }); @@ -520,8 +522,8 @@ function AgentAuditPageContent() { dispatch({ type: 'ADD_LOG', payload: { type: 'error', title: `Error: ${err}` } }); }, }), [afterSequence, dispatch, loadTask, loadFindings, loadAgentTree, debouncedLoadAgentTree, - updateLog, removeLog, getCurrentAgentName, getCurrentThinkingId, - setCurrentAgentName, setCurrentThinkingId]); + updateLog, removeLog, getCurrentAgentName, getCurrentThinkingId, + setCurrentAgentName, setCurrentThinkingId]); const { connect: connectStream, disconnect: disconnectStream, isConnected } = useAgentStream(taskId || null, streamOptions); @@ -589,14 +591,19 @@ function AgentAuditPageContent() { if (hasConnectedRef.current) return; hasConnectedRef.current = true; - console.log(`[AgentAudit] Connecting to stream with afterSequence=${afterSequence}`); + console.log(`[AgentAudit] Connecting to stream (afterSequence will be passed via streamOptions)`); connectStream(); dispatch({ type: 'ADD_LOG', payload: { type: 'info', title: 'Connected to audit stream' } }); return () => { + console.log('[AgentAudit] Cleanup: disconnecting stream'); disconnectStream(); }; - }, [taskId, task?.status, historicalEventsLoaded, connectStream, disconnectStream, dispatch, afterSequence]); + // 🔥 CRITICAL FIX: 移除 afterSequence 依赖! + // afterSequence 通过 streamOptions 传递,不需要在这里触发重连 + // 如果包含它,当 loadHistoricalEvents 更新 afterSequence 时会触发断开重连 + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [taskId, task?.status, historicalEventsLoaded, connectStream, disconnectStream, dispatch]); // Polling useEffect(() => { diff --git a/frontend/src/shared/api/agentStream.ts b/frontend/src/shared/api/agentStream.ts index 770a31c..2fc6429 100644 --- a/frontend/src/shared/api/agentStream.ts +++ b/frontend/src/shared/api/agentStream.ts @@ -30,6 +30,7 @@ export type StreamEventType = | 'phase_end' | 'phase_complete' // 发现相关 + | 'finding' // Backward compatibility | 'finding_new' | 'finding_verified' // 状态相关 @@ -133,8 +134,11 @@ export class AgentStreamHandler { * 开始监听事件流 */ connect(): void { + // 🔥 重置断开标志,允许新的连接 + this.isDisconnecting = false; + // 🔥 如果已经连接,不重复连接 - if (this.isConnected || this.isDisconnecting) { + if (this.isConnected) { return; } @@ -196,12 +200,14 @@ export class AgentStreamHandler { while (true) { // 🔥 检查是否正在断开 if (this.isDisconnecting) { + console.log('[AgentStream] Disconnecting, breaking loop'); break; } const { done, value } = await this.reader.read(); if (done) { + console.log('[AgentStream] Reader done, stream ended'); break; } @@ -211,6 +217,12 @@ export class AgentStreamHandler { const events = this.parseSSE(buffer); buffer = events.remaining; + // 🔥 DEBUG: 记录接收到的事件 + if (events.parsed.length > 0) { + const eventTypes = events.parsed.map(e => e.type); + console.log(`[AgentStream] Received ${events.parsed.length} events:`, eventTypes); + } + // 🔥 逐个处理事件,添加微延迟确保 React 能逐个渲染 for (const event of events.parsed) { this.handleEvent(event); @@ -448,21 +460,39 @@ export class AgentStreamHandler { this.isDisconnecting = true; this.isConnected = false; - // 🔥 取消 fetch 请求 + // 🔥 取消 fetch 请求 (wrap in try-catch to handle AbortError) if (this.abortController) { - this.abortController.abort(); + try { + this.abortController.abort(); + } catch { + // 忽略 abort 错误 + } this.abortController = null; } - // 🔥 清理 reader + // 🔥 清理 reader (handle promise rejection from cancel()) if (this.reader) { - try { - this.reader.cancel(); - this.reader.releaseLock(); - } catch { - // 忽略清理错误 - } + const reader = this.reader; this.reader = null; + + // reader.cancel() returns a Promise that may reject with AbortError + // We need to catch this to prevent unhandled promise rejection + Promise.resolve().then(() => { + try { + // Cancel and release in a controlled way + reader.cancel().catch(() => { + // Silently ignore cancel errors (expected during abort) + }).finally(() => { + try { + reader.releaseLock(); + } catch { + // Silently ignore releaseLock errors + } + }); + } catch { + // Silently ignore any synchronous errors + } + }); } // 清理 EventSource(如果使用)