""" Agent 间通信机制 提供: - 消息类型定义 - 消息队列管理 - Agent间消息传递 """ import logging import uuid from datetime import datetime, timezone from enum import Enum from typing import Any, Dict, List, Optional from dataclasses import dataclass, field logger = logging.getLogger(__name__) class MessageType(str, Enum): """消息类型""" QUERY = "query" # 查询消息(请求信息) INSTRUCTION = "instruction" # 指令消息(要求执行操作) INFORMATION = "information" # 信息消息(分享发现或状态) RESULT = "result" # 结果消息(任务完成报告) ERROR = "error" # 错误消息 class MessagePriority(str, Enum): """消息优先级""" LOW = "low" NORMAL = "normal" HIGH = "high" URGENT = "urgent" @dataclass class AgentMessage: """ Agent 消息 用于Agent间通信的消息结构 """ id: str = field(default_factory=lambda: f"msg_{uuid.uuid4().hex[:8]}") from_agent: str = "" to_agent: str = "" content: str = "" message_type: MessageType = MessageType.INFORMATION priority: MessagePriority = MessagePriority.NORMAL timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) # 状态 delivered: bool = False read: bool = False # 附加数据 metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "id": self.id, "from": self.from_agent, "to": self.to_agent, "content": self.content, "message_type": self.message_type.value if isinstance(self.message_type, MessageType) else self.message_type, "priority": self.priority.value if isinstance(self.priority, MessagePriority) else self.priority, "timestamp": self.timestamp, "delivered": self.delivered, "read": self.read, "metadata": self.metadata, } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "AgentMessage": """从字典创建""" return cls( id=data.get("id", f"msg_{uuid.uuid4().hex[:8]}"), from_agent=data.get("from", ""), to_agent=data.get("to", ""), content=data.get("content", ""), message_type=MessageType(data.get("message_type", "information")), priority=MessagePriority(data.get("priority", "normal")), timestamp=data.get("timestamp", datetime.now(timezone.utc).isoformat()), delivered=data.get("delivered", False), read=data.get("read", False), metadata=data.get("metadata", {}), ) def to_xml(self) -> str: """转换为XML格式(用于LLM理解)""" return f""" {self.from_agent} {self.message_type.value if isinstance(self.message_type, MessageType) else self.message_type} {self.priority.value if isinstance(self.priority, MessagePriority) else self.priority} {self.timestamp} {self.content} """ class MessageBus: """ 消息总线 管理Agent间的消息传递 """ def __init__(self): self._queues: Dict[str, List[AgentMessage]] = {} self._message_history: List[AgentMessage] = [] def create_queue(self, agent_id: str) -> None: """为Agent创建消息队列""" if agent_id not in self._queues: self._queues[agent_id] = [] logger.debug(f"Created message queue for agent: {agent_id}") def delete_queue(self, agent_id: str) -> None: """删除Agent的消息队列""" if agent_id in self._queues: del self._queues[agent_id] logger.debug(f"Deleted message queue for agent: {agent_id}") def send_message( self, from_agent: str, to_agent: str, content: str, message_type: MessageType = MessageType.INFORMATION, priority: MessagePriority = MessagePriority.NORMAL, metadata: Optional[Dict[str, Any]] = None, ) -> AgentMessage: """ 发送消息 Args: from_agent: 发送者Agent ID to_agent: 接收者Agent ID content: 消息内容 message_type: 消息类型 priority: 优先级 metadata: 附加数据 Returns: 发送的消息 """ message = AgentMessage( from_agent=from_agent, to_agent=to_agent, content=content, message_type=message_type, priority=priority, metadata=metadata or {}, ) # 确保目标队列存在 if to_agent not in self._queues: self.create_queue(to_agent) # 添加到队列 self._queues[to_agent].append(message) message.delivered = True # 记录历史 self._message_history.append(message) logger.debug(f"Message sent from {from_agent} to {to_agent}: {content[:50]}...") return message def get_messages( self, agent_id: str, unread_only: bool = True, mark_as_read: bool = True, ) -> List[AgentMessage]: """ 获取Agent的消息 Args: agent_id: Agent ID unread_only: 是否只获取未读消息 mark_as_read: 是否标记为已读 Returns: 消息列表 """ if agent_id not in self._queues: return [] messages = self._queues[agent_id] if unread_only: messages = [m for m in messages if not m.read] if mark_as_read: for m in messages: m.read = True return messages def has_unread_messages(self, agent_id: str) -> bool: """检查是否有未读消息""" if agent_id not in self._queues: return False return any(not m.read for m in self._queues[agent_id]) def get_unread_count(self, agent_id: str) -> int: """获取未读消息数量""" if agent_id not in self._queues: return 0 return sum(1 for m in self._queues[agent_id] if not m.read) def send_user_message( self, to_agent: str, content: str, priority: MessagePriority = MessagePriority.HIGH, ) -> AgentMessage: """发送用户消息到Agent""" return self.send_message( from_agent="user", to_agent=to_agent, content=content, message_type=MessageType.INSTRUCTION, priority=priority, ) def send_completion_report( self, from_agent: str, to_agent: str, summary: str, findings: List[Dict[str, Any]], success: bool = True, ) -> AgentMessage: """发送任务完成报告""" content = f""" {"SUCCESS" if success else "FAILED"} {summary} {len(findings)} """ return self.send_message( from_agent=from_agent, to_agent=to_agent, content=content, message_type=MessageType.RESULT, priority=MessagePriority.HIGH, metadata={ "summary": summary, "findings": findings, "success": success, }, ) def clear_queue(self, agent_id: str) -> None: """清空Agent的消息队列""" if agent_id in self._queues: self._queues[agent_id] = [] def clear_all(self) -> None: """清空所有消息""" self._queues.clear() self._message_history.clear() def get_message_history( self, agent_id: Optional[str] = None, limit: int = 100, ) -> List[AgentMessage]: """获取消息历史""" history = self._message_history if agent_id: history = [ m for m in history if m.from_agent == agent_id or m.to_agent == agent_id ] return history[-limit:] # 全局消息总线实例 message_bus = MessageBus()