BettaFish源码解析(一):整体架构设计
如何设计一个多智能体协作系统?

前言
在AI领域,多智能体协作是一个热门但困难的课题。BettaFish(微舆)作为一个从零构建的舆情分析系统,给出了一套独特的解决方案:让5个AI智能体像人类一样,在”论坛”里辩论协作。
本文将深入解析BettaFish的整体架构设计,回答一个核心问题:
如何让多个AI智能体有效协作,而不是各自为政?
一、架构选型:Flask + Streamlit混合架构
1.1 为什么不是单一框架?
在设计之初,BettaFish面临一个现实问题:如何让专业Agent既有独立UI,又能统一管理?
方案对比:
| 方案 | 优点 | 缺点 | BettaFish选择 |
|---|---|---|---|
| 纯Flask统一UI | 集中管理,一致性好 | 开发复杂度高,每个Agent都要写Flask路由 | ❌ |
| 纯Streamlit多应用 | 开发简单,Pythonic | 端口管理混乱,无统一入口 | ❌ |
| Flask主控 + Streamlit子应用 | 集中管理 + 独立开发 | 进程管理复杂 | ✅ |
1.2 混合架构的核心代码
在app.py中,Flask作为主控制器,负责管理三个Streamlit子应用的进程:
# processes字典存储所有子进程信息processes= {'insight': {'process': None, 'port': 8501, 'status': 'stopped'},'media': {'process': None, 'port': 8502, 'status': 'stopped'},'query': {'process': None, 'port': 8503, 'status': 'stopped'},'forum': {'process': None, 'status': 'stopped'}}STREAMLIT_SCRIPTS= {'insight': 'SingleEngineApp/insight_engine_streamlit_app.py','media': 'SingleEngineApp/media_engine_streamlit_app.py','query': 'SingleEngineApp/query_engine_streamlit_app.py'}
设计要点:
-
端口预分配:每个Agent固定端口(8501-8503),避免冲突
-
进程生命周期管理:启动、停止、健康检查
-
统一日志收集:所有子进程输出通过
read_process_output实时收集
1.3 子进程启动机制
defstart_streamlit_app(app_name, script_path, port):"""启动Streamlit应用"""cmd= [sys.executable, '-m', 'streamlit', 'run',script_path,'--server.port', str(port),'--server.headless', 'true', # 无浏览器模式'--logger.level', 'info' ]# 关键:环境变量确保UTF-8编码env=os.environ.copy()env.update({'PYTHONIOENCODING': 'utf-8','PYTHONUNBUFFERED': '1', # 禁用缓冲,实时输出 })process=subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,bufsize=0, # 无缓冲cwd=os.getcwd(),env=env )
为什么这么做?
-
无缓冲输出:
PYTHONUNBUFFERED='1'确保日志实时显示,不丢失Agent思考过程 -
独立工作目录:
cwd=os.getcwd()让子进程共享项目根目录,避免路径问题 -
无浏览器模式:
--server.headless=true适合服务器部署,不启动多余浏览器
二、Agent协作模式:论坛机制
2.1 传统Agent协作的痛点
问题:多个Agent如何交换信息?
传统方案的问题:
❌ 方案A:直接函数调用 Agent A → 直接调用 Agent B 的函数 问题:强耦合,难扩展,无法追踪对话历史❌ 方案B:消息队列(RabbitMQ/Kafka) Agent A → 发布消息到队列 → Agent B消费 问题:引入额外基础设施,部署复杂度高❌ 方案C:WebSocket实时通信 Agent A ↔ WebSocket ↔ Agent B 问题:状态管理复杂,断线重连逻辑繁琐
2.2 BettaFish的方案:日志作为共享媒介
核心设计:论坛机制(ForumEngine)
# ForumEngine/monitor.py核心逻辑classLogMonitor:def__init__(self, log_dir: str="logs"):# 论坛日志文件self.forum_log_file=self.log_dir/"forum.log"# 要监控的Agent日志self.monitored_logs= {'insight': self.log_dir/'insight.log','media': self.log_dir/'media.log','query': self.log_dir/'query.log' }
设计思路:
-
简单粗暴有效:用文件系统作为通信媒介
-
通用格式:
[HH:MM:SS] [SOURCE] content -
异步监控:
inotify或轮询检测文件变化
2.3 论坛日志格式设计
# 论坛日志格式规范# [时间戳] [来源] 内容# 来源可以是:QUERY, INSIGHT, MEDIA, HOST, SYSTEM[10:23:45] [QUERY] 我找到了关于"武汉大学舆情"的50条新闻[10:24:12] [INSIGHT] 我已经分析了1000条相关评论[10:24:45] [MEDIA] 我爬取了抖音平台上的相关视频[10:25:00] [HOST] 请QueryAgent提供更多时间维度的数据[10:25:30] [QUERY] 好的,我来补充2023年的历史数据
为什么是文件而非数据库?
-
可追溯性:日志文件天然按时间排序,易于回溯对话
-
调试友好:可以直接
tail -f forum.log实时观察Agent辩论 -
无单点故障:数据库挂了,文件系统还能工作
三、Agent节点模式:如何组织Agent内部逻辑?
3.1 为什么是节点(Node)而不是函数?
在BettaFish中,每个Agent的内部处理逻辑采用节点模式:
# QueryEngine/nodes/base_node.pyclassBaseNode:"""基础节点类,所有节点继承此类"""def__init__(self, name: str, agent):self.name=nameself.agent=agentdefexecute(self, state: State, **kwargs) ->Dict:"""执行节点逻辑"""raiseNotImplementedErrordeflog(self, message: str):"""统一的日志记录"""self.agent.log(f"[{self.name}] {message}")
节点设计的好处:
| 特性 | 函数模式 | 节点模式 |
|---|---|---|
| 状态追踪 | ❌ 需要手动传参 | ✅ 节点有自己的生命周期钩子 |
| 日志隔离 | ❌ 难以区分来源 | ✅ 节点自动带上前缀 |
| 重试机制 | ❌ 每个函数都要写 | ✅ 基类可统一实现 |
| 组合能力 | ❌ 需要手动编排 | ✅ 节点可任意组合 |
3.2 Agent的节点流水线
以QueryEngine为例,展示节点如何串联:
# QueryEngine/agent.pyclassDeepSearchAgent:def__init__(self, config):# 初始化节点self._initialize_nodes()def_initialize_nodes(self):"""初始化所有节点"""self.nodes= {'search': FirstSearchNode(self),'format': FormattingNode(self),'summary': SummaryNode(self),'report_structure': ReportStructureNode(self) }defexecute(self, query: str):"""执行Agent主流程"""# 节点流水线result=self.nodes['search'].execute(self.state,query=query )result=self.nodes['format'].execute(self.state,search_result=result )result=self.nodes['summary'].execute(self.state,formatted_result=result )
为什么不是if-else链式调用?
# ❌ 传统写法defexecute_query(query):result1=search(query)result2=format(result1)result3=summarize(result2)# 难以扩展,每个步骤硬编码# ✅ 节点写法pipeline= [SearchNode(), FormatNode(), SummarizeNode()]fornodeinpipeline:result=node.execute(state, input=result)# 易于扩展,动态调整节点顺序
四、LLM主持人:如何引导Agent辩论?
4.1 主持人的职责
在BettaFish中,LLM主持人不是”裁判”,而是”辩论引导者”:
# ForumEngine/llm_host.pyclassForumHost:""" 论坛主持人类 使用硅基流动的Qwen3-235B模型作为智能主持人 """def__init__(self, api_key: str, base_url: str):self.client=OpenAI(api_key=api_key,base_url=base_url )self.model="qwen-plus"# 主持人专用模型defgenerate_host_speech(self, forum_logs: List[str]) ->Optional[str]:"""生成主持人发言"""# 如果累积了5条Agent发言,触发主持人发言iflen(self.agent_speeches_buffer) >=self.host_speech_threshold:# 调用LLM生成引导response=self.client.chat.completions.create(model=self.model,messages=[ {"role": "system", "content": "你是辩论主持人,引导三个Agent讨论..."}, {"role": "user", "content": "\n".join(forum_logs)} ] )returnresponse.choices[0].message.content
4.2 主持人提示词设计(简化版)
你是一个辩论主持人,正在主持三个专业Agent的讨论:[Query Engine]:负责从网络搜索新闻[Insight Engine]:负责分析私有数据库中的舆情数据[Media Engine]:负责分析视频和图片等多模态内容你的任务:1. 确保每个Agent都能发言,不被边缘化2. 当Agent观点冲突时,引导他们寻找共同点3. 当信息不全时,指派特定Agent补充4. 总结阶段性成果,推进讨论深度请基于以上Agent的发言,生成你的引导发言:
关键设计点:
-
不判断对错:主持人只引导方向,不裁决观点
-
平衡参与度:确保所有Agent都有发言机会
-
聚焦目标:始终提醒Agent当前的任务目标
五、数据流设计:从提问到报告的全链路
5.1 完整的数据流图
用户提问 ↓Flask app.py接收查询 ↓并行启动三个Agent ├─→ Query Agent (端口8501) ├─→ Insight Agent (端口8502) └─→ Media Agent (端口8503) ↓各Agent写入日志 ↓ForumEngine监控三个日志 + forum.log ↓LLM主持人生成引导(每5条发言) ↓Agent通过forum_reader读取forum.log ↓循环N轮(N可配置) ↓ReportEngine收集所有日志 ↓生成报告(HTML/PDF/MD)
5.2 日志监控线程
# app.py中的日志监听器defmonitor_forum_log():"""监听forum.log文件变化并推送到前端"""forum_log_file=LOG_DIR/"forum.log"last_position=0whileTrue:try:ifforum_log_file.exists():withopen(forum_log_file, 'r', encoding='utf-8') asf:f.seek(last_position)new_lines=f.readlines()forlineinnew_lines:# 解析日志行并发送forum消息parsed_message=parse_forum_log_line(line)ifparsed_message:socketio.emit('forum_message', parsed_message)# 同时发送控制台输出socketio.emit('console_output', {'app': 'forum','line': line })last_position=f.tell()time.sleep(1) # 每秒检查一次exceptExceptionase:logger.error(f"Forum日志监听错误: {e}")time.sleep(5)
设计要点:
-
实时推送:使用Socket.IO向前端推送实时日志
-
断点续传:记录文件位置,重启后不丢失历史
-
双流分发:同时推送到控制台和论坛两个视图
六、架构设计的取舍与权衡
6.1 简单性 vs 功能性
| 设计选择 | 简单性 | 功能性 | BettaFish选择 |
|---|---|---|---|
| 单一通信方式 | ✅ 高 | ❌ 低(限制扩展) | 文件+Socket.IO双通道 |
| 同步通信 | ✅ 高 | ❌ 低(阻塞Agent) | 异步文件+实时推送 |
| 中央编排 | ✅ 高 | ❌ 低(单点故障) | 去中心化的论坛机制 |
6.2 可扩展性设计
如何添加新的Agent?
# 1. 在processes中添加配置processes= {'insight': {'process': None, 'port': 8501},'media': {'process': None, 'port': 8502},'query': {'process': None, 'port': 8503},'new_agent': {'process': None, 'port': 8504}, # 新增}STREAMLIT_SCRIPTS= {'new_agent': 'SingleEngineApp/new_agent_streamlit_app.py'}# 2. 在ForumEngine中添加日志监控self.monitored_logs= {'insight': ..., 'media': ..., 'query': ...,'new_agent': self.log_dir/'new_agent.log'# 新增}# 3. 论坛日志中新增来源# [10:30:00] [NEW_AGENT] 我是新Agent,负责XXX功能
只需3步,无需修改核心逻辑。这就是去中心化架构的好处。
七、总结:BettaFish架构设计的核心思想
7.1 三大设计原则
-
简单有效:用文件系统作为通信媒介,而非复杂中间件
-
去中心化:Agent通过论坛协作,而非中央调度
-
可观测性:所有对话过程都记录在日志中,实时展示
7.2 适用场景与局限性
适用场景:
-
✅ 需要多Agent协作的复杂任务
-
✅ 需要追溯对话历史的场景
-
✅ 开发者友好的调试需求
局限性:
-
❌ 不适合实时性要求极高(<100ms)的场景
-
❌ 不适合跨服务器部署(同一台机器更合适)
-
❌ 日志文件会随时间增长,需要定期清理
下一章预告
在整体架构设计的基础上,下一章我们将深入解析Agent论坛机制的核心实现:
-
论坛日志解析器的正则表达式设计
-
如何识别Agent发言的不同类型(搜索、总结、反思)
-
LLM主持人如何判断何时发言、说什么
-
Agent如何通过
forum_reader工具读取论坛并响应
关注公众号,不迷路 👉 BettaFish源码解析系列持续更新中…
夜雨聆风
