乐于分享
好东西不私藏

Containerd 源码解析:(四)Containerd 创建容器进程

Containerd 源码解析:(四)Containerd 创建容器进程

大家好,我是费益洲。通过上一篇文章我们知道,Containerd 创建容器其实就是在数据库中注册容器的 Metadata 信息,而容器本质上来说,就是宿主机的一个进程,容器的启动最终就是容器进程的启动。本文将详细解析 Containerd 在创建容器成功之后,创建 Task 的过程以及 Task 根据容器的 Metadata 创建容器进程的过程。

前置条件

Containerd 源码版本:v1.6.33

Containerd 项目地址:https://github.com/containerd/containerd

创建 Task

📦 Task

在 Containerd 中,Task 是容器运行时状态的核心抽象,代表了一个正在运行或可运行的容器进程实例。简单来说,容器(Container)是静态的配置和文件系统,而 Task 是其动态的执行实体。创建一个容器(ctr run 或通过客户端)的本质,就是为其创建一个对应的 Task 来运行指定的进程。

从之前的系列文章也可以看出,无论是 Docker 还是 ctr,在容器创建完成之后,都会调用 Containerd 的 GRPC 接口来创建 Task。我们继续以 ctr run 命令启动容器的逻辑梳理 Task 的创建过程,逻辑入口如下:

cmd/ctr/commands/run/run.go

// line 92 运行容器子命令var Command = cli.Command{    Name:           "run", Usage:          "run a container",    ...// line 133    Action: func(context *cli.Context)error {        ...// step 3:line 195 创建task        task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...)        ...    }}

继续查看 tasks.NewTask 函数:

cmd/ctr/commands/tasks/tasks_unix.go

// line 72funcNewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts)(containerd.Task, error) { ...// line 100 t, err := container.NewTask(ctx, ioCreator, opts...) ...}

继续向下调用创建 Task 的函数:

container.go

// line 210func(c *container)NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts)(_ Task, err error) { ...// line 299 response, err := c.client.TaskService().Create(ctx, request) ...}

继续往下调用 Containerd 创建 Task 的 GRPC 接口:

api/services/tasks/v1/tasks.pb.go

// line 1307func(c *tasksClient)Create(ctx context.Context, in *CreateTaskRequest, opts ...grpc.CallOption)(*CreateTaskResponse, error) { out := new(CreateTaskResponse) err := c.cc.Invoke(ctx, "/containerd.services.tasks.v1.Tasks/Create", in, out, opts...)if err != nil {returnnil, err }return out, nil}

Task 的 GRPC 服务插件的注册逻辑与上一篇的容器 GRPC 服务插件注册逻辑一致,本章不再赘述 GRPC 服务插件的注册流程,直接从创建 Task 的 Handler 开始:

api/services/tasks/v1/tasks.pb.go

// line 1545func _Tasks_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{})errorinterceptorgrpc.UnaryServerInterceptor(interface{}, error) { in := new(CreateTaskRequest)if err := dec(in); err != nil {returnnil, err }if interceptor == nil {return srv.(TasksServer).Create(ctx, in) } info := &grpc.UnaryServerInfo{  Server:     srv,  FullMethod: "/containerd.services.tasks.v1.Tasks/Create", } handler := func(ctx context.Context, req interface{})(interface{}, error) {return srv.(TasksServer).Create(ctx, req.(*CreateTaskRequest)) }return interceptor(ctx, in, info, handler)}

从上述代码可以看到,grpc 接收到请求后,会调用 srv.(TasksServer).Create 函数去创建 Task,查看 Containerd 中提供该服务的插件:

services/tasks/service.go

// line 34funcinit() { plugin.Register(&plugin.Registration{// line 36  Type: plugin.GRPCPlugin,// line 37  ID:   "tasks",  Requires: []plugin.Type{   plugin.ServicePlugin,  },  InitFn: func(ic *plugin.InitContext)(interface{}, error) {   plugins, err := ic.GetByType(plugin.ServicePlugin)if err != nil {returnnil, err   }   p, ok := plugins[services.TasksService]if !ok {returnnil, errors.New("tasks service not found")   }   i, err := p.Instance()if err != nil {returnnil, err   }return &service{local: i.(api.TasksClient)}, nil  }, })}

从上面代码中的 36、37 行可以看出,Contianerd 中提供该服务的插件式 io.containerd.grpc.v1.tasks ,调用插件对象 service 的 Create 方法创建 task:

services/tasks/service.go

// line 68func(s *service)Create(ctx context.Context, r *api.CreateTaskRequest)(*api.CreateTaskResponse, error) {return s.local.Create(ctx, r)}

服务实例调用了 local 对象的 Create 方法来创建 Task,local 对象是在服务注册的时候就完成了初始化:

services/tasks/local.go

// line 74funcinit() { plugin.Register(&plugin.Registration{// line 76  Type:     plugin.ServicePlugin,// line 77  ID:       services.TasksService,  Requires: tasksServiceRequires,  Config:   &Config{},  InitFn:   initFunc, }) timeout.Set(stateTimeout, 2*time.Second)}

从上面代码中的 76、77 行可以看出,local 是 io.containerd.service.v1.tasks-service 插件的实例化对象。继续查看插件对象调用 Create 方法创建 task 的具体实现:

// line 166func(l *local)Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption)(*api.CreateTaskResponse, error) {// line 167 获取容器信息 container, err := l.getContainer(ctx, r.ContainerID) ...// line 223 获取runtime rtime, err := l.getRuntime(container.Runtime.Name) ...// line 227 校验task是否已经存在 _, err = rtime.Get(ctx, r.ContainerID) ...// line 234 创建task c, err := rtime.Create(ctx, r.ContainerID, opts) ...// line 242 获取pid pid, err := c.PID(ctx) ...}

上述代码的逻辑清晰,Containerd 会通过容器 ID 从数据库中获取容器信息,然后调用 local.v2Runtime.Create 创建 Task。v2Runtime 是 io.containerd.runtime.v2.task 插件的实例:

runtime/v2/manager.go

// line 50funcinit() { plugin.Register(&plugin.Registration{  Type: plugin.RuntimePluginV2,  ID:   "task",  ...  InitFn: func(ic *plugin.InitContext)(interface{}, error) {   ...// 获取metadata的插件实例   m, err := ic.Get(plugin.MetadataPlugin)   ...// 获取容器存储对象实例   cs := metadata.NewContainerStore(m.(*metadata.DB))   ...   shimManager, err := NewShimManager(ic.Context, &ManagerConfig{    Root:         ic.Root,    State:        ic.State,    Address:      ic.Address,    TTRPCAddress: ic.TTRPCAddress,    Events:       events,    Store:        cs,    SchedCore:    config.SchedCore,   })   ...return NewTaskManager(shimManager), nil  }, })}

io.containerd.runtime.v2.task 插件的实现实例是 TaskManager,其中包括了 Shim。调用 local.v2Runtime.Create 创建 Task 实际调用的是 TaskManager 实例的 Create 函数:

runtime/v2/manager.go

// line 367func(m *TaskManager)Create(ctx context.Context, taskID string, opts runtime.CreateOpts)(runtime.Task, error) {// line 368 创建并启动shim进程 process, err := m.manager.Start(ctx, taskID, opts) ...// line 375 创建并启动runc init进程 shim := process.(*shimTask) t, err := shim.Create(ctx, opts) ...}

上述的 Create 函数主要就是加载一个 shim 实例,然后调用 shim 创建 Task。下面我们看一下 shim 创建 Task 的过程。

创建并启动 shim 进程

TaskManager.manager.Start 的具体实现如下:

runtime/v2/manager.go

// line 158func(m *ShimManager)Start(ctx context.Context, id string, opts runtime.CreateOpts)(_ ShimProcess, retErr error) {// 创建bundle bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec.Value) ...// 创建并启动shim shim, err := m.startShim(ctx, bundle, id, opts) ...// 将启动的shim添加到TaskManager中if err := m.shims.Add(ctx, shimTask); err != nil {returnnil, fmt.Errorf("failed to add task: %w", err) }return shimTask, nil}

创建 Bundle

在 Containerd 中,Bundle 是一个重要的概念,它代表了一个容器的文件系统结构和配置信息的集合。具体来说,Bundle 是一个目录,其中包含了运行一个容器所需的所有文件:包括容器的配置(config.json)和根文件系统(rootfs)。

📑 Bundle 的作用

  1. 容器运行时标准

    Bundle 遵循 OCI 运行时规范,使得不同的容器运行时(如 runc、crun)能够使用相同的 Bundle 格式来运行容器。这提供了互操作性。

  2. 容器生命周期管理

    在 Containerd 中,当创建一个容器时,会为这个容器准备一个 Bundle。然后,Containerd 可以使用这个 Bundle 来启动、停止、删除容器

  3. 与运行时的交互

    Containerd 通过 Bundle 与底层的 OCI 运行时(如 runc)进行交互。当 Containerd 需要启动一个容器时,它会将 Bundle 的路径传递给 OCI 运行时,OCI 运行时根据 Bundle 中的配置启动容器

  4. 容器状态持久化

    Bundle 中的配置和文件系统代表了容器的静态定义。当容器运行时,其状态(如进程 ID、状态等)由运行时管理,但 Bundle 提供了容器的基础定义

🔭 查看容器的 Bundle 内容

Bundle 默认路径是/var/run/containerd/io.containerd.runtime.v2.task/<namespace>/<container_id>,一个完整的 Bundle 目录结构如下:

[root@master01 ~]# ls -al /var/run/containerd/io.containerd.runtime.v2.task/moby/009e47ae071d970b0f61775c2fc50608956d21b179738ef8715909271eda3f42/total 32drwx------  3 root root  240 Feb  3 16:31 .drwx--x--x 34 root root  680 Feb  5 10:57 ..-rw-rw-rw-  1 root root   89 Feb  3 16:31 address-rw-r--r--  1 root root 9958 Feb  3 16:31 config.json-rw-r--r--  1 root root    6 Feb  3 16:31 init.pidprwx------  1 root root    0 Feb  3 16:31 log-rw-r--r--  1 root root    0 Feb  3 16:31 log.json-rw-------  1 root root   82 Feb  3 16:31 options.jsondrwx--x--x  2 root root   40 Feb  3 16:31 rootfs-rw-------  1 root root    4 Feb  3 16:31 runtime-rw-------  1 root root   32 Feb  3 16:31 shim-binary-pathlrwxrwxrwx  1 root root  119 Feb  3 16:31 work -> /var/lib/containerd/io.containerd.runtime.v2.task/moby/009e47ae071d970b0f61775c2fc50608956d21b179738ef8715909271eda3f42

我们接着查看一下 NewBundle 的具体实现:

runtime/v2/bundle.go

funcNewBundle(ctx context.Context, root, state, id string, spec []byte)(b *Bundle, err error) { ... ns, err := namespaces.NamespaceRequired(ctx) ... work := filepath.Join(root, ns, id) b = &Bundle{  ID:        id,  Path:      filepath.Join(state, ns, id),  Namespace: ns, } ...if err := os.MkdirAll(filepath.Dir(b.Path), 0711); err != nil {returnnil, err } ...if err := os.MkdirAll(filepath.Dir(work), 0711); err != nil {returnnil, err } ...// symlink workdirif err := os.Symlink(work, filepath.Join(b.Path, "work")); err != nil {returnnil, err }// write the spec to the bundle err = os.WriteFile(filepath.Join(b.Path, oci.ConfigFilename), spec, 0666)return b, err}

上述代码的主要逻辑就是为容器创建对应的 Bundle,包括 config.json 和 work 等。接下来我们继续看启动 Shim 的实现过程。

启动 Shim 进程

🚌 在 containerd 中,Shim 进程是一个位于核心守护进程(containerd)和实际容器进程之间的轻量级代理进程。它是实现 Containerd 稳定性和可扩展性架构的基石。简单来说,Shim 是 containerd 为了管理容器生命周期而又不亲自成为容器的父进程而引入的“中间人”或“垫片,每个容器都会有一个单独的 Shim 进程。

这种设计的好处就是容器进程和核心守护进程(containerd)之间不会有直接的关联,保证了核心守护进程的绝对稳定,也使得容器进程不受核心守护进程(containerd)重启和停止影响,保证了容器进程的稳定。

启动 Shim 的具体实现如下:

runtime/v2/manager.go

// line 193func(m *ShimManager)startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts)(*shim, error) { ...// line 209 初始化Shim的Binary对象 b := shimBinary(bundle, shimBinaryConfig{  runtime:      runtimePath,  address:      m.containerdAddress,  ttrpcAddress: m.containerdTTRPCAddress,  schedCore:    m.schedCore, })// line 215 Binary对象启动Shim shim, err := b.Start(ctx, topts, func() {  ... }) ...}

执行二进制文件的具体实现如下:

runtime/v2/binary.go

// line 63func(b *binary)Start(ctx context.Context, opts *types.Any, onClose func()(_ *shim, err error) {// line 64 初始化shim执行参数 args := []string{"-id", b.bundle.ID}switch log.GetLevel() {case log.DebugLevel, log.TraceLevel:  args = append(args, "-debug") } args = append(args, "start")// line 71 初始化cmd cmd, err := client.Command(  ctx,  &client.CommandConfig{   Runtime:      b.runtime,   Address:      b.containerdAddress,   TTRPCAddress: b.containerdTTRPCAddress,   Path:         b.bundle.Path,   Opts:         opts,   Args:         args,   SchedCore:    b.schedCore,  }) ...// line 93 读取shim日志 f, err := openShimLog(shimCtx, b.bundle, client.AnonDialer) ...// line 116 执行shim二进制文件 out, err := cmd.CombinedOutput() ...// line 134 创建ttrpc client client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onCloseWithShimLog))return &shim{  bundle: b.bundle,  client: client, }, nil}

Containerd 通过执行二进制文件的方式启动 Shim 进程,这里的二进制文件对应的是/usr/bin/containerd-shim-runc-v2。我们可以通过查看进程列表的方式,查看到 Containerd Shim 进程:

[root@feiyizhou ~]# ctr run -d docker.io/library/nginx:latest nginx[root@feiyizhou ~]# ctr container lsCONTAINER    IMAGE                             RUNTIMEnginx        docker.io/library/nginx:latest    io.containerd.runc.v2[root@feiyizhou ~]# ps -ef | grep nginx | grep -v greproot     3508675       1  0 17:12 ?        00:00:00 /usr/bin/containerd-shim-runc-v2 -namespace default -id nginx -address /run/containerd/containerd.sockroot     3508696 3508675  0 17:12 ?        00:00:00 nginx: master process nginx -g daemon off;101      3508731 3508696  0 17:12 ?        00:00:00 nginx: worker process101      3508732 3508696  0 17:12 ?        00:00:00 nginx: worker process

我们使用 ctr 运行一个 nginx 容器,可以看到,在进程列表中,Containerd Shim 进程 ID 是 3508675,其父进程 ID 是 root(1)进程,而 nginx 容器进程 ID 是 3508696,其父进程正是 Containerd Shim 进程。

创建容器 Task

我们继续查看容器 Task 的创建过程:

runtime/v2/shim.go

// line 325func(s *shimTask)Create(ctx context.Context, opts runtime.CreateOpts)(runtime.Task, error) { ...// line 330 request := &task.CreateTaskRequest{  ID:         s.ID(),  Bundle:     s.bundle.Path,  Stdin:      opts.IO.Stdin,  Stdout:     opts.IO.Stdout,  Stderr:     opts.IO.Stderr,  Terminal:   opts.IO.Terminal,  Checkpoint: opts.Checkpoint,  Options:    topts, } ...// line 348 _, err := s.task.Create(ctx, request) ...}

会继续向下调用 Shim 层的 TTRPC 接口containerd.task.v2.Task/Create去创建 Task:

runtime/v2/task/shim.pb.go

// line 3591func(c *taskClient)Create(ctx context.Context, req *CreateTaskRequest)(*CreateTaskResponse, error) {var resp CreateTaskResponseif err := c.client.Call(ctx, "containerd.task.v2.Task""Create", req, &resp); err != nil {returnnil, err }return &resp, nil}

这里的 c.client 指的是在前一节启动 Shim 进程时创建的 TTRPC Client。TTRPC 服务同 GRPC 服务一样,在 Containerd 主进程启动时,就已经完成了注册。而这里containerd.task.v2.Task/Create是由 Task Service 提供的。关于 TTRPC 插件服务的注册在之前的文章中详细阐述过,此处不再赘述,直接从提供服务的插件开始梳理逻辑:

runtime/v2/runc/task/service.go

// line 248func(s *service)Create(ctx context.Context, r *taskAPI.CreateTaskRequest)(_ *taskAPI.CreateTaskResponse, err error) { ...// line 257 container, err := runc.NewContainer(ctx, s.platform, r) ...}

Task Service 调用了 runc.NewContainer 来创建容器。runc 是实际启动容器的低级运行时,在调用时会通过创建一个 runc init 进程来初始化容器相关的基础信息,也会根据之前的 bundle 信息初始化一个 runc 使用的容器信息,这个信息使用容器 ID 来唯一标识。

启动 runc init 进程

💡 runc init 进程是容器启动的关键环节,它的核心作用是初始化容器环境并启动容器内的用户进程。其生命周期极其短暂,在完成初始化工作后便会立即退出。后续会单独出 runc 启动容器进程的文章,感兴趣的同志可以持续关注~

继续往下梳理创建 runc init 进程并创建容器信息的逻辑:

runtime/v2/runc/container.go

funcNewContainer(ctx context.Context, platform stdio.Platform, r *task.CreateTaskRequest)(_ *Container, retErr error) { ...// line 117 初始化runc init进程 p, err := newInit(  ctx,  r.Bundle,  filepath.Join(r.Bundle, "work"),  ns,  platform,  config,  &opts,  rootfs, ) ...// line 130 根据配置创建runc init进程if err := p.Create(ctx, config); err != nil {returnnil, errdefs.ToGRPC(err) } ...}

这里注意一下,newInit 返回的是一个 *process.Init 类型的对象指针,这里会影响到后续的容器启动流程,具体的初始化函数此处不过多阐述,感兴趣的同志可以自行探索。除此之外,还需要关注创建 runc init 进程的 p.Create 函数,具体实现如下:

pkg/process/init.go

// line 112func(p *Init)Create(ctx context.Context, r *CreateConfig)error { ...// line 145if err := p.runtime.Create(ctx, r.ID, r.Bundle, opts); err != nil {return p.runtimeError(err, "OCI runtime create failed") } ...}

需要关注的函数是 p.runtime.Create,这个函数会继续向下调用 runc 的 create 命令,根据 bundle 创建容器:

vendor/github.com/containerd/go-runc/runc.go

// line 128func(r *Runc)Create(context context.Context, id, bundle string, opts *CreateOpts)error {// line 129 初始化args args := []string{"create""--bundle", bundle} ...// line 137 初始化cmd cmd := r.command(context, append(args, id)...) ...// line 151 执行cmd ec, err := Monitor.Start(cmd) ...}

runc 根据 bundle 创建容器的这个过程就是通过 runc create 命令创建容器信息并启动 runc init 进程的过程。这部分逻辑会在 runc 部分详细介绍,本文不扩展介绍。至此 Containerd 完成了容器 Task 的创建以及 Shim 进程和 runc init 进程的启动,接下来我们继续梳理 Task 的启动逻辑。感兴趣的同志可以持续关注。