YingXingAI/utils/websocket-manager.js

354 lines
7.8 KiB
JavaScript
Raw Normal View History

/**
* WebSocket 管理器
*
* 功能
* 1. 自动重连
* 2. 心跳检测
* 3. 消息队列
* 4. 事件监听
*
* 使用示例
* import wsManager from '@/utils/websocket-manager.js'
*
* // 连接
* wsManager.connect('wss://example.com/chat')
*
* // 监听消息
* wsManager.on('message', (data) => {
* console.log('收到消息:', data)
* })
*
* // 发送消息
* wsManager.send({ type: 'text', content: 'Hello' })
*/
class WebSocketManager {
constructor() {
// WebSocket 实例
this.socketTask = null
// 连接状态
this.isConnected = false
this.isConnecting = false
// 配置
this.url = ''
this.reconnectAttempts = 0
this.maxReconnectAttempts = 5
this.reconnectInterval = 5000
this.heartbeatInterval = 30000
// 定时器
this.reconnectTimer = null
this.heartbeatTimer = null
// 消息队列(连接未建立时暂存)
this.messageQueue = []
// 事件监听器
this.listeners = {
message: [], // 收到消息
open: [], // 连接成功
close: [], // 连接关闭
error: [], // 连接错误
reconnect: [] // 重连中
}
}
/**
* 连接 WebSocket
* @param {String} url WebSocket 服务器地址
* @param {Object} options 配置选项
*/
connect(url, options = {}) {
// 如果已经在连接中,直接返回
if (this.isConnecting || this.isConnected) {
console.log('[WebSocket] 已在连接中或已连接')
return
}
this.url = url
this.isConnecting = true
// 合并配置
Object.assign(this, options)
console.log(`[WebSocket] 开始连接: ${url}`)
// 创建连接
this.socketTask = uni.connectSocket({
url: this.url,
success: () => {
console.log('[WebSocket] 连接请求已发送')
},
fail: (err) => {
console.error('[WebSocket] 连接失败:', err)
this.isConnecting = false
this.handleError(err)
this.handleReconnect()
}
})
// 监听连接打开
this.socketTask.onOpen(() => {
console.log('[WebSocket] 连接成功')
this.isConnected = true
this.isConnecting = false
this.reconnectAttempts = 0
// 触发 open 事件
this.emit('open')
// 启动心跳
this.startHeartbeat()
// 发送队列中的消息
this.flushMessageQueue()
})
// 监听消息
this.socketTask.onMessage((res) => {
console.log('[WebSocket] 收到消息:', res.data)
try {
const data = JSON.parse(res.data)
// 处理 pong心跳响应
if (data.type === 'pong') {
console.log('[WebSocket] 收到心跳响应')
return
}
// 触发 message 事件
this.emit('message', data)
} catch (e) {
console.error('[WebSocket] 消息解析失败:', e)
// 如果不是 JSON直接传递原始数据
this.emit('message', res.data)
}
})
// 监听连接关闭
this.socketTask.onClose((res) => {
console.log('[WebSocket] 连接已关闭:', res)
this.isConnected = false
this.isConnecting = false
// 停止心跳
this.stopHeartbeat()
// 触发 close 事件
this.emit('close', res)
// 尝试重连
this.handleReconnect()
})
// 监听错误
this.socketTask.onError((err) => {
console.error('[WebSocket] 连接错误:', err)
this.isConnected = false
this.isConnecting = false
// 触发 error 事件
this.handleError(err)
})
}
/**
* 发送消息
* @param {Object|String} data 要发送的数据
*/
send(data) {
// 如果未连接,加入队列
if (!this.isConnected) {
console.warn('[WebSocket] 未连接,消息加入队列')
this.messageQueue.push(data)
return
}
const message = typeof data === 'string' ? data : JSON.stringify(data)
this.socketTask.send({
data: message,
success: () => {
console.log('[WebSocket] 消息发送成功:', data)
},
fail: (err) => {
console.error('[WebSocket] 消息发送失败:', err)
}
})
}
/**
* 关闭连接
* @param {Boolean} manual 是否手动关闭手动关闭不重连
*/
close(manual = true) {
console.log(`[WebSocket] 关闭连接 (手动: ${manual})`)
// 如果是手动关闭,清除重连定时器
if (manual) {
this.clearReconnectTimer()
this.reconnectAttempts = this.maxReconnectAttempts // 阻止重连
}
this.stopHeartbeat()
if (this.socketTask) {
this.socketTask.close({
code: 1000,
reason: manual ? '手动关闭' : '自动关闭'
})
this.socketTask = null
}
this.isConnected = false
this.isConnecting = false
}
/**
* 启动心跳
*/
startHeartbeat() {
console.log('[WebSocket] 启动心跳')
this.stopHeartbeat()
this.heartbeatTimer = setInterval(() => {
if (this.isConnected) {
console.log('[WebSocket] 发送心跳')
this.send({ type: 'ping', timestamp: Date.now() })
}
}, this.heartbeatInterval)
}
/**
* 停止心跳
*/
stopHeartbeat() {
if (this.heartbeatTimer) {
console.log('[WebSocket] 停止心跳')
clearInterval(this.heartbeatTimer)
this.heartbeatTimer = null
}
}
/**
* 处理重连
*/
handleReconnect() {
// 如果达到最大重连次数,停止重连
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('[WebSocket] 达到最大重连次数,停止重连')
return
}
// 如果已经在重连中,直接返回
if (this.reconnectTimer) {
return
}
this.reconnectAttempts++
console.log(`[WebSocket] 准备重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)
// 触发 reconnect 事件
this.emit('reconnect', this.reconnectAttempts)
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null
this.connect(this.url)
}, this.reconnectInterval)
}
/**
* 清除重连定时器
*/
clearReconnectTimer() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
}
/**
* 发送队列中的消息
*/
flushMessageQueue() {
if (this.messageQueue.length === 0) {
return
}
console.log(`[WebSocket] 发送队列中的 ${this.messageQueue.length} 条消息`)
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift()
this.send(message)
}
}
/**
* 处理错误
*/
handleError(err) {
this.emit('error', err)
}
/**
* 监听事件
* @param {String} event 事件名称
* @param {Function} callback 回调函数
*/
on(event, callback) {
if (this.listeners[event]) {
this.listeners[event].push(callback)
}
}
/**
* 移除事件监听
* @param {String} event 事件名称
* @param {Function} callback 回调函数
*/
off(event, callback) {
if (this.listeners[event]) {
const index = this.listeners[event].indexOf(callback)
if (index > -1) {
this.listeners[event].splice(index, 1)
}
}
}
/**
* 触发事件
* @param {String} event 事件名称
* @param {*} data 数据
*/
emit(event, data) {
if (this.listeners[event]) {
this.listeners[event].forEach(callback => {
try {
callback(data)
} catch (e) {
console.error(`[WebSocket] 事件 ${event} 回调执行失败:`, e)
}
})
}
}
/**
* 获取连接状态
*/
getState() {
return {
isConnected: this.isConnected,
isConnecting: this.isConnecting,
reconnectAttempts: this.reconnectAttempts,
queueLength: this.messageQueue.length
}
}
}
// 导出单例
export default new WebSocketManager()