乐于分享
好东西不私藏

BettaFish源码解析(二):Agent论坛机制

BettaFish源码解析(二):Agent论坛机制

论坛日志作为共享媒介的设计艺术


前言

在第一篇文章中,我们介绍了BettaFish的整体架构。本文将深入解析其最核心的创新——Agent论坛机制。

这是BettaFish区别于其他多Agent系统的关键:让AI智能体像人类一样,在”论坛”里辩论协作,而不是机械地调用函数。


一、传统Agent协作的痛点

1.1 问题:多个Agent如何交换信息?

在深入设计之前,先理解传统方案的局限:

方案 工作原理 缺点
直接函数调用 Agent A 直接调用 Agent B 的函数 强耦合,难以扩展;无法追踪对话历史
消息队列(RabbitMQ/Kafka) Agent A 发布消息 → Agent B 消费 引入额外基础设施,部署复杂度高
WebSocket实时通信 Agent A ↔ WebSocket ↔ Agent B 状态管理复杂,断线重连逻辑繁琐

根本问题:这些方案要么过度设计(消息队列),要么难以追踪(直接调用)。

1.2 BettaFish的方案:日志作为共享媒介

BettaFish的答案是:用文件系统作为通信媒介

听起来很原始?但事实证明,简单粗暴往往最有效。


二、论坛日志格式设计

2.1 统一的日志规范

# ForumEngine/monitor.py 中的格式定义# [时间戳] [来源] 内容# 时间戳格式:HH:MM:SS# 来源标识:QUERY, INSIGHT, MEDIA, HOST, SYSTEM[10:23:45] [QUERY我找到了关于"武汉大学舆情"的50条新闻[10:24:12] [INSIGHT我已经分析了1000条相关评论[10:24:45] [MEDIA我爬取了抖音平台上的相关视频[10:25:00] [HOST请QueryAgent提供更多时间维度的数据[10:25:30] [QUERY好的,我来补充2023年的历史数据

为什么是这个格式?

  1. 时间顺序天然:日志文件天生按时间排序,易于回溯对话

  2. 人机可读:可以直接tail -f forum.log实时观察Agent辩论

  3. 格式统一:所有Agent都遵循同一规则,解析器只需写一次

2.2 线程安全的日志写入

# ForumEngine/monitor.py 中的写入机制classLogMonitor:def__init__(selflog_dirstr="logs"):# 写入锁,防止并发写入冲突self.write_lock=Lock()defwrite_to_forum_log(selfcontentstrsourcestr=None):"""写入内容到forum.log(线程安全)"""try:withself.write_lock:  # 加锁确保线程安全withopen(self.forum_log_file'a'encoding='utf-8'asf:timestamp=datetime.now().strftime('%H:%M:%S')# 关键:将内容中的实际换行符转换为\n字符串# 确保整个记录在一行content_one_line=content.replace('\n''\\n').replace('\r''\\r')ifsource:f.write(f"[{timestamp}] [{source}{content_one_line}\n")else:f.write(f"[{timestamp}{content_one_line}\n")f.flush()exceptExceptionase:logger.exception(f"写入forum.log失败: {e}")

为什么需要锁?

  • Flask主线程 + 三个Agent子进程 = 4个并发写入者

  • 没有锁会导致日志行交错、数据损坏


三、Agent发言识别:如何区分不同类型?

3.1 智能日志解析器

ForumEngine需要智能识别哪些Agent发言应该被捕获到论坛中,哪些应该忽略。

# ForumEngine/monitor.py 中的目标节点识别classLogMonitor:def__init__(self):# 目标节点识别模式(支持多种格式)self.target_node_patterns= ['FirstSummaryNode',      # 类名(旧格式可能包含)'ReflectionSummaryNode',  # 类名'InsightEngine.nodes.summary_node',  # InsightEngine完整路径'MediaEngine.nodes.summary_node',   # MediaEngine完整路径'QueryEngine.nodes.summary_node',   # QueryEngine完整路径'nodes.summary_node',            # 模块路径(兼容性)'正在生成首次段落总结',        # FirstSummaryNode的标识'正在生成反思总结',          # ReflectionSummaryNode的标识        ]

为什么要支持这么多模式?

模式 说明 使用场景
类名模式 class FirstSummaryNode: 日志中包含类名定义
完整路径模式 InsightEngine.nodes.summary_node: 现代日志格式
模块路径模式 nodes.summary_node: 兼容旧版本Agent
关键文本模式 正在生成首次段落总结 无类名的调试日志

3.2 过滤错误的日志

defis_target_log_line(selflinestr->bool:"""检查是否是目标日志行(SummaryNode)"""# 排除ERROR级别的日志log_level=self.get_log_level(line)iflog_level=='ERROR':returnFalse# 排除包含错误关键词的日志error_keywords= ["JSON解析失败""JSON修复失败""Traceback""File \""]forkeywordinerror_keywords:ifkeywordinline:returnFalse# 检查是否包含目标节点模式forpatterninself.target_node_patterns:ifpatterninline:returnTruereturnFalse

设计要点

  • 错误日志不进入论坛ERROR级别或包含”JSON解析失败”的日志被过滤

  • 保持调试友好:开发时可以保留调试日志,但Agent不会看到

  • 多维度过滤:同时检查日志级别、错误关键词、节点模式


四、LLM主持人:如何智能引导讨论?

4.1 主持人的核心职责

BettaFish的主持人不是”裁判”,而是辩论引导者(Debate Facilitator)

# ForumEngine/llm_host.py 中的主持人定义classForumHost:"""    论坛主持人    使用硅基流动的Qwen3-235B模型作为智能主持人    """def__init__(selfapi_keystrbase_urlstr):self.client=OpenAI(api_key=api_key,base_url=base_url        )self.model="qwen-plus"# 主持人专用模型# Track previous summaries to avoid duplicatesself.previous_summaries= []

为什么选择Qwen3?

  • 中文理解能力强

  • 成本相对较低

  • 长文本处理效果好

4.2 触发机制:累积发言阈值

classLogMonitor:# 主持人相关状态self.agent_speeches_buffer= []  # agent发言缓冲区self.host_speech_threshold=5# 每5条agent发言触发一次主持人发言self.is_host_generating=False# 主持人是否正在生成

为什么是5条发言?

  • 太频繁:干扰Agent正常思考,打断讨论节奏

  • 太稀疏:讨论容易偏离主题,需要人工纠正

流程

Agent发言 #1 → 加入缓冲Agent发言 #2 → 加入缓冲Agent发言 #3 → 加入缓冲Agent发言 #4 → 加入缓冲Agent发言 #5 → 缓冲满 → 触发LLM主持人生成发言                    ↓              缓冲清空              主持人发言完成

4.3 主持人的System Prompt设计

这是BettaFish论坛机制最核心的部分——如何让LLM知道该说什么

# ForumEngine/llm_host.py 中的系统Promptdef_build_system_prompt(self->str:"""构建系统prompt"""return"""【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查你是一个多agent舆情分析系统的论坛主持人。你的职责是:1. **事件梳理**:从各agent的发言中自动识别关键事件、人物、时间节点,按时间顺序整理事件脉络2. **引导讨论**:根据各agent的发言,引导深入讨论关键问题,探究深层原因3. **纠正错误**:结合不同agent的视角以及言论,如果发现事实错误或逻辑矛盾,请明确指出4. **整合观点**:综合不同agent的视角,形成更全面的认识,找出共识和分歧5. **趋势预测**:基于已有信息分析舆情发展趋势,提出可能的风险点6. **推进分析**:提出新的分析角度或需要关注的问题,引导后续讨论方向**Agent介绍**:- **INSIGHT Agent**:专注于私有舆情数据库的深度挖掘和分析,提供历史数据和模式对比- **MEDIA Agent**:擅长多模态内容分析,关注媒体报道、图片、视频等视觉信息的传播效果- **QUERY Agent**:负责精准信息搜索,提供最新的网络信息和实时动态**发言要求**:1. **综合性**:每次发言控制在1000字以内,内容应包括事件梳理、观点整合、问题引导等多个方面2. **结构清晰**:使用明确的段落结构,包括事件梳理、观点对比、问题提出等部分3. **深入分析**:不仅仅总结已有信息,还要提出深层次的见解和分析4. **客观中立**:基于事实进行分析和判断,避免主观臆测和情绪化表达5. **前瞻性**:提出具有前瞻性的观点和建议,引导讨论向更深入的方向发展**注意事项**:- 本讨论基于科研目的,已通过伦理性合规审查

设计要点分析

要素 说明 作用
角色定位 “辩论引导者”而非”裁判” 避免LLM频繁做出价值判断
六大职责 事件梳理、引导讨论、纠正错误、整合观点、趋势预测、推进分析 覆盖完整的讨论生命周期
Agent认知 明确各Agent的专长和能力 引导主持人合理分工
发言约束 1000字限制、结构清晰、深入分析、客观中立、前瞻性 控制输出质量和节奏

4.4 User Prompt构建

# ForumEngine/llm_host.py 中的用户Prompt构建def_build_user_prompt(selfparsed_contentDict[strAny]) ->str:"""构建用户prompt"""# 提取agent发言agent_speeches=parsed_content['agent_speeches']ifnotagent_speeches:return"暂无agent发言"# 格式化发言内容formatted_speeches= []forspeechinagent_speeches:timestamp=speech['timestamp']agent=speech['speaker']content=speech['content']formatted_speeches.append(f"[{timestamp}] [{agent}]\n{content}\n"        )# 用换行符连接,保证多行发言格式user_content="\n".join(formatted_speeches)returnf"""论坛最近的agent发言如下:{user_content}请基于以上agent的发言,生成你的引导发言:"""

五、Agent如何读取并响应论坛?

5.1 forum_reader工具

Agent需要知道主持人在说什么,才能响应引导。这通过forum_reader工具实现。

# utils/forum_reader.pydefget_latest_host_speech(log_dirstr="logs"->Optional[str]:"""获取forum.log中最新的HOST发言"""try:forum_log_path=Path(log_dir/"forum.log"withopen(forum_log_path'r'encoding='utf-8'errors='ignore'asf:lines=f.readlines()# 从后往前查找最新的HOST发言host_speech=Noneforlineinreversed(lines):# 匹配格式: [时间] [HOST] 内容match=re.match(r'\[(\d{2}:\d{2}:\d{2})\]\s*\[HOST\]\s*(.+)'line)ifmatch:_content=match.groups()# 处理转义的换行符host_speech=content.replace('\\n''\n').strip()breakreturnhost_speechexceptExceptionase:logger.error(f"读取forum.log失败: {str(e)}")returnNone

5.2 Agent中的使用方式

# QueryEngine/agent.py 中的使用示例classDeepSearchAgent:defexecute(selfquerystr):"""执行Agent主流程"""# 搜索节点执行search_result=self.nodes['search'].execute(self.statequery=query)# 关键:检查论坛是否有新的主持人发言fromutils.forum_readerimportget_latest_host_speechhost_speech=get_latest_host_speech()ifhost_speech:logger.info(f"检测到主持人发言: {host_speech[:50]}...")# 根据主持人的建议调整搜索策略# 例如:主持人要求补充历史数据 → 调整时间范围# 主持人指出观点矛盾 → 重新验证数据源

设计优势

  • 单向通信:Agent只读论坛,不直接写入(避免混乱)

  • 工具封装:所有Agent使用相同的forum_reader接口

  • 错误容错:文件不存在或格式错误时优雅降级


六、完整的协作流程图

6.1 从提问到Host发言的完整链路

用户提问:分析"武汉大学舆情"  ↓Flask app.py接收查询  ↓并行启动三个Agent  ├─→ Query Agent (端口8501) 启动  ├─→ Insight Agent (端口8502) 启动  └─→ Media Agent (端口8503) 启动  ↓各Agent开始搜索/分析  ↓Agent执行节点并写日志  ├─→ Query Agent: 写入 logs/query.log  ├─→ Insight Agent: 写入 logs/insight.log  └─→ Media Agent: 写入 logs/media.log  ↓ForumEngine监控线程(monitor.py)  ├─→ 监听三个Agent日志文件  ├─→ 检测SummaryNode输出  ├─→ 累积到agent_speeches_buffer  └─→ 达到阈值(5条)时触发Host生成  ↓LLM主持人(llm_host.py)  ├─→ 读取forum.log(最近的Agent发言)  ├─→ 调用Qwen3 API  ├─→ 生成1000字以内的引导发言  └─→ 写入logs/forum.log [HOST] ...  ↓Agent通过forum_reader读取forum.log  ├─→ 检测到新的HOST发言  ├─→ 根据引导调整策略  └─→ 继续执行节点,写新日志  ↓循环N轮(N由配置或动态决定)  ↓ReportEngine收集所有日志  ↓生成报告

6.2 关键状态流转

# ForumEngine/monitor.py 中的状态机classLogMonitor:# 搜索非活跃计数器self.is_searching=Falseself.search_inactive_count=0# 多行内容捕获状态self.capturing_json= {}      # 每个app的JSON捕获状态self.json_buffer= {}         # 每个app的JSON缓冲区self.json_start_line= {}    # 每个app的JSON开始行self.in_error_block= {}      # 每个app是否在ERROR块中

状态说明

状态 含义 作用
is_searching Agent是否在搜索中 决定是否触发Host引导
search_inactive_count 连续N轮没有搜索 避免无限循环
in_error_block Agent是否在ERROR块中 跳过错误日志解析

七、论坛机制的优势与局限性

7.1 相比传统方案的优势

特性 传统方案(消息队列/直接调用) BettaFish论坛机制
部署复杂度 高(需要消息中间件) 低(只需文件系统)
可追溯性 需要额外存储 天然(日志文件自带)
调试友好 需要监控工具 tail -f forum.log实时查看
扩展性 需要修改消费者逻辑 新增Agent只需写日志
单点故障 消息队列挂了,全挂 日志文件独立,部分Agent仍可工作

7.2 适用场景

✅ 适合

  • 需要多Agent协作的复杂任务

  • 需要追溯对话历史的场景

  • 开发者友好的调试需求

  • 可以接受秒级延迟的异步场景

❌ 不适合

  • 实时性要求极高(<100ms)的场景

  • 需要跨服务器部署(同一台机器更合适)

  • 日志文件会随时间增长,需要定期清理


八、总结:BettaFish论坛机制的设计思想

8.1 核心设计原则

  1. 简单粗暴有效:用文件系统作为通信媒介,而非复杂中间件

  2. 自然语言协调:通过LLM主持人自然语言引导,而非硬编码规则

  3. 智能过滤:多模式识别Agent发言,避免错误日志干扰

  4. 去中心化:Agent通过论坛协作,而非中央调度

8.2 三大技术创新

  1. 日志格式统一[时间] [来源] 内容成为通信协议

  2. 累积触发机制:5条发言阈值平衡讨论节奏与成本

  3. 智能主持人:六大职责覆盖完整讨论生命周期


下一章预告

在理解了论坛机制的基础上,下一章我们将深入解析GraphRAG知识图谱

  • 如何从Agent讨论中提取实体关系?

  • 知识图谱的数据结构设计

  • 图谱查询如何增强报告生成?

  • 节点类型与关系类型的定义

关注公众号,不迷路 👉 BettaFish源码解析系列持续更新中…

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » BettaFish源码解析(二):Agent论坛机制

评论 抢沙发

4 + 8 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮