BettaFish源码解析(五):分布式爬虫系统
如何构建可扩展的平台爬虫?
前言

在前几篇文章中,我们分别讲解了BettaFish的整体架构、Agent论坛机制、GraphRAG知识图谱和报告生成引擎。这些组件构成了完整的”数据获取-处理-输出”闭环。
今天,我们深入解析系统的”数据获取”端——MindSpider分布式爬虫系统。这是BettaFish能够进行舆情分析的基础设施。
本文将深入解析MindSpider的设计,回答一个核心问题:
如何构建一个可扩展、可维护、智能化的分布式爬虫系统,支持7大主流社交平台?
一、整体架构:为什么需要两阶段爬虫?
1.1 传统单阶段爬虫的问题
在深入设计之前,先理解传统方案的痛点:
| 问题 | 传统单阶段方案 | BettaFish两阶段方案 ||——|—————-|————|—————–|| 目标模糊 | 爬虫不知道要抓什么 | 话题提取提供明确方向 || 重复爬取 | 每天都全量抓取 | 只抓新增内容,高效 || 平台限制 | 无法深入多平台 | 前后分离,各平台独立扩展 || 负载不均 | 高峰期可能封IP | 分布式调度,平滑负载 |
1.2 BettaFish的两阶段设计
# MindSpider/main.pyclassMindSpider:def__init__(self):self.broad_topic_path=self.project_root/"BroadTopicExtraction"# 阶段一self.deep_sentiment_path=self.project_root/"DeepSentimentCrawling"# 阶段二self.schema_path=self.project_root/"schema"
设计亮点:
-
目标分离:阶段一提供热点话题 → 阶段二基于话题精准爬取
-
可扩展性:每个阶段独立,易于添加新平台
-
数据复用:两个阶段共享数据库Schema和ORM模型
-
高效调度:话题提取每日一次,深度爬取可按需执行
二、阶段一:热点话题提取(BroadTopicExtraction)
2.1 模块职责
# MindSpider/BroadTopicExtraction/main.pyclassBroadTopicExtraction:"""BroadTopicExtraction主要工作流程"""def__init__(self):self.news_collector=NewsCollector() # 新闻采集器self.topic_extractor=TopicExtractor() # AI话题提取器self.db_manager=DatabaseManager() # 数据库管理器asyncdefrun_daily_extraction(self, max_keywords: int=100) ->Dict:"""执行每日话题提取流程"""
工作流程:
1. 新闻采集(NewsCollector) ├─→ 微博热搜 ├─→ 知乎热榜 ├─→ B站热门 └─→ 今日头条 ↓2. AI分析(TopicExtractor) ├─→ DeepSeek API智能分析 ├─→ 提取话题关键词 └─→ 生成新闻摘要 ↓3. 数据持久化(DatabaseManager) └─→ 保存到daily_topics表
2.2 新闻采集器实现
# MindSpider/BroadTopicExtraction/get_today_news.pyclassNewsCollector:"""多平台新闻采集器"""SUPPORTED_SOURCES= {'weibo': '微博热搜','zhihu': '知乎热榜','bilibili': 'B站热门','toutiao': '今日头条' }defcollect_weibo_hot(self) ->List[Dict]:"""采集微博热搜"""# API调用获取热搜列表# 解析数据(标题、链接、排名)# 返回结构化数据defcollect_zhihu_hot(self) ->List[Dict]:"""采集知乎热榜"""# 爬取知乎热榜页面# 解析问题标题、热度# 返回结构化数据
关键设计点:
-
统一接口:所有采集器实现相同接口,返回List[Dict]
-
可扩展性:新平台只需添加到SUPPORTED_SOURCES
-
数据结构化:统一返回格式,便于后续AI分析
2.3 AI话题提取器
# MindSpider/BroadTopicExtraction/topic_extractor.pyclassTopicExtractor:"""基于DeepSeek AI的话题提取器"""def__init__(self, api_key: str):self.client=DeepSeekClient(api_key)asyncdefextract_topics(self, news_list: List[Dict]) ->List[str]:"""使用LLM分析新闻,提取话题关键词"""# 1. 构建Promptsystem_prompt="""你是一个专业的舆情分析师,擅长从多源新闻中提取核心话题。 分析以下新闻,提取3-5个最重要的关键词作为话题标签。 要求:关键词应该简明、准确、易于搜索。"""user_prompt=f""" 请分析以下新闻:{format_news(news_list)} 请提取3-5个最重要的关键词,每行一个关键词。 """# 2. 调用DeepSeek APIresponse=awaitself.client.chat.completions.create(model="deepseek-chat",messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] )# 3. 解析结果keywords= []forlineinresponse.choices[0].message.content.split('\n'):keywords.append(line.strip())returnkeywords
设计亮点:
-
智能化提取:使用LLM而非规则匹配,适应性强
-
多源融合:综合多平台信息,提高准确性
-
Prompt工程:精心设计的System Prompt引导高质量输出
三、阶段二:深度舆情爬取(DeepSentimentCrawling)
3.1 模块职责
# MindSpider/DeepSentimentCrawling/main.pyclassDeepSentimentCrawling:"""深度舆情爬取主工作流程"""def__init__(self):self.keyword_manager=KeywordManager() # 关键词管理器self.platform_crawler=PlatformCrawler() # 平台爬虫管理器asyncdefrun_daily_crawling(self, target_date: date=None,platforms: List[str] =None,max_keywords_per_platform: int=50,max_notes_per_platform: int=50,login_type: str="qrcode") ->Dict:"""执行每日深度舆情爬取任务"""
工作流程:
1. 关键词获取(KeywordManager) ├─→ 从数据库读取当日话题关键词 ├─→ 按热度排序 └─→ 返回Top N个关键词 ↓2. 平台选择(PlatformCrawler) ├─→ 根据关键词选择适合的平台 ├─→ 考虑平台特性(视频优先、图文优先) └─→ 返回目标平台列表 ↓3. 登录爬取(各平台爬虫) ├─→ 小红书登录(二维码) ├─→ 抖音登录(二维码) ├─→ B站、微博、知乎登录 └─→ 并行执行爬取任务 ↓4. 数据存储(DatabaseManager) └─→ 保存到各平台内容表
3.2 关键词管理器
# MindSpider/DeepSentimentCrawling/keyword_manager.pyclassKeywordManager:"""关键词管理器"""defget_latest_keywords(self, target_date: date, max_keywords: int=100) ->List[str]:"""获取最新的关键词列表"""# 1. 从数据库查询daily_topics表# 2. 按热度降序# 3. 返回前N个关键词defsave_keywords(self, keywords: List[str], source: str):"""保存关键词到数据库"""# 检查重复,插入或更新热度
3.3 平台爬虫管理器
# MindSpider/DeepSentimentCrawling/platform_crawler.pyclassPlatformCrawler:"""平台爬虫管理器"""def__init__(self):# 动态导入各平台爬虫模块from .media_platformimportWeiboCrawlerfrom .media_platformimportDouyinCrawlerfrom .media_platformimportXiaohongshuCrawler# ... 更多平台self.platforms= {'weibo': WeiboCrawler(),'douyin': DouyinCrawler(),'xiaohongshu': XiaohongshuCrawler()# ... }asyncdefrun_multi_platform_crawl_by_keywords(self, keywords: List[str],platforms: List[str],max_notes: int=50,login_type: str="qrcode") ->Dict:"""多平台关键词爬取"""
关键设计点:
-
策略选择:根据关键词特性选择平台(视频关键词→抖音,话题关键词→微博)
-
并行执行:多个平台同时爬取,提高效率
-
错误隔离:单个平台失败不影响其他平台
-
登录管理:二维码登录状态持久化,避免重复登录
3.4 Playwright平台爬虫架构
这是MindSpider的核心技术创新——使用浏览器自动化框架,而非传统的HTTP请求。
# MindSpider/DeepSentimentCrawling/MediaCrawler/base.pyclassBaseCrawler:"""所有平台爬虫的基类"""def__init__(self):self.browser=None# Playwright浏览器实例self.context=None# 浏览器上下文self.logged_in=False# 登录状态asyncdeflogin(self) ->bool:"""登录平台"""# 1. 启动浏览器self.browser=awaitplaywright.async_api().chromium.launch(headless=False)# 2. 导航到登录页awaitself.browser.goto('https://www.xiaohongshu.com')# 3. 执行登录操作awaitself._perform_login()self.logged_in=TruereturnTrueasyncdefcrawl_content(self, keywords: List[str], max_notes: int=50) ->List[Dict]:"""根据关键词爬取内容"""# 1. 搜索关键词# 2. 遍历搜索结果# 3. 解析帖子/笔记/视频# 4. 提取互动数据(点赞、评论、转发)# 5. 保存到数据库
为什么选择Playwright?
| 框架 | 优点 | 缺点 ||——|——|——|———|| HTTP请求 | 简单、轻量 | ❌ 无法处理JS渲染 || Selenium | 成熟、稳定 | ❌ 性能较慢 || Playwright | 现代、高性能 | ✅ 支持多浏览器、异步 |
3.5 Playwright的核心优势
-
异步支持:
asyncio+await语法,避免阻塞 -
多浏览器:Chromium/Firefox/WebKit多浏览器支持
-
无头模式:
headless=False适合服务器部署 -
智能等待:
wait_for_selector自动等待元素加载 -
反检测:执行JavaScript操作,隐藏自动化特征
四、配置化设计:如何降低维护成本?
4.1 Pydantic Settings统一配置
# MindSpider/config.pyfrompydantic_settingsimportBaseSettingsfromtypingimportOptionalclassSettings(BaseSettings):"""全局配置管理"""# 数据库配置DB_DIALECT: str="mysql"# 支持mysql和postgresqlDB_HOST: str="your_database_host"DB_PORT: int=3306DB_USER: str="your_username"DB_PASSWORD: str=" "your_password"DB_NAME: str="mindspider"DB_CHARSET: str="utf8mb4"# DeepSeek API配置MINDSPIDER_API_KEY: str="your_deepseek_api_key"MINDSPIDER_BASE_URL: str="https://api.deepseek.com"MINDSPIDER_MODEL_NAME: str="deepseek-chat"
设计亮点:
-
类型安全:Pydantic自动验证配置类型
-
默认值:提供合理的默认配置
-
环境变量支持:通过
.env文件管理敏感信息 -
跨平台兼容:同一套配置支持Windows/Linux/macOS
五、数据库架构与ORM映射
5.1 数据库Schema设计
-- MindSpider/schema/mindspider_tables.sqlCREATETABLEIFNOTEXISTS daily_topics ( id INTPRIMARYKEYAUTO_INCREMENT, topic_date DATENOTNULL, topic_name VARCHAR(255)NOTNULL, keywords TEXT, heat_score INTDEFAULT0, created_at TIMESTAMPDEFAULTCURRENT_TIMESTAMP,INDEX idx_date (topic_date),INDEX idx_topic (topic_name, topic_date));CREATETABLEIFNOTEXISTS daily_news ( id INTPRIMARYKEYAUTO_INCREMENT, topic_id INT,sourceVARCHAR(50)NOTNULL,-- weibo/zhihu/bilibili/toutiao/other title TEXT, url VARCHAR(500), ranking INT, publish_time DATETIME,INDEX idx_source_topic (source, topic_id),INDEX idx_publish_time (publish_time));
设计要点:
-
每日话题表:存储AI提取的话题,支持热度排序
-
每日新闻表:存储各平台采集的新闻,支持多源对比
-
索引优化:为常用查询字段建立索引,提高查询效率
-
外键约束:确保数据完整性
5.2 SQLAlchemy ORM映射
# MindSpider/schema/models_sa.pyfromsqlalchemyimportColumn, Integer, String, DateTime, Text, ForeignKeyfromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemy.ormimportrelationshipBase=declarative_base()classDailyTopic(Base):"""每日话题表ORM映射"""__tablename__='daily_topics'id=Column(Integer, primary_key=True)topic_date=Column(Date)topic_name=Column(String(255))keywords=Column(Text)heat_score=Column(Integer, default=0)created_at=Column(DateTime, default=datetime.utcnow)classDailyNews(Base):"""每日新闻表ORM映射"""__tablename__='daily_news'id=Column(Integer, primary_key=True)topic_id=Column(Integer, ForeignKey('daily_topics.id'))source=Column(String(50)) # weibo/zhihu/bilibili/toutiao/othertitle=Column(Text)url=Column(String(500))ranking=Column(Integer)publish_time=Column(DateTime)# 关系定义topic=relationship("DailyTopic", back_populates="daily_news")
设计亮点:
-
类型安全:SQLAlchemy ORM提供编译时类型检查
-
关系映射:
relationship自动处理外键关系 -
查询优化:ORM转换为高效SQL,支持索引
-
异步支持:
asyncio集成异步数据库操作
5.3 数据库连接管理
# MindSpider/schema/db_manager.pyclassDatabaseManager:"""数据库连接管理器"""def__init__(self):# 创建异步引擎self.engine=create_async_engine(build_async_url())asyncdefsave_topic(self, topic: DailyTopic):"""保存话题到数据库"""asyncwithself.engine.connect() asconn:awaitconn.run(DailyTopic.__table__.insert(), values={'topic_name': topic.name,'keywords': topic.keywords })asyncdefsave_news(self, news: DailyNews):"""保存新闻到数据库"""asyncwithself.engine.connect() asconn:awaitconn.run(DailyNews.__table__.insert(), values={'topic_id': news.topic_id,'source': news.source,'title': news.title,'url': news.url })
设计亮点:
-
连接池:
pool_pre_ping=True自动管理连接池 -
异步操作:所有数据库操作都是异步的,避免阻塞
-
自动清理:使用
finally块确保资源释放 -
错误处理:完善的异常捕获和日志记录
六、命令行工具设计:如何让用户易用?
6.1 argparse参数设计
# MindSpider/main.py 的参数解析importargparseparser=argparse.ArgumentParser(description='MindSpider AI爬虫系统')# 主要命令parser.add_argument('--status', action='store_true', help='检查系统状态')parser.add_argument('--setup', action='store_true', help='初始化项目')parser.add_argument('--road-topic', action='store_true', help='执行话题提取')parser.add_argument('--deep-sentiment', action='store_true', help='执行深度舆情爬取')parser.add_argument('--complete', action='store_true', help='执行完整流程')parser.add_argument('--test', action='store_true', help='测试模式')# 爬虫参数parser.add_argument('--platforms', nargs='+', help='指定平台 (xhs dy ks bili wb tieba)')parser.add_argument('--date', type=str, help='指定日期 (YYYY-MM-DD)')parser.add_argument('--max-keywords', type=int, default=100, help='关键词数量')parser.add_argument('--max-notes', type=int, default=50, help='内容数量')
设计亮点:
-
子命令分离:每个子命令有独立的参数和逻辑
-
参数校验:类型检查、范围验证
-
帮助系统:
--help自动生成使用文档 -
灵活组合:支持组合多个参数
6.2 完整的命令行流程图
用户执行命令 ↓argparse解析参数 ↓检查配置有效性(check_config) ↓初始化数据库(init_database) ↓执行对应子命令 ↓收集结果 ↓生成统计报告 ↓输出到控制台
七、爬虫配置(重要)
7.1 平台登录配置
首次使用必须登录各平台:
# 1. 测试小红书登录python main.py --deep-sentiment--platforms xhs --test# 2. 测试抖音登录python main.py --deep-sentiment--platforms dy --test# 3. 使用手机APP扫码登录# - 终端会显示二维码# - 使用APP扫码后,登录状态会保存# - 后续爬取自动使用保存的登录态
7.2 代理配置(如需要)
# 在DeepSentimentCrawling/platform_crawler/base.py中配置PROXY_SETTINGS= {'http': 'http://proxy-server:port','socks5': 'socks5://proxy-server:port','verify_ssl': False# 避免SSL验证问题}
设计要点:
-
轮换代理:避免单个IP被封
-
类型选择:根据平台特性选择HTTP/SOCKS5
-
验证配置:可配置是否验证SSL证书
-
超时设置:代理响应超时自动切换
八、错误处理与日志记录
8.1 统一的错误处理机制
# MindSpider/core/error_handler.pyclassErrorHandler:"""统一错误处理器"""@staticmethoddefhandle_login_failure(platform: str, error: Exception):"""处理登录失败"""logger.error(f"{platform} 登录失败: {str(error)}")# 保存失败状态,避免重复尝试# 提供重试建议@staticmethoddefhandle_crawl_failure(platform: str, error: Exception):"""处理爬取失败"""logger.error(f"{platform} 爬取失败: {str(error)}")# 记录失败原因,便于分析# 继续其他平台爬取
8.2 结构化日志记录
# 使用loguru进行日志记录fromloguruimportlogger# 配置日志格式logger.add("logs/mindspider_{time}.log",rotation="500 MB",level="INFO",format="<green>{time}</green> | <level>{message}</level> | {extra}")# 使用示例logger.info("开始执行话题提取")logger.warning("检测到网络延迟,重试中...")logger.error("数据库连接失败,请检查配置")
设计要点:
-
日志轮转:自动按大小分割,避免单个文件过大
-
级别区分:INFO/WARNING/ERROR,便于过滤分析
-
结构化输出:使用JSON格式,便于后续处理
-
性能监控:记录关键操作的耗时,识别性能瓶颈
九、扩展性设计:如何添加新平台?
9.1 添加新平台的标准流程
# 1. 创建平台配置类# MindSpider/DeepSentimentCrawling/media_platform/xiaohongshu/config.pyclassXiaohongshuCrawler(BaseCrawler):"""小红书爬虫"""def__init__(self):# 继承BaseCrawler的通用能力super().__init__()# 2. 实现小红书特有逻辑asyncdeflogin(self) ->bool:"""小红书特有登录"""# 实现二维码登录逻辑asyncdefsearch(self, keywords: List[str]) ->List[Dict]:"""小红书搜索"""# 实现小红书搜索逻辑# 3. 更新PlatformCrawler# MindSpider/DeepSentimentCrawling/platform_crawler.pyclassPlatformCrawler:self.platforms= {'xiaohongshu': XiaohongshuCrawler(), # 新增'weibo': WeiboCrawler(),'douyin': DouyinCrawler(),# ... }
关键设计点:
-
基类继承:
BaseCrawler提供通用能力(登录、搜索、存储) -
特有扩展:子类实现平台特有逻辑(如二维码登录)
-
配置统一:所有平台配置在
config/目录下 -
注册机制:在
PlatformCrawler中注册新平台
十、总结:BettaFish分布式爬虫的设计思想
10.1 核心设计原则
-
两阶段分离:话题提取与深度爬取职责清晰,易于维护
-
平台抽象:BaseCrawler提供统一接口,新平台只需实现特有逻辑
-
配置驱动:Pydantic Settings统一配置,环境变量管理
-
ORM映射:SQLAlchemy提供类型安全的数据库操作
-
异步优先:所有I/O操作都是异步的,避免阻塞
-
Playwright选型:浏览器自动化框架,支持现代JS渲染
10.2 适用场景
✅ 适合:
-
需要多平台舆情采集的场景
-
需要智能话题发现的场景
-
需要深度内容分析的场景
❌ 不适合:
-
实时性要求极高(<1秒)的场景
-
需要处理海量数据的场景(建议用专门的爬虫框架)
10.3 三大技术创新
-
两阶段设计:话题提取提供方向,深度爬取聚焦内容
-
Playwright架构:浏览器自动化,支持多平台
-
配置化设计:Pydantic Settings + 环境变量,降低维护成本
下一章预告
在理解了分布式爬虫系统的基础上,下一章我们将深入解析情感分析模型层:
-
多模型融合策略:BERT/GPT-2/Qwen3/传统ML
-
模型训练与推理流程
-
模型集成到InsightEngine的方式
-
情感分析API的设计
关注公众号,不迷路 👉 BettaFish源码解析系列持续更新中…
夜雨聆风
