乐于分享
好东西不私藏

BettaFish源码解析(一):整体架构设计

BettaFish源码解析(一):整体架构设计

如何设计一个多智能体协作系统?


前言

在AI领域,多智能体协作是一个热门但困难的课题。BettaFish(微舆)作为一个从零构建的舆情分析系统,给出了一套独特的解决方案:让5个AI智能体像人类一样,在”论坛”里辩论协作。

本文将深入解析BettaFish的整体架构设计,回答一个核心问题:

如何让多个AI智能体有效协作,而不是各自为政?


一、架构选型:Flask + Streamlit混合架构

1.1 为什么不是单一框架?

在设计之初,BettaFish面临一个现实问题:如何让专业Agent既有独立UI,又能统一管理?

方案对比

方案 优点 缺点 BettaFish选择
纯Flask统一UI 集中管理,一致性好 开发复杂度高,每个Agent都要写Flask路由
纯Streamlit多应用 开发简单,Pythonic 端口管理混乱,无统一入口
Flask主控 + Streamlit子应用 集中管理 + 独立开发 进程管理复杂

1.2 混合架构的核心代码

app.py中,Flask作为主控制器,负责管理三个Streamlit子应用的进程:

# processes字典存储所有子进程信息processes= {'insight': {'process'None'port'8501'status''stopped'},'media': {'process'None'port'8502'status''stopped'},'query': {'process'None'port'8503'status''stopped'},'forum': {'process'None'status''stopped'}}STREAMLIT_SCRIPTS= {'insight''SingleEngineApp/insight_engine_streamlit_app.py','media''SingleEngineApp/media_engine_streamlit_app.py','query''SingleEngineApp/query_engine_streamlit_app.py'}

设计要点

  1. 端口预分配:每个Agent固定端口(8501-8503),避免冲突

  2. 进程生命周期管理:启动、停止、健康检查

  3. 统一日志收集:所有子进程输出通过read_process_output实时收集

1.3 子进程启动机制

defstart_streamlit_app(app_namescript_pathport):"""启动Streamlit应用"""cmd= [sys.executable'-m''streamlit''run',script_path,'--server.port'str(port),'--server.headless''true',  # 无浏览器模式'--logger.level''info'    ]# 关键:环境变量确保UTF-8编码env=os.environ.copy()env.update({'PYTHONIOENCODING''utf-8','PYTHONUNBUFFERED''1',  # 禁用缓冲,实时输出    })process=subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,bufsize=0,  # 无缓冲cwd=os.getcwd(),env=env    )

为什么这么做?

  • 无缓冲输出PYTHONUNBUFFERED='1'确保日志实时显示,不丢失Agent思考过程

  • 独立工作目录cwd=os.getcwd()让子进程共享项目根目录,避免路径问题

  • 无浏览器模式--server.headless=true适合服务器部署,不启动多余浏览器


二、Agent协作模式:论坛机制

2.1 传统Agent协作的痛点

问题:多个Agent如何交换信息?

传统方案的问题

❌ 方案A:直接函数调用  Agent A → 直接调用 Agent B 的函数  问题:强耦合,难扩展,无法追踪对话历史❌ 方案B:消息队列(RabbitMQ/Kafka)  Agent A → 发布消息到队列 → Agent B消费  问题:引入额外基础设施,部署复杂度高❌ 方案C:WebSocket实时通信  Agent A ↔ WebSocket ↔ Agent B  问题:状态管理复杂,断线重连逻辑繁琐

2.2 BettaFish的方案:日志作为共享媒介

核心设计:论坛机制(ForumEngine)

# ForumEngine/monitor.py核心逻辑classLogMonitor:def__init__(selflog_dirstr="logs"):# 论坛日志文件self.forum_log_file=self.log_dir/"forum.log"# 要监控的Agent日志self.monitored_logs= {'insight'self.log_dir/'insight.log','media'self.log_dir/'media.log','query'self.log_dir/'query.log'        }

设计思路

  1. 简单粗暴有效:用文件系统作为通信媒介

  2. 通用格式[HH:MM:SS] [SOURCE] content

  3. 异步监控inotify或轮询检测文件变化

2.3 论坛日志格式设计

# 论坛日志格式规范# [时间戳] [来源] 内容# 来源可以是:QUERY, INSIGHT, MEDIA, HOST, SYSTEM[10:23:45] [QUERY我找到了关于"武汉大学舆情"的50条新闻[10:24:12] [INSIGHT我已经分析了1000条相关评论[10:24:45] [MEDIA我爬取了抖音平台上的相关视频[10:25:00] [HOST请QueryAgent提供更多时间维度的数据[10:25:30] [QUERY好的,我来补充2023年的历史数据

为什么是文件而非数据库?

  • 可追溯性:日志文件天然按时间排序,易于回溯对话

  • 调试友好:可以直接tail -f forum.log实时观察Agent辩论

  • 无单点故障:数据库挂了,文件系统还能工作


三、Agent节点模式:如何组织Agent内部逻辑?

3.1 为什么是节点(Node)而不是函数?

在BettaFish中,每个Agent的内部处理逻辑采用节点模式

# QueryEngine/nodes/base_node.pyclassBaseNode:"""基础节点类,所有节点继承此类"""def__init__(selfnamestragent):self.name=nameself.agent=agentdefexecute(selfstateState**kwargs->Dict:"""执行节点逻辑"""raiseNotImplementedErrordeflog(selfmessagestr):"""统一的日志记录"""self.agent.log(f"[{self.name}{message}")

节点设计的好处

特性 函数模式 节点模式
状态追踪 ❌ 需要手动传参 ✅ 节点有自己的生命周期钩子
日志隔离 ❌ 难以区分来源 ✅ 节点自动带上前缀
重试机制 ❌ 每个函数都要写 ✅ 基类可统一实现
组合能力 ❌ 需要手动编排 ✅ 节点可任意组合

3.2 Agent的节点流水线

以QueryEngine为例,展示节点如何串联:

# QueryEngine/agent.pyclassDeepSearchAgent:def__init__(selfconfig):# 初始化节点self._initialize_nodes()def_initialize_nodes(self):"""初始化所有节点"""self.nodes= {'search'FirstSearchNode(self),'format'FormattingNode(self),'summary'SummaryNode(self),'report_structure'ReportStructureNode(self)        }defexecute(selfquerystr):"""执行Agent主流程"""# 节点流水线result=self.nodes['search'].execute(self.state,query=query        )result=self.nodes['format'].execute(self.state,search_result=result        )result=self.nodes['summary'].execute(self.state,formatted_result=result        )

为什么不是if-else链式调用?

# ❌ 传统写法defexecute_query(query):result1=search(query)result2=format(result1)result3=summarize(result2)# 难以扩展,每个步骤硬编码# ✅ 节点写法pipeline= [SearchNode(), FormatNode(), SummarizeNode()]fornodeinpipeline:result=node.execute(stateinput=result)# 易于扩展,动态调整节点顺序

四、LLM主持人:如何引导Agent辩论?

4.1 主持人的职责

在BettaFish中,LLM主持人不是”裁判”,而是”辩论引导者”:

# ForumEngine/llm_host.pyclassForumHost:"""    论坛主持人类    使用硅基流动的Qwen3-235B模型作为智能主持人    """def__init__(selfapi_keystrbase_urlstr):self.client=OpenAI(api_key=api_key,base_url=base_url        )self.model="qwen-plus"# 主持人专用模型defgenerate_host_speech(selfforum_logsList[str]) ->Optional[str]:"""生成主持人发言"""# 如果累积了5条Agent发言,触发主持人发言iflen(self.agent_speeches_buffer>=self.host_speech_threshold:# 调用LLM生成引导response=self.client.chat.completions.create(model=self.model,messages=[                    {"role""system""content""你是辩论主持人,引导三个Agent讨论..."},                    {"role""user""content""\n".join(forum_logs)}                ]            )returnresponse.choices[0].message.content

4.2 主持人提示词设计(简化版)

你是一个辩论主持人,正在主持三个专业Agent的讨论:[Query Engine]:负责从网络搜索新闻[Insight Engine]:负责分析私有数据库中的舆情数据[Media Engine]:负责分析视频和图片等多模态内容你的任务:1. 确保每个Agent都能发言,不被边缘化2. 当Agent观点冲突时,引导他们寻找共同点3. 当信息不全时,指派特定Agent补充4. 总结阶段性成果,推进讨论深度请基于以上Agent的发言,生成你的引导发言:

关键设计点

  • 不判断对错:主持人只引导方向,不裁决观点

  • 平衡参与度:确保所有Agent都有发言机会

  • 聚焦目标:始终提醒Agent当前的任务目标


五、数据流设计:从提问到报告的全链路

5.1 完整的数据流图

用户提问  ↓Flask app.py接收查询  ↓并行启动三个Agent  ├─→ Query Agent (端口8501)  ├─→ Insight Agent (端口8502)  └─→ Media Agent (端口8503)  ↓各Agent写入日志  ↓ForumEngine监控三个日志 + forum.log  ↓LLM主持人生成引导(每5条发言)  ↓Agent通过forum_reader读取forum.log  ↓循环N轮(N可配置)  ↓ReportEngine收集所有日志  ↓生成报告(HTML/PDF/MD)

5.2 日志监控线程

# app.py中的日志监听器defmonitor_forum_log():"""监听forum.log文件变化并推送到前端"""forum_log_file=LOG_DIR/"forum.log"last_position=0whileTrue:try:ifforum_log_file.exists():withopen(forum_log_file'r'encoding='utf-8'asf:f.seek(last_position)new_lines=f.readlines()forlineinnew_lines:# 解析日志行并发送forum消息parsed_message=parse_forum_log_line(line)ifparsed_message:socketio.emit('forum_message'parsed_message)# 同时发送控制台输出socketio.emit('console_output', {'app''forum','line'line                        })last_position=f.tell()time.sleep(1)  # 每秒检查一次exceptExceptionase:logger.error(f"Forum日志监听错误: {e}")time.sleep(5)

设计要点

  1. 实时推送:使用Socket.IO向前端推送实时日志

  2. 断点续传:记录文件位置,重启后不丢失历史

  3. 双流分发:同时推送到控制台和论坛两个视图


六、架构设计的取舍与权衡

6.1 简单性 vs 功能性

设计选择 简单性 功能性 BettaFish选择
单一通信方式 ✅ 高 ❌ 低(限制扩展) 文件+Socket.IO双通道
同步通信 ✅ 高 ❌ 低(阻塞Agent) 异步文件+实时推送
中央编排 ✅ 高 ❌ 低(单点故障) 去中心化的论坛机制

6.2 可扩展性设计

如何添加新的Agent?

# 1. 在processes中添加配置processes= {'insight': {'process'None'port'8501},'media': {'process'None'port'8502},'query': {'process'None'port'8503},'new_agent': {'process'None'port'8504},  # 新增}STREAMLIT_SCRIPTS= {'new_agent''SingleEngineApp/new_agent_streamlit_app.py'}# 2. 在ForumEngine中添加日志监控self.monitored_logs= {'insight'...'media'...'query'...,'new_agent'self.log_dir/'new_agent.log'# 新增}# 3. 论坛日志中新增来源# [10:30:00] [NEW_AGENT] 我是新Agent,负责XXX功能

只需3步,无需修改核心逻辑。这就是去中心化架构的好处。


七、总结:BettaFish架构设计的核心思想

7.1 三大设计原则

  1. 简单有效:用文件系统作为通信媒介,而非复杂中间件

  2. 去中心化:Agent通过论坛协作,而非中央调度

  3. 可观测性:所有对话过程都记录在日志中,实时展示

7.2 适用场景与局限性

适用场景

  • ✅ 需要多Agent协作的复杂任务

  • ✅ 需要追溯对话历史的场景

  • ✅ 开发者友好的调试需求

局限性

  • ❌ 不适合实时性要求极高(<100ms)的场景

  • ❌ 不适合跨服务器部署(同一台机器更合适)

  • ❌ 日志文件会随时间增长,需要定期清理


下一章预告

在整体架构设计的基础上,下一章我们将深入解析Agent论坛机制的核心实现:

  • 论坛日志解析器的正则表达式设计

  • 如何识别Agent发言的不同类型(搜索、总结、反思)

  • LLM主持人如何判断何时发言、说什么

  • Agent如何通过forum_reader工具读取论坛并响应

关注公众号,不迷路 👉 BettaFish源码解析系列持续更新中…

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » BettaFish源码解析(一):整体架构设计

评论 抢沙发

9 + 2 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮