Containerd 源码解析:(四)Containerd 创建容器进程

大家好,我是费益洲。通过上一篇文章我们知道,Containerd 创建容器其实就是在数据库中注册容器的 Metadata 信息,而容器本质上来说,就是宿主机的一个进程,容器的启动最终就是容器进程的启动。本文将详细解析 Containerd 在创建容器成功之后,创建 Task 的过程以及 Task 根据容器的 Metadata 创建容器进程的过程。
前置条件
Containerd 源码版本:v1.6.33
Containerd 项目地址:https://github.com/containerd/containerd
创建 Task
📦 Task
在 Containerd 中,Task 是容器运行时状态的核心抽象,代表了一个正在运行或可运行的容器进程实例。简单来说,容器(Container)是静态的配置和文件系统,而 Task 是其动态的执行实体。创建一个容器(ctr run 或通过客户端)的本质,就是为其创建一个对应的 Task 来运行指定的进程。
从之前的系列文章也可以看出,无论是 Docker 还是 ctr,在容器创建完成之后,都会调用 Containerd 的 GRPC 接口来创建 Task。我们继续以 ctr run 命令启动容器的逻辑梳理 Task 的创建过程,逻辑入口如下:
cmd/ctr/commands/run/run.go
// line 92 运行容器子命令var Command = cli.Command{ Name: "run", Usage: "run a container", ...// line 133 Action: func(context *cli.Context)error { ...// step 3:line 195 创建task task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...) ... }}
继续查看 tasks.NewTask 函数:
cmd/ctr/commands/tasks/tasks_unix.go
// line 72funcNewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts)(containerd.Task, error) { ...// line 100 t, err := container.NewTask(ctx, ioCreator, opts...) ...}
继续向下调用创建 Task 的函数:
container.go
// line 210func(c *container)NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts)(_ Task, err error) { ...// line 299 response, err := c.client.TaskService().Create(ctx, request) ...}
继续往下调用 Containerd 创建 Task 的 GRPC 接口:
api/services/tasks/v1/tasks.pb.go
// line 1307func(c *tasksClient)Create(ctx context.Context, in *CreateTaskRequest, opts ...grpc.CallOption)(*CreateTaskResponse, error) { out := new(CreateTaskResponse) err := c.cc.Invoke(ctx, "/containerd.services.tasks.v1.Tasks/Create", in, out, opts...)if err != nil {returnnil, err }return out, nil}
Task 的 GRPC 服务插件的注册逻辑与上一篇的容器 GRPC 服务插件注册逻辑一致,本章不再赘述 GRPC 服务插件的注册流程,直接从创建 Task 的 Handler 开始:
api/services/tasks/v1/tasks.pb.go
// line 1545func _Tasks_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{})error, interceptorgrpc.UnaryServerInterceptor) (interface{}, error) { in := new(CreateTaskRequest)if err := dec(in); err != nil {returnnil, err }if interceptor == nil {return srv.(TasksServer).Create(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/containerd.services.tasks.v1.Tasks/Create", } handler := func(ctx context.Context, req interface{})(interface{}, error) {return srv.(TasksServer).Create(ctx, req.(*CreateTaskRequest)) }return interceptor(ctx, in, info, handler)}
从上述代码可以看到,grpc 接收到请求后,会调用 srv.(TasksServer).Create 函数去创建 Task,查看 Containerd 中提供该服务的插件:
services/tasks/service.go
// line 34funcinit() { plugin.Register(&plugin.Registration{// line 36 Type: plugin.GRPCPlugin,// line 37 ID: "tasks", Requires: []plugin.Type{ plugin.ServicePlugin, }, InitFn: func(ic *plugin.InitContext)(interface{}, error) { plugins, err := ic.GetByType(plugin.ServicePlugin)if err != nil {returnnil, err } p, ok := plugins[services.TasksService]if !ok {returnnil, errors.New("tasks service not found") } i, err := p.Instance()if err != nil {returnnil, err }return &service{local: i.(api.TasksClient)}, nil }, })}
从上面代码中的 36、37 行可以看出,Contianerd 中提供该服务的插件式 io.containerd.grpc.v1.tasks ,调用插件对象 service 的 Create 方法创建 task:
services/tasks/service.go
// line 68func(s *service)Create(ctx context.Context, r *api.CreateTaskRequest)(*api.CreateTaskResponse, error) {return s.local.Create(ctx, r)}
服务实例调用了 local 对象的 Create 方法来创建 Task,local 对象是在服务注册的时候就完成了初始化:
services/tasks/local.go
// line 74funcinit() { plugin.Register(&plugin.Registration{// line 76 Type: plugin.ServicePlugin,// line 77 ID: services.TasksService, Requires: tasksServiceRequires, Config: &Config{}, InitFn: initFunc, }) timeout.Set(stateTimeout, 2*time.Second)}
从上面代码中的 76、77 行可以看出,local 是 io.containerd.service.v1.tasks-service 插件的实例化对象。继续查看插件对象调用 Create 方法创建 task 的具体实现:
// line 166func(l *local)Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption)(*api.CreateTaskResponse, error) {// line 167 获取容器信息 container, err := l.getContainer(ctx, r.ContainerID) ...// line 223 获取runtime rtime, err := l.getRuntime(container.Runtime.Name) ...// line 227 校验task是否已经存在 _, err = rtime.Get(ctx, r.ContainerID) ...// line 234 创建task c, err := rtime.Create(ctx, r.ContainerID, opts) ...// line 242 获取pid pid, err := c.PID(ctx) ...}
上述代码的逻辑清晰,Containerd 会通过容器 ID 从数据库中获取容器信息,然后调用 local.v2Runtime.Create 创建 Task。v2Runtime 是 io.containerd.runtime.v2.task 插件的实例:
runtime/v2/manager.go
// line 50funcinit() { plugin.Register(&plugin.Registration{ Type: plugin.RuntimePluginV2, ID: "task", ... InitFn: func(ic *plugin.InitContext)(interface{}, error) { ...// 获取metadata的插件实例 m, err := ic.Get(plugin.MetadataPlugin) ...// 获取容器存储对象实例 cs := metadata.NewContainerStore(m.(*metadata.DB)) ... shimManager, err := NewShimManager(ic.Context, &ManagerConfig{ Root: ic.Root, State: ic.State, Address: ic.Address, TTRPCAddress: ic.TTRPCAddress, Events: events, Store: cs, SchedCore: config.SchedCore, }) ...return NewTaskManager(shimManager), nil }, })}
io.containerd.runtime.v2.task 插件的实现实例是 TaskManager,其中包括了 Shim。调用 local.v2Runtime.Create 创建 Task 实际调用的是 TaskManager 实例的 Create 函数:
runtime/v2/manager.go
// line 367func(m *TaskManager)Create(ctx context.Context, taskID string, opts runtime.CreateOpts)(runtime.Task, error) {// line 368 创建并启动shim进程 process, err := m.manager.Start(ctx, taskID, opts) ...// line 375 创建并启动runc init进程 shim := process.(*shimTask) t, err := shim.Create(ctx, opts) ...}
上述的 Create 函数主要就是加载一个 shim 实例,然后调用 shim 创建 Task。下面我们看一下 shim 创建 Task 的过程。
创建并启动 shim 进程
TaskManager.manager.Start 的具体实现如下:
runtime/v2/manager.go
// line 158func(m *ShimManager)Start(ctx context.Context, id string, opts runtime.CreateOpts)(_ ShimProcess, retErr error) {// 创建bundle bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value) ...// 创建并启动shim shim, err := m.startShim(ctx, bundle, id, opts) ...// 将启动的shim添加到TaskManager中if err := m.shims.Add(ctx, shimTask); err != nil {returnnil, fmt.Errorf("failed to add task: %w", err) }return shimTask, nil}
创建 Bundle
在 Containerd 中,Bundle 是一个重要的概念,它代表了一个容器的文件系统结构和配置信息的集合。具体来说,Bundle 是一个目录,其中包含了运行一个容器所需的所有文件:包括容器的配置(config.json)和根文件系统(rootfs)。
📑 Bundle 的作用
-
容器运行时标准
Bundle 遵循 OCI 运行时规范,使得不同的容器运行时(如 runc、crun)能够使用相同的 Bundle 格式来运行容器。这提供了互操作性。
-
容器生命周期管理
在 Containerd 中,当创建一个容器时,会为这个容器准备一个 Bundle。然后,Containerd 可以使用这个 Bundle 来启动、停止、删除容器
-
与运行时的交互
Containerd 通过 Bundle 与底层的 OCI 运行时(如 runc)进行交互。当 Containerd 需要启动一个容器时,它会将 Bundle 的路径传递给 OCI 运行时,OCI 运行时根据 Bundle 中的配置启动容器
-
容器状态持久化
Bundle 中的配置和文件系统代表了容器的静态定义。当容器运行时,其状态(如进程 ID、状态等)由运行时管理,但 Bundle 提供了容器的基础定义
🔭 查看容器的 Bundle 内容
Bundle 默认路径是/var/run/containerd/io.containerd.runtime.v2.task/<namespace>/<container_id>,一个完整的 Bundle 目录结构如下:
[root@master01 ~]# ls -al /var/run/containerd/io.containerd.runtime.v2.task/moby/009e47ae071d970b0f61775c2fc50608956d21b179738ef8715909271eda3f42/total 32drwx------ 3 root root 240 Feb 3 16:31 .drwx--x--x 34 root root 680 Feb 5 10:57 ..-rw-rw-rw- 1 root root 89 Feb 3 16:31 address-rw-r--r-- 1 root root 9958 Feb 3 16:31 config.json-rw-r--r-- 1 root root 6 Feb 3 16:31 init.pidprwx------ 1 root root 0 Feb 3 16:31 log-rw-r--r-- 1 root root 0 Feb 3 16:31 log.json-rw------- 1 root root 82 Feb 3 16:31 options.jsondrwx--x--x 2 root root 40 Feb 3 16:31 rootfs-rw------- 1 root root 4 Feb 3 16:31 runtime-rw------- 1 root root 32 Feb 3 16:31 shim-binary-pathlrwxrwxrwx 1 root root 119 Feb 3 16:31 work -> /var/lib/containerd/io.containerd.runtime.v2.task/moby/009e47ae071d970b0f61775c2fc50608956d21b179738ef8715909271eda3f42
我们接着查看一下 NewBundle 的具体实现:
runtime/v2/bundle.go
funcNewBundle(ctx context.Context, root, state, id string, spec []byte)(b *Bundle, err error) { ... ns, err := namespaces.NamespaceRequired(ctx) ... work := filepath.Join(root, ns, id) b = &Bundle{ ID: id, Path: filepath.Join(state, ns, id), Namespace: ns, } ...if err := os.MkdirAll(filepath.Dir(b.Path), 0711); err != nil {returnnil, err } ...if err := os.MkdirAll(filepath.Dir(work), 0711); err != nil {returnnil, err } ...// symlink workdirif err := os.Symlink(work, filepath.Join(b.Path, "work")); err != nil {returnnil, err }// write the spec to the bundle err = os.WriteFile(filepath.Join(b.Path, oci.ConfigFilename), spec, 0666)return b, err}
上述代码的主要逻辑就是为容器创建对应的 Bundle,包括 config.json 和 work 等。接下来我们继续看启动 Shim 的实现过程。
启动 Shim 进程
🚌 在 containerd 中,Shim 进程是一个位于核心守护进程(containerd)和实际容器进程之间的轻量级代理进程。它是实现 Containerd 稳定性和可扩展性架构的基石。简单来说,Shim 是 containerd 为了管理容器生命周期而又不亲自成为容器的父进程而引入的“中间人”或“垫片,每个容器都会有一个单独的 Shim 进程。
这种设计的好处就是容器进程和核心守护进程(containerd)之间不会有直接的关联,保证了核心守护进程的绝对稳定,也使得容器进程不受核心守护进程(containerd)重启和停止影响,保证了容器进程的稳定。
启动 Shim 的具体实现如下:
runtime/v2/manager.go
// line 193func(m *ShimManager)startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts)(*shim, error) { ...// line 209 初始化Shim的Binary对象 b := shimBinary(bundle, shimBinaryConfig{ runtime: runtimePath, address: m.containerdAddress, ttrpcAddress: m.containerdTTRPCAddress, schedCore: m.schedCore, })// line 215 Binary对象启动Shim shim, err := b.Start(ctx, topts, func() { ... }) ...}
执行二进制文件的具体实现如下:
runtime/v2/binary.go
// line 63func(b *binary)Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {// line 64 初始化shim执行参数 args := []string{"-id", b.bundle.ID}switch log.GetLevel() {case log.DebugLevel, log.TraceLevel: args = append(args, "-debug") } args = append(args, "start")// line 71 初始化cmd cmd, err := client.Command( ctx, &client.CommandConfig{ Runtime: b.runtime, Address: b.containerdAddress, TTRPCAddress: b.containerdTTRPCAddress, Path: b.bundle.Path, Opts: opts, Args: args, SchedCore: b.schedCore, }) ...// line 93 读取shim日志 f, err := openShimLog(shimCtx, b.bundle, client.AnonDialer) ...// line 116 执行shim二进制文件 out, err := cmd.CombinedOutput() ...// line 134 创建ttrpc client client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))return &shim{ bundle: b.bundle, client: client, }, nil}
Containerd 通过执行二进制文件的方式启动 Shim 进程,这里的二进制文件对应的是/usr/bin/containerd-shim-runc-v2。我们可以通过查看进程列表的方式,查看到 Containerd Shim 进程:
[root@feiyizhou ~]# ctr run -d docker.io/library/nginx:latest nginx[root@feiyizhou ~]# ctr container lsCONTAINER IMAGE RUNTIMEnginx docker.io/library/nginx:latest io.containerd.runc.v2[root@feiyizhou ~]# ps -ef | grep nginx | grep -v greproot 3508675 1 0 17:12 ? 00:00:00 /usr/bin/containerd-shim-runc-v2 -namespace default -id nginx -address /run/containerd/containerd.sockroot 3508696 3508675 0 17:12 ? 00:00:00 nginx: master process nginx -g daemon off;101 3508731 3508696 0 17:12 ? 00:00:00 nginx: worker process101 3508732 3508696 0 17:12 ? 00:00:00 nginx: worker process
我们使用 ctr 运行一个 nginx 容器,可以看到,在进程列表中,Containerd Shim 进程 ID 是 3508675,其父进程 ID 是 root(1)进程,而 nginx 容器进程 ID 是 3508696,其父进程正是 Containerd Shim 进程。
创建容器 Task
我们继续查看容器 Task 的创建过程:
runtime/v2/shim.go
// line 325func(s *shimTask)Create(ctx context.Context, opts runtime.CreateOpts)(runtime.Task, error) { ...// line 330 request := &task.CreateTaskRequest{ ID: s.ID(), Bundle: s.bundle.Path, Stdin: opts.IO.Stdin, Stdout: opts.IO.Stdout, Stderr: opts.IO.Stderr, Terminal: opts.IO.Terminal, Checkpoint: opts.Checkpoint, Options: topts, } ...// line 348 _, err := s.task.Create(ctx, request) ...}
会继续向下调用 Shim 层的 TTRPC 接口containerd.task.v2.Task/Create去创建 Task:
runtime/v2/task/shim.pb.go
// line 3591func(c *taskClient)Create(ctx context.Context, req *CreateTaskRequest)(*CreateTaskResponse, error) {var resp CreateTaskResponseif err := c.client.Call(ctx, "containerd.task.v2.Task", "Create", req, &resp); err != nil {returnnil, err }return &resp, nil}
这里的 c.client 指的是在前一节启动 Shim 进程时创建的 TTRPC Client。TTRPC 服务同 GRPC 服务一样,在 Containerd 主进程启动时,就已经完成了注册。而这里containerd.task.v2.Task/Create是由 Task Service 提供的。关于 TTRPC 插件服务的注册在之前的文章中详细阐述过,此处不再赘述,直接从提供服务的插件开始梳理逻辑:
runtime/v2/runc/task/service.go
// line 248func(s *service)Create(ctx context.Context, r *taskAPI.CreateTaskRequest)(_ *taskAPI.CreateTaskResponse, err error) { ...// line 257 container, err := runc.NewContainer(ctx, s.platform, r) ...}
Task Service 调用了 runc.NewContainer 来创建容器。runc 是实际启动容器的低级运行时,在调用时会通过创建一个 runc init 进程来初始化容器相关的基础信息,也会根据之前的 bundle 信息初始化一个 runc 使用的容器信息,这个信息使用容器 ID 来唯一标识。
启动 runc init 进程
❝
💡 runc init 进程是容器启动的关键环节,它的核心作用是初始化容器环境并启动容器内的用户进程。其生命周期极其短暂,在完成初始化工作后便会立即退出。后续会单独出 runc 启动容器进程的文章,感兴趣的同志可以持续关注~
继续往下梳理创建 runc init 进程并创建容器信息的逻辑:
runtime/v2/runc/container.go
funcNewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest)(_ *Container, retErr error) { ...// line 117 初始化runc init进程 p, err := newInit( ctx, r.Bundle, filepath.Join(r.Bundle, "work"), ns, platform, config, &opts, rootfs, ) ...// line 130 根据配置创建runc init进程if err := p.Create(ctx, config); err != nil {returnnil, errdefs.ToGRPC(err) } ...}
这里注意一下,newInit 返回的是一个 *process.Init 类型的对象指针,这里会影响到后续的容器启动流程,具体的初始化函数此处不过多阐述,感兴趣的同志可以自行探索。除此之外,还需要关注创建 runc init 进程的 p.Create 函数,具体实现如下:
pkg/process/init.go
// line 112func(p *Init)Create(ctx context.Context, r *CreateConfig)error { ...// line 145if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil {return p.runtimeError(err, "OCI runtime create failed") } ...}
需要关注的函数是 p.runtime.Create,这个函数会继续向下调用 runc 的 create 命令,根据 bundle 创建容器:
vendor/github.com/containerd/go-runc/runc.go
// line 128func(r *Runc)Create(context context.Context, id, bundle string, opts *CreateOpts)error {// line 129 初始化args args := []string{"create", "--bundle", bundle} ...// line 137 初始化cmd cmd := r.command(context, append(args, id)...) ...// line 151 执行cmd ec, err := Monitor.Start(cmd) ...}
runc 根据 bundle 创建容器的这个过程就是通过 runc create 命令创建容器信息并启动 runc init 进程的过程。这部分逻辑会在 runc 部分详细介绍,本文不扩展介绍。至此 Containerd 完成了容器 Task 的创建以及 Shim 进程和 runc init 进程的启动,接下来我们继续梳理 Task 的启动逻辑。感兴趣的同志可以持续关注。
夜雨聆风