LangChain 插件架构实现机制分析
1. 概述
LangChain 是一个用于构建基于大语言模型(LLM)应用的开源框架,其核心设计理念是「模块化」和「可组合性」,通过统一的接口抽象和灵活的插件机制,使开发者能够轻松构建复杂的 AI 应用。
2. 整体架构
2.1 Monorepo 项目结构
LangChain 采用 「monorepo(单仓库多包)」 架构组织代码:
langchain/
├── libs/ # 核心代码库
│ ├── core/ # 基础抽象和 LCEL 表达式语言
│ ├── langchain/ # 经典版 LangChain 包
│ ├── langchain_v1/ # 当前主版本包
│ ├── community/ # 第三方社区集成
│ ├── partners/ # 官方合作伙伴集成包
│ ├── text-splitters/ # 文本分割工具
│ ├── standard-tests/ # 标准化测试套件
│ └── cli/ # 命令行工具
├── docs/ # 文档
├── cookbook/ # 教程和示例
└── templates/ # 可部署的参考架构
2.2 包层次结构
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ (langchain / langchain_v1) │
│ Chains, Agents, Retrieval Strategies │
├─────────────────────────────────────────────────────────────┤
│ Integration Layer │
│ ┌──────────────────┐ ┌──────────────────────────────┐ │
│ │ langchain-openai │ │ langchain-community │ │
│ │ langchain-anthr. │ │ (第三方社区集成) │ │
│ │ langchain-google │ │ │ │
│ └──────────────────┘ └──────────────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Core Layer │
│ (langchain-core) │
│ Runnable Interface, LCEL, Base Classes, Callbacks │
└─────────────────────────────────────────────────────────────┘
3. 核心插件机制
3.1 Runnable 接口 – 统一抽象层
「Runnable」 是 LangChain 最核心的抽象接口,所有可执行组件都实现此接口,这是实现插件化的基础。
3.1.1 Runnable 接口定义
from abc import ABC, abstractmethod
from typing import Any, Optional, List, AsyncIterator, Iterator
classRunnable(ABC):
"""所有可运行组件的基类"""
@abstractmethod
definvoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
"""同步执行单个输入"""
pass
asyncdefainvoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
"""异步执行单个输入"""
pass
defbatch(self, inputs: List[Any], config: Optional[RunnableConfig] = None) -> List[Any]:
"""批量执行多个输入"""
pass
asyncdefabatch(self, inputs: List[Any], config: Optional[RunnableConfig] = None) -> List[Any]:
"""异步批量执行"""
pass
defstream(self, input: Any, config: Optional[RunnableConfig] = None) -> Iterator[Any]:
"""流式返回响应"""
pass
asyncdefastream(self, input: Any, config: Optional[RunnableConfig] = None) -> AsyncIterator[Any]:
"""异步流式返回"""
pass
3.1.2 Runnable 组合方式
LangChain 提供两种主要的 Runnable 组合方式:
「RunnableSequence(顺序执行)」:
from langchain_core.runnables import RunnableSequence
# 方式1:使用管道操作符
chain = prompt | llm | output_parser
# 方式2:显式构造
chain = RunnableSequence([prompt, llm, output_parser])
「RunnableParallel(并行执行)」:
from langchain_core.runnables import RunnableParallel
# 并行执行多个 Runnable
parallel = RunnableParallel({
"summary": summary_chain,
"translation": translation_chain
})
3.2 LCEL (LangChain Expression Language)
LCEL 是 LangChain 的声明式表达语言,通过管道操作符 | 串联实现了 Runnable 接口的组件。
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# 使用 LCEL 构建链
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个有帮助的助手"),
("human", "{input}")
])
model = ChatOpenAI(model="gpt-4")
output_parser = StrOutputParser()
# 管道操作符组合
chain = prompt | model | output_parser
# 执行
result = chain.invoke({"input": "你好"})
3.3 Tool 工具系统
Tool 是 LangChain 插件架构的核心组件之一,用于扩展 LLM 的能力。
3.3.1 BaseTool 基类
from abc import ABC, abstractmethod
from pydantic import BaseModel
from typing import Any, Optional, Type
classBaseTool(BaseModel, ABC):
"""工具基类"""
name: str # 工具名称(必须唯一)
description: str # 工具描述(供 LLM 理解)
args_schema: Optional[Type[BaseModel]] = None# 参数模式
return_direct: bool = False# 是否直接返回结果
@abstractmethod
def_run(self, *args, **kwargs) -> Any:
"""同步执行工具"""
pass
asyncdef_arun(self, *args, **kwargs) -> Any:
"""异步执行工具"""
raise NotImplementedError("异步执行未实现")
3.3.2 创建工具的三种方式
「方式1:@tool 装饰器(最简单)」
from langchain_core.tools import tool
@tool
defmultiply(a: int, b: int) -> int:
"""将两个数字相乘"""
return a * b
print(multiply.name) # "multiply"
print(multiply.description) # "将两个数字相乘"
print(multiply.args) # {'a': {'type': 'integer'}, 'b': {'type': 'integer'}}
「方式2:StructuredTool.from_function(更多配置)」
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
classSearchInput(BaseModel):
query: str = Field(description="搜索查询")
max_results: int = Field(default=10, description="最大结果数")
defsearch_function(query: str, max_results: int = 10) -> str:
returnf"搜索 '{query}' 的结果"
search_tool = StructuredTool.from_function(
func=search_function,
name="search",
description="在网络上搜索信息",
args_schema=SearchInput,
return_direct=False
)
「方式3:继承 BaseTool(最灵活)」
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type
classCalculatorInput(BaseModel):
expression: str = Field(description="数学表达式")
classCalculatorTool(BaseTool):
name: str = "calculator"
description: str = "计算数学表达式"
args_schema: Type[BaseModel] = CalculatorInput
def_run(self, expression: str) -> str:
try:
result = eval(expression)
return str(result)
except Exception as e:
returnf"计算错误: {e}"
asyncdef_arun(self, expression: str) -> str:
return self._run(expression)
3.4 Callback 回调系统
回调系统是 LangChain 的事件驱动机制,用于监控、日志记录和调试。
3.4.1 BaseCallbackHandler 接口
from typing import Any, Dict, List
from langchain_core.callbacks import BaseCallbackHandler
classBaseCallbackHandler:
"""回调处理器基类"""
defon_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs
) -> Any:
"""LLM 开始运行时触发"""
pass
defon_llm_new_token(self, token: str, **kwargs) -> Any:
"""生成新 token 时触发(流式)"""
pass
defon_llm_end(self, response: Any, **kwargs) -> Any:
"""LLM 结束时触发"""
pass
defon_llm_error(self, error: Exception, **kwargs) -> Any:
"""LLM 出错时触发"""
pass
defon_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs
) -> Any:
"""Chain 开始时触发"""
pass
defon_chain_end(self, outputs: Dict[str, Any], **kwargs) -> Any:
"""Chain 结束时触发"""
pass
defon_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
**kwargs
) -> Any:
"""Tool 开始时触发"""
pass
defon_tool_end(self, output: str, **kwargs) -> Any:
"""Tool 结束时触发"""
pass
defon_agent_action(self, action: Any, **kwargs) -> Any:
"""Agent 执行动作时触发"""
pass
defon_agent_finish(self, finish: Any, **kwargs) -> Any:
"""Agent 完成时触发"""
pass
3.4.2 自定义回调处理器示例
from langchain_core.callbacks import BaseCallbackHandler
classCustomHandler(BaseCallbackHandler):
"""自定义回调处理器"""
defon_llm_start(self, serialized, prompts, **kwargs):
print(f"[LLM开始] 提示词: {prompts}")
defon_llm_new_token(self, token, **kwargs):
print(f"[新Token] {token}", end="", flush=True)
defon_llm_end(self, response, **kwargs):
print(f"\n[LLM结束] 响应: {response}")
defon_tool_start(self, serialized, input_str, **kwargs):
print(f"[工具开始] 输入: {input_str}")
# 使用回调
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="gpt-4",
streaming=True,
callbacks=[CustomHandler()]
)
3.5 OutputParser 输出解析器
OutputParser 用于将 LLM 的原始文本输出转换为结构化数据。
3.5.1 BaseOutputParser 接口
from abc import ABC, abstractmethod
from typing import TypeVar, Generic
T = TypeVar('T')
classBaseOutputParser(ABC, Generic[T]):
"""输出解析器基类"""
@abstractmethod
defparse(self, text: str) -> T:
"""解析 LLM 输出文本"""
pass
defget_format_instructions(self) -> str:
"""返回格式化指令,注入到提示词中"""
pass
defparse_with_prompt(self, completion: str, prompt: Any) -> T:
"""带提示词的解析"""
return self.parse(completion)
3.5.2 常用解析器
from langchain_core.output_parsers import (
StrOutputParser, # 字符串解析
JsonOutputParser, # JSON 解析
CommaSeparatedListOutputParser, # 逗号分隔列表
PydanticOutputParser, # Pydantic 模型解析
)
# JSON 解析器示例
from pydantic import BaseModel, Field
classMovieReview(BaseModel):
title: str = Field(description="电影标题")
rating: int = Field(description="评分 1-10")
summary: str = Field(description="简短评价")
parser = JsonOutputParser(pydantic_object=MovieReview)
# 获取格式化指令
instructions = parser.get_format_instructions()
# 输出: "请以 JSON 格式返回,包含以下字段: title, rating, summary..."
3.6 Memory 记忆系统
Memory 组件用于管理对话历史和状态。
3.6.1 BaseChatMessageHistory 接口
from abc import ABC, abstractmethod
from typing import List
from langchain_core.messages import BaseMessage
classBaseChatMessageHistory(ABC):
"""聊天消息历史基类"""
@property
@abstractmethod
defmessages(self) -> List[BaseMessage]:
"""获取所有消息"""
pass
@abstractmethod
defadd_message(self, message: BaseMessage) -> None:
"""添加消息"""
pass
@abstractmethod
defclear(self) -> None:
"""清空历史"""
pass
3.6.2 内置实现
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_community.chat_message_histories import (
FileChatMessageHistory, # 文件存储
RedisChatMessageHistory, # Redis 存储
SQLChatMessageHistory, # SQL 数据库存储
)
# 内存存储
memory = InMemoryChatMessageHistory()
memory.add_user_message("你好")
memory.add_ai_message("你好!有什么可以帮助你的?")
# 文件存储
file_memory = FileChatMessageHistory(file_path="chat_history.json")
4. 集成包架构
4.1 包分层策略
LangChain 采用分层的集成包策略:
|
|
|
|
|---|---|---|
| 「langchain-core」 |
|
|
| 「Partner Packages」 |
|
|
| 「langchain-community」 |
|
|
4.2 创建集成包
创建新的集成包需要遵循以下结构:
langchain-myintegration/
├── pyproject.toml
├── README.md
├── langchain_myintegration/
│ ├── __init__.py
│ ├── chat_models.py # 聊天模型集成
│ ├── llms.py # LLM 集成
│ ├── embeddings.py # 嵌入模型集成
│ ├── vectorstores.py # 向量存储集成
│ └── tools.py # 工具集成
└── tests/
└── unit_tests/
4.3 集成示例:自定义 LLM
from langchain_core.language_models.llms import LLM
from typing import Any, List, Optional
classMyCustomLLM(LLM):
"""自定义 LLM 集成"""
model_name: str = "my-model"
api_key: str = ""
@property
def_llm_type(self) -> str:
return"my_custom_llm"
def_call(
self,
prompt: str,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> str:
"""执行 LLM 调用"""
# 实现具体的 API 调用逻辑
response = self._make_api_call(prompt)
return response
asyncdef_acall(
self,
prompt: str,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> str:
"""异步执行 LLM 调用"""
response = await self._make_async_api_call(prompt)
return response
@property
def_identifying_params(self) -> dict:
"""返回标识参数"""
return {"model_name": self.model_name}
5. 设计模式分析
5.1 使用的设计模式
|
|
|
|
|---|---|---|
| 「策略模式」 |
|
|
| 「装饰器模式」 |
|
|
| 「工厂模式」 |
|
|
| 「组合模式」 |
|
|
| 「观察者模式」 |
|
|
| 「模板方法模式」 |
|
|
| 「责任链模式」 |
|
|
5.2 核心设计原则
-
「接口隔离原则」:通过 Runnable 接口统一所有可执行组件 -
「依赖倒置原则」:高层模块依赖抽象(BaseTool, BaseCallbackHandler) -
「开闭原则」:通过继承和组合扩展功能,无需修改核心代码 -
「单一职责原则」:每个组件专注于单一功能
6. 扩展机制总结
6.1 扩展点
|
|
|
|
|---|---|---|
|
|
BaseLLM
BaseChatModel |
|
|
|
BaseTool |
|
|
|
VectorStore |
|
|
|
Embeddings |
|
|
|
BaseLoader |
|
|
|
TextSplitter |
|
|
|
BaseOutputParser |
|
|
|
BaseChatMessageHistory |
|
|
|
BaseCallbackHandler |
|
6.2 扩展流程
1. 确定扩展类型 → 2. 继承对应基类 → 3. 实现抽象方法 → 4. 注册/使用组件
7. 最佳实践
7.1 创建自定义工具
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type, Optional
classWeatherInput(BaseModel):
"""天气查询输入"""
city: str = Field(description="城市名称")
date: Optional[str] = Field(default=None, description="日期,格式 YYYY-MM-DD")
classWeatherTool(BaseTool):
"""天气查询工具"""
name: str = "weather"
description: str = "查询指定城市的天气信息"
args_schema: Type[BaseModel] = WeatherInput
api_key: str = ""# 配置项
def_run(self, city: str, date: Optional[str] = None) -> str:
# 实现天气查询逻辑
returnf"{city} 的天气是晴天,温度 25°C"
asyncdef_arun(self, city: str, date: Optional[str] = None) -> str:
# 异步实现
return self._run(city, date)
7.2 创建自定义回调
from langchain_core.callbacks import BaseCallbackHandler
from datetime import datetime
import json
classMetricsCallbackHandler(BaseCallbackHandler):
"""性能指标收集回调"""
def__init__(self):
self.metrics = []
self.start_time = None
defon_llm_start(self, serialized, prompts, **kwargs):
self.start_time = datetime.now()
defon_llm_end(self, response, **kwargs):
if self.start_time:
duration = (datetime.now() - self.start_time).total_seconds()
self.metrics.append({
"type": "llm_call",
"duration": duration,
"timestamp": datetime.now().isoformat()
})
defget_metrics(self) -> str:
return json.dumps(self.metrics, indent=2)
8. 总结
LangChain 的插件架构具有以下特点:
-
「统一的 Runnable 接口」:所有组件实现相同接口,支持无缝组合 -
「声明式 LCEL 语言」:通过管道操作符简化链的构建 -
「丰富的扩展点」:工具、模型、存储、解析器等都可扩展 -
「事件驱动的回调系统」:支持监控、日志和调试 -
「分层的包架构」:核心层、集成层、应用层分离 -
「社区生态」:通过 langchain-community 支持大量第三方集成
这种架构设计使 LangChain 成为一个高度可扩展、易于使用的 LLM 应用开发框架。
夜雨聆风
