乐于分享
好东西不私藏

BettaFish源码解析(五):分布式爬虫系统

BettaFish源码解析(五):分布式爬虫系统

如何构建可扩展的平台爬虫?


前言

在前几篇文章中,我们分别讲解了BettaFish的整体架构Agent论坛机制GraphRAG知识图谱报告生成引擎。这些组件构成了完整的”数据获取-处理-输出”闭环。

今天,我们深入解析系统的”数据获取”端——MindSpider分布式爬虫系统。这是BettaFish能够进行舆情分析的基础设施。

本文将深入解析MindSpider的设计,回答一个核心问题:

如何构建一个可扩展、可维护、智能化的分布式爬虫系统,支持7大主流社交平台?


一、整体架构:为什么需要两阶段爬虫?

1.1 传统单阶段爬虫的问题

在深入设计之前,先理解传统方案的痛点:

| 问题 | 传统单阶段方案 | BettaFish两阶段方案 ||——|—————-|————|—————–|目标模糊 | 爬虫不知道要抓什么 | 话题提取提供明确方向 |重复爬取 | 每天都全量抓取 | 只抓新增内容,高效 |平台限制 | 无法深入多平台 | 前后分离,各平台独立扩展 |负载不均 | 高峰期可能封IP | 分布式调度,平滑负载 |

1.2 BettaFish的两阶段设计

# MindSpider/main.pyclassMindSpider:def__init__(self):self.broad_topic_path=self.project_root/"BroadTopicExtraction"# 阶段一self.deep_sentiment_path=self.project_root/"DeepSentimentCrawling"# 阶段二self.schema_path=self.project_root/"schema"

设计亮点

  • 目标分离:阶段一提供热点话题 → 阶段二基于话题精准爬取

  • 可扩展性:每个阶段独立,易于添加新平台

  • 数据复用:两个阶段共享数据库Schema和ORM模型

  • 高效调度:话题提取每日一次,深度爬取可按需执行


二、阶段一:热点话题提取(BroadTopicExtraction)

2.1 模块职责

# MindSpider/BroadTopicExtraction/main.pyclassBroadTopicExtraction:"""BroadTopicExtraction主要工作流程"""def__init__(self):self.news_collector=NewsCollector()  # 新闻采集器self.topic_extractor=TopicExtractor()  # AI话题提取器self.db_manager=DatabaseManager()  # 数据库管理器asyncdefrun_daily_extraction(selfmax_keywordsint=100->Dict:"""执行每日话题提取流程"""

工作流程

1. 新闻采集(NewsCollector)   ├─→ 微博热搜   ├─→ 知乎热榜   ├─→ B站热门   └─→ 今日头条   ↓2. AI分析(TopicExtractor)   ├─→ DeepSeek API智能分析   ├─→ 提取话题关键词   └─→ 生成新闻摘要   ↓3. 数据持久化(DatabaseManager)   └─→ 保存到daily_topics表

2.2 新闻采集器实现

# MindSpider/BroadTopicExtraction/get_today_news.pyclassNewsCollector:"""多平台新闻采集器"""SUPPORTED_SOURCES= {'weibo''微博热搜','zhihu''知乎热榜','bilibili''B站热门','toutiao''今日头条'    }defcollect_weibo_hot(self->List[Dict]:"""采集微博热搜"""# API调用获取热搜列表# 解析数据(标题、链接、排名)# 返回结构化数据defcollect_zhihu_hot(self->List[Dict]:"""采集知乎热榜"""# 爬取知乎热榜页面# 解析问题标题、热度# 返回结构化数据

关键设计点

  • 统一接口:所有采集器实现相同接口,返回List[Dict]

  • 可扩展性:新平台只需添加到SUPPORTED_SOURCES

  • 数据结构化:统一返回格式,便于后续AI分析

2.3 AI话题提取器

# MindSpider/BroadTopicExtraction/topic_extractor.pyclassTopicExtractor:"""基于DeepSeek AI的话题提取器"""def__init__(selfapi_keystr):self.client=DeepSeekClient(api_key)asyncdefextract_topics(selfnews_listList[Dict]) ->List[str]:"""使用LLM分析新闻,提取话题关键词"""# 1. 构建Promptsystem_prompt="""你是一个专业的舆情分析师,擅长从多源新闻中提取核心话题。        分析以下新闻,提取3-5个最重要的关键词作为话题标签。        要求:关键词应该简明、准确、易于搜索。"""user_prompt=f"""        请分析以下新闻:{format_news(news_list)}        请提取3-5个最重要的关键词,每行一个关键词。        """# 2. 调用DeepSeek APIresponse=awaitself.client.chat.completions.create(model="deepseek-chat",messages=[{"role""system""content"system_prompt},                   {"role""user""content"user_prompt}]        )# 3. 解析结果keywords= []forlineinresponse.choices[0].message.content.split('\n'):keywords.append(line.strip())returnkeywords

设计亮点

  • 智能化提取:使用LLM而非规则匹配,适应性强

  • 多源融合:综合多平台信息,提高准确性

  • Prompt工程:精心设计的System Prompt引导高质量输出


三、阶段二:深度舆情爬取(DeepSentimentCrawling)

3.1 模块职责

# MindSpider/DeepSentimentCrawling/main.pyclassDeepSentimentCrawling:"""深度舆情爬取主工作流程"""def__init__(self):self.keyword_manager=KeywordManager()  # 关键词管理器self.platform_crawler=PlatformCrawler()  # 平台爬虫管理器asyncdefrun_daily_crawling(selftarget_datedate=None,platformsList[str=None,max_keywords_per_platformint=50,max_notes_per_platformint=50,login_typestr="qrcode"->Dict:"""执行每日深度舆情爬取任务"""

工作流程

1. 关键词获取(KeywordManager)   ├─→ 从数据库读取当日话题关键词   ├─→ 按热度排序   └─→ 返回Top N个关键词   ↓2. 平台选择(PlatformCrawler)   ├─→ 根据关键词选择适合的平台   ├─→ 考虑平台特性(视频优先、图文优先)   └─→ 返回目标平台列表   ↓3. 登录爬取(各平台爬虫)   ├─→ 小红书登录(二维码)   ├─→ 抖音登录(二维码)   ├─→ B站、微博、知乎登录   └─→ 并行执行爬取任务   ↓4. 数据存储(DatabaseManager)   └─→ 保存到各平台内容表

3.2 关键词管理器

# MindSpider/DeepSentimentCrawling/keyword_manager.pyclassKeywordManager:"""关键词管理器"""defget_latest_keywords(selftarget_datedatemax_keywordsint=100->List[str]:"""获取最新的关键词列表"""# 1. 从数据库查询daily_topics表# 2. 按热度降序# 3. 返回前N个关键词defsave_keywords(selfkeywordsList[str], sourcestr):"""保存关键词到数据库"""# 检查重复,插入或更新热度

3.3 平台爬虫管理器

# MindSpider/DeepSentimentCrawling/platform_crawler.pyclassPlatformCrawler:"""平台爬虫管理器"""def__init__(self):# 动态导入各平台爬虫模块from .media_platformimportWeiboCrawlerfrom .media_platformimportDouyinCrawlerfrom .media_platformimportXiaohongshuCrawler# ... 更多平台self.platforms= {'weibo'WeiboCrawler(),'douyin'DouyinCrawler(),'xiaohongshu'XiaohongshuCrawler()# ...        }asyncdefrun_multi_platform_crawl_by_keywords(selfkeywordsList[str],platformsList[str],max_notesint=50,login_typestr="qrcode"->Dict:"""多平台关键词爬取"""

关键设计点

  • 策略选择:根据关键词特性选择平台(视频关键词→抖音,话题关键词→微博)

  • 并行执行:多个平台同时爬取,提高效率

  • 错误隔离:单个平台失败不影响其他平台

  • 登录管理:二维码登录状态持久化,避免重复登录

3.4 Playwright平台爬虫架构

这是MindSpider的核心技术创新——使用浏览器自动化框架,而非传统的HTTP请求。

# MindSpider/DeepSentimentCrawling/MediaCrawler/base.pyclassBaseCrawler:"""所有平台爬虫的基类"""def__init__(self):self.browser=None# Playwright浏览器实例self.context=None# 浏览器上下文self.logged_in=False# 登录状态asyncdeflogin(self->bool:"""登录平台"""# 1. 启动浏览器self.browser=awaitplaywright.async_api().chromium.launch(headless=False)# 2. 导航到登录页awaitself.browser.goto('https://www.xiaohongshu.com')# 3. 执行登录操作awaitself._perform_login()self.logged_in=TruereturnTrueasyncdefcrawl_content(selfkeywordsList[str], max_notesint=50->List[Dict]:"""根据关键词爬取内容"""# 1. 搜索关键词# 2. 遍历搜索结果# 3. 解析帖子/笔记/视频# 4. 提取互动数据(点赞、评论、转发)# 5. 保存到数据库

为什么选择Playwright?

| 框架 | 优点 | 缺点 ||——|——|——|———|HTTP请求 | 简单、轻量 | ❌ 无法处理JS渲染 |Selenium | 成熟、稳定 | ❌ 性能较慢 |Playwright | 现代、高性能 | ✅ 支持多浏览器、异步 |

3.5 Playwright的核心优势

  1. 异步支持asyncio + await语法,避免阻塞

  2. 多浏览器:Chromium/Firefox/WebKit多浏览器支持

  3. 无头模式headless=False适合服务器部署

  4. 智能等待wait_for_selector自动等待元素加载

  5. 反检测:执行JavaScript操作,隐藏自动化特征


四、配置化设计:如何降低维护成本?

4.1 Pydantic Settings统一配置

# MindSpider/config.pyfrompydantic_settingsimportBaseSettingsfromtypingimportOptionalclassSettings(BaseSettings):"""全局配置管理"""# 数据库配置DB_DIALECTstr="mysql"# 支持mysql和postgresqlDB_HOSTstr="your_database_host"DB_PORTint=3306DB_USERstr="your_username"DB_PASSWORDstr=" "your_password"DB_NAMEstr="mindspider"DB_CHARSETstr="utf8mb4"# DeepSeek API配置MINDSPIDER_API_KEYstr="your_deepseek_api_key"MINDSPIDER_BASE_URLstr="https://api.deepseek.com"MINDSPIDER_MODEL_NAMEstr="deepseek-chat"

设计亮点

  • 类型安全:Pydantic自动验证配置类型

  • 默认值:提供合理的默认配置

  • 环境变量支持:通过.env文件管理敏感信息

  • 跨平台兼容:同一套配置支持Windows/Linux/macOS


五、数据库架构与ORM映射

5.1 数据库Schema设计

-- MindSpider/schema/mindspider_tables.sqlCREATETABLEIFNOTEXISTS daily_topics (    id INTPRIMARYKEYAUTO_INCREMENT,    topic_date DATENOTNULL,    topic_name VARCHAR(255)NOTNULL,    keywords TEXT,    heat_score INTDEFAULT0,    created_at TIMESTAMPDEFAULTCURRENT_TIMESTAMP,INDEX idx_date (topic_date),INDEX idx_topic (topic_name, topic_date));CREATETABLEIFNOTEXISTS daily_news (    id INTPRIMARYKEYAUTO_INCREMENT,    topic_id INT,sourceVARCHAR(50)NOTNULL,-- weibo/zhihu/bilibili/toutiao/other    title TEXT,    url VARCHAR(500),    ranking INT,    publish_time DATETIME,INDEX idx_source_topic (source, topic_id),INDEX idx_publish_time (publish_time));

设计要点

  • 每日话题表:存储AI提取的话题,支持热度排序

  • 每日新闻表:存储各平台采集的新闻,支持多源对比

  • 索引优化:为常用查询字段建立索引,提高查询效率

  • 外键约束:确保数据完整性

5.2 SQLAlchemy ORM映射

# MindSpider/schema/models_sa.pyfromsqlalchemyimportColumnIntegerStringDateTimeTextForeignKeyfromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemy.ormimportrelationshipBase=declarative_base()classDailyTopic(Base):"""每日话题表ORM映射"""__tablename__='daily_topics'id=Column(Integerprimary_key=True)topic_date=Column(Date)topic_name=Column(String(255))keywords=Column(Text)heat_score=Column(Integerdefault=0)created_at=Column(DateTimedefault=datetime.utcnow)classDailyNews(Base):"""每日新闻表ORM映射"""__tablename__='daily_news'id=Column(Integerprimary_key=True)topic_id=Column(IntegerForeignKey('daily_topics.id'))source=Column(String(50))  # weibo/zhihu/bilibili/toutiao/othertitle=Column(Text)url=Column(String(500))ranking=Column(Integer)publish_time=Column(DateTime)# 关系定义topic=relationship("DailyTopic"back_populates="daily_news")

设计亮点

  • 类型安全:SQLAlchemy ORM提供编译时类型检查

  • 关系映射relationship自动处理外键关系

  • 查询优化:ORM转换为高效SQL,支持索引

  • 异步支持asyncio集成异步数据库操作

5.3 数据库连接管理

# MindSpider/schema/db_manager.pyclassDatabaseManager:"""数据库连接管理器"""def__init__(self):# 创建异步引擎self.engine=create_async_engine(build_async_url())asyncdefsave_topic(selftopicDailyTopic):"""保存话题到数据库"""asyncwithself.engine.connect() asconn:awaitconn.run(DailyTopic.__table__.insert(), values={'topic_name'topic.name,'keywords'topic.keywords            })asyncdefsave_news(selfnewsDailyNews):"""保存新闻到数据库"""asyncwithself.engine.connect() asconn:awaitconn.run(DailyNews.__table__.insert(), values={'topic_id'news.topic_id,'source'news.source,'title'news.title,'url'news.url            })

设计亮点

  • 连接池pool_pre_ping=True自动管理连接池

  • 异步操作:所有数据库操作都是异步的,避免阻塞

  • 自动清理:使用finally块确保资源释放

  • 错误处理:完善的异常捕获和日志记录


六、命令行工具设计:如何让用户易用?

6.1 argparse参数设计

# MindSpider/main.py 的参数解析importargparseparser=argparse.ArgumentParser(description='MindSpider AI爬虫系统')# 主要命令parser.add_argument('--status'action='store_true'help='检查系统状态')parser.add_argument('--setup'action='store_true'help='初始化项目')parser.add_argument('--road-topic'action='store_true'help='执行话题提取')parser.add_argument('--deep-sentiment'action='store_true'help='执行深度舆情爬取')parser.add_argument('--complete'action='store_true'help='执行完整流程')parser.add_argument('--test'action='store_true'help='测试模式')# 爬虫参数parser.add_argument('--platforms'nargs='+'help='指定平台 (xhs dy ks bili wb tieba)')parser.add_argument('--date'type=strhelp='指定日期 (YYYY-MM-DD)')parser.add_argument('--max-keywords'type=intdefault=100help='关键词数量')parser.add_argument('--max-notes'type=intdefault=50help='内容数量')

设计亮点

  • 子命令分离:每个子命令有独立的参数和逻辑

  • 参数校验:类型检查、范围验证

  • 帮助系统--help自动生成使用文档

  • 灵活组合:支持组合多个参数

6.2 完整的命令行流程图

用户执行命令  ↓argparse解析参数  ↓检查配置有效性(check_config)  ↓初始化数据库(init_database)  ↓执行对应子命令  ↓收集结果  ↓生成统计报告  ↓输出到控制台

七、爬虫配置(重要)

7.1 平台登录配置

首次使用必须登录各平台:

# 1. 测试小红书登录python main.py --deep-sentiment--platforms xhs --test# 2. 测试抖音登录python main.py --deep-sentiment--platforms dy --test# 3. 使用手机APP扫码登录# - 终端会显示二维码# - 使用APP扫码后,登录状态会保存# - 后续爬取自动使用保存的登录态

7.2 代理配置(如需要)

# 在DeepSentimentCrawling/platform_crawler/base.py中配置PROXY_SETTINGS= {'http''http://proxy-server:port','socks5''socks5://proxy-server:port','verify_ssl'False# 避免SSL验证问题}

设计要点

  • 轮换代理:避免单个IP被封

  • 类型选择:根据平台特性选择HTTP/SOCKS5

  • 验证配置:可配置是否验证SSL证书

  • 超时设置:代理响应超时自动切换


八、错误处理与日志记录

8.1 统一的错误处理机制

# MindSpider/core/error_handler.pyclassErrorHandler:"""统一错误处理器"""@staticmethoddefhandle_login_failure(platformstrerrorException):"""处理登录失败"""logger.error(f"{platform} 登录失败: {str(error)}")# 保存失败状态,避免重复尝试# 提供重试建议@staticmethoddefhandle_crawl_failure(platformstrerrorException):"""处理爬取失败"""logger.error(f"{platform} 爬取失败: {str(error)}")# 记录失败原因,便于分析# 继续其他平台爬取

8.2 结构化日志记录

# 使用loguru进行日志记录fromloguruimportlogger# 配置日志格式logger.add("logs/mindspider_{time}.log",rotation="500 MB",level="INFO",format="<green>{time}</green> | <level>{message}</level> | {extra}")# 使用示例logger.info("开始执行话题提取")logger.warning("检测到网络延迟,重试中...")logger.error("数据库连接失败,请检查配置")

设计要点

  • 日志轮转:自动按大小分割,避免单个文件过大

  • 级别区分:INFO/WARNING/ERROR,便于过滤分析

  • 结构化输出:使用JSON格式,便于后续处理

  • 性能监控:记录关键操作的耗时,识别性能瓶颈


九、扩展性设计:如何添加新平台?

9.1 添加新平台的标准流程

# 1. 创建平台配置类# MindSpider/DeepSentimentCrawling/media_platform/xiaohongshu/config.pyclassXiaohongshuCrawler(BaseCrawler):"""小红书爬虫"""def__init__(self):# 继承BaseCrawler的通用能力super().__init__()# 2. 实现小红书特有逻辑asyncdeflogin(self->bool:"""小红书特有登录"""# 实现二维码登录逻辑asyncdefsearch(selfkeywordsList[str]) ->List[Dict]:"""小红书搜索"""# 实现小红书搜索逻辑# 3. 更新PlatformCrawler# MindSpider/DeepSentimentCrawling/platform_crawler.pyclassPlatformCrawler:self.platforms= {'xiaohongshu'XiaohongshuCrawler(),  # 新增'weibo'WeiboCrawler(),'douyin'DouyinCrawler(),# ...    }

关键设计点

  • 基类继承BaseCrawler提供通用能力(登录、搜索、存储)

  • 特有扩展:子类实现平台特有逻辑(如二维码登录)

  • 配置统一:所有平台配置在config/目录下

  • 注册机制:在PlatformCrawler中注册新平台


十、总结:BettaFish分布式爬虫的设计思想

10.1 核心设计原则

  1. 两阶段分离:话题提取与深度爬取职责清晰,易于维护

  2. 平台抽象:BaseCrawler提供统一接口,新平台只需实现特有逻辑

  3. 配置驱动:Pydantic Settings统一配置,环境变量管理

  4. ORM映射:SQLAlchemy提供类型安全的数据库操作

  5. 异步优先:所有I/O操作都是异步的,避免阻塞

  6. Playwright选型:浏览器自动化框架,支持现代JS渲染

10.2 适用场景

✅ 适合

  • 需要多平台舆情采集的场景

  • 需要智能话题发现的场景

  • 需要深度内容分析的场景

❌ 不适合

  • 实时性要求极高(<1秒)的场景

  • 需要处理海量数据的场景(建议用专门的爬虫框架)

10.3 三大技术创新

  1. 两阶段设计:话题提取提供方向,深度爬取聚焦内容

  2. Playwright架构:浏览器自动化,支持多平台

  3. 配置化设计:Pydantic Settings + 环境变量,降低维护成本


下一章预告

在理解了分布式爬虫系统的基础上,下一章我们将深入解析情感分析模型层

  • 多模型融合策略:BERT/GPT-2/Qwen3/传统ML

  • 模型训练与推理流程

  • 模型集成到InsightEngine的方式

  • 情感分析API的设计

关注公众号,不迷路 👉 BettaFish源码解析系列持续更新中…

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » BettaFish源码解析(五):分布式爬虫系统

评论 抢沙发

4 + 1 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮