BettaFish源码解析(二):Agent论坛机制
论坛日志作为共享媒介的设计艺术
前言

在第一篇文章中,我们介绍了BettaFish的整体架构。本文将深入解析其最核心的创新——Agent论坛机制。
这是BettaFish区别于其他多Agent系统的关键:让AI智能体像人类一样,在”论坛”里辩论协作,而不是机械地调用函数。
一、传统Agent协作的痛点
1.1 问题:多个Agent如何交换信息?
在深入设计之前,先理解传统方案的局限:
| 方案 | 工作原理 | 缺点 |
|---|---|---|
| 直接函数调用 | Agent A 直接调用 Agent B 的函数 | 强耦合,难以扩展;无法追踪对话历史 |
| 消息队列(RabbitMQ/Kafka) | Agent A 发布消息 → Agent B 消费 | 引入额外基础设施,部署复杂度高 |
| WebSocket实时通信 | Agent A ↔ WebSocket ↔ Agent B | 状态管理复杂,断线重连逻辑繁琐 |
根本问题:这些方案要么过度设计(消息队列),要么难以追踪(直接调用)。
1.2 BettaFish的方案:日志作为共享媒介
BettaFish的答案是:用文件系统作为通信媒介。
听起来很原始?但事实证明,简单粗暴往往最有效。
二、论坛日志格式设计
2.1 统一的日志规范
# ForumEngine/monitor.py 中的格式定义# [时间戳] [来源] 内容# 时间戳格式:HH:MM:SS# 来源标识:QUERY, INSIGHT, MEDIA, HOST, SYSTEM[10:23:45] [QUERY] 我找到了关于"武汉大学舆情"的50条新闻[10:24:12] [INSIGHT] 我已经分析了1000条相关评论[10:24:45] [MEDIA] 我爬取了抖音平台上的相关视频[10:25:00] [HOST] 请QueryAgent提供更多时间维度的数据[10:25:30] [QUERY] 好的,我来补充2023年的历史数据
为什么是这个格式?
-
时间顺序天然:日志文件天生按时间排序,易于回溯对话
-
人机可读:可以直接
tail -f forum.log实时观察Agent辩论 -
格式统一:所有Agent都遵循同一规则,解析器只需写一次
2.2 线程安全的日志写入
# ForumEngine/monitor.py 中的写入机制classLogMonitor:def__init__(self, log_dir: str="logs"):# 写入锁,防止并发写入冲突self.write_lock=Lock()defwrite_to_forum_log(self, content: str, source: str=None):"""写入内容到forum.log(线程安全)"""try:withself.write_lock: # 加锁确保线程安全withopen(self.forum_log_file, 'a', encoding='utf-8') asf:timestamp=datetime.now().strftime('%H:%M:%S')# 关键:将内容中的实际换行符转换为\n字符串# 确保整个记录在一行content_one_line=content.replace('\n', '\\n').replace('\r', '\\r')ifsource:f.write(f"[{timestamp}] [{source}] {content_one_line}\n")else:f.write(f"[{timestamp}] {content_one_line}\n")f.flush()exceptExceptionase:logger.exception(f"写入forum.log失败: {e}")
为什么需要锁?
-
Flask主线程 + 三个Agent子进程 = 4个并发写入者
-
没有锁会导致日志行交错、数据损坏
三、Agent发言识别:如何区分不同类型?
3.1 智能日志解析器
ForumEngine需要智能识别哪些Agent发言应该被捕获到论坛中,哪些应该忽略。
# ForumEngine/monitor.py 中的目标节点识别classLogMonitor:def__init__(self):# 目标节点识别模式(支持多种格式)self.target_node_patterns= ['FirstSummaryNode', # 类名(旧格式可能包含)'ReflectionSummaryNode', # 类名'InsightEngine.nodes.summary_node', # InsightEngine完整路径'MediaEngine.nodes.summary_node', # MediaEngine完整路径'QueryEngine.nodes.summary_node', # QueryEngine完整路径'nodes.summary_node', # 模块路径(兼容性)'正在生成首次段落总结', # FirstSummaryNode的标识'正在生成反思总结', # ReflectionSummaryNode的标识 ]
为什么要支持这么多模式?
| 模式 | 说明 | 使用场景 |
|---|---|---|
| 类名模式 | class FirstSummaryNode: |
日志中包含类名定义 |
| 完整路径模式 | InsightEngine.nodes.summary_node: |
现代日志格式 |
| 模块路径模式 | nodes.summary_node: |
兼容旧版本Agent |
| 关键文本模式 | 正在生成首次段落总结 |
无类名的调试日志 |
3.2 过滤错误的日志
defis_target_log_line(self, line: str) ->bool:"""检查是否是目标日志行(SummaryNode)"""# 排除ERROR级别的日志log_level=self.get_log_level(line)iflog_level=='ERROR':returnFalse# 排除包含错误关键词的日志error_keywords= ["JSON解析失败", "JSON修复失败", "Traceback", "File \""]forkeywordinerror_keywords:ifkeywordinline:returnFalse# 检查是否包含目标节点模式forpatterninself.target_node_patterns:ifpatterninline:returnTruereturnFalse
设计要点:
-
错误日志不进入论坛:
ERROR级别或包含”JSON解析失败”的日志被过滤 -
保持调试友好:开发时可以保留调试日志,但Agent不会看到
-
多维度过滤:同时检查日志级别、错误关键词、节点模式
四、LLM主持人:如何智能引导讨论?
4.1 主持人的核心职责
BettaFish的主持人不是”裁判”,而是辩论引导者(Debate Facilitator)。
# ForumEngine/llm_host.py 中的主持人定义classForumHost:""" 论坛主持人 使用硅基流动的Qwen3-235B模型作为智能主持人 """def__init__(self, api_key: str, base_url: str):self.client=OpenAI(api_key=api_key,base_url=base_url )self.model="qwen-plus"# 主持人专用模型# Track previous summaries to avoid duplicatesself.previous_summaries= []
为什么选择Qwen3?
-
中文理解能力强
-
成本相对较低
-
长文本处理效果好
4.2 触发机制:累积发言阈值
classLogMonitor:# 主持人相关状态self.agent_speeches_buffer= [] # agent发言缓冲区self.host_speech_threshold=5# 每5条agent发言触发一次主持人发言self.is_host_generating=False# 主持人是否正在生成
为什么是5条发言?
-
太频繁:干扰Agent正常思考,打断讨论节奏
-
太稀疏:讨论容易偏离主题,需要人工纠正
流程:
Agent发言 #1 → 加入缓冲Agent发言 #2 → 加入缓冲Agent发言 #3 → 加入缓冲Agent发言 #4 → 加入缓冲Agent发言 #5 → 缓冲满 → 触发LLM主持人生成发言 ↓ 缓冲清空 主持人发言完成
4.3 主持人的System Prompt设计
这是BettaFish论坛机制最核心的部分——如何让LLM知道该说什么。
# ForumEngine/llm_host.py 中的系统Promptdef_build_system_prompt(self) ->str:"""构建系统prompt"""return"""【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查你是一个多agent舆情分析系统的论坛主持人。你的职责是:1. **事件梳理**:从各agent的发言中自动识别关键事件、人物、时间节点,按时间顺序整理事件脉络2. **引导讨论**:根据各agent的发言,引导深入讨论关键问题,探究深层原因3. **纠正错误**:结合不同agent的视角以及言论,如果发现事实错误或逻辑矛盾,请明确指出4. **整合观点**:综合不同agent的视角,形成更全面的认识,找出共识和分歧5. **趋势预测**:基于已有信息分析舆情发展趋势,提出可能的风险点6. **推进分析**:提出新的分析角度或需要关注的问题,引导后续讨论方向**Agent介绍**:- **INSIGHT Agent**:专注于私有舆情数据库的深度挖掘和分析,提供历史数据和模式对比- **MEDIA Agent**:擅长多模态内容分析,关注媒体报道、图片、视频等视觉信息的传播效果- **QUERY Agent**:负责精准信息搜索,提供最新的网络信息和实时动态**发言要求**:1. **综合性**:每次发言控制在1000字以内,内容应包括事件梳理、观点整合、问题引导等多个方面2. **结构清晰**:使用明确的段落结构,包括事件梳理、观点对比、问题提出等部分3. **深入分析**:不仅仅总结已有信息,还要提出深层次的见解和分析4. **客观中立**:基于事实进行分析和判断,避免主观臆测和情绪化表达5. **前瞻性**:提出具有前瞻性的观点和建议,引导讨论向更深入的方向发展**注意事项**:- 本讨论基于科研目的,已通过伦理性合规审查
设计要点分析:
| 要素 | 说明 | 作用 |
|---|---|---|
| 角色定位 | “辩论引导者”而非”裁判” | 避免LLM频繁做出价值判断 |
| 六大职责 | 事件梳理、引导讨论、纠正错误、整合观点、趋势预测、推进分析 | 覆盖完整的讨论生命周期 |
| Agent认知 | 明确各Agent的专长和能力 | 引导主持人合理分工 |
| 发言约束 | 1000字限制、结构清晰、深入分析、客观中立、前瞻性 | 控制输出质量和节奏 |
4.4 User Prompt构建
# ForumEngine/llm_host.py 中的用户Prompt构建def_build_user_prompt(self, parsed_content: Dict[str, Any]) ->str:"""构建用户prompt"""# 提取agent发言agent_speeches=parsed_content['agent_speeches']ifnotagent_speeches:return"暂无agent发言"# 格式化发言内容formatted_speeches= []forspeechinagent_speeches:timestamp=speech['timestamp']agent=speech['speaker']content=speech['content']formatted_speeches.append(f"[{timestamp}] [{agent}]\n{content}\n" )# 用换行符连接,保证多行发言格式user_content="\n".join(formatted_speeches)returnf"""论坛最近的agent发言如下:{user_content}请基于以上agent的发言,生成你的引导发言:"""
五、Agent如何读取并响应论坛?
5.1 forum_reader工具
Agent需要知道主持人在说什么,才能响应引导。这通过forum_reader工具实现。
# utils/forum_reader.pydefget_latest_host_speech(log_dir: str="logs") ->Optional[str]:"""获取forum.log中最新的HOST发言"""try:forum_log_path=Path(log_dir) /"forum.log"withopen(forum_log_path, 'r', encoding='utf-8', errors='ignore') asf:lines=f.readlines()# 从后往前查找最新的HOST发言host_speech=Noneforlineinreversed(lines):# 匹配格式: [时间] [HOST] 内容match=re.match(r'\[(\d{2}:\d{2}:\d{2})\]\s*\[HOST\]\s*(.+)', line)ifmatch:_, content=match.groups()# 处理转义的换行符host_speech=content.replace('\\n', '\n').strip()breakreturnhost_speechexceptExceptionase:logger.error(f"读取forum.log失败: {str(e)}")returnNone
5.2 Agent中的使用方式
# QueryEngine/agent.py 中的使用示例classDeepSearchAgent:defexecute(self, query: str):"""执行Agent主流程"""# 搜索节点执行search_result=self.nodes['search'].execute(self.state, query=query)# 关键:检查论坛是否有新的主持人发言fromutils.forum_readerimportget_latest_host_speechhost_speech=get_latest_host_speech()ifhost_speech:logger.info(f"检测到主持人发言: {host_speech[:50]}...")# 根据主持人的建议调整搜索策略# 例如:主持人要求补充历史数据 → 调整时间范围# 主持人指出观点矛盾 → 重新验证数据源
设计优势:
-
单向通信:Agent只读论坛,不直接写入(避免混乱)
-
工具封装:所有Agent使用相同的
forum_reader接口 -
错误容错:文件不存在或格式错误时优雅降级
六、完整的协作流程图
6.1 从提问到Host发言的完整链路
用户提问:分析"武汉大学舆情" ↓Flask app.py接收查询 ↓并行启动三个Agent ├─→ Query Agent (端口8501) 启动 ├─→ Insight Agent (端口8502) 启动 └─→ Media Agent (端口8503) 启动 ↓各Agent开始搜索/分析 ↓Agent执行节点并写日志 ├─→ Query Agent: 写入 logs/query.log ├─→ Insight Agent: 写入 logs/insight.log └─→ Media Agent: 写入 logs/media.log ↓ForumEngine监控线程(monitor.py) ├─→ 监听三个Agent日志文件 ├─→ 检测SummaryNode输出 ├─→ 累积到agent_speeches_buffer └─→ 达到阈值(5条)时触发Host生成 ↓LLM主持人(llm_host.py) ├─→ 读取forum.log(最近的Agent发言) ├─→ 调用Qwen3 API ├─→ 生成1000字以内的引导发言 └─→ 写入logs/forum.log [HOST] ... ↓Agent通过forum_reader读取forum.log ├─→ 检测到新的HOST发言 ├─→ 根据引导调整策略 └─→ 继续执行节点,写新日志 ↓循环N轮(N由配置或动态决定) ↓ReportEngine收集所有日志 ↓生成报告
6.2 关键状态流转
# ForumEngine/monitor.py 中的状态机classLogMonitor:# 搜索非活跃计数器self.is_searching=Falseself.search_inactive_count=0# 多行内容捕获状态self.capturing_json= {} # 每个app的JSON捕获状态self.json_buffer= {} # 每个app的JSON缓冲区self.json_start_line= {} # 每个app的JSON开始行self.in_error_block= {} # 每个app是否在ERROR块中
状态说明:
| 状态 | 含义 | 作用 |
|---|---|---|
is_searching |
Agent是否在搜索中 | 决定是否触发Host引导 |
search_inactive_count |
连续N轮没有搜索 | 避免无限循环 |
in_error_block |
Agent是否在ERROR块中 | 跳过错误日志解析 |
七、论坛机制的优势与局限性
7.1 相比传统方案的优势
| 特性 | 传统方案(消息队列/直接调用) | BettaFish论坛机制 |
|---|---|---|
| 部署复杂度 | 高(需要消息中间件) | 低(只需文件系统) |
| 可追溯性 | 需要额外存储 | 天然(日志文件自带) |
| 调试友好 | 需要监控工具 | tail -f forum.log实时查看 |
| 扩展性 | 需要修改消费者逻辑 | 新增Agent只需写日志 |
| 单点故障 | 消息队列挂了,全挂 | 日志文件独立,部分Agent仍可工作 |
7.2 适用场景
✅ 适合:
-
需要多Agent协作的复杂任务
-
需要追溯对话历史的场景
-
开发者友好的调试需求
-
可以接受秒级延迟的异步场景
❌ 不适合:
-
实时性要求极高(<100ms)的场景
-
需要跨服务器部署(同一台机器更合适)
-
日志文件会随时间增长,需要定期清理
八、总结:BettaFish论坛机制的设计思想
8.1 核心设计原则
-
简单粗暴有效:用文件系统作为通信媒介,而非复杂中间件
-
自然语言协调:通过LLM主持人自然语言引导,而非硬编码规则
-
智能过滤:多模式识别Agent发言,避免错误日志干扰
-
去中心化:Agent通过论坛协作,而非中央调度
8.2 三大技术创新
-
日志格式统一:
[时间] [来源] 内容成为通信协议 -
累积触发机制:5条发言阈值平衡讨论节奏与成本
-
智能主持人:六大职责覆盖完整讨论生命周期
下一章预告
在理解了论坛机制的基础上,下一章我们将深入解析GraphRAG知识图谱:
-
如何从Agent讨论中提取实体关系?
-
知识图谱的数据结构设计
-
图谱查询如何增强报告生成?
-
节点类型与关系类型的定义
关注公众号,不迷路 👉 BettaFish源码解析系列持续更新中…
夜雨聆风
