Compare commits

..

1 Commits

Author SHA1 Message Date
vinland100 62c42341c4 feat: Implement incremental historical event loading and a centralized state synchronization mechanism, including stream reconnection, for the AgentAudit page.
Build and Push CodeReview / build (push) Waiting to run Details
2026-01-29 14:51:48 +08:00
13 changed files with 187 additions and 151 deletions

View File

@ -574,6 +574,17 @@ Final Answer: {{"findings": [...], "summary": "..."}}"""
# 重置空响应计数器
self._empty_retry_count = 0
# 🔥 检查是否是 API 错误(从 BaseAgent.stream_llm_call 返回)
if llm_output.startswith("[API_ERROR:"):
# 提取错误类型和消息
match = re.match(r"\[API_ERROR:(\w+)\]\s*(.*)", llm_output)
if match:
error_type = match.group(1)
error_message = match.group(2)
logger.error(f"[{self.name}] Fatal API error: {error_type} - {error_message}")
await self.emit_event("error", f"LLM API 错误 ({error_type}): {error_message}")
break
# 解析 LLM 响应
step = self._parse_llm_response(llm_output)
self._steps.append(step)

View File

@ -1033,7 +1033,7 @@ class BaseAgent(ABC):
# 使用特殊前缀标记 API 错误,让调用方能够识别
# 格式:[API_ERROR:error_type] user_message
if error_type in ("rate_limit", "quota_exceeded", "authentication", "connection"):
if error_type in ("rate_limit", "quota_exceeded", "authentication", "connection", "server_error"):
accumulated = f"[API_ERROR:{error_type}] {user_message}"
elif not accumulated:
accumulated = f"[系统错误: {error_msg}] 请重新思考并输出你的决策。"

View File

@ -331,6 +331,19 @@ Action Input: {{"参数": "值"}}
await asyncio.sleep(5) # 等待 5 秒后重试
continue
elif error_type == "server_error":
# 服务器错误 (5xx) - 重试
api_retry_count = getattr(self, '_api_retry_count', 0) + 1
self._api_retry_count = api_retry_count
if api_retry_count >= 3:
logger.error(f"[{self.name}] Too many server errors, stopping")
await self.emit_event("error", f"API 服务端错误重试次数过多: {error_message}")
break
logger.warning(f"[{self.name}] Server error, retrying ({api_retry_count}/3)")
await self.emit_event("warning", f"API 服务端繁忙或异常,重试中 ({api_retry_count}/3)")
await asyncio.sleep(10) # 等待 10 秒后重试(服务端错误通常需要更久恢复)
continue
# 重置 API 重试计数器(成功获取响应后)
self._api_retry_count = 0

View File

@ -509,6 +509,17 @@ Final Answer: [JSON格式的结果]"""
# 重置空响应计数器
self._empty_retry_count = 0
# 🔥 检查是否是 API 错误(从 BaseAgent.stream_llm_call 返回)
if llm_output.startswith("[API_ERROR:"):
# 提取错误类型和消息
match = re.match(r"\[API_ERROR:(\w+)\]\s*(.*)", llm_output)
if match:
error_type = match.group(1)
error_message = match.group(2)
logger.error(f"[{self.name}] Fatal API error: {error_type} - {error_message}")
await self.emit_event("error", f"LLM API 错误 ({error_type}): {error_message}")
break
# 解析 LLM 响应
step = self._parse_llm_response(llm_output)
self._steps.append(step)

View File

@ -718,6 +718,17 @@ class VerificationAgent(BaseAgent):
self._total_tokens += tokens_this_round
# 🔥 检查是否是 API 错误(从 BaseAgent.stream_llm_call 返回)
if llm_output.startswith("[API_ERROR:"):
# 提取错误类型和消息
match = re.match(r"\[API_ERROR:(\w+)\]\s*(.*)", llm_output)
if match:
error_type = match.group(1)
error_message = match.group(2)
logger.error(f"[{self.name}] Fatal API error: {error_type} - {error_message}")
await self.emit_event("error", f"LLM API 错误 ({error_type}): {error_message}")
break
# 🔥 Handle empty LLM response to prevent loops
if not llm_output or not llm_output.strip():
logger.warning(f"[{self.name}] Empty LLM response in iteration {self._iteration}")

View File

@ -251,6 +251,9 @@ class LiteLLMAdapter(BaseLLMAdapter):
except litellm.exceptions.APIConnectionError as e:
api_response = self._extract_api_response(e)
raise LLMError(f"无法连接到 API 服务", self.config.provider, api_response=api_response)
except (litellm.exceptions.ServiceUnavailableError, litellm.exceptions.InternalServerError) as e:
api_response = self._extract_api_response(e)
raise LLMError(f"API 服务暂时不可用 ({type(e).__name__})", self.config.provider, 503, api_response=api_response)
except litellm.exceptions.APIError as e:
api_response = self._extract_api_response(e)
raise LLMError(f"API 错误", self.config.provider, getattr(e, 'status_code', None), api_response=api_response)
@ -469,6 +472,17 @@ class LiteLLMAdapter(BaseLLMAdapter):
"accumulated": accumulated_content,
"usage": None,
}
except (litellm.exceptions.ServiceUnavailableError, litellm.exceptions.InternalServerError) as e:
# 服务不可用 - 服务器端 5xx 错误
logger.error(f"Stream server error ({type(e).__name__}): {e}")
yield {
"type": "error",
"error_type": "server_error",
"error": str(e),
"user_message": f"API 服务暂时不可用 ({type(e).__name__})",
"accumulated": accumulated_content,
"usage": None,
}
except Exception as e:
# 其他错误 - 检查是否是包装的速率限制错误

View File

@ -202,13 +202,11 @@ class LLMService:
2. column 是问题代码在该行中的起始列位置
3. code_snippet 应该包含问题代码及其上下文去掉"行号|"前缀
4. 如果代码片段包含多行必须使用 \\n 表示换行符
5. 禁止在 code_snippet 中输出大量的重复空白多余空行或过长的无关代码如果涉及此类问题请使用 "[...]" 进行省略展示单次 code_snippet 的行数建议不超过 10
严格禁止
- 禁止在任何字段中使用英文所有内容必须是简体中文
- 禁止在JSON字符串值中使用真实换行符必须用\\n转义
- 禁止输出markdown代码块标记```json
- 禁止输出重复的冗余的长代码块
重要提醒line字段必须从代码左侧的行号标注中读取不要猜测或填0"""
else:
@ -255,13 +253,11 @@ Note:
2. 'column' is the starting column position
3. 'code_snippet' should include the problematic code with context, remove "lineNumber|" prefix
4. Use \\n for newlines in code snippets
5. DO NOT output massive amounts of empty lines, repeated spaces, or excessively long code in 'code_snippet'. Use "[...]" to indicate truncation if necessary. Keep the snippet under 10 lines.
STRICTLY PROHIBITED:
- NO Chinese characters in any field - English ONLY
- NO real newline characters in JSON string values (must use \\n)
- NO real newline characters in JSON string values
- NO markdown code block markers
- NO redundant long code blocks
CRITICAL: Read line numbers from the "lineNumber|" prefix. Do NOT guess or use 0!"""
@ -855,9 +851,7 @@ Please analyze the following code:
1. 必须只输出纯JSON对象
2. 禁止在JSON前后添加任何文字说明markdown标记
3. 所有文本字段title, description, suggestion等必须使用中文输出
4. code_snippet 字段禁止输出大量的重复空白多余空行或过长的无关代码如果涉及此类问题请使用 "[...]" 进行省略展示单次 code_snippet 的行数建议不超过 10
5. 禁止在JSON字符串值中使用真实换行符必须用\\n转义
6. 输出格式必须严格符合以下 JSON Schema
4. 输出格式必须符合以下 JSON Schema
{schema}
{rules_prompt}"""
@ -868,9 +862,7 @@ Please analyze the following code:
1. Must output pure JSON object only
2. Do not add any text, explanation, or markdown markers before or after JSON
3. All text fields (title, description, suggestion, etc.) must be in English
4. DO NOT output massive amounts of empty lines, repeated spaces, or excessively long code in 'code_snippet'. Use "[...]" for truncation. Keep it under 10 lines.
5. NO real newline characters in JSON string values (must use \\n)
6. Output format must strictly conform to the following JSON Schema:
4. Output format must conform to the following JSON Schema:
{schema}
{rules_prompt}"""

View File

@ -599,9 +599,6 @@ async def scan_repo_task(task_id: str, db_session_factory, user_config: dict = N
if len(content) > settings.MAX_FILE_SIZE_BYTES:
return {"type": "skip", "reason": "too_large", "path": f_path}
if task_control.is_cancelled(task_id):
return None
# 4.2 LLM 分析
language = get_language_from_path(f_path)
scan_config = (user_config or {}).get('scan_config', {})
@ -625,9 +622,6 @@ async def scan_repo_task(task_id: str, db_session_factory, user_config: dict = N
"language": language,
"analysis": analysis_result
}
except asyncio.CancelledError:
# 捕获取消异常,不再重试
return None
except Exception as e:
if attempt < MAX_RETRIES - 1:
wait_time = (attempt + 1) * 2
@ -644,22 +638,16 @@ async def scan_repo_task(task_id: str, db_session_factory, user_config: dict = N
last_error = str(e)
return {"type": "error", "path": f_path, "error": str(e)}
# 创建所有分析任务对象以便跟踪
task_objects = [asyncio.create_task(analyze_single_file(f)) for f in files]
# 创建所有分析任务
analysis_tasks = [analyze_single_file(f) for f in files]
try:
# 使用 as_completed 处理结果,这样可以实时更新进度且安全使用当前 db session
for future in asyncio.as_completed(task_objects):
for future in asyncio.as_completed(analysis_tasks):
if task_control.is_cancelled(task_id):
# 停止处理后续完成的任务
print(f"🛑 任务 {task_id} 检测到取消信号,停止主循环")
break
try:
res = await future
except asyncio.CancelledError:
continue
res = await future
if not res: continue
if res["type"] == "skip":
@ -724,18 +712,6 @@ async def scan_repo_task(task_id: str, db_session_factory, user_config: dict = N
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
print(f"❌ 任务 {task_id}: 连续失败 {consecutive_failures} 次,停止分析")
break
finally:
# 无论正常结束、中途 break 还是发生异常,都确保取消所有未完成的任务
pending_count = 0
for t in task_objects:
if not t.done():
t.cancel()
pending_count += 1
if pending_count > 0:
print(f"🧹 任务 {task_id}: 已清理 {pending_count} 个后台待处理或执行中的任务")
# 等待一下让取消逻辑执行完毕,但不阻塞太久
await asyncio.gather(*task_objects, return_exceptions=True)
# 5. 完成任务
avg_quality_score = sum(quality_scores) / len(quality_scores) if quality_scores else 100.0

View File

@ -1,6 +1,5 @@
server {
listen 80;
deny 111.194.138.35;# 封禁攻击的ip
server_name localhost;
root /usr/share/nginx/html;
index index.html;

View File

@ -18,7 +18,7 @@ export interface UseAgentStreamOptions extends StreamOptions {
}
export interface UseAgentStreamReturn extends AgentStreamState {
connect: (overrideAfterSequence?: number) => void;
connect: () => void;
disconnect: () => void;
isConnected: boolean;
clearEvents: () => void;
@ -98,7 +98,7 @@ export function useAgentStream(
afterSequenceRef.current = afterSequence;
// 连接
const connect = useCallback((overrideAfterSequence?: number) => {
const connect = useCallback(() => {
if (!taskId) return;
// 断开现有连接
@ -118,8 +118,8 @@ export function useAgentStream(
setError(null);
thinkingBufferRef.current = [];
// 🔥 使用 ref 获取最新的 afterSequence 值,或者使用覆盖值
const currentAfterSequence = overrideAfterSequence !== undefined ? overrideAfterSequence : afterSequenceRef.current;
// 🔥 使用 ref 获取最新的 afterSequence 值
const currentAfterSequence = afterSequenceRef.current;
console.log(`[useAgentStream] Creating handler with afterSequence=${currentAfterSequence}`);
// 创建新的 handler

View File

@ -49,7 +49,7 @@ function agentAuditReducer(state: AgentAuditState, action: AgentAuditAction): Ag
if (newFinding.id && existingIds.has(newFinding.id)) {
return state; // 已存在,不添加
}
return { ...state, findings: [...state.findings, newFinding as AgentFinding] };
return { ...state, findings: [...state.findings, newFinding] };
}
case 'SET_AGENT_TREE':
@ -99,7 +99,7 @@ function agentAuditReducer(state: AgentAuditState, action: AgentAuditAction): Ag
}
case 'UPDATE_OR_ADD_PROGRESS_LOG': {
const { progressKey, title, agentName, time } = action.payload;
const { progressKey, title, agentName } = action.payload;
// 查找是否已存在相同 progressKey 的进度日志
const existingIndex = state.logs.findIndex(
log => log.type === 'progress' && log.progressKey === progressKey
@ -111,7 +111,7 @@ function agentAuditReducer(state: AgentAuditState, action: AgentAuditAction): Ag
updatedLogs[existingIndex] = {
...updatedLogs[existingIndex],
title,
time: time || new Date().toLocaleTimeString('en-US', { hour12: false }),
time: new Date().toLocaleTimeString('en-US', { hour12: false }),
};
return { ...state, logs: updatedLogs };
} else {
@ -121,7 +121,6 @@ function agentAuditReducer(state: AgentAuditState, action: AgentAuditAction): Ag
title,
progressKey,
agentName,
time,
});
return { ...state, logs: [...state.logs, newLog] };
}

View File

@ -146,30 +146,33 @@ function AgentAuditPageContent() {
}, [loadAgentTree]);
// 🔥 NEW: 加载历史事件并转换为日志项
const loadHistoricalEvents = useCallback(async (isIncremental = false) => {
const loadHistoricalEvents = useCallback(async (isSync = false) => {
if (!taskId) return 0;
// 🔥 如果不是增量加载,且已经加载过历史事件,则跳过
if (!isIncremental && hasLoadedHistoricalEventsRef.current) {
// 🔥 只有在非同步模式下才检查是否已加载过(首屏加载)
if (!isSync && hasLoadedHistoricalEventsRef.current) {
console.log('[AgentAudit] Historical events already loaded, skipping');
return 0;
}
// 标记已尝试加载(初次加载时)
if (!isIncremental) {
// 如果是首屏加载,标记已尝试加载
if (!isSync) {
hasLoadedHistoricalEventsRef.current = true;
}
try {
console.log(`[AgentAudit] Fetching ${isIncremental ? 'incremental' : 'initial'} events for task ${taskId}, after_sequence: ${isIncremental ? lastEventSequenceRef.current : 0}...`);
const currentSeq = lastEventSequenceRef.current;
console.log(`[AgentAudit] Fetching historical events for task ${taskId} (after_sequence=${currentSeq})...`);
// 🔥 传递当前已知的最后序列号,实现增量加载
const events = await getAgentEvents(taskId, {
limit: 500,
after_sequence: isIncremental ? lastEventSequenceRef.current : 0
after_sequence: currentSeq,
limit: 500
});
console.log(`[AgentAudit] Received ${events.length} events from API`);
if (events.length === 0) {
console.log('[AgentAudit] No historical events found');
console.log('[AgentAudit] No new historical events found');
return 0;
}
@ -613,7 +616,6 @@ function AgentAuditPageContent() {
line_start: finding.line_start as number,
description: finding.description as string,
is_verified: (finding.is_verified as boolean) || false,
task_id: taskId || '',
}
});
},
@ -637,8 +639,43 @@ function AgentAuditPageContent() {
disconnectStreamRef.current = disconnectStream;
}, [disconnectStream]);
// 🔥 Centralized Sync State Function
const syncState = useCallback(async () => {
if (!taskId) return;
console.log('[AgentAudit] Synchronizing state...');
try {
// 1. 同步任务基本信息、发现项和树结构
await Promise.all([loadTask(), loadFindings(), loadAgentTree()]);
// 2. 增量同步日志完成状态或缺失日志
await loadHistoricalEvents(true);
// 3. 如果连接断开且任务仍在运行,尝试重新连接流
if (!isConnected && isRunning) {
console.log('[AgentAudit] Stream disconnected while task running, attempting reconnect...');
hasConnectedRef.current = false; // 允许重新连接
connectStream();
}
} catch (err) {
console.error('[AgentAudit] Failed to sync state:', err);
}
}, [taskId, loadTask, loadFindings, loadAgentTree, loadHistoricalEvents, isConnected, isRunning, connectStream]);
// ============ Effects ============
// 🔥 Visibility Change Listener - 同步状态的主要触发器
useEffect(() => {
const handleVisibilityChange = () => {
if (document.visibilityState === 'visible') {
syncState();
}
};
document.addEventListener('visibilitychange', handleVisibilityChange);
return () => document.removeEventListener('visibilitychange', handleVisibilityChange);
}, [syncState]);
// Status animation
useEffect(() => {
if (!isRunning) return;
@ -730,33 +767,6 @@ function AgentAuditPageContent() {
}
}, [logs, isAutoScroll]);
// 🔥 Visibility Change Handler - 处理离开页面后返回时的同步
useEffect(() => {
const handleVisibilityChange = async () => {
if (document.visibilityState === 'visible' && taskId) {
console.log('[AgentAudit] Tab became visible, checking for updates...');
// 1. 刷新任务状态
const updatedTask = await getAgentTask(taskId);
setTask(updatedTask);
// 2. 无论什么状态,都增量加载错过的事件
await loadHistoricalEvents(true);
if (updatedTask.status === 'running') {
// 3. 强制重新连接流,确保使用最新的 sequence 且不是僵尸连接
console.log('[AgentAudit] Reconnecting stream on visibility change, last sequence:', lastEventSequenceRef.current);
connectStream(lastEventSequenceRef.current);
}
}
};
document.addEventListener('visibilitychange', handleVisibilityChange);
return () => {
document.removeEventListener('visibilitychange', handleVisibilityChange);
};
}, [taskId, loadHistoricalEvents, isConnected, connectStream, setTask]);
// ============ Handlers ============
const handleAgentSelect = useCallback((agentId: string) => {

View File

@ -78,7 +78,7 @@ export type AgentAuditAction =
| { type: 'SET_LOGS'; payload: LogItem[] }
| { type: 'ADD_LOG'; payload: Omit<LogItem, 'id' | 'time'> & { id?: string; time?: string } }
| { type: 'UPDATE_LOG'; payload: { id: string; updates: Partial<LogItem> } }
| { type: 'UPDATE_OR_ADD_PROGRESS_LOG'; payload: { progressKey: string; title: string; agentName?: string; time?: string } }
| { type: 'UPDATE_OR_ADD_PROGRESS_LOG'; payload: { progressKey: string; title: string; agentName?: string } }
| { type: 'COMPLETE_TOOL_LOG'; payload: { toolName: string; output: string; duration: number } }
| { type: 'REMOVE_LOG'; payload: string }
| { type: 'SELECT_AGENT'; payload: string | null }