乐于分享
好东西不私藏

LangChain 插件架构实现机制分析

LangChain 插件架构实现机制分析

1. 概述

LangChain 是一个用于构建基于大语言模型(LLM)应用的开源框架,其核心设计理念是「模块化」「可组合性」,通过统一的接口抽象和灵活的插件机制,使开发者能够轻松构建复杂的 AI 应用。

2. 整体架构

2.1 Monorepo 项目结构

LangChain 采用 「monorepo(单仓库多包)」 架构组织代码:

langchain/
├── libs/                    # 核心代码库
│   ├── core/               # 基础抽象和 LCEL 表达式语言
│   ├── langchain/          # 经典版 LangChain 包
│   ├── langchain_v1/       # 当前主版本包
│   ├── community/          # 第三方社区集成
│   ├── partners/           # 官方合作伙伴集成包
│   ├── text-splitters/     # 文本分割工具
│   ├── standard-tests/     # 标准化测试套件
│   └── cli/                # 命令行工具
├── docs/                   # 文档
├── cookbook/               # 教程和示例
└── templates/              # 可部署的参考架构

2.2 包层次结构

┌─────────────────────────────────────────────────────────────┐
│                    Application Layer                         │
│              (langchain / langchain_v1)                      │
│         Chains, Agents, Retrieval Strategies                 │
├─────────────────────────────────────────────────────────────┤
│                   Integration Layer                          │
│    ┌──────────────────┐  ┌──────────────────────────────┐   │
│    │ langchain-openai │  │    langchain-community       │   │
│    │ langchain-anthr. │  │    (第三方社区集成)           │   │
│    │ langchain-google │  │                              │   │
│    └──────────────────┘  └──────────────────────────────┘   │
├─────────────────────────────────────────────────────────────┤
│                     Core Layer                               │
│                   (langchain-core)                           │
│    Runnable Interface, LCEL, Base Classes, Callbacks         │
└─────────────────────────────────────────────────────────────┘

3. 核心插件机制

3.1 Runnable 接口 – 统一抽象层

「Runnable」 是 LangChain 最核心的抽象接口,所有可执行组件都实现此接口,这是实现插件化的基础。

3.1.1 Runnable 接口定义

from abc import ABC, abstractmethod
from typing import Any, Optional, List, AsyncIterator, Iterator

classRunnable(ABC):
"""所有可运行组件的基类"""

    @abstractmethod
definvoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
"""同步执行单个输入"""
pass

asyncdefainvoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
"""异步执行单个输入"""
pass

defbatch(self, inputs: List[Any], config: Optional[RunnableConfig] = None) -> List[Any]:
"""批量执行多个输入"""
pass

asyncdefabatch(self, inputs: List[Any], config: Optional[RunnableConfig] = None) -> List[Any]:
"""异步批量执行"""
pass

defstream(self, input: Any, config: Optional[RunnableConfig] = None) -> Iterator[Any]:
"""流式返回响应"""
pass

asyncdefastream(self, input: Any, config: Optional[RunnableConfig] = None) -> AsyncIterator[Any]:
"""异步流式返回"""
pass

3.1.2 Runnable 组合方式

LangChain 提供两种主要的 Runnable 组合方式:

「RunnableSequence(顺序执行)」

from langchain_core.runnables import RunnableSequence

# 方式1:使用管道操作符
chain = prompt | llm | output_parser

# 方式2:显式构造
chain = RunnableSequence([prompt, llm, output_parser])

「RunnableParallel(并行执行)」

from langchain_core.runnables import RunnableParallel

# 并行执行多个 Runnable
parallel = RunnableParallel({
"summary": summary_chain,
"translation": translation_chain
})

3.2 LCEL (LangChain Expression Language)

LCEL 是 LangChain 的声明式表达语言,通过管道操作符 | 串联实现了 Runnable 接口的组件。

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 使用 LCEL 构建链
prompt = ChatPromptTemplate.from_messages([
    ("system""你是一个有帮助的助手"),
    ("human""{input}")
])

model = ChatOpenAI(model="gpt-4")
output_parser = StrOutputParser()

# 管道操作符组合
chain = prompt | model | output_parser

# 执行
result = chain.invoke({"input""你好"})

3.3 Tool 工具系统

Tool 是 LangChain 插件架构的核心组件之一,用于扩展 LLM 的能力。

3.3.1 BaseTool 基类

from abc import ABC, abstractmethod
from pydantic import BaseModel
from typing import Any, Optional, Type

classBaseTool(BaseModel, ABC):
"""工具基类"""

    name: str                           # 工具名称(必须唯一)
    description: str                    # 工具描述(供 LLM 理解)
    args_schema: Optional[Type[BaseModel]] = None# 参数模式
    return_direct: bool = False# 是否直接返回结果

    @abstractmethod
def_run(self, *args, **kwargs) -> Any:
"""同步执行工具"""
pass

asyncdef_arun(self, *args, **kwargs) -> Any:
"""异步执行工具"""
raise NotImplementedError("异步执行未实现")

3.3.2 创建工具的三种方式

「方式1:@tool 装饰器(最简单)」

from langchain_core.tools import tool

@tool
defmultiply(a: int, b: int) -> int:
"""将两个数字相乘"""
return a * b

print(multiply.name)        # "multiply"
print(multiply.description) # "将两个数字相乘"
print(multiply.args)        # {'a': {'type': 'integer'}, 'b': {'type': 'integer'}}

「方式2:StructuredTool.from_function(更多配置)」

from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field

classSearchInput(BaseModel):
    query: str = Field(description="搜索查询")
    max_results: int = Field(default=10, description="最大结果数")

defsearch_function(query: str, max_results: int = 10) -> str:
returnf"搜索 '{query}' 的结果"

search_tool = StructuredTool.from_function(
    func=search_function,
    name="search",
    description="在网络上搜索信息",
    args_schema=SearchInput,
    return_direct=False
)

「方式3:继承 BaseTool(最灵活)」

from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type

classCalculatorInput(BaseModel):
    expression: str = Field(description="数学表达式")

classCalculatorTool(BaseTool):
    name: str = "calculator"
    description: str = "计算数学表达式"
    args_schema: Type[BaseModel] = CalculatorInput

def_run(self, expression: str) -> str:
try:
            result = eval(expression)
return str(result)
except Exception as e:
returnf"计算错误: {e}"

asyncdef_arun(self, expression: str) -> str:
return self._run(expression)

3.4 Callback 回调系统

回调系统是 LangChain 的事件驱动机制,用于监控、日志记录和调试。

3.4.1 BaseCallbackHandler 接口

from typing import Any, Dict, List
from langchain_core.callbacks import BaseCallbackHandler

classBaseCallbackHandler:
"""回调处理器基类"""

defon_llm_start(
        self, 
        serialized: Dict[str, Any], 
        prompts: List[str], 
        **kwargs
    )
 -> Any:

"""LLM 开始运行时触发"""
pass

defon_llm_new_token(self, token: str, **kwargs) -> Any:
"""生成新 token 时触发(流式)"""
pass

defon_llm_end(self, response: Any, **kwargs) -> Any:
"""LLM 结束时触发"""
pass

defon_llm_error(self, error: Exception, **kwargs) -> Any:
"""LLM 出错时触发"""
pass

defon_chain_start(
        self, 
        serialized: Dict[str, Any], 
        inputs: Dict[str, Any], 
        **kwargs
    )
 -> Any:

"""Chain 开始时触发"""
pass

defon_chain_end(self, outputs: Dict[str, Any], **kwargs) -> Any:
"""Chain 结束时触发"""
pass

defon_tool_start(
        self, 
        serialized: Dict[str, Any], 
        input_str: str, 
        **kwargs
    )
 -> Any:

"""Tool 开始时触发"""
pass

defon_tool_end(self, output: str, **kwargs) -> Any:
"""Tool 结束时触发"""
pass

defon_agent_action(self, action: Any, **kwargs) -> Any:
"""Agent 执行动作时触发"""
pass

defon_agent_finish(self, finish: Any, **kwargs) -> Any:
"""Agent 完成时触发"""
pass

3.4.2 自定义回调处理器示例

from langchain_core.callbacks import BaseCallbackHandler

classCustomHandler(BaseCallbackHandler):
"""自定义回调处理器"""

defon_llm_start(self, serialized, prompts, **kwargs):
        print(f"[LLM开始] 提示词: {prompts}")

defon_llm_new_token(self, token, **kwargs):
        print(f"[新Token] {token}", end="", flush=True)

defon_llm_end(self, response, **kwargs):
        print(f"\n[LLM结束] 响应: {response}")

defon_tool_start(self, serialized, input_str, **kwargs):
        print(f"[工具开始] 输入: {input_str}")

# 使用回调
from langchain_openai import ChatOpenAI

model = ChatOpenAI(
    model="gpt-4",
    streaming=True,
    callbacks=[CustomHandler()]
)

3.5 OutputParser 输出解析器

OutputParser 用于将 LLM 的原始文本输出转换为结构化数据。

3.5.1 BaseOutputParser 接口

from abc import ABC, abstractmethod
from typing import TypeVar, Generic

T = TypeVar('T')

classBaseOutputParser(ABC, Generic[T]):
"""输出解析器基类"""

    @abstractmethod
defparse(self, text: str) -> T:
"""解析 LLM 输出文本"""
pass

defget_format_instructions(self) -> str:
"""返回格式化指令,注入到提示词中"""
pass

defparse_with_prompt(self, completion: str, prompt: Any) -> T:
"""带提示词的解析"""
return self.parse(completion)

3.5.2 常用解析器

from langchain_core.output_parsers import (
    StrOutputParser,           # 字符串解析
    JsonOutputParser,          # JSON 解析
    CommaSeparatedListOutputParser,  # 逗号分隔列表
    PydanticOutputParser,      # Pydantic 模型解析
)

# JSON 解析器示例
from pydantic import BaseModel, Field

classMovieReview(BaseModel):
    title: str = Field(description="电影标题")
    rating: int = Field(description="评分 1-10")
    summary: str = Field(description="简短评价")

parser = JsonOutputParser(pydantic_object=MovieReview)

# 获取格式化指令
instructions = parser.get_format_instructions()
# 输出: "请以 JSON 格式返回,包含以下字段: title, rating, summary..."

3.6 Memory 记忆系统

Memory 组件用于管理对话历史和状态。

3.6.1 BaseChatMessageHistory 接口

from abc import ABC, abstractmethod
from typing import List
from langchain_core.messages import BaseMessage

classBaseChatMessageHistory(ABC):
"""聊天消息历史基类"""

    @property
    @abstractmethod
defmessages(self) -> List[BaseMessage]:
"""获取所有消息"""
pass

    @abstractmethod
defadd_message(self, message: BaseMessage) -> None:
"""添加消息"""
pass

    @abstractmethod
defclear(self) -> None:
"""清空历史"""
pass

3.6.2 内置实现

from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_community.chat_message_histories import (
    FileChatMessageHistory,    # 文件存储
    RedisChatMessageHistory,   # Redis 存储
    SQLChatMessageHistory,     # SQL 数据库存储
)

# 内存存储
memory = InMemoryChatMessageHistory()
memory.add_user_message("你好")
memory.add_ai_message("你好!有什么可以帮助你的?")

# 文件存储
file_memory = FileChatMessageHistory(file_path="chat_history.json")

4. 集成包架构

4.1 包分层策略

LangChain 采用分层的集成包策略:

包类型
说明
示例
「langchain-core」
核心抽象,无第三方依赖
Runnable, BaseTool, BaseCallbackHandler
「Partner Packages」
官方维护的轻量级集成
langchain-openai, langchain-anthropic
「langchain-community」
社区维护的第三方集成
各种向量数据库、工具集成

4.2 创建集成包

创建新的集成包需要遵循以下结构:

langchain-myintegration/
├── pyproject.toml
├── README.md
├── langchain_myintegration/
│   ├── __init__.py
│   ├── chat_models.py      # 聊天模型集成
│   ├── llms.py             # LLM 集成
│   ├── embeddings.py       # 嵌入模型集成
│   ├── vectorstores.py     # 向量存储集成
│   └── tools.py            # 工具集成
└── tests/
    └── unit_tests/

4.3 集成示例:自定义 LLM

from langchain_core.language_models.llms import LLM
from typing import Any, List, Optional

classMyCustomLLM(LLM):
"""自定义 LLM 集成"""

    model_name: str = "my-model"
    api_key: str = ""

    @property
def_llm_type(self) -> str:
return"my_custom_llm"

def_call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        **kwargs: Any,
    )
 -> str:

"""执行 LLM 调用"""
# 实现具体的 API 调用逻辑
        response = self._make_api_call(prompt)
return response

asyncdef_acall(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        **kwargs: Any,
    )
 -> str:

"""异步执行 LLM 调用"""
        response = await self._make_async_api_call(prompt)
return response

    @property
def_identifying_params(self) -> dict:
"""返回标识参数"""
return {"model_name": self.model_name}

5. 设计模式分析

5.1 使用的设计模式

设计模式
应用场景
说明
「策略模式」
Tool, LLM, Memory
允许运行时切换不同实现
「装饰器模式」
@tool, Callbacks
动态添加功能
「工厂模式」
from_function, from_template
创建组件实例
「组合模式」
RunnableSequence, RunnableParallel
组合多个组件
「观察者模式」
Callback 系统
监听和响应事件
「模板方法模式」
BaseTool._run
定义算法骨架
「责任链模式」
LCEL 管道
请求沿链传递

5.2 核心设计原则

  1. 「接口隔离原则」:通过 Runnable 接口统一所有可执行组件
  2. 「依赖倒置原则」:高层模块依赖抽象(BaseTool, BaseCallbackHandler)
  3. 「开闭原则」:通过继承和组合扩展功能,无需修改核心代码
  4. 「单一职责原则」:每个组件专注于单一功能

6. 扩展机制总结

6.1 扩展点

扩展点
基类/接口
用途
语言模型
BaseLLM

BaseChatModel
集成新的 LLM
工具
BaseTool
添加新的工具能力
向量存储
VectorStore
集成向量数据库
嵌入模型
Embeddings
集成嵌入模型
文档加载器
BaseLoader
加载不同格式文档
文本分割器
TextSplitter
自定义分割策略
输出解析器
BaseOutputParser
自定义输出格式
记忆存储
BaseChatMessageHistory
自定义存储后端
回调处理
BaseCallbackHandler
自定义事件处理

6.2 扩展流程

1. 确定扩展类型 → 2. 继承对应基类 → 3. 实现抽象方法 → 4. 注册/使用组件

7. 最佳实践

7.1 创建自定义工具

from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type, Optional

classWeatherInput(BaseModel):
"""天气查询输入"""
    city: str = Field(description="城市名称")
    date: Optional[str] = Field(default=None, description="日期,格式 YYYY-MM-DD")

classWeatherTool(BaseTool):
"""天气查询工具"""

    name: str = "weather"
    description: str = "查询指定城市的天气信息"
    args_schema: Type[BaseModel] = WeatherInput

    api_key: str = ""# 配置项

def_run(self, city: str, date: Optional[str] = None) -> str:
# 实现天气查询逻辑
returnf"{city} 的天气是晴天,温度 25°C"

asyncdef_arun(self, city: str, date: Optional[str] = None) -> str:
# 异步实现
return self._run(city, date)

7.2 创建自定义回调

from langchain_core.callbacks import BaseCallbackHandler
from datetime import datetime
import json

classMetricsCallbackHandler(BaseCallbackHandler):
"""性能指标收集回调"""

def__init__(self):
        self.metrics = []
        self.start_time = None

defon_llm_start(self, serialized, prompts, **kwargs):
        self.start_time = datetime.now()

defon_llm_end(self, response, **kwargs):
if self.start_time:
            duration = (datetime.now() - self.start_time).total_seconds()
            self.metrics.append({
"type""llm_call",
"duration": duration,
"timestamp": datetime.now().isoformat()
            })

defget_metrics(self) -> str:
return json.dumps(self.metrics, indent=2)

8. 总结

LangChain 的插件架构具有以下特点:

  1. 「统一的 Runnable 接口」:所有组件实现相同接口,支持无缝组合
  2. 「声明式 LCEL 语言」:通过管道操作符简化链的构建
  3. 「丰富的扩展点」:工具、模型、存储、解析器等都可扩展
  4. 「事件驱动的回调系统」:支持监控、日志和调试
  5. 「分层的包架构」:核心层、集成层、应用层分离
  6. 「社区生态」:通过 langchain-community 支持大量第三方集成

这种架构设计使 LangChain 成为一个高度可扩展、易于使用的 LLM 应用开发框架。

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » LangChain 插件架构实现机制分析

评论 抢沙发

9 + 2 =
  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮