乐于分享
好东西不私藏

CSI 插件开发实战(上):规范深读与驱动实现

CSI 插件开发实战(上):规范深读与驱动实现

📚 系列导航

本文是《Kubernetes 深度学习》系列第一/二季第 4/25 篇。 上一篇:存储架构演进:从 PV/PVC 到 CSI 的插件化之路 | 下一篇:CSI 插件开发实战(下):部署、测试与生产上线

理解 CSI 不等于能实现 CSI。本文从规范深读到代码实现,带你手写一个完整的存储驱动——覆盖三层 gRPC 服务设计、Go 语言驱动实现、以及 Unix Socket 通信机制。

核心要点

  • CSI 规范三层拆解
    :Identity / Controller / Node 服务的职责边界与协议设计
  • 完整驱动实现
    :Go 语言实现的本地存储驱动,覆盖卷生命周期全流程
  • gRPC 服务启动
    :Unix Socket 绑定、驱动注册与健康检查机制

CSI 规范深读:三个 gRPC 服务的职责边界

CSI 规范定义的不是一个单一的 gRPC 服务,而是三个独立的服务接口,分别运行在不同的 Pod 容器中,通过 Unix Socket 通信。这种分层设计的妙处在于:允许你选择性地实现某些接口(例如,只实现 Node 服务来支持连接已有的云存储),同时保持架构的内聚性。

Identity 服务:驱动的身份证

go点击代码区域可全选复制
serviceIdentity {rpcGetPluginInfo(GetPluginInfoRequestreturns (GetPluginInfoResponse) {}rpcGetPluginCapabilities(GetPluginCapabilitiesRequestreturns (GetPluginCapabilitiesResponse) {}rpcProbe(ProbeRequestreturns (ProbeResponse) {}}

GetPluginInfo 返回驱动的名称、版本等元数据。Kubernetes 通过这个接口验证驱动是否与集群兼容。关键字段是 name(必须是 FQDN 格式,如 localstorage.k8s.io),kubelet 用它来标识你的驱动。

GetPluginCapabilities 声明驱动支持的能力——比如是否支持卷扩展(EXPAND_VOLUME)、卷统计(GET_VOLUME_STATS)。Kubernetes 根据这个列表决定是否向驱动发送相关请求。换句话说,你不在 Capabilities 里声明的能力,K8s 压根不会来问你,驱动可以只做自己擅长的事。

Probe 是健康检查接口。kubelet 周期性调用它,确保驱动仍然活着。返回值很简单——ready: true 表示准备好接收请求,否则 kubelet 会隔离这个驱动。

Controller 服务:卷的生命周期管理

go点击代码区域可全选复制
serviceController {rpcCreateVolume(CreateVolumeRequestreturns (CreateVolumeResponse) {}rpcDeleteVolume(DeleteVolumeRequestreturns (DeleteVolumeResponse) {}rpcControllerPublishVolume(ControllerPublishVolumeRequestreturns (ControllerPublishVolumeResponse) {}rpcControllerUnpublishVolume(ControllerUnpublishVolumeRequestreturns (ControllerUnpublishVolumeResponse) {}rpcListVolumes(ListVolumesRequestreturns (ListVolumesResponse) {}rpcGetCapacity(GetCapacityRequestreturns (GetCapacityResponse) {}rpcControllerGetCapabilities(ControllerGetCapabilitiesRequestreturns (ControllerGetCapabilitiesResponse) {}}

这个服务运行在控制平面(通常是 Controller Manager Pod),负责卷的全生命周期。CreateVolume 是我在自己的存储后端(比如文件系统、块设备、云存储)中创建实际的卷。这个调用可能很耗时(秒级),所以 CSI spec 特别强调:实现必须是幂等的。如果网络中断,Kubernetes 重试请求时,你必须返回相同结果而不是创建两个卷。

ControllerPublishVolume 负责将卷关联到特定的节点。对于云存储(如 AWS EBS),这步通常是将磁盘 attach 到 EC2 实例。对于本地存储,这步可能是空操作。返回的 publish_context 是一个 key-value 对,kubelet 会在 NodePublishVolume 时接收到它,用来区分同一个卷在不同节点的挂载方式。

Node 服务:Pod 挂载的关键操作

go点击代码区域可全选复制
serviceNode {rpcNodeStageVolume(NodeStageVolumeRequestreturns (NodeStageVolumeResponse) {}rpcNodeUnstageVolume(NodeUnstageVolumeRequestreturns (NodeUnstageVolumeResponse) {}rpcNodePublishVolume(NodePublishVolumeRequestreturns (NodePublishVolumeResponse) {}rpcNodeUnpublishVolume(NodeUnpublishVolumeRequestreturns (NodeUnpublishVolumeResponse) {}rpcNodeGetCapabilities(NodeGetCapabilitiesRequestreturns (NodeGetCapabilitiesResponse) {}rpcNodeGetInfo(NodeGetInfoRequestreturns (NodeGetInfoResponse) {}}

这个服务运行在每个节点(kubelet 通过 gRPC 调用),负责将卷实际挂载到 Pod。NodeStageVolume 是预处理阶段——如果是块设备,这里格式化磁盘、创建文件系统。NodePublishVolume 才是真正的挂载,将卷绑定到 Pod 的挂载点(比如 /var/lib/kubelet/pods/{pod-id}/volumes/kubernetes.io~csi/{volume-id}/mount)。

两阶段设计(Stage + Publish)的好处是:一个块设备可能被多个 Pod 以只读方式使用,Stage 阶段只做一次(格式化、创建 FS),多个 Pod 各自执行 Publish 阶段(只读挂载)。


自定义本地存储驱动的完整实现

现在我们用 Go 实现一个简单但完整的本地存储驱动。这个驱动的后端存储是 Linux 本地文件系统上的目录——每个”卷”就是一个目录。

驱动的基础结构

go点击代码区域可全选复制
packagemainimport (“context”“fmt”“net”“os”“path/filepath”“sync”“google.golang.org/grpc”csi“github.com/container-storage-interface/spec/lib/go/csi”)typeLocalStorageDriverstruct {nodeIDstringdriverNamestringvolumeDirstring// 所有卷的存储路径,如 /var/lib/local-volumesvolumesMutexsync.Mutexvolumesmap[string]*Volume// volumeID -> Volume metadata}typeVolumestruct {IDstringPathstringCapacityint64}funcNewLocalStorageDriver(nodeIDdriverNamevolumeDirstring*LocalStorageDriver {return&LocalStorageDriver{nodeID:    nodeID,driverNamedriverName,volumeDirvolumeDir,volumes:   make(map[string]*Volume), }}

驱动持有一个内存中的卷注册表(volumes map),记录已创建的卷。实际生产系统会用数据库替代这个 map,但这里用 map 演示核心逻辑。

Identity 服务实现

go点击代码区域可全选复制
func (d*LocalStorageDriverGetPluginInfo(ctxcontext.Contextreq*csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponseerror) {return&csi.GetPluginInfoResponse{Name:          d.driverName// “local-storage.k8s.io”VendorVersion“0.1.0”, }, nil}func (d*LocalStorageDriverGetPluginCapabilities(ctxcontext.Contextreq*csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponseerror) {return&csi.GetPluginCapabilitiesResponse{Capabilities: []*csi.PluginCapability{ {Type&csi.PluginCapability_Service_{Service&csi.PluginCapability_Service{Typecsi.PluginCapability_Service_CONTROLLER_SERVICE, }, }, }, }, }, nil}func (d*LocalStorageDriverProbe(ctxcontext.Contextreq*csi.ProbeRequest) (*csi.ProbeResponseerror) {return&csi.ProbeResponse{Ready&wrapperspb.BoolValue{Valuetrue}}, nil}

这三个方法很直接。注意 GetPluginCapabilities 声明驱动支持 CONTROLLER_SERVICE,这告诉 Kubernetes:”我实现了完整的生命周期管理(CreateVolume / DeleteVolume),不是只读驱动”。

下面这张表格汇总了三个服务的核心差异,开发时可以快速查阅:

对比维度
Identity 服务
Controller 服务
Node 服务
运行位置
所有 Pod
控制平面(通常 1 副本)
每个节点(DaemonSet)
必须实现
否(可选)
典型 RPC 数量
3
7+
6
幂等性要求

(CreateVolume 必须幂等)
中(mount 天然幂等)
状态管理
无状态
需持久化卷元数据
依赖本地 mount 表

Controller 服务:创建和删除卷

go点击代码区域可全选复制
func (d*LocalStorageDriverCreateVolume(ctxcontext.Contextreq*csi.CreateVolumeRequest) (*csi.CreateVolumeResponseerror) {d.volumesMutex.Lock()deferd.volumesMutex.Unlock()volumeID:=req.Nameif_exists:=d.volumes[volumeID]; exists {// 幂等性:已存在则返回成功vol:=d.volumes[volumeID]return&csi.CreateVolumeResponse{Volume&csi.Volume{VolumeId:      volumeID,CapacityBytesvol.Capacity, }, }, nil }// 创建卷的物理存储(这里就是一个目录)volPath:=filepath.Join(d.volumeDirvolumeID)iferr:=os.MkdirAll(volPath0755); err!=nil {returnnilfmt.Errorf(“failed to create volume directory: %v”err) }capacity:=req.CapacityRange.RequiredBytesifcapacity==0 {capacity = 1024*1024*1024// 默认 1GiB }d.volumes[volumeID] = &Volume{ID:       volumeID,Path:     volPath,Capacitycapacity, }return&csi.CreateVolumeResponse{Volume&csi.Volume{VolumeId:      volumeID,CapacityBytescapacity, }, }, nil}func (d*LocalStorageDriverDeleteVolume(ctxcontext.Contextreq*csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponseerror) {d.volumesMutex.Lock()deferd.volumesMutex.Unlock()volumeID:=req.VolumeIdvolexists:=d.volumes[volumeID]if !exists {// 幂等性:已删除则返回成功return&csi.DeleteVolumeResponse{}, nil }iferr:=os.RemoveAll(vol.Path); err!=nil {returnnilfmt.Errorf(“failed to delete volume: %v”err) } delete(d.volumesvolumeID)return&csi.DeleteVolumeResponse{}, nil}func (d*LocalStorageDriverControllerPublishVolume(ctxcontext.Contextreq*csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponseerror) {// 本地存储驱动不需要 attach 操作,返回 publish_context 为空return&csi.ControllerPublishVolumeResponse{PublishContextmap[string]string{}, }, nil}func (d*LocalStorageDriverControllerUnpublishVolume(ctxcontext.Contextreq*csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponseerror) {return&csi.ControllerUnpublishVolumeResponse{}, nil}

注意 CreateVolume 的幂等性设计:如果卷已存在,不是报错而是返回现有卷的信息。这样即使网络超时导致 Kubernetes 重试,也能安全处理。ControllerPublishVolume 对本地存储来说是空操作,但你必须实现这个方法——我第一次写 CSI 驱动时漏掉了它,结果 external-attacher 反复报 Unimplemented 错误,排查了快两小时才意识到问题出在这里。

Node 服务:挂载到 Pod

go点击代码区域可全选复制
func (d*LocalStorageDriverNodeStageVolume(ctxcontext.Contextreq*csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponseerror) {// 本地存储不需要 stage 操作(格式化),直接返回return&csi.NodeStageVolumeResponse{}, nil}func (d*LocalStorageDriverNodeUnstageVolume(ctxcontext.Contextreq*csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponseerror) {return&csi.NodeUnstageVolumeResponse{}, nil}func (d*LocalStorageDriverNodePublishVolume(ctxcontext.Contextreq*csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponseerror) {targetPath:=req.TargetPath// 例如 /var/lib/kubelet/pods/xxx/volumes/kubernetes.io~csi/vol-1/mountvolumeID:=req.VolumeIdd.volumesMutex.Lock()volexists:=d.volumes[volumeID]d.volumesMutex.Unlock()if !exists {returnnilfmt.Errorf(“volume %s not found”volumeID) }// 创建目标挂载点iferr:=os.MkdirAll(targetPath0755); err!=nil {returnnilfmt.Errorf(“failed to create mount point: %v”err) }// 执行 bind mount:将卷目录挂载到 Pod 的挂载点// 实际实现需要调用 syscall.Mount()iferr:=syscall.Mount(vol.PathtargetPath“”syscall.MS_BIND“”); err!=nil {returnnilfmt.Errorf(“mount failed: %v”err) }return&csi.NodePublishVolumeResponse{}, nil}func (d*LocalStorageDriverNodeUnpublishVolume(ctxcontext.Contextreq*csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponseerror) {targetPath:=req.TargetPathiferr:=syscall.Unmount(targetPath0); err!=nil {returnnilfmt.Errorf(“unmount failed: %v”err) }return&csi.NodeUnpublishVolumeResponse{}, nil}func (d*LocalStorageDriverNodeGetInfo(ctxcontext.Contextreq*csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponseerror) {return&csi.NodeGetInfoResponse{NodeId:            d.nodeID,MaxVolumesPerNode100// 单个节点最多挂载 100 个卷 }, nil}

NodePublishVolume 是 Pod 能否使用卷的关键。通过 syscall.Mount() 执行 bind mount,将卷目录(/var/lib/local-volumes/vol-1)挂载到 Pod 的挂载点(/var/lib/kubelet/pods/xxx/volumes/...)。kubelet 看到挂载点出现后,才会启动 Pod。有一个容易忽略的细节:NodeUnpublishVolume 里的 Unmount 失败时,如果你直接返回错误,kubelet 会不断重试,但 Pod 也无法被删除。生产中更稳妥的做法是先检查挂载点是否还存在(os.Stat(targetPath)),不存在就直接返回成功。


gRPC 服务启动与 Unix Socket 通信

go点击代码区域可全选复制
func main() {   driverName:=“local-storage.k8s.io”   nodeID:=os.Getenv(“NODE_ID”)   ifnodeID==“” {      nodeID = “local-node-1” }volumeDir:=“/var/lib/local-volumes”os.MkdirAll(volumeDir0755)driver:=NewLocalStorageDriver(nodeIDdriverNamevolumeDir)// 创建 gRPC 服务器并注册驱动grpcServer:=grpc.NewServer()csi.RegisterIdentityServer(grpcServerdriver)csi.RegisterControllerServer(grpcServerdriver)csi.RegisterNodeServer(grpcServerdriver)// 绑定到 Unix SocketsocketPath:=“/var/lib/kubelet/plugins/local-storage.k8s.io/csi.sock”os.MkdirAll(filepath.Dir(socketPath), 0755)os.Remove(socketPath// 防止旧的 socket 文件冲突listenererr:=net.Listen(“unix”socketPath)iferr!=nil {log.Fatalf(“Failed to listen on socket: %v”err) }log.Printf(“CSI driver starting on %s”socketPath)iferr:=grpcServer.Serve(listener); err!=nil {log.Fatalf(“Server failed: %v”err) }}

驱动启动时,创建一个 Unix Socket(通常在 /var/lib/kubelet/plugins/{driver-name}/csi.sock)并绑定 gRPC 服务。kubelet 和 external-provisioner 通过这个 Socket 与驱动通信。Socket 文件本身就是驱动存活的证明——kubelet 检测 Socket 文件存在就认为驱动已就绪。我曾在一次故障排查中发现,驱动重启后旧的 Socket 文件没被清理,导致 kubelet 一直连不上新进程,所以上面代码里 os.Remove(socketPath) 这一行不能省。

验证 Socket 是否正常工作,可以用 csi-sanity 或直接用 grpcurl 测试:

bash点击代码区域可全选复制
# 检查 Socket 文件是否存在ls -la /var/lib/kubelet/plugins/local-storage.k8s.io/csi.sock# 用 grpcurl 测试 Identity 服务grpcurl -plaintext -unix /var/lib/kubelet/plugins/local-storage.k8s.io/csi.sock csi.v1.Identity/GetPluginInfo

到这里,我们已经完成了 CSI 驱动的核心实现——从规范理解到三层 gRPC 服务的 Go 代码,再到 Unix Socket 启动机制。但代码写完只是开始,要让驱动真正在 Kubernetes 集群中跑起来,还需要配套的 Sidecar 容器编排、部署 YAML、测试验证和生产环境配置。

现在就动手:把上面的代码跑起来,用 csi-sanity 对你的驱动做一次合规测试。它会自动调用所有 CSI RPC 并验证返回值是否符合规范——我每次改完驱动都先跑一遍,能拦住 80% 的低级错误:

bash点击代码区域可全选复制
# 安装 csi-sanitygo install github.com/kubernetes-csi/csi-test/cmd/csi-sanity@latest# 对你的驱动跑合规测试(驱动需先启动)csi-sanity –csi.endpoint /var/lib/kubelet/plugins/local-storage.k8s.io/csi.sock \           –csi.testvolumesize 1073741824

下一篇预告

《CSI 插件开发实战(下):部署、测试与生产上线》 — 我们将完成 Sidecar 容器编排(provisioner / attacher / registrar)、Controller StatefulSet 和 Node DaemonSet 部署、单元测试与集成测试、本地集群调试技巧,以及生产环境的 RBAC / StorageClass / 监控告警配置。