Fix: Improve agent stream stability by preventing unnecessary reconnections and correctly draining buffered events.

This commit is contained in:
lintsinghua 2025-12-13 20:21:30 +08:00
parent 507fe393d1
commit d449e2ba78
5 changed files with 106 additions and 36 deletions

View File

@ -942,7 +942,9 @@ class BaseAgent(ABC):
logger.info(f"[{self.name}] Cancelled before LLM call")
return "", 0
logger.info(f"[{self.name}] 🚀 Starting stream_llm_call, emitting thinking_start...")
await self.emit_thinking_start()
logger.info(f"[{self.name}] ✅ thinking_start emitted, starting LLM stream...")
try:
async for chunk in self.llm_service.chat_completion_stream(

View File

@ -312,6 +312,13 @@ class EventManager:
if task_id in self._event_queues:
try:
self._event_queues[task_id].put_nowait(event_data)
# 🔥 DEBUG: 记录重要事件被添加到队列
if event_type in ["thinking_start", "thinking_end", "dispatch", "task_complete", "task_error"]:
logger.info(f"[EventQueue] Added {event_type} to queue for task {task_id}, queue size: {self._event_queues[task_id].qsize()}")
elif event_type == "thinking_token":
# 每10个token记录一次
if sequence % 10 == 0:
logger.debug(f"[EventQueue] Added thinking_token #{sequence} to queue, size: {self._event_queues[task_id].qsize()}")
except asyncio.QueueFull:
logger.warning(f"Event queue full for task {task_id}, dropping event: {event_type}")
@ -438,16 +445,22 @@ class EventManager:
# 获取现有队列(由 AgentRunner 在初始化时创建)
queue = self._event_queues.get(task_id)
if not queue:
# 如果队列不存在,创建一个新的(回退逻辑)
queue = self.create_queue(task_id)
logger.warning(f"Queue not found for task {task_id}, created new one")
# 🔥 先排空队列中已缓存的事件(这些是在 SSE 连接前产生的)
# 🔥 CRITICAL FIX: 记录当前队列大小,只消耗这些已存在的事件
# 之前的 bug: while not queue.empty() 会永远循环,因为 LLM 持续添加事件
initial_queue_size = queue.qsize()
logger.info(f"[StreamEvents] Task {task_id}: Draining {initial_queue_size} buffered events...")
# 🔥 先排空队列中已缓存的事件(只消耗连接时已存在的事件数量)
buffered_count = 0
skipped_count = 0
while not queue.empty():
max_drain = initial_queue_size # 只消耗这么多事件,避免无限循环
for _ in range(max_drain):
try:
buffered_event = queue.get_nowait()
@ -460,38 +473,48 @@ class EventManager:
buffered_count += 1
yield buffered_event
# 🔥 为所有缓存事件添加延迟,确保不会一起输出
# 🔥 为缓存事件添加小延迟,但比之前少很多(避免拖慢)
event_type = buffered_event.get("event_type")
if event_type == "thinking_token":
await asyncio.sleep(0.015) # 15ms for tokens
else:
await asyncio.sleep(0.005) # 5ms for other events
await asyncio.sleep(0.005) # 5ms for tokens (reduced from 15ms)
# 其他事件不加延迟,快速发送
# 检查是否是结束事件
if event_type in ["task_complete", "task_error", "task_cancel"]:
logger.debug(f"Task {task_id} already completed, sent {buffered_count} buffered events (skipped {skipped_count})")
logger.info(f"[StreamEvents] Task {task_id} already completed, sent {buffered_count} buffered events (skipped {skipped_count})")
return
except asyncio.QueueEmpty:
break
if buffered_count > 0 or skipped_count > 0:
logger.debug(f"Drained queue for task {task_id}: sent {buffered_count}, skipped {skipped_count} (after_sequence={after_sequence})")
logger.info(f"[StreamEvents] Task {task_id}: Drained {buffered_count} buffered events, skipped {skipped_count}")
# 🔥 DEBUG: 记录进入实时循环
logger.info(f"[StreamEvents] Task {task_id}: Entering real-time loop, queue size: {queue.qsize()}")
# 然后实时推送新事件
try:
while True:
try:
logger.debug(f"[StreamEvents] Task {task_id}: Waiting for next event from queue...")
event = await asyncio.wait_for(queue.get(), timeout=30)
logger.debug(f"[StreamEvents] Task {task_id}: Got event from queue: {event.get('event_type')}")
# 🔥 过滤掉序列号 <= after_sequence 的事件
event_sequence = event.get("sequence", 0)
if event_sequence <= after_sequence:
logger.debug(f"[StreamEvents] Task {task_id}: Skipping event seq={event_sequence} (after_sequence={after_sequence})")
continue
# 🔥 DEBUG: 记录重要事件被发送
event_type = event.get("event_type")
if event_type in ["thinking_start", "thinking_end", "dispatch", "task_complete", "task_error"]:
logger.info(f"[StreamEvents] Yielding {event_type} (seq={event_sequence}) for task {task_id}")
yield event
# 🔥 为 thinking_token 添加微延迟确保流式效果
if event.get("event_type") == "thinking_token":
if event_type == "thinking_token":
await asyncio.sleep(0.01) # 10ms
# 检查是否是结束事件

View File

@ -93,6 +93,10 @@ export function useAgentStream(
const handlerRef = useRef<AgentStreamHandler | null>(null);
const thinkingBufferRef = useRef<string[]>([]);
// 🔥 使用 ref 存储 afterSequence避免 connect 函数依赖变化导致重连
const afterSequenceRef = useRef(afterSequence);
afterSequenceRef.current = afterSequence;
// 连接
const connect = useCallback(() => {
if (!taskId) return;
@ -114,11 +118,15 @@ export function useAgentStream(
setError(null);
thinkingBufferRef.current = [];
// 🔥 使用 ref 获取最新的 afterSequence 值
const currentAfterSequence = afterSequenceRef.current;
console.log(`[useAgentStream] Creating handler with afterSequence=${currentAfterSequence}`);
// 创建新的 handler
handlerRef.current = new AgentStreamHandler(taskId, {
includeThinking,
includeToolCalls,
afterSequence,
afterSequence: currentAfterSequence,
onEvent: (event) => {
// Pass to custom callback first (important for capturing metadata like agent_name)
@ -215,7 +223,7 @@ export function useAgentStream(
handlerRef.current.connect();
setIsConnected(true);
}, [taskId, includeThinking, includeToolCalls, afterSequence, maxEvents]); // 🔥 移除 callbackOptions 依赖
}, [taskId, includeThinking, includeToolCalls, maxEvents]); // 🔥 移除 afterSequence 依赖,使用 ref 代替
// 断开连接
const disconnect = useCallback(() => {

View File

@ -423,14 +423,16 @@ function AgentAuditPageContent() {
if (!currentId) {
// 预生成 ID这样我们可以跟踪这个日志
const newLogId = `thinking-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
dispatch({ type: 'ADD_LOG', payload: {
dispatch({
type: 'ADD_LOG', payload: {
id: newLogId,
type: 'thinking',
title: 'Thinking...',
content: cleanContent,
isStreaming: true,
agentName: getCurrentAgentName() || undefined,
}});
}
});
setCurrentThinkingId(newLogId);
} else {
updateLog(currentId, { content: cleanContent });
@ -589,14 +591,19 @@ function AgentAuditPageContent() {
if (hasConnectedRef.current) return;
hasConnectedRef.current = true;
console.log(`[AgentAudit] Connecting to stream with afterSequence=${afterSequence}`);
console.log(`[AgentAudit] Connecting to stream (afterSequence will be passed via streamOptions)`);
connectStream();
dispatch({ type: 'ADD_LOG', payload: { type: 'info', title: 'Connected to audit stream' } });
return () => {
console.log('[AgentAudit] Cleanup: disconnecting stream');
disconnectStream();
};
}, [taskId, task?.status, historicalEventsLoaded, connectStream, disconnectStream, dispatch, afterSequence]);
// 🔥 CRITICAL FIX: 移除 afterSequence 依赖!
// afterSequence 通过 streamOptions 传递,不需要在这里触发重连
// 如果包含它,当 loadHistoricalEvents 更新 afterSequence 时会触发断开重连
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [taskId, task?.status, historicalEventsLoaded, connectStream, disconnectStream, dispatch]);
// Polling
useEffect(() => {

View File

@ -30,6 +30,7 @@ export type StreamEventType =
| 'phase_end'
| 'phase_complete'
// 发现相关
| 'finding' // Backward compatibility
| 'finding_new'
| 'finding_verified'
// 状态相关
@ -133,8 +134,11 @@ export class AgentStreamHandler {
*
*/
connect(): void {
// 🔥 重置断开标志,允许新的连接
this.isDisconnecting = false;
// 🔥 如果已经连接,不重复连接
if (this.isConnected || this.isDisconnecting) {
if (this.isConnected) {
return;
}
@ -196,12 +200,14 @@ export class AgentStreamHandler {
while (true) {
// 🔥 检查是否正在断开
if (this.isDisconnecting) {
console.log('[AgentStream] Disconnecting, breaking loop');
break;
}
const { done, value } = await this.reader.read();
if (done) {
console.log('[AgentStream] Reader done, stream ended');
break;
}
@ -211,6 +217,12 @@ export class AgentStreamHandler {
const events = this.parseSSE(buffer);
buffer = events.remaining;
// 🔥 DEBUG: 记录接收到的事件
if (events.parsed.length > 0) {
const eventTypes = events.parsed.map(e => e.type);
console.log(`[AgentStream] Received ${events.parsed.length} events:`, eventTypes);
}
// 🔥 逐个处理事件,添加微延迟确保 React 能逐个渲染
for (const event of events.parsed) {
this.handleEvent(event);
@ -448,21 +460,39 @@ export class AgentStreamHandler {
this.isDisconnecting = true;
this.isConnected = false;
// 🔥 取消 fetch 请求
// 🔥 取消 fetch 请求 (wrap in try-catch to handle AbortError)
if (this.abortController) {
try {
this.abortController.abort();
} catch {
// 忽略 abort 错误
}
this.abortController = null;
}
// 🔥 清理 reader
// 🔥 清理 reader (handle promise rejection from cancel())
if (this.reader) {
try {
this.reader.cancel();
this.reader.releaseLock();
} catch {
// 忽略清理错误
}
const reader = this.reader;
this.reader = null;
// reader.cancel() returns a Promise that may reject with AbortError
// We need to catch this to prevent unhandled promise rejection
Promise.resolve().then(() => {
try {
// Cancel and release in a controlled way
reader.cancel().catch(() => {
// Silently ignore cancel errors (expected during abort)
}).finally(() => {
try {
reader.releaseLock();
} catch {
// Silently ignore releaseLock errors
}
});
} catch {
// Silently ignore any synchronous errors
}
});
}
// 清理 EventSource如果使用