乐于分享
好东西不私藏

Kubernetes 控制器源码深度剖析:从 kube-controller-manager 启动到几类工作负载控制器实现-----第一部分

Kubernetes 控制器源码深度剖析:从 kube-controller-manager 启动到几类工作负载控制器实现-----第一部分
开始学kube-controller-manager和几类工作负载控制器,微信公众号最多一篇文章限制5万字,所以只能拆成两部分了,开始发车

1.1、kube-controller-manager启动主流程

1.1.1、配置初始化

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\controllermanager.go// NewControllerManagerCommand creates a *cobra.Command object with default parametersfunc NewControllerManagerCommand() *cobra.Command {  s, err := options.NewKubeControllerManagerOptions()if err != nil {    klog.Fatalf("unable to initialize command options: %v", err)  }  cmd := &cobra.Command{    Run: func(cmd *cobra.Command, args []string) {      verflag.PrintAndExitIfRequested()      cliflag.PrintFlags(cmd.Flags())      err := checkNonZeroInsecurePort(cmd.Flags())if err != nil {        fmt.Fprintf(os.Stderr, "%v\n", err)        os.Exit(1)      }      c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())if err != nil {        fmt.Fprintf(os.Stderr, "%v\n", err)        os.Exit(1)      }if err := Run(c.Complete(), wait.NeverStop); err != nil {        fmt.Fprintf(os.Stderr, "%v\n", err)        os.Exit(1)      }    }  }  fs := cmd.Flags()  namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List())  verflag.AddFlags(namedFlagSets.FlagSet("global"))  globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())  registerLegacyGlobalFlags(namedFlagSets)for _, f := range namedFlagSets.FlagSets {    fs.AddFlagSet(f)  }  cols, _, _ := term.TerminalSize(cmd.OutOrStdout())  cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)return cmd}

追踪cobra.Command.Run中的s.Config(KnownControllers(), ControllersDisabledByDefault.List())

其中的KnownControllers()返回常用的控制器的名字

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\controllermanager.go// KnownControllers returns all known controllers's namefunc KnownControllers() []string {// 常用控制器的名称和他们的初始化方法  ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))// saTokenControllerName 属于非常规初始化的控制器,单独Insert到set中  ret.Insert(    saTokenControllerName,  )return ret.List()}

追踪NewControllerInitializers返回常用控制器的名称和他们的初始化方法

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\controllermanager.go// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)// paired to their InitFunc.  This allows for structured downstream composition and subdivision.func NewControllerInitializers(loopMode ControllerLoopMode)map[string]InitFunc {  controllers := map[string]InitFunc{}  controllers["endpoint"] = startEndpointController  controllers["endpointslice"] = startEndpointSliceController  controllers["endpointslicemirroring"] = startEndpointSliceMirroringController  controllers["replicationcontroller"] = startReplicationController  controllers["podgc"] = startPodGCController  controllers["resourcequota"] = startResourceQuotaController  controllers["namespace"] = startNamespaceController  controllers["serviceaccount"] = startServiceAccountController  controllers["garbagecollector"] = startGarbageCollectorController  controllers["daemonset"] = startDaemonSetController  controllers["job"] = startJobController  controllers["deployment"] = startDeploymentController  controllers["replicaset"] = startReplicaSetController  controllers["horizontalpodautoscaling"] = startHPAController  controllers["disruption"] = startDisruptionController  controllers["statefulset"] = startStatefulSetController  controllers["cronjob"] = startCronJobController  controllers["csrsigning"] = startCSRSigningController  controllers["csrapproving"] = startCSRApprovingController  controllers["csrcleaner"] = startCSRCleanerController  controllers["ttl"] = startTTLController  controllers["bootstrapsigner"] = startBootstrapSignerController  controllers["tokencleaner"] = startTokenCleanerController  controllers["nodeipam"] = startNodeIpamController  controllers["nodelifecycle"] = startNodeLifecycleControllerif loopMode == IncludeCloudLoops {    controllers["service"] = startServiceController    controllers["route"] = startRouteController    controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController// TODO: volume controller into the IncludeCloudLoops only set.  }  controllers["persistentvolume-binder"] = startPersistentVolumeBinderController  controllers["attachdetach"] = startAttachDetachController  controllers["persistentvolume-expander"] = startVolumeExpandController  controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController  controllers["pvc-protection"] = startPVCProtectionController  controllers["pv-protection"] = startPVProtectionController  controllers["ttl-after-finished"] = startTTLAfterFinishedController  controllers["root-ca-cert-publisher"] = startRootCACertPublisher  controllers["ephemeral-volume"] = startEphemeralVolumeControllerif utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&    utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {    controllers["storage-version-gc"] = startStorageVersionGCController  }return controllers}

追踪cobra.Command.Run中的s.Config(KnownControllers(), ControllersDisabledByDefault.List())

其中的ControllersDisabledByDefault.List()ControllersDisabledByDefault代表默认禁止的控制器

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\controllermanager.go// ControllersDisabledByDefault is the set of controllers which is disabled by defaultvar ControllersDisabledByDefault = sets.NewString("bootstrapsigner","tokencleaner",)

继续追踪s.Config(KnownControllers(), ControllersDisabledByDefault.List())中的s.Config

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\options\options.go// Config 方法:根据 KubeControllerManagerOptions 配置项,// 构建并返回一个完整的 kube-controller-manager 配置对象(kubecontrollerconfig.Config)// 参数:allControllers 所有可用控制器列表、disabledByDefaultControllers 默认禁用的控制器列表// 返回:构建好的配置指针 或 错误信息func (s KubeControllerManagerOptions) Config(allControllers []string, disabledByDefaultControllers []string) (*kubecontrollerconfig.Config, error) {// 第一步:校验配置参数合法性, 如果参数校验失败,直接返回错误if err := s.Validate(allControllers, disabledByDefaultControllers); err != nil {return nil, err  }// 第二步:生成自签名证书(安全服务配置)// 为本地 HTTPS 服务生成 localhost、127.0.0.1 的自签名证书// 用于控制器管理器的安全端口(Secure Serving)if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost"nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {return nil, fmt.Errorf("error creating self-signed certificates: %v", err)  }// 第三步:构建 Kubernetes API 客户端配置(kubeconfig)// 根据命令行参数 --master 和 --kubeconfig 生成 rest.Config 配置  kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)if err != nil {return nil, err  }// 配置客户端连接参数:关闭压缩、设置内容类型、QPS、并发上限  kubeconfig.DisableCompression = true// 接受的内容类型(如 protobuf、json)  kubeconfig.ContentConfig.AcceptContentTypes = s.Generic.ClientConnection.AcceptContentTypes// 首选内容类型  kubeconfig.ContentConfig.ContentType = s.Generic.ClientConnection.ContentType// 客户端限流 QPS(每秒请求数)  kubeconfig.QPS = s.Generic.ClientConnection.QPS// 客户端并发峰值(Burst)  kubeconfig.Burst = int(s.Generic.ClientConnection.Burst)// 第四步:创建 Kubernetes 客户端(clientset)// 基于 kubeconfig 创建可访问 Kubernetes API 的客户端,并设置用户代理  client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, KubeControllerManagerUserAgent))if err != nil {return nil, err  }// 第五步:创建事件记录器(EventRecorder)// 用于控制器向 Kubernetes 事件中心记录操作事件(如正常/警告/错误)  eventRecorder := createRecorder(client, KubeControllerManagerUserAgent)// 第六步:初始化核心配置对象// 把客户端、kubeconfig、事件记录器等核心组件注入配置结构体  c := &kubecontrollerconfig.Config{    Client:        client,        // Kubernetes 客户端    Kubeconfig:    kubeconfig,    // API 连接配置    EventRecorder: eventRecorder, // 事件记录器  }// 第七步:将剩余配置项应用到配置对象// 把 KubeControllerManagerOptions 中的所有配置(如控制器、安全、日志等)// 全部写入最终的 Config 对象if err := s.ApplyTo(c); err != nil {return nil, err  }// 第八步:启用指标采集// 初始化 Prometheus 指标暴露配置  s.Metrics.Apply()// 第九步:应用日志配置// 初始化日志格式、级别、输出等  s.Logs.Apply()// 最终返回构建完成的控制器管理器配置return c, nil}

继续追踪KubeControllerManagerOptions.ApplyTo(c)方法

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\options\options.go// ApplyTo fills up controller manager config with options.// ApplyTo 方法:将 KubeControllerManagerOptions 中的所有配置项// 批量应用到 kubecontrollerconfig.Config 运行时配置对象中// 作用:把命令行/配置文件解析的选项,完整赋值给控制器管理器的运行配置func (s *KubeControllerManagerOptions) ApplyTo(c *kubecontrollerconfig.Config) error {// 1. 应用通用组件配置(控制器通用参数:并发数、队列、超时等)if err := s.Generic.ApplyTo(&c.ComponentConfig.Generic); err != nil {return err  }// 2. 应用云平台共享配置(云厂商相关公共配置)if err := s.KubeCloudShared.ApplyTo(&c.ComponentConfig.KubeCloudShared); err != nil {return err  }// ===================== 各类控制器配置开始 =====================// 3. 卷挂载/卸载控制器配置(负责Pod卷的Attach/Detach操作)if err := s.AttachDetachController.ApplyTo(&c.ComponentConfig.AttachDetachController); err != nil {return err  }// 4. CSR证书签名控制器配置(处理K8s证书签名请求)if err := s.CSRSigningController.ApplyTo(&c.ComponentConfig.CSRSigningController); err != nil {return err  }// 5. DaemonSet控制器配置(管理守护进程集)if err := s.DaemonSetController.ApplyTo(&c.ComponentConfig.DaemonSetController); err != nil {return err  }// 6. Deployment控制器配置(管理无状态应用部署)if err := s.DeploymentController.ApplyTo(&c.ComponentConfig.DeploymentController); err != nil {return err  }// 7. StatefulSet控制器配置(管理有状态应用)if err := s.StatefulSetController.ApplyTo(&c.ComponentConfig.StatefulSetController); err != nil {return err  }// 8. 已废弃标记控制器配置(兼容旧版参数)if err := s.DeprecatedFlags.ApplyTo(&c.ComponentConfig.DeprecatedController); err != nil {return err  }// 9. Endpoint控制器配置(维护服务后端端点)if err := s.EndpointController.ApplyTo(&c.ComponentConfig.EndpointController); err != nil {return err  }// 10. EndpointSlice控制器配置(新一代服务端点管理)if err := s.EndpointSliceController.ApplyTo(&c.ComponentConfig.EndpointSliceController); err != nil {return err  }// 11. EndpointSlice镜像控制器配置(同步Endpoint到EndpointSlice)if err := s.EndpointSliceMirroringController.ApplyTo(&c.ComponentConfig.EndpointSliceMirroringController); err != nil {return err  }// 12. 垃圾回收控制器配置(清理废弃资源)if err := s.GarbageCollectorController.ApplyTo(&c.ComponentConfig.GarbageCollectorController); err != nil {return err  }// 13. 自动扩缩容控制器配置(HPA)if err := s.HPAController.ApplyTo(&c.ComponentConfig.HPAController); err != nil {return err  }// 14. Job控制器配置(管理一次性任务)if err := s.JobController.ApplyTo(&c.ComponentConfig.JobController); err != nil {return err  }// 15. CronJob控制器配置(管理定时任务)if err := s.CronJobController.ApplyTo(&c.ComponentConfig.CronJobController); err != nil {return err  }// 16. 命名空间控制器配置(管理Namespace生命周期)if err := s.NamespaceController.ApplyTo(&c.ComponentConfig.NamespaceController); err != nil {return err  }// 17. 节点IPAM控制器配置(分配节点网段)if err := s.NodeIPAMController.ApplyTo(&c.ComponentConfig.NodeIPAMController); err != nil {return err  }// 18. 节点生命周期控制器配置(节点上下线管理)if err := s.NodeLifecycleController.ApplyTo(&c.ComponentConfig.NodeLifecycleController); err != nil {return err  }// 19. 持久卷绑定控制器配置(PV/PVC绑定、扩容)if err := s.PersistentVolumeBinderController.ApplyTo(&c.ComponentConfig.PersistentVolumeBinderController); err != nil {return err  }// 20. Pod垃圾回收控制器配置(清理失败/完成Pod)if err := s.PodGCController.ApplyTo(&c.ComponentConfig.PodGCController); err != nil {return err  }// 21. ReplicaSet控制器配置(管理副本集)if err := s.ReplicaSetController.ApplyTo(&c.ComponentConfig.ReplicaSetController); err != nil {return err  }// 22. 复制控制器配置(旧版副本控制器,兼容用)if err := s.ReplicationController.ApplyTo(&c.ComponentConfig.ReplicationController); err != nil {return err  }// 23. 资源配额控制器配置(管控Namespace资源配额)if err := s.ResourceQuotaController.ApplyTo(&c.ComponentConfig.ResourceQuotaController); err != nil {return err  }// 24. 服务账号控制器配置(自动创建SA令牌)if err := s.SAController.ApplyTo(&c.ComponentConfig.SAController); err != nil {return err  }// 25. 服务控制器配置(云厂商负载均衡同步)if err := s.ServiceController.ApplyTo(&c.ComponentConfig.ServiceController); err != nil {return err  }// 26. 任务完成TTL控制器配置(自动清理完成的Job)if err := s.TTLAfterFinishedController.ApplyTo(&c.ComponentConfig.TTLAfterFinishedController); err != nil {return err  }// ===================== 各类控制器配置结束 =====================// 27. 应用安全服务配置(HTTPS监听、TLS证书、端口绑定)if err := s.SecureServing.ApplyTo(&c.SecureServing, &c.LoopbackClientConfig); err != nil {return err  }// 28. 如果启用了安全端口(HTTPS),则应用认证+授权配置if s.SecureServing.BindPort != 0 || s.SecureServing.Listener != nil {// 认证配置(ServiceAccount、客户端证书、Webhook等)if err := s.Authentication.ApplyTo(&c.Authentication, c.SecureServing, nil); err != nil {return err    }// 授权配置(RBAC、Webhook等权限控制)if err := s.Authorization.ApplyTo(&c.Authorization); err != nil {return err    }  }// 所有配置应用成功,返回nilreturn nil}

其中KubeControllerManagerOptions的创建,源自于NewControllerManagerCommand

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\controllermanager.go// NewControllerManagerCommand creates a *cobra.Command object with default parametersfunc NewControllerManagerCommand() *cobra.Command {  s, err := options.NewKubeControllerManagerOptions()    cmd := &cobra.Command{}return cmd}

继续追踪options.NewKubeControllerManagerOptions

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\options\options.go// NewKubeControllerManagerOptions:创建一个带默认配置的 KubeControllerManagerOptions 对象// 作用:初始化控制器管理器的所有配置项(默认值),供后续命令行参数覆盖func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) {// 1. 创建默认组件配置(所有控制器的基础默认值)  componentConfig, err := NewDefaultComponentConfig()if err != nil {return nil, err  }// 2. 初始化 KubeControllerManagerOptions 结构体// 把所有控制器、通用、安全、认证、指标、日志配置全部绑定到默认值  s := KubeControllerManagerOptions{// 通用控制器管理器配置(并发、队列、超时、选举等)    Generic:         cmoptions.NewGenericControllerManagerConfigurationOptions(&componentConfig.Generic),// 云平台共享配置    KubeCloudShared: cpoptions.NewKubeCloudSharedOptions(&componentConfig.KubeCloudShared),// ===================== 所有控制器的默认配置 =====================    ServiceController: &cpoptions.ServiceControllerOptions{      ServiceControllerConfiguration: &componentConfig.ServiceController,    },// 卷挂载卸载控制器    AttachDetachController: &AttachDetachControllerOptions{      &componentConfig.AttachDetachController,    },// CSR 证书签名控制器    CSRSigningController: &CSRSigningControllerOptions{      &componentConfig.CSRSigningController,    },// DaemonSet 控制器    DaemonSetController: &DaemonSetControllerOptions{      &componentConfig.DaemonSetController,    },// Deployment 控制器    DeploymentController: &DeploymentControllerOptions{      &componentConfig.DeploymentController,    },// StatefulSet 控制器    StatefulSetController: &StatefulSetControllerOptions{      &componentConfig.StatefulSetController,    },// 已废弃标记控制器    DeprecatedFlags: &DeprecatedControllerOptions{      &componentConfig.DeprecatedController,    },// Endpoint 控制器    EndpointController: &EndpointControllerOptions{      &componentConfig.EndpointController,    },// EndpointSlice 控制器    EndpointSliceController: &EndpointSliceControllerOptions{      &componentConfig.EndpointSliceController,    },// EndpointSlice 镜像控制器    EndpointSliceMirroringController: &EndpointSliceMirroringControllerOptions{      &componentConfig.EndpointSliceMirroringController,    },// 垃圾回收控制器    GarbageCollectorController: &GarbageCollectorControllerOptions{      &componentConfig.GarbageCollectorController,    },// HPA 自动扩缩容控制器    HPAController: &HPAControllerOptions{      &componentConfig.HPAController,    },// Job 一次性任务控制器    JobController: &JobControllerOptions{      &componentConfig.JobController,    },// CronJob 定时任务控制器    CronJobController: &CronJobControllerOptions{      &componentConfig.CronJobController,    },// 命名空间控制器    NamespaceController: &NamespaceControllerOptions{      &componentConfig.NamespaceController,    },// 节点 IP 网段分配控制器    NodeIPAMController: &NodeIPAMControllerOptions{      &componentConfig.NodeIPAMController,    },// 节点生命周期控制器    NodeLifecycleController: &NodeLifecycleControllerOptions{      &componentConfig.NodeLifecycleController,    },// PV/PVC 绑定控制器    PersistentVolumeBinderController: &PersistentVolumeBinderControllerOptions{      &componentConfig.PersistentVolumeBinderController,    },// Pod 垃圾回收控制器    PodGCController: &PodGCControllerOptions{      &componentConfig.PodGCController,    },// ReplicaSet 副本集控制器    ReplicaSetController: &ReplicaSetControllerOptions{      &componentConfig.ReplicaSetController,    },// 旧版 Replication 控制器(兼容用)    ReplicationController: &ReplicationControllerOptions{      &componentConfig.ReplicationController,    },// 资源配额控制器    ResourceQuotaController: &ResourceQuotaControllerOptions{      &componentConfig.ResourceQuotaController,    },// 服务账号(SA)控制器    SAController: &SAControllerOptions{      &componentConfig.SAController,    },// Job 完成后自动清理控制器    TTLAfterFinishedController: &TTLAfterFinishedControllerOptions{      &componentConfig.TTLAfterFinishedController,    },// ===================== 控制器配置结束 =====================// 安全服务配置(HTTPS、TLS、监听端口)    SecureServing:  apiserveroptions.NewSecureServingOptions().WithLoopback(),// 认证配置(委托认证:ServiceAccount、客户端证书等)    Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(),// 授权配置(委托授权:RBAC 权限控制)    Authorization:  apiserveroptions.NewDelegatingAuthorizationOptions(),// 指标采集配置(Prometheus 监控)    Metrics:        metrics.NewOptions(),// 日志配置    Logs:           logs.NewOptions(),  }// 3. 认证/授权配置:kubeconfig 文件可选(非必须)  s.Authentication.RemoteKubeConfigFileOptional = true  s.Authorization.RemoteKubeConfigFileOptional = true// 4. 安全证书配置:// 证书目录为空 → 表示内存生成自签名证书(不落地文件)// 证书名称:kube-controller-manager  s.SecureServing.ServerCert.CertDirectory = ""  s.SecureServing.ServerCert.PairName = "kube-controller-manager"// 绑定默认端口:10257(k8s 官方固定端口)  s.SecureServing.BindPort = ports.KubeControllerManagerPort// 5. 垃圾回收控制器:设置默认忽略的资源(不回收这些资源)  gcIgnoredResources := make([]garbagecollectorconfig.GroupResource, 0len(garbagecollector.DefaultIgnoredResources()))for r := range garbagecollector.DefaultIgnoredResources() {    gcIgnoredResources = append(gcIgnoredResources, garbagecollectorconfig.GroupResource{Group: r.Group, Resource: r.Resource})  }  s.GarbageCollectorController.GCIgnoredResources = gcIgnoredResources// 6. 领导者选举(Leader Election)配置:// 保证集群中只有一个控制器管理器主节点工作  s.Generic.LeaderElection.ResourceName = "kube-controller-manager"  s.Generic.LeaderElection.ResourceNamespace = "kube-system"// 7. 返回初始化完成的配置对象return &s, nil}

其中NewDefaultComponentConfig函数返回默认的配置

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\options\options.go// NewDefaultComponentConfig returns kube-controller manager configuration object.func NewDefaultComponentConfig() (kubectrlmgrconfig.KubeControllerManagerConfiguration, error) {  versioned := kubectrlmgrconfigv1alpha1.KubeControllerManagerConfiguration{}  kubectrlmgrconfigscheme.Scheme.Default(&versioned)  internal := kubectrlmgrconfig.KubeControllerManagerConfiguration{}if err := kubectrlmgrconfigscheme.Scheme.Convert(&versioned, &internal, nil); err != nil {return internal, err  }return internal, nil}

在 NewKubeControllerManagerOptions中可以看到,s的各个controller字段使用componentConfig赋值。所以上面的整个启动流程如下:

1. 程序启动入口        ↓2. NewDefaultComponentConfig()        ↓   产出【默认ComponentConfig】:存储所有控制器官方默认参数(默认值唯一源头)        ↓3. NewKubeControllerManagerOptions()// 核心:把默认ComponentConfig的指针传给每一个控制器子Option        ↓   产出【顶层Options对象】   ├─ 内部所有子Option都引用上面的默认Config,自带默认值   └─ 初始化安全端口、证书、leader选举、垃圾回收等固定配置        ↓4. Cobra命令行绑定 AddFlags()        ↓   用户输入启动命令参数 --xxx,覆盖Options内部的参数值        ↓5. 调用 Options.Config() 方法   ├─ 执行参数校验 Validate()   ├─ 创建空的【运行时Config】结构体   └─ 调用 ApplyTo() 函数        ↓6. ApplyTo() 单向数据拷贝// 循环所有控制器子Option,逐个复制参数到运行时Config   GenericOption → 运行时Config.Generic   DeploymentOption → 运行时Config.Deployment   HPAControllerOption → 运行时Config.HPAController   ...所有控制器依次复制   同步安全服务、认证、授权配置        ↓7. 初始化指标、日志配置        ↓   产出最终【运行时Config】        ↓8. 启动所有控制器(Deployment/HPA/PV等)// 运行阶段全程只读取运行时Config,Options对象彻底丢弃不再使用

精简一下就是如下流程:

【默认值源头】DefaultComponentConfig        ↓(提供默认值给Options)【输入层】KubeControllerManagerOptions        ↓(ApplyTo单向拷贝)【运行层】Runtime Config        ↓(控制器读取)所有业务控制器

1.1.2、kube-controller-manager启动流程

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\controllermanager.go// NewControllerManagerCommand creates a *cobra.Command object with default parametersfunc NewControllerManagerCommand() *cobra.Command {  s, err := options.NewKubeControllerManagerOptions()if err != nil {    klog.Fatalf("unable to initialize command options: %v", err)  }  cmd := &cobra.Command{    Run: func(cmd *cobra.Command, args []string) {      verflag.PrintAndExitIfRequested()      cliflag.PrintFlags(cmd.Flags())      err := checkNonZeroInsecurePort(cmd.Flags())if err != nil {        fmt.Fprintf(os.Stderr, "%v\n", err)        os.Exit(1)      }      c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())if err != nil {        fmt.Fprintf(os.Stderr, "%v\n", err)        os.Exit(1)      }if err := Run(c.Complete(), wait.NeverStop); err != nil {        fmt.Fprintf(os.Stderr, "%v\n", err)        os.Exit(1)      }    },  }return cmd}

继续追踪Run方法

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\controllermanager.go// Run:运行 KubeControllerManager(kube-controller-manager)// 功能:启动控制器管理器的所有核心服务,一旦启动就永不退出// 参数:c 已完成的运行时配置(CompletedConfig)、stopCh 退出信号通道func Run(c *config.CompletedConfig, stopCh <-chan struct{})error {// 1. 注册配置到 configz(用于在线查看运行配置,调试接口)if cfgz, err := configz.New(ConfigzName); err == nil {    cfgz.Set(c.ComponentConfig)  } else {    klog.Errorf("unable to register configz: %v", err)  }// 2. 健康检查初始化// 准备健康检查器(healthz),如果开启了领导者选举,加入选主健康检查var checks []healthz.HealthCheckervar electionChecker *leaderelection.HealthzAdaptorif c.ComponentConfig.Generic.LeaderElection.LeaderElect {    electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)    checks = append(checks, electionChecker)  }// 3. 启动 HTTP 服务(健康检查、metrics、debug 接口)var unsecuredMux *mux.PathRecorderMuxif c.SecureServing != nil {// 创建基础路由(healthz/debug/metrics)    unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)// 构建认证、授权拦截器    handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)// 启动 HTTPS 服务if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {return err    }  }// 5. 创建 Kubernetes 客户端构造器// 用于给各个控制器创建独立的 API 客户端  clientBuilder, rootClientBuilder := createClientBuilders(c)// 6. 封装 SA Token 控制器启动方法(特殊控制器,需要优先启动)  saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController// ===================== 核心运行函数 =====================// run:真正启动所有控制器的逻辑  run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {// 创建控制器上下文(包含 informer、客户端、队列、停止信号等)    controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())if err != nil {      klog.Fatalf("error building controller context: %v", err)    }// 获取所有要启动的控制器初始化列表    controllerInitializers := initializersFunc(controllerContext.LoopMode)// 启动所有控制器(Deployment、HPA、StatefulSet 等)if err := StartControllers(controllerContext, startSATokenController, controllerInitializers, unsecuredMux); err != nil {      klog.Fatalf("error starting controllers: %v", err)    }// 启动 informer 工厂(开始监听 APIServer 资源变化)    controllerContext.InformerFactory.Start(controllerContext.Stop)    controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)close(controllerContext.InformersStarted)// 永久阻塞,保持程序运行select {}  }// ===================== 模式判断:是否开启领导者选举 =====================// 如果 没有开启选主 → 直接启动所有控制器(单机模式)if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {    run(context.TODO(), saTokenControllerInitFunc, NewControllerInitializers)panic("unreachable")  }// ===================== 开启选主模式(多机高可用) =====================// 获取主机名,生成唯一 ID  id, err := os.Hostname()if err != nil {return err  }// 加唯一 UUID,防止同主机冲突  id = id + "_" + string(uuid.NewUUID())// 选主迁移相关(高可用升级用)var leaderMigrator *leadermigration.LeaderMigrator = nil  startSATokenController := saTokenControllerInitFunc// 如果开启了 Leader Migration(选主迁移),初始化迁移器if leadermigration.Enabled(&c.ComponentConfig.Generic) {    klog.Infof("starting leader migration")    leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration, "kube-controller-manager")// 包装 SA token 控制器,启动后通知迁移就绪    startSATokenController = func(ctx ControllerContext) (http.Handler, boolerror) {defer close(leaderMigrator.MigrationReady)return saTokenControllerInitFunc(ctx)    }  }// ===================== 启动主选主协程 =====================go leaderElectAndRun(c, id, electionChecker,    c.ComponentConfig.Generic.LeaderElection.ResourceLock,    c.ComponentConfig.Generic.LeaderElection.ResourceName,    leaderelection.LeaderCallbacks{// 当选为主节点时:启动所有控制器      OnStartedLeading: func(ctx context.Context) {        initializersFunc := NewControllerInitializersif leaderMigrator != nil {          initializersFunc = createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)          klog.Info("leader migration: starting main controllers.")        }        run(ctx, startSATokenController, initializersFunc)      },// 选主丢失时:直接崩溃退出(保证高可用切换)      OnStoppedLeading: func() {        klog.Fatalf("leaderelection lost")      },    })// 永久阻塞,保持进程运行select {}}

追踪StartControllers,首先启动特殊的sa控制器,因为其他控制器需要sa控制器构造token,其次遍历注册控制器,执行他们的initFn即可。

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\controllermanager.gofunc StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux)error {// 第一步:【必须第一个启动】SA Token 控制器// 作用:为所有其他控制器创建、颁发 Service Account 令牌// 如果这个控制器启动失败,其他控制器都没有权限访问 APIServerif startSATokenController != nil {if _, _, err := startSATokenController(ctx); err != nil {return err    }  }for controllerName, initFn := range controllers {    debugHandler, started, err := initFn(ctx)  }return nil}

上面完成了对kube-controller-manager启动的源码的分析,接下来就开始研究各类控制器。

各类工作负载控制器会比对资源期望副本数与当前实际运行 Pod 数量,依据差值执行扩容或缩容:

  • 获取现有 Pod 数量:通过 syncHandler 调用 Informer 拉取对应命名空间资源,结合标签筛选出归属 Pod 统计数量;
  • 扩容策略:采用指数渐进扩容(从 1 开始两倍递增),避免一次性批量创建 Pod 引发批量同类故障、压垮后端服务;
  • 缩容筛选逻辑:缩容前对存量 Pod 排序,优先清理风险更高、稳定性差的 Pod,优先级依次为 Pending 状态 Pod、同一节点冗余 Pod、创建时间早 / 就绪时长短 Pod、重启次数多的 Pod

我们从第一个控制器ReplicaSet开始

1.2、ReplicaSet控制器

ReplicaSet(副本集)是 Kubernetes 中保证 Pod 副本数量始终符合期望值的核心控制器。

它的核心作用:

  • 持续比对期望副本数 vs 当前运行 Pod 数
  • 不足则自动创建 Pod,多余则自动删除 Pod
  • 通过标签选择器精准管理归属的 Pod
  • 是 Deployment 的底层实现,Deployment 真正管理的就是 ReplicaSet

追踪ReplicaSet 的源代码,开始发车

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\controllermanager.gofunc NewControllerInitializers(loopMode ControllerLoopMode)map[string]InitFunc {  controllers := map[string]InitFunc{}  controllers["replicaset"] = startReplicaSetControllerreturn controllers}

追踪startReplicaSetController

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\apps.gofunc startReplicaSetController(ctx ControllerContext) (http.Handler, boolerror) {//  核心:创建并运行 ReplicaSet 控制器(goroutine 后台运行)// 这行代码 = 创建 ReplicaSet 控制器 + 启动它的无限同步循环go replicaset.NewReplicaSetController(// 1. 传入 ReplicaSet 的 informer:监听 ReplicaSet 资源变化    ctx.InformerFactory.Apps().V1().ReplicaSets(),// 2. 传入 Pod 的 informer:监听 Pod 资源变化(用来统计当前副本数)    ctx.InformerFactory.Core().V1().Pods(),// 3. 传入 K8s 客户端:用于创建/删除 Pod    ctx.ClientBuilder.ClientOrDie("replicaset-controller"),// 4. 并发扩容的“突发副本数”(控制每次最多扩容多少 Pod)BurstReplicas = 500    replicaset.BurstReplicas,// 5. 启动控制器:// ConcurrentRSSyncs = 并发工作协程数(多 worker 同时同步)// ctx.Stop = 退出信号  ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)// 启动成功,返回:无调试handler、已启动=true、无错误return niltruenil}

追踪NewReplicaSetController

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.go// NewReplicaSetController configures a replica set controller with the specified event recorderfunc NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {// 新事件广播器  eventBroadcaster := record.NewBroadcaster()  eventBroadcaster.StartStructuredLogging(0)  eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})if err := metrics.Register(legacyregistry.Register); err != nil {    klog.ErrorS(err, "unable to register metrics")  }return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,    apps.SchemeGroupVersion.WithKind("ReplicaSet"),"replicaset_controller","replicaset",    controller.RealPodControl{      KubeClient: kubeClient,      Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),    },  )}

追踪NewBaseController

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.go// ReplicaSetController:副本集控制器的核心结构体// 作用:管理 ReplicaSet,确保 Pod 数量永远等于期望数量type ReplicaSetController struct {// 1. 标识资源类型:用来区分是 ReplicaSet 还是 ReplicationController  schema.GroupVersionKind// 2. K8s 客户端:用来操作 API(创建/删除 Pod)  kubeClient clientset.Interface// 3. Pod 控制接口:封装了创建/删除 Pod 的方法,简化操作  podControl controller.PodControlInterface// 4. 并发扩容上限:一次最多扩容多少个 Pod,防止瞬间压力过大  burstReplicas int// 5. 同步处理函数:真正执行“比对期望数和当前数 → 扩缩容”的核心逻辑  syncHandler func(rsKey string)error// 6. 预期管理器:记录“我已经创建/删除了 Pod”,防止重复操作  expectations *controller.UIDTrackingControllerExpectations// 7. ReplicaSet 本地缓存(从 Informer 来),不用每次查 API  rsLister appslisters.ReplicaSetLister// 8. 判断 ReplicaSet 缓存是否已经和 APIServer 同步完成  rsListerSynced cache.InformerSynced// 9. Pod 本地缓存(从 Informer 来)  podLister corelisters.PodLister// 10. 判断 Pod 缓存是否同步完成,判断informer数据已经同步过了  podListerSynced cache.InformerSynced// 11. 工作队列:所有需要同步的 ReplicaSet 都排队在这里处理  queue workqueue.RateLimitingInterface}
// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.go// NewBaseController:创建 ReplicaSet 控制器的核心方法// 作用:初始化控制器、注册监听器、绑定事件处理函数func NewBaseController(  rsInformer appsinformers.ReplicaSetInformer,  // RS 监听器  podInformer coreinformers.PodInformer,        // Pod 监听器  kubeClient clientset.Interface,               // K8s 客户端  burstReplicas int,                            // 并发扩容上限  gvk schema.GroupVersionKind,                    metricOwnerName string,                         queueName string,                               podControl controller.PodControlInterface,    // Pod 操作工具) *ReplicaSetController {// 1. 注册客户端限流指标(监控用)if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {    ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())  }// 2. 创建 ReplicaSetController 结构体(控制器本体)  rsc := &ReplicaSetController{    GroupVersionKind: gvk,    kubeClient:    kubeClient,       // 操作 K8s API    podControl:    podControl,       // 创建/删除 Pod 的工具    burstReplicas: burstReplicas,    // 每次最多扩容多少 Pod    expectations:  controller.NewUIDTrackingControllerExpectations(), // 去重、防重复执行    queue:         workqueue.NewNamedRateLimitingQueue(...), // 任务队列(核心)    syncHandler:   rsc.syncReplicaSet, // 真正执行同步的函数  }// ------------------------------// 3. 注册 ReplicaSet 事件监听// 当 RS 新增、更新、删除时,触发对应函数// ------------------------------  rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{    AddFunc:    rsc.addRS,    // RS 创建 → 加入队列    UpdateFunc: rsc.updateRS, // RS 更新 → 加入队列    DeleteFunc: rsc.deleteRS, // RS 删除 → 加入队列  })  rsc.rsLister = rsInformer.Lister()       // 本地缓存读取 RS  rsc.rsListerSynced = rsInformer.Informer().HasSynced // 缓存是否同步完成// ------------------------------// 4. 注册 Pod 事件监听// Pod 创建、状态变化、删除 → 都触发同步// ------------------------------  podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{    AddFunc:    rsc.addPod,    // Pod 创建 → 触发 RS 同步    UpdateFunc: rsc.updatePod, // Pod 状态变化 → 触发 RS 同步    DeleteFunc: rsc.deletePod, // Pod 删除 → 触发 RS 同步  })  rsc.podLister = podInformer.Lister()       // 本地缓存读取 Pod  rsc.podListerSynced = podInformer.Informer().HasSynced// 5. 返回创建好的控制器return rsc}

其中metricOwnerName的值是replicaset_controller,用来注册一个指标,这个指标是replicaset_controller_rate_limiter_use,我们在prometheus中查看相关指标就可以发现有一堆控制器的rate_limiter_use的指标 ,它们的值为0,是用来注册到prometheus做统计有哪些controller开始了rateLimiter

  // 1. 注册客户端限流指标(监控用)if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {    ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())  }

其中rsInformer.Informer().AddEventHandlerpodInformer.Informer().AddEventHandler中添加了对应的回调函数,也就是当RS或者Pod创建,更新,删除的时候触发的动作

比如rsc.addPod,就是向queue中添加 <namespace>/<name> 或者<namespace>

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.gofunc (rsc *ReplicaSetController) addRS(obj interface{}) {  rs := obj.(*apps.ReplicaSet)  klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)  rsc.enqueueRS(rs)}

追踪enqueueRS

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.gofunc (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) {  key, err := controller.KeyFunc(rs)if err != nil {    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))return  }  rsc.queue.Add(key)}

追踪NewReplicaSetController().Run,查看控制器的启动。

追踪Run方法的第一个参数ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs,它对应的是kube-controller-manager --concurrent-replicaset-syncs=2这个启动参数

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\config\types.gopackage config// ReplicaSetControllerConfiguration contains elements describing ReplicaSetController.type ReplicaSetControllerConfiguration struct {// concurrentRSSyncs is the number of replica sets that are  allowed to sync// concurrently. Larger number = more responsive replica  management, but more// CPU (and network) load.  ConcurrentRSSyncs int32}

ConcurrentRSSyncs 表示,同时有多少个 ReplicaSet 可以在后台并行执行同步(扩缩容)。

继续追踪Run

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.go// Run begins watching and syncing.func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer rsc.queue.ShutDown()  controllerName := strings.ToLower(rsc.Kind)  klog.Infof("Starting %v controller", controllerName)defer klog.Infof("Shutting down %v controller", controllerName)if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {return  }// 启动ConcurrentRSSyncs个协程 for i := 0; i < workers; i++ {go wait.Until(rsc.worker, time.Second, stopCh)  }  <-stopCh}

追踪wait.Until,就是死循环周期性执行

// C:\git\kubernetes-1.22.3\vendor\k8s.io\apimachinery\pkg\util\wait\wait.go// Until 会一直循环执行函数 f,直到 stopCh 关闭信号到来。// 每隔 period 时间执行一次。//// Until 本质就是 JitterUntil 的简化版:不带抖动、滑动执行。func Until(f func(), period time.Duration, stopCh <-chan struct{}) {  JitterUntil(f, period, 0.0true, stopCh)}

追踪rsc.worker,死循环处理rsc.processNextWorkItem

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.go// worker runs a worker thread that just dequeues items, processes them, and marks them done.// It enforces that the syncHandler is never invoked concurrently with the same key.func (rsc *ReplicaSetController) worker() {for rsc.processNextWorkItem() {  }}

继续追踪rsc.processNextWorkItem

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.go// processNextWorkItem:从队列里取出一个 ReplicaSet 任务并处理// 不断被循环调用,一个一个处理排队的 RSfunc (rsc *ReplicaSetController) processNextWorkItem() bool {// 1. 从工作队列里拿一个任务 key(比如:default/my-rs)// quit == true 表示队列关闭,要退出了  key, quit := rsc.queue.Get()if quit {return false  }// 2. 标记这个任务“处理完成”(无论成功失败,最后都会执行)defer rsc.queue.Done(key)// 3.  真正执行同步:调用 syncReplicaSet 方法// 也就是:比对Pod数量 → 扩容/缩容  err := rsc.syncHandler(key.(string))// 4. 如果处理成功if err == nil {// 把任务从队列里删掉(成功完成,不再重试)    rsc.queue.Forget(key)return true  }// 5. 如果处理失败(报错了)// 打印错误日志  utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))// 把任务重新放回队列,等下重试(带限流、退避,不会疯狂重试)  rsc.queue.AddRateLimited(key)return true}

追踪ReplicaSetController.queue

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.gotype ReplicaSetController struct {// Controllers that need to be synced  queue workqueue.RateLimitingInterface}func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,  gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {  rsc := &ReplicaSetController{    queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),  }return rsc}
// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\util\workqueue\default_rate_limiters.go// DefaultControllerRateLimiter:控制器队列的【默认限流/重试器】// 作用:防止失败任务疯狂重试压垮集群// 特点:既有全局限流,又有单个任务的指数退避func DefaultControllerRateLimiter() RateLimiter {// 组合 2 种限流:谁限制得更严格,就听谁的return NewMaxOfRateLimiter(// 限流1:【单个任务指数退避限流】// 同一个任务失败重试:// 第一次等 5ms,第二次 10ms,第三次 20ms... 指数翻倍// 最大等待 1000秒(16分钟)    NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),// 限流2:【全局令牌桶限流】// 所有任务加起来:每秒最多放行 10 个重试// 最大积压 100 个(防止突发)    &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},  )}

追踪其中的GetDoneForget三个方法

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\util\workqueue\queue.go// Type is a work queue (see the package comment).// Type:工作队列的【底层数据结构】// 用来安全、有序、不重复地处理任务type Type struct {// 1. 排队队列:按顺序存放【等待被处理】的任务// 任务在这里排队,工人来 Get() 时从这里取  queue []t// 2. 脏集合:存放【需要处理】的任务(去重用)// 保证同一个任务不会被重复加入队列  dirty set// 3. 处理中集合:存放【正在被处理】的任务// 标记任务正在运行,防止同一个任务被并发处理  processing set// 4. 锁 + 条件变量:保证多协程并发安全(生产者/消费者模型)  cond *sync.Cond// 5. 是否正在关闭  shuttingDown bool// 6. 监控指标  metrics queueMetrics// 7. 内部更新周期 + 时钟  unfinishedWorkUpdatePeriod time.Duration  clock                      clock.Clock}

追踪Get方法

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\util\workqueue\queue.go// Get:阻塞式获取一个任务。// 队列空了就等着,直到有任务或队列关闭。// 拿到任务后,必须调用 Done() 标记完成。func (q *Type) Get() (item interface{}, shutdown bool) {  q.cond.L.Lock()defer q.cond.L.Unlock()// 如果队列空了,并且没关闭 → 阻塞等待for len(q.queue) == 0 && !q.shuttingDown {    q.cond.Wait()  }// 如果队列空了 → 说明关闭了if len(q.queue) == 0 {return niltrue  }// 从队列头部取出一个任务  item, q.queue = q.queue[0], q.queue[1:]// 统计指标  q.metrics.get(item)// 两个最关键操作  q.processing.insert(item)  // 标记:这个任务【正在处理】  q.dirty.delete(item)       // 从【待处理】里删掉// 返回任务return item, false}

追踪Done

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\util\workqueue\queue.go// Done:标记一个任务已经处理完成// 如果在处理期间,这个任务又被标记为「需要重新处理」(dirty)// 那么它会被再次加入队列,重新处理func (q *Type) Done(item interface{}) {  q.cond.L.Lock()defer q.cond.L.Unlock()// 统计监控指标  q.metrics.done(item)// 1. 把任务从「正在处理」集合中删掉// 告诉队列:我处理完了,不再占用这个任务  q.processing.delete(item)// 2. 检查:处理期间,这个任务是否又被标记为「需要处理」(dirty)if q.dirty.has(item) {// 如果是 → 把任务重新放回队列尾部    q.queue = append(q.queue, item)// 唤醒等待的 worker,让它来处理这个重新入队的任务    q.cond.Signal()  }}

追踪Forget,任务【成功了】,要把【失败次数清零】!

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\util\workqueue\default_rate_limiters.go// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit// dealing with max failures and expiration are up to the callertype ItemExponentialFailureRateLimiter struct {  failuresLock sync.Mutex  failures     map[interface{}]int  baseDelay time.Duration  maxDelay  time.Duration}var _ RateLimiter = &ItemExponentialFailureRateLimiter{}func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {return &ItemExponentialFailureRateLimiter{    failures:  map[interface{}]int{},    baseDelay: baseDelay,    maxDelay:  maxDelay,  }}func DefaultItemBasedRateLimiter() RateLimiter {return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)}func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {  r.failuresLock.Lock()defer r.failuresLock.Unlock()  exp := r.failures[item]  r.failures[item] = r.failures[item] + 1// The backoff is capped such that 'calculated' value never overflows.  backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2float64(exp))if backoff > math.MaxInt64 {return r.maxDelay  }  calculated := time.Duration(backoff)if calculated > r.maxDelay {return r.maxDelay  }return calculated}func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {  r.failuresLock.Lock()defer r.failuresLock.Unlock()return r.failures[item]}func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {  r.failuresLock.Lock()defer r.failuresLock.Unlock()delete(r.failures, item)}

看了一下限流队列的源代码,大体是下面这样的:

① RateLimiter(限流器)

只干一件事:算下次要等多久

  • 失败次数越多,等待时间越长(指数增长:1ms → 2ms → 4ms → 8ms…)
  • 用 When() 计算时间
  • 用 Forget() 成功清零
  • 不存储任务、不控制队列,只算时间

② DelayingQueue(延时队列)

只干一件事:到时间再把任务给你

  • 接收 AddAfter(item, 时间)
  • 内部用 最小堆(优先队列) 按时间排序
  • 后台 waitingLoop 检查:时间到了 → 放行
  • 不到时间就睡觉,不占 CPU

③ RateLimitingQueue(限流队列)

只干一件事:把限流 + 延时结合起来

  • 对外提供 AddRateLimited(item)
  • 内部调用 limiter.When() 得到等待时间
  • 再调用 delayQueue.AddAfter()
  • 它是个 “中介”

  1. 完整工作流程(一句话版)

失败 → 限流算时间 → 延时队列等时间 → 时间到重试 → 成功清零

继续追踪ReplicaSetController.syncReplicaSet

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.go// syncReplicaSet 是 ReplicaSet 控制器的核心同步方法// 作用:检查当前 Pod 数量是否符合期望,执行创建/删除操作,最终更新状态// 注意:同一个 ReplicaSet 同一时间只能被一个协程处理func (rsc *ReplicaSetController) syncReplicaSet(key stringerror {// 把队列中的 key(格式:namespace/name)拆分成命名空间和名称  namespace, name, err := cache.SplitMetaNamespaceKey(key)// 从本地缓存(Informer)中获取当前的 ReplicaSet 对象  rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)// 检查是否满足同步条件:之前创建/删除 Pod 的操作已经完成(收到了事件)  rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)// 将 RS 的标签选择器转换为标准选择器,用于筛选归属的 Pod  selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)// 从本地缓存列出当前命名空间下的所有 Pod// 目的:防止有些 Pod 标签变了,但引用还是旧的,导致漏删  allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())// 过滤掉非活跃 Pod(已失败、已删除等),只保留运行中的 Pod  filteredPods := controller.FilterActivePods(allPods)// 认领 Pod:筛选出属于当前 RS 的活跃 Pod// 同时处理旧引用的 Pod,确保归属关系正确  filteredPods, err = rsc.claimPods(rs, selector, filteredPods)var manageReplicasErr error// 如果满足同步条件,并且 RS 没有被删除,则执行扩缩容逻辑if rsNeedsSync && rs.DeletionTimestamp == nil {//  核心:管理副本(少了创建,多了删除)    manageReplicasErr = rsc.manageReplicas(filteredPods, rs)  }// 深拷贝 RS 对象,避免修改本地缓存中的数据  rs = rs.DeepCopy()// 计算最新状态:当前副本数、就绪副本数、可用副本数  newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)// 将最新状态更新到 APIServer  updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)// 如果配置了最小就绪时间,且副本已就绪但未可用,延迟再次同步if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&    updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&    updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {// 等待 MinReadySeconds 后重新入队,确保 Pod 稳定可用    rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)  }// 返回扩缩容的执行结果return manageReplicasErr}

追踪manageReplicasmanageReplicas 只做两件事:

  1. 副本少了那么慢启动批量创建 Pod
  2. 副本多了 那么选择最差的Pod并发删除
// manageReplicas 检查并调整 Pod 副本数量// 不会修改传入的 filteredPods// 如果创建/删除 Pod 失败,会让 RS 重新入队重试func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {// 计算当前副本数与期望副本数的差值// diff < 0 → 副本不足,需要创建// diff > 0 → 副本过多,需要删除  diff := len(filteredPods) - int(*(rs.Spec.Replicas))// 获取 RS 的唯一标识 key  rsKey, err := controller.KeyFunc(rs)// ==============================================// 情况 1:当前 Pod 数量 不足,需要创建 Pod// ==============================================if diff < 0 {// 把 diff 转为正数,表示需要创建多少个    diff *= -1// 限制单次最大创建数量,防止瞬间压力过大if diff > rsc.burstReplicas {      diff = rsc.burstReplicas    }// 告诉期望控制器:我即将创建 diff 个 Pod// 防止重复创建    rsc.expectations.ExpectCreations(rsKey, diff)// 慢启动批量创建:从 1 个开始,指数级增加批量大小// 防止一次性创建太多失败导致风暴    successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func()error {// 创建 Pod      err := rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))if err != nil {// 如果命名空间正在删除,直接忽略,不用重试if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {return nil        }      }return err    })  }// ==============================================// 情况 2:当前 Pod 数量 过多,需要删除 Pod// ==============================================else if diff > 0 {// 限制单次最大删除数量if diff > rsc.burstReplicas {      diff = rsc.burstReplicas    }// 获取关联的 Pod(包括间接关联)    relatedPods, err := rsc.getIndirectlyRelatedPods(rs)// 选择要删除的 Pod(优先删除:Pending、未就绪、旧的、异常的)    podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)// 告诉期望控制器:我要删除这些 Pod    rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))// 并发删除 Pod,使用 WaitGroup 等待所有删除协程完成    errCh := make(chan error, diff)var wg sync.WaitGroup    wg.Add(diff)for _, pod := range podsToDelete {go func(targetPod *v1.Pod) {defer wg.Done()// 执行删除if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {// 删除失败,修正预期          podKey := controller.PodKey(targetPod)          rsc.expectations.DeletionObserved(rsKey, podKey)if !apierrors.IsNotFound(err) {            klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)            errCh <- err          }        }      }(pod)    }// 等待所有删除完成    wg.Wait()// 返回第一个错误select {case err := <-errCh:if err != nil {return err      }default:    }  }// 既不扩容也不缩容,直接返回return nil}

其中需要关注的是slowStartBatch,在扩容过程,慢启动扩容函数 slowStartBatch,目的是为了防止大量创建pod出现相同的错误。

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.gofunc slowStartBatch(count int, initialBatchSize int, fn func()error) (interror)

slowStartBatch的逻辑举例,总共创建 10 个Pod,初始批次大小 = 1

  1. 批次 1:创建 1 个,全部成功 → 下一批扩容至 2;

  2. 批次 2:创建 2 个,全部成功 → 下一批扩容至 4;

  3. 批次 3:创建 4 个,全部成功 → 剩余 3 个;

  4. 批次 4:创建剩余 3 个,全部成功;

    最终成功总数 = 10,无错误返回。

失败场景,总共创建 10 个Pod,初始批次大小 = 1

  1. 批次 1:创建 1 个,成功;

  2. 批次 2:创建 2 个,其中 1 个失败;

    函数直接终止,剩余 7 个Pod,不再创建,返回成功总数 1 + 捕获到的错误。

func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, error)

slowStartBatch这个函数的第三个,也就是CreatePods用来创建Pod,底层调用clientcreate pod,通过apiserver写入etcd等待scheduler的调度

接下来看一下缩容过程,关注getPodsToDelete

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.go// getPodsToDelete:从当前运行的 Pod 里挑选出 **要删除的 Pod**// 目的:ReplicaSet 缩容时,选出最合适删掉的 Pod(保证集群稳定)func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {// 如果不是删除所有 Pod,只是删除一部分,就需要给 Pod 排序// 选出“最该删除”的 Podif diff < len(filteredPods) {// 给 Pod 打分:同一节点上相关 Pod 越少,优先级越高(优先删除)    podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)// 按照打分排序,把“最该删除”的 Pod 排在最前面    sort.Sort(podsWithRanks)// 上报监控指标(用于观察删除逻辑的行为)    reportSortingDeletionAgeRatioMetric(filteredPods, diff)  }// 取前 diff 个 Pod 返回,这些就是要删除的 Podreturn filteredPods[:diff]}

追踪getPodsRankedByRelatedPodsOnSameNode

// C:\git\kubernetes-1.22.3\pkg\controller\replicaset\replica_set.go// getPodsRankedByRelatedPodsOnSameNode// 功能:给每个 Pod 计算一个“排名分数”,分数 = 这个节点上运行的【同类/相关Pod数量】// 排名分数用来决定:缩容时,哪个 Pod 优先被删除func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {// 第一步:统计每个节点上,有多少个【活跃的相关Pod】// 用 map 保存:key=节点名称,value=该节点上Pod数量  podsOnNode := make(map[string]int)for _, pod := range relatedPods {// 只统计活跃的Pod(运行中、未删除、未失败)if controller.IsPodActive(pod) {      podsOnNode[pod.Spec.NodeName]++    }  }// 第二步:给每个需要排序的Pod,设置排名分数// 分数 = 它所在节点上的【相关Pod总数】  ranks := make([]intlen(podsToRank))for i, pod := range podsToRank {    ranks[i] = podsOnNode[pod.Spec.NodeName]  }// 返回带排名的Pod列表,用于后面排序return controller.ActivePodsWithRanks{    Pods:  podsToRank,    Rank:  ranks,    Now:   metav1.Now(),  }}

追踪ActivePodsWithRanks

// C:\git\kubernetes-1.22.3\pkg\controller\controller_utils.go// ActivePodsWithRanks 带有排名的活跃Pod列表// 用于 ReplicaSet 缩容时,给 Pod 排序,决定谁先被删除type ActivePodsWithRanks struct {// Pods:需要排序的 Pod 列表  Pods []*v1.Pod// Rank:分数(节点上同类Pod数量),分数越高越先删  Rank []int// Now:排序用的时间基准  Now metav1.Time}// Len:返回 Pod 数量(排序接口必须实现)func (s ActivePodsWithRanks) Len() int {return len(s.Pods)}// Swap:交换两个 Pod 的位置(排序接口必须实现)func (s ActivePodsWithRanks) Swap(i, j int) {  s.Pods[i], s.Pods[j] = s.Pods[j], s.Pods[i]  s.Rank[i], s.Rank[j] = s.Rank[j], s.Rank[i]}// Less 核心:比较两个 Pod,返回 true 表示 i 比 j 更应该被删除func (s ActivePodsWithRanks) Less(i, j intbool {// 1. 优先删除【未分配节点】的 Pod// 没节点的 Pod 优先删if s.Pods[i].Spec.NodeName != s.Pods[j].Spec.NodeName && (len(s.Pods[i].Spec.NodeName) == 0 || len(s.Pods[j].Spec.NodeName) == 0) {return len(s.Pods[i].Spec.NodeName) == 0  }// 2. 按阶段排序:Pending < Unknown < Running// 越不健康的越先删if podPhaseToOrdinal[s.Pods[i].Status.Phase] != podPhaseToOrdinal[s.Pods[j].Status.Phase] {return podPhaseToOrdinal[s.Pods[i].Status.Phase] < podPhaseToOrdinal[s.Pods[j].Status.Phase]  }// 3. 未就绪 < 就绪// 不健康的优先删if podutil.IsPodReady(s.Pods[i]) != podutil.IsPodReady(s.Pods[j]) {return !podutil.IsPodReady(s.Pods[i])  }// 4. 删除成本低的优先删(PodDeletionCost)if utilfeature.DefaultFeatureGate.Enabled(features.PodDeletionCost) {    pi, _ := helper.GetDeletionCostFromPodAnnotations(s.Pods[i].Annotations)    pj, _ := helper.GetDeletionCostFromPodAnnotations(s.Pods[j].Annotations)if pi != pj {return pi < pj    }  }// 5. 节点上同类Pod越多,越优先删(打散分布)// rank 高 = 节点拥挤 → 优先删if s.Rank[i] != s.Rank[j] {return s.Rank[i] > s.Rank[j]  }// 6. 刚就绪的优先删(保留运行久的稳定Pod)if podutil.IsPodReady(s.Pods[i]) && podutil.IsPodReady(s.Pods[j]) {    readyTime1 := podReadyTime(s.Pods[i])    readyTime2 := podReadyTime(s.Pods[j])if !readyTime1.Equal(readyTime2) {return afterOrZero(readyTime1, readyTime2)    }  }// 7. 容器重启次数多的优先删(不稳定Pod)if maxContainerRestarts(s.Pods[i]) != maxContainerRestarts(s.Pods[j]) {return maxContainerRestarts(s.Pods[i]) > maxContainerRestarts(s.Pods[j])  }// 8. 越新的Pod越先删(保留老Pod)if !s.Pods[i].CreationTimestamp.Equal(&s.Pods[j].CreationTimestamp) {return afterOrZero(&s.Pods[i].CreationTimestamp, &s.Pods[j].CreationTimestamp)  }return false}

其中删除成本低的优先删,删除成本指的是controller.kubernetes.io/pod-deletion-cost设置的值

controller.kubernetes.io/pod-deletion-cost类别:注解示例:controller.kubernetes.io/pod-deletion-cost: "10"用于:Pod该注解用于设置 Pod 删除成本允许用户影响 ReplicaSet 缩减顺序。注解解析为 int32 类型。

pod-deletion-cost参考:https://kubernetes.io/zh-cn/docs/reference/labels-annotations-taints/

最终缩容动作调用DeletePod执行,也就是

rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs)

至此,ReplicaSet控制器分析完成。

1.3、Job控制器

1.3.1、Job 和 Pod 的核心关系

  1. Job 控制器负责创建、调度、管理一批 Pod,专门执行一次性批处理任务;

  2. Job 会持续重试失败 Pod,直至满足设定的成功完成 Pod 数量;

  3. Job 持续统计成功正常退出的 Pod 个数,成功 Pod 数量达到 completions 阈值时,Job 任务标记完成;

  4. 资源联动规则:

    • 删除 Job:会级联清除该 Job 创建的所有 Pod
    • 挂起 Job:立刻终止所有运行中的活跃 Pod,恢复 Job 后才会重新创建 Pod 执行任务。

1.3.2、Job 完整 YAML 示例说明

apiVersion: batch/v1kind: Jobmetadata:name: pispec:backoffLimit: 6# Pod失败标记Job失败前,最大重试次数,默认6次completions: 10# Job需要成功执行完成的Pod总数量,不指定则默认等于parallelismparallelism: 2# 同一时间最多并发运行的Pod数量,默认1ttlSecondsAfterFinished: 60# Job完成后等待60秒自动删除Job及关联Podtemplate:spec:containers:name: piimage: perl# 计算2位圆周率command: ["perl",  "-Mbignum=bpi""-wle""print bpi(2)"]restartPolicy: Never # Pod内容器失败后不重启容器

执行效果演示

  1. 创建 Job 后,受parallelism:2限制,同一时刻只会启动 2 个 Pod 拉取镜像、运行任务:
[root]# kubectl get podNAME           READY   STATUS              RESTARTS   AGEpi-4xygm       0/1     ContainerCreating   0          22spi-889t5       0/1     ContainerCreating   0          22s
  1. 实时监控 Job 完成进度,控制器会不断新建 Pod 补齐并发数,直到累计 10 个 Pod 执行成功:
[root]# kubectl get job -wNAME   COMPLETIONS   DURATION   AGEpi     0/10          2s         2spi     1/10          4s         4spi     2/10          6s         6spi     ...pi     10/10         29s        29s

1.3.3、Job 扩缩容限制

Job 创建完成后不支持运行时动态扩缩容:核心字段 spec.completions(总完成 Pod 数)不允许修改,无法在线调整任务总量。

1.3.4、Job 资源回收机制

1. 手动删除规则

任务执行完毕的 Pod 会保持Completed状态驻留集群,持续占用集群资源;

执行 kubectl delete job pi 删除Job 时,会级联删除所有归属该 Job 的Pod

  1. 自动 TTL 清理(推荐)

K8sv1.12 版本新增 TTLAfterFinished 特性,启用 TTL 控制器实现 Job 自动回收:

  1. 在 Job Spec 配置 ttlSecondsAfterFinished,控制器会在Job 完成后等待指定时长,自动删除 Job 和所有关联 Pod
  2. 配置值说明:
    • ttlSecondsAfterFinished: 60Job 完成 60 秒后自动清理;
    • ttlSecondsAfterFinished: 0Job 执行成功后立刻清理;
  3. 清理逻辑:TTL 控制器删除Job 时采用级联删除策略,Job 与全部 Pod 一并清除。

示例:等待 60 秒后查询资源,Job 与对应 Pod 均已消失:

查询Job,无资源[root]#  kubectl get jobNo resources found in default namespace.查询Pod,仅其他业务Pod保留[root]#  kubectl get podNAME                       READY   STATUS    RESTARTS   AGEgrafana-123fb84d84-po2lz   1/1     Running   0          9m21s

1.3.5、Job 内 Pod 两大核心控制参数

1. restartPolicyPod 容器重启策略

Job 的 Pod 仅支持两种重启策略,作用于容器执行失败场景:

  • restartPolicy: OnFailure:容器运行失败时,原地重启容器,不会新建整个 Pod
  • restartPolicy: Never:容器运行失败后直接退出,控制器会新建一个全新 Pod 重试任务。

2. activeDeadlineSecondsJob 全局最长运行时限

用于限制整个 Job 的总运行时长(单位:秒),优先级高于重试次数:

spec:backoffLimit: 5activeDeadlineSeconds: 100

执行逻辑:

  1. Job 控制器持续校验activeDeadlineSeconds配置;
  2. 若该字段不为空,实时统计 Job 整体运行时长;
  3. 一旦运行时长超过设定阈值,直接终止所有活跃 Pod,将 Job 标记为失败,终止原因为DeadlineExceeded,不再进行重试。

1.3.6、Job源码

接着开始分析Job源码,Job控制器的初始化也是在NewControllerInitializers

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\controllermanager.go// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)// paired to their InitFunc.  This allows for structured downstream composition and subdivision.func NewControllerInitializers(loopMode ControllerLoopMode)map[string]InitFunc {  controllers := map[string]InitFunc{}  controllers["job"] = startJobControllerreturn controllers}

追踪startJobController

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\batch.gofunc startJobController(ctx ControllerContext) (http.Handler, boolerror) {go job.NewController(// 代表PodInformer,即要监听的pod对象    ctx.InformerFactory.Core().V1().Pods(),// 代表JobInformer,即要监听的job对象    ctx.InformerFactory.Batch().V1().Jobs(),// 代表和apiserver通信的kubeClient对象    ctx.ClientBuilder.ClientOrDie("job-controller"),  ).Run(int(ctx.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Stop)return niltruenil}

这个Job控制器和ReplicaSet控制器一样,也是获取一个控制器,启动这个控制器,这两个步骤。

首先追踪NewController获得JobController对象

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// 功能:创建一个全新的 Job 控制器// 作用:让 Job 和它管理的 Pod 始终保持状态同步,实现 Job 自动管理 Pod 的核心逻辑func NewController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {//  1. 事件广播器初始化 // 创建事件广播器:用于打印控制器日志、向 Kubernetes 发送事件(kubectl describe 可见)  eventBroadcaster := record.NewBroadcaster()// 开启结构化日志输出  eventBroadcaster.StartStructuredLogging(0)// 将事件写入 APIServer 的事件存储中,供 kubectl describe 查询  eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})//  2. 限流与监控注册 // 如果客户端启用了限流,注册限流指标if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {// 注册job_controller_rate_limiter_use这个指标,用来标识JobController开启了限流    ratelimiter.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())  }//  3. 初始化 Job 控制器结构体 // 创建控制器实例,这是 Job 控制器的“大脑”  jm := &Controller{// K8s 客户端:用于操作 Job、Pod 等资源    kubeClient: kubeClient,// Pod 控制器:真正执行创建/删除 Pod 操作的工具    podControl: controller.RealPodControl{      KubeClient: kubeClient,      Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),    },// 期望管理器:记录预期创建的 Pod 数量,防止重复创建    expectations: controller.NewControllerExpectations(),// 主工作队列:存放需要处理的 Job,带指数退避重试    queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),// 孤儿 Pod 队列:处理不属于任何 Job 的 Pod    orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"),// 事件记录器:记录控制器产生的事件    recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),  }//  4. 注册 Job 资源监听器 // 监听 Job 的创建、更新、删除事件  jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{// Job 创建:加入队列,触发同步    AddFunc: func(obj interface{}) {      jm.enqueueController(obj, true)    },// Job 更新:执行更新逻辑    UpdateFunc: jm.updateJob,// Job 删除:加入队列,清理资源    DeleteFunc: func(obj interface{}) {      jm.enqueueController(obj, true)    },  })// Job 缓存:用于快速获取 Job 信息  jm.jobLister = jobInformer.Lister()// Job 缓存同步状态:判断缓存是否已初始化完成  jm.jobStoreSynced = jobInformer.Informer().HasSynced//  5. 注册 Pod 资源监听器 // 监听 Pod 的创建、更新、删除事件// Pod 状态变化会直接驱动 Job 状态更新(完成/失败/重试)  podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{    AddFunc:    jm.addPod,    // Pod 创建:更新 Job 计数    UpdateFunc: jm.updatePod, // Pod 状态变化:成功/失败/运行中    DeleteFunc: jm.deletePod, // Pod 删除:Job 补齐缺失的 Pod  })// Pod 缓存:用于快速获取 Pod 信息  jm.podStore = podInformer.Lister()// Pod 缓存同步状态  jm.podStoreSynced = podInformer.Informer().HasSynced//  6. 绑定核心业务处理函数 // 更新 Job 状态(完成数、失败数、运行状态)  jm.updateStatusHandler = jm.updateJobStatus// 补丁更新 Job 配置  jm.patchJobHandler = jm.patchJob// 核心同步函数:Job 管理 Pod 的总逻辑(创建、计数、重试、完成判断)  jm.syncHandler = jm.syncJob//  7. 注册监控指标 // 注册 Prometheus 监控指标,用于观测控制器运行状态  metrics.Register()// 返回创建好的 Job 控制器return jm}

仔细追踪Job Controller的定义

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// Controller ensures that all Job objects have corresponding pods to// run their configured workload.type Controller struct {  kubeClient clientset.Interface  podControl controller.PodControlInterface// To allow injection of the following for testing.  updateStatusHandler func(job *batch.Job)error  patchJobHandler     func(job *batch.Job, patch []byte)error  syncHandler         func(jobKey string) (boolerror)// podStoreSynced returns true if the pod store has been synced at least once.// Added as a member to the struct to allow injection for testing.  podStoreSynced cache.InformerSynced// jobStoreSynced returns true if the job store has been synced at least once.// Added as a member to the struct to allow injection for testing.  jobStoreSynced cache.InformerSynced// A TTLCache of pod creates/deletes each rc expects to see  expectations controller.ControllerExpectationsInterface// A store of jobs  jobLister batchv1listers.JobLister// A store of pods, populated by the podController  podStore corelisters.PodLister// Jobs that need to be updated  queue workqueue.RateLimitingInterface// Orphan deleted pods that still have a Job tracking finalizer to be removed  orphanQueue workqueue.RateLimitingInterface  recorder record.EventRecorder}
功能模块
字段名
核心作用
设计目的 / 补充说明
核心客户端与操作接口
kubeClient
作为控制器与 APIServer 通信的入口,调用 Kubernetes API 实现 JobPod 等资源的 CRUD 操作
标准化通信接口,支持注入 Fake Client 方便单元测试
podControl
封装 Pod 资源的创建、删除、补丁等操作,控制器通过该接口操作 Pod
简化 Pod 操作逻辑,解耦具体实现,测试时可注入 FakePodControl
可注入处理函数(测试 / 扩展)
updateStatusHandler
封装 Job 状态(如完成数、成功数)的更新逻辑
测试时注入模拟函数,验证状态更新逻辑,无需调用真实 APIServer
patchJobHandler
通过 Patch 方式局部更新 Job 资源(如标签、注解、状态)
减少全量更新的网络传输和资源冲突,解耦 Patch 操作实现
syncHandlerJob
 控制器的核心同步逻辑,处理单个 Job 的调谐(创建 / 删除 Pod、更新状态)
测试时注入模拟函数,单独验证核心业务逻辑
Informer
缓存同步状态
podStoreSynced
检查 Pod 资源的 Informer 本地缓存是否完成初始同步
确保控制器仅在 Pod 缓存就绪后执行业务逻辑,避免数据不全
jobStoreSynced
检查 Job 资源的 Informer 本地缓存是否完成初始同步
确保控制器仅在 Job 缓存就绪后执行业务逻辑,避免数据不全
期望机制核心
expectations
跟踪控制器预期的 Pod 创建 / 删除事件数,判断是否执行同步操作一个 TTL 缓存,用于存储每个 rc 预期要观测到的 Pod 创建 / 删除事件
解决异步事件延迟 / 丢失导致的重复操作、状态不一致问题
本地缓存与查询
jobLister
提供对本地 Job 缓存的只读查询能力(按命名空间、标签筛选)
本地缓存查询性能远高于 APIServer 调用,提升控制器响应速度
podStore
提供对本地 Pod 缓存的只读查询能力,获取 Job 关联的 Pod 状态
减少 APIServer 压力,快速查询 Pod 运行状态(如运行中、已完成)
工作队列(任务调度)
queue存放需处理的 Job 资源 Key,实现事件驱动的任务调度
解耦事件接收与处理,限流功能避免资源频繁变化导致的风暴问题
orphanQueue
存放需清理的 “孤儿 Pod”(带 Job Finalizer 但所属 Job 已删除的 Pod
单独处理孤儿 Pod 清理,避免与正常 Job 同步逻辑冲突,提升稳定性
可观测性
recorder
向集群记录 Job/Pod 操作的事件(如创建失败、Job 完成)
提供控制器可观测性,方便运维排查问题,记录审计日志满足可追溯性需求

其中的新建queue的操作

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.govar (// DefaultJobBackOff is the default backoff period, exported for the e2e test  DefaultJobBackOff = 10 * time.Second// MaxJobBackOff is the max backoff period, exported for the e2e test  MaxJobBackOff = 360 * time.Second)// 首先是创建workqueue所需的RateLimiter对象,基础重试间隔为10秒,最长为360秒rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff)// 然后带上RateLimiter对象 新建名字为job的workqueuequeue:workqueue.NewNamedRateLimitingQueue(rateLimiter, "job"),
1.3.6.1、jobInformer.AddEventHandler

追踪jobInformer.Informer().AddEventHandler

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.gojobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{    AddFunc: func(obj interface{}) {      jm.enqueueController(obj, true)    },    UpdateFunc: jm.updateJob,    DeleteFunc: func(obj interface{}) {      jm.enqueueController(obj, true)    },  })

比如其中的AddFunc或者DeleteFunc,都是jm.enqueueController(obj, true)

追踪jm.enqueueController,可以看到

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// 函数作用:将 Job 对象加入控制器的工作队列,等待后续同步处理// obj:可以是 *batch.Job 对象,也可能是删除状态未知的标记对象// immediate:是否立即入队(不等待退避延迟),仅当 Pod 成功运行完成时才传 truefunc (jm *Controller) enqueueController(obj interface{}, immediate bool) {// 1. 从 Job 对象中生成唯一标识 Key(格式:namespace/name)  key, err := controller.KeyFunc(obj)// 2. 设置入队延迟时间(指数退避重试)  backoff := time.Duration(0)// 如果不是立即执行,就计算退避延迟(失败重试时使用)if !immediate {    backoff = getBackoff(jm.queue, key)  }// 3. 将 Job Key 加入工作队列,延迟 backoff 时间后处理// 延迟=0 立即处理;延迟>0 按重试策略等待后处理  jm.queue.AddAfter(key, backoff)}

其中AddAfter的意思是在backoff时间后再把key添加进去,这里backoff在 immediate=true时是0,如果immediate=false需要根据当前队列的排队情况获取一个backoff等待时长

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\util\workqueue\delaying_queue.go// AddAfter 在指定延迟时间后,将元素加入工作队列// 作用:实现 Job 控制器的**指数退避重试、延迟处理**func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {// 1. 如果队列正在关闭,直接放弃入队,避免资源泄露if q.ShuttingDown() {return  }// 2. 监控指标:记录一次重试行为(用于 Prometheus 监控)  q.metrics.retry()// 3. 如果延迟时间 <=0,代表不需要等待,直接立即加入队列if duration <= 0 {    q.Add(item)return  }// 4. 延迟入队核心逻辑:// 将任务包装成 waitFor 对象,记录【任务数据】和【准备就绪时间】// 同时监听停止信号,避免阻塞select {// 队列关闭时,退出不处理case <-q.stopCh:// 将延迟任务发送到等待通道,由后台协程定时处理case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:  }}

继续追踪jm.updateJob

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// updateJob 处理 Job 的更新事件// 当 Job 的配置、状态发生变化时,K8s 会自动调用这个方法func (jm *Controller) updateJob(old, cur interface{}) {// 1. 类型转换:把旧对象、新对象都转成 Job 类型  oldJob := old.(*batch.Job)  curJob := cur.(*batch.Job)// 2. 生成 Job 的唯一标识 key (namespace/name)  key, err := controller.KeyFunc(curJob)if err != nil {return  }// 3. 立即将 Job 加入队列,触发同步// immediate=true:表示立即同步,用于快速更新 Job 状态  jm.enqueueController(curJob, true)//  核心逻辑:处理 ActiveDeadlineSeconds 更新 // 检查:如果 Job 已经开始运行(StartTime 不为空)if curJob.Status.StartTime != nil {// 获取当前 Job 最新的 最长运行时限配置    curADS := curJob.Spec.ActiveDeadlineSeconds// 如果没设置时限,直接退出,无需处理if curADS == nil {return    }// 获取旧 Job 的最长运行时限配置    oldADS := oldJob.Spec.ActiveDeadlineSeconds// 判断:最长运行时限 被修改过(新增 / 改值)if oldADS == nil || *oldADS != *curADS {// 获取当前时间      now := metav1.Now()// 获取 Job 开始运行的时间      start := curJob.Status.StartTime.Time// 计算 Job 已经运行了多久      passed := now.Time.Sub(start)// 计算配置的总最长运行时长      total := time.Duration(*curADS) * time.Second// 关键:重新设置一个【超时闹钟】// 剩余时间 = 总允许时长 - 已运行时长// 到点后会自动触发 Job 终止(DeadlineExceeded)      jm.queue.AddAfter(key, total-passed)    }  }}

上面的代码中有关于ActiveDeadlineSeconds,也就是Job 最长运行时间的判断。上面的代码中会判断ActiveDeadlineSeconds是否发生了变化:

  • 场景1 :原来job中没有配置 ActiveDeadlineSeconds

  • 场景2 :jobActiveDeadlineSeconds发生了改变动作就是以新的ActiveDeadlineSeconds减去job已经存活的时间作为backoff等待时间,把新的job再延迟入队,ActiveDeadlineSeconds代表job的存活时间

当调用 jm.queue.AddAfter(key, 负数)时候,AddAfter识别负数,立即把Job key 放入工作队列,controller worker立刻取出keyJob Reconcile 逻辑检测Job 运行时长超过限制,杀掉所有 Pod,更新 Job 状态为超时失败,可以在JobStatus字段中看到。

status:conditions:type: Failedstatus: "True"reason: DeadlineExceededmessage: Job was active longer than specified deadline
1.3.6.2、podInformer.AddEventHandler
// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go  podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{    AddFunc:    jm.addPod,    UpdateFunc: jm.updatePod,    DeleteFunc: jm.deletePod,  })

追踪jm.addPod

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// addPod 当有 Pod 被创建时触发// 作用:找到管理这个 Pod 的 Job,把 Job 加入队列同步,让 Job 知道自己的 Pod 启动了func (jm *Controller) addPod(obj interface{}) {// 1. 把传入的对象转成 Pod 类型  pod := obj.(*v1.Pod)// 2. 如果这个 Pod 已经被标记为删除(DeletionTimestamp 不为空)// 说明不是新建,而是正在删除,直接走删除逻辑if pod.DeletionTimestamp != nil {// 控制器重启时可能出现这种情况:刚发现 Pod 就已经要删了    jm.deletePod(pod)return  }// 3. 检查 Pod 是否有属主(ControllerRef),即:是否归某个 Job 管理if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {// 找到这个 Pod 对应的 Job    job := jm.resolveControllerRef(pod.Namespace, controllerRef)// 找不到 Job,直接退出if job == nil {return    }// 生成 Job 的唯一 key    jobKey, err := controller.KeyFunc(job)if err != nil {return    }// 核心:通知控制器“我看到 Pod 创建成功了”// 作用:抵消 expectations 里的“预期创建计数”,防止重复创建 Pod    jm.expectations.CreationObserved(jobKey)// 把 Job 加入队列,立即同步状态(更新 COMPLETIONS、运行中 Pod 数等)    jm.enqueueController(job, true)return  }// 4. 如果 Pod 没有属主 → 孤儿 Pod// 寻找所有匹配这个 Pod 的 Job,尝试让 Job 收养它// 不标记 CreationObserved,因为孤儿 Pod 不是任何 Job 预期创建的for _, job := range jm.getPodJobs(pod) {    jm.enqueueController(job, true)  }}

追踪jm.updatePod

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// updatePod 当 Pod 发生更新时触发(状态变化、标签变化、归属变化等)// 作用:找到管理该 Pod 的 Job,唤醒 Job 进行状态同步// 如果 Pod 标签发生变化,需要同时唤醒【旧 Job】和【新 Job】// old 和 cur 必须都是 *v1.Pod 类型func (jm *Controller) updatePod(old, cur interface{}) {// 1. 类型转换:旧Pod、新Pod  curPod := cur.(*v1.Pod)  oldPod := old.(*v1.Pod)// 2. 如果资源版本一样 → 没有实际变化,直接返回// 周期性同步会重复发送事件,用 ResourceVersion 去重if curPod.ResourceVersion == oldPod.ResourceVersion {return  }// 3. 如果 Pod 被标记为删除(开始优雅删除)// 立即触发删除逻辑,让 Job 立刻补建新 Pod,不要等彻底删完if curPod.DeletionTimestamp != nil {    jm.deletePod(curPod)return  }// 4. 只有 Pod 状态为 Failed 时,才启用退避延迟重试// 其他状态(Running/Succeed)都立即同步,不延迟  immediate := curPod.Status.Phase != v1.PodFailed// 5. 获取 Pod 新旧两个版本的 属主引用(ControllerRef)  curControllerRef := metav1.GetControllerOf(curPod)  oldControllerRef := metav1.GetControllerOf(oldPod)// 判断:Pod 的归属 Job 是否发生了变化  controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)// 6. 如果归属发生变化,且原来有归属 → 通知旧 Job 同步if controllerRefChanged && oldControllerRef != nil {if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {      jm.enqueueController(job, immediate)    }  }// 7. 如果 Pod 当前有归属(正常被 Job 管理)if curControllerRef != nil {// 找到对应的 Job    job := jm.resolveControllerRef(curPod.Namespace, curControllerRef)if job == nil {return    }// 唤醒 Job 同步状态(计数、重试、完成判断)    jm.enqueueController(job, immediate)return  }// 8. 否则是孤儿 Pod:如果标签变了 / 归属变了// 寻找所有匹配的 Job,尝试让 Job 收养它  labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)if labelChanged || controllerRefChanged {for _, job := range jm.getPodJobs(curPod) {      jm.enqueueController(job, immediate)    }  }}

追踪jm.deletePod

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// deletePod 当 Pod 被删除时触发// 作用:找到管理这个 Pod 的 Job,通知 Job “Pod 没了”,让 Job 赶紧补建新 Pod// obj 可能是 *v1.Pod,也可能是删除状态未知的墓碑(tombstone)标记对象func (jm *Controller) deletePod(obj interface{}) {// 1. 尝试把对象转换成正常的 Pod 类型  pod, ok := obj.(*v1.Pod)// 2. 如果转不成正常 Pod,说明是【墓碑对象】(删除事件丢失时的兜底)// 从墓碑里把真正的 Pod 信息取出来if !ok {    tombstone, ok := obj.(cache.DeletedFinalStateUnknown)if !ok {      utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))return    }// 从墓碑中获取真实的 Pod 对象    pod, ok = tombstone.Obj.(*v1.Pod)if !ok {      utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %+v", obj))return    }  }// 3. 获取 Pod 的属主(管理它的 Job)  controllerRef := metav1.GetControllerOf(pod)// 如果没有属主(孤儿 Pod),直接忽略,没人关心它的删除if controllerRef == nil {return  }// 4. 根据属主信息,找到对应的 Job  job := jm.resolveControllerRef(pod.Namespace, controllerRef)// 如果找不到对应的 Jobif job == nil {// 如果 Pod 带有 Job 跟踪标记,加入孤儿 Pod 队列处理if hasJobTrackingFinalizer(pod) {      jm.enqueueOrphanPod(pod)    }return  }// 5. 生成 Job 的唯一标识 Key  jobKey, err := controller.KeyFunc(job)if err != nil {return  }// 6. 核心:通知控制器 “我观测到 Pod 被删除了”// 作用:更新预期,让 Job 知道预期运行的 Pod 少了一个  jm.expectations.DeletionObserved(jobKey)// 7. 把 Job 加入队列,立即同步// Job 收到后会发现 Pod 数量不够,立刻创建新的 Pod 补齐  jm.enqueueController(job, true)}

需要关注的是"墓碑对象","墓碑对象"的生成和消费过程如下:

  1. apiserver 中 Pod 被删除
  2. 因网络断开,Reflector 没收到删除事件
  3. Reflector 全量同步时发现 Pod 消失
  4. Reflector 创建墓碑对象 放入队列
  5. Job 控制器(消费者) 取出墓碑
  6. 得知 Pod 已删除,触发重建Pod

至此,Job控制器的创建过程完成。接下来分析Run

1.3.6.3、Job控制的Run方法

可以看到Job控制器的Run方法和Replicaset的类似

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// Run the main goroutine responsible for watching and syncing jobs.func (jm *Controller) Run(workers int, stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer jm.queue.ShutDown()defer jm.orphanQueue.ShutDown()  klog.Infof("Starting job controller")defer klog.Infof("Shutting down job controller")// 等待job和pod 至少 list同步一次if !cache.WaitForNamedCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) {return  }// 启动workersNum个 worker任务for i := 0; i < workers; i++ {go wait.Until(jm.worker, time.Second, stopCh)  }// 启动一个orphanWorker处理孤儿podgo wait.Until(jm.orphanWorker, time.Second, stopCh)  <-stopCh}

追踪jm.worker

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// worker runs a worker thread that just dequeues items, processes them, and marks them done.// It enforces that the syncHandler is never invoked concurrently with the same key.func (jm *Controller) worker() {for jm.processNextWorkItem() {  }}

从函数注释可以看到worker启动一个线程,从队列中消费任务,处理任务,标记任务为已完成内容就是不断的调用jm.processNextWorkItem,直到jm.processNextWorkItem返回false退出,继续追踪jm.processNextWorkItem()

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.gofunc (jm *Controller) processNextWorkItem() bool {// 从队列中获取一个key,  key, quit := jm.queue.Get()if quit {return false  }// defer 标记这个任务完成defer jm.queue.Done(key)// 调用syncHandler处理,返回的forget代表这个key不需要重试了  forget, err := jm.syncHandler(key.(string))if err == nil {if forget {      jm.queue.Forget(key)    }return true  }  utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err))// 发生错误,将key再推入队列中等待下一次调度  jm.queue.AddRateLimited(key)return true}
1.3.6.4、Job控制的syncJob方法

继续追踪syncHandler,也就是syncJob

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// syncJob 如果Job的期望(预期Pod创建/删除)都已满足,则同步该Job// 同一个key(同一个Job)不会被并发调用func (jm *Controller) syncJob(key string) (forget bool, rErr error) {//  1. 解析Job的唯一标识:namespace/name   ns, name, err := cache.SplitMetaNamespaceKey(key)//  2. 从本地缓存获取Job对象   sharedJob, err := jm.jobLister.Jobs(ns).Get(name)if err != nil {// Job不存在(已被删除)if apierrors.IsNotFound(err) {      jm.expectations.DeleteExpectations(key) // 清理预期return truenil    }return false, err  }// 深拷贝,避免修改共享缓存数据  job := *sharedJob.DeepCopy()//  3. 如果Job已经结束(成功/失败),直接返回 if IsJobFinished(&job) {return truenil  }//  4. 功能特性校验 // 不支持IndexedJob但配置了,直接跳过if !feature.DefaultFeatureGate.Enabled(features.IndexedJob) && isIndexedJob(&job) {    jm.recorder.Event(&job, v1.EventTypeWarning, "IndexedJobDisabled""Skipped Indexed Job sync because feature is disabled.")return falsenil  }/*   完成模式不合法,跳过   Job 的 completionMode 只能是以下两种,不是这两种就非法!   NonIndexed(普通模式,默认)   Indexed(索引模式)   */if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode != batch.NonIndexedCompletion &&   *job.Spec.CompletionMode != batch.IndexedCompletion {    jm.recorder.Event(&job, v1.EventTypeWarning, "UnknownCompletionMode""Skipped Job sync because completion mode is unknown")return falsenil  }// 记录监控指标相关  completionMode := string(batch.NonIndexedCompletion)if isIndexedJob(&job) {    completionMode = string(batch.IndexedCompletion)  }  action := metrics.JobSyncActionReconciling// 函数结束记录监控指标defer func() {    result := "success"if rErr != nil {      result = "error"    }// 比如:metrics.JobSyncDurationSeconds.WithLabelValues("NonIndexed", "success",  "reconciling").Observe(time.Since(startTime).Seconds())    metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result, action).Observe(time.Since(startTime).Seconds())    metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()  }()//  5. 处理Pod终止状态跟踪(finalizer机制) var uncounted *uncountedTerminatedPodsif trackingUncountedPods(&job) {if job.Status.UncountedTerminatedPods == nil {      job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}    }    uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)  } else if patch := removeTrackingAnnotationPatch(&job); patch != nil {if err := jm.patchJobHandler(&job, patch); err != nil {return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err)    }  }}

其中处理Pod终止状态跟踪(finalizer机制),Finalizer就是一个 “不让你删,先等我处理完” 的钩子。

旧版Job(有 bug),Pod 跑完马上算成功 / 失败,但是Pod还没删干净!结果导致次数算错、重复统计、状态乱套。新版Job(加了这段代码就解决了),Pod 跑完先别急着算!放进一个临时等待列表,等 Pod 彻底删干净再正式算进成功 / 失败。这个临时等待列表就是UncountedTerminatedPods,就是你看到的这个结构体:

// C:\git\kubernetes-1.22.3\vendor\k8s.io\api\batch\v1\types.gotype UncountedTerminatedPods struct {    Succeeded []UID  // 成功了,但还没正式统计    Failed    []UID  // 失败了,但还没正式统计}

继续追踪syncHandler,也就是syncJob

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.gofunc (jm *Controller) syncJob(key string) (forget bool, rErr error) {//  6. 检查Job预期是否满足 // 预期满足:没有等待创建/删除的Pod,可以执行同步  jobNeedsSync := jm.expectations.SatisfiedExpectations(key)//  7. 获取当前Job管理的所有Pod   pods, err := jm.getPodsForJob(&job, uncounted != nil)if err != nil {return false, err  }}

追踪jm.getPodsForJob

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// getPodsForJob 返回当前 Job 应该管理的所有 Pod 集合// 同时完成:认领 Pod 归属权、设置 OwnerReference、添加跟踪用的 Finalizer(如果开启)// 返回的 Pod 指针来自本地缓存,不是深拷贝对象func (jm *Controller) getPodsForJob(j *batch.Job, withFinalizers bool) ([]*v1.Pod, error) {// 把 Job 的标签选择器转换成标准选择器,用于匹配 Pod  selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)if err != nil {return nil, fmt.Errorf("couldn't convert Job selector: %v", err)  }// 列出命名空间下所有 Pod,包含:// 1. 匹配标签的 Pod// 2. 不匹配标签但归属权属于当前 Job 的 Pod  pods, err := jm.podStore.Pods(j.Namespace).List(labels.Everything())if err != nil {return nil, err  }// 定义认领 Pod 前的安全检查函数:// 认领前重新确认 Job 真实存在,Job没有被删除,防止无效认领  canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {    fresh, err := jm.kubeClient.BatchV1().Jobs(j.Namespace).Get(context.TODO(), j.Name, metav1.GetOptions{})if err != nil {return nil, err    }if fresh.UID != j.UID {return nil, fmt.Errorf("original Job %v/%v is gone: got uid %v, wanted %v", j.Namespace, j.Name, fresh.UID, j.UID)    }return fresh, nil  })// 如果开启了精准计数(withFinalizers=true),添加 Job 跟踪专用 Finalizer// 作用:Pod 删除前必须先让 Job 完成统计,保证计数准确var finalizers []stringif withFinalizers {    finalizers = append(finalizers, batch.JobTrackingFinalizer)  }// 创建 Pod 控制器引用管理器,用于统一处理 Pod 归属权  cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, finalizers...)// 核心动作:ClaimPods 认领 Pod// 1. 把匹配标签的 Pod 收归当前 Job 管理// 2. 添加 OwnerReference// 3. 自动添加 Finalizer(如果配置了)  pods, err = cm.ClaimPods(pods)if err != nil || !withFinalizers {return pods, err  }// 给刚被认领、但还没有 Finalizer 的 Pod 补上跟踪 Finalizer// 确保所有被管理的 Pod 都走精准统计流程for i, p := range pods {    adopted := truefor _, r := range p.OwnerReferences {if r.UID == j.UID {        adopted = falsebreak      }    }if adopted && !hasJobTrackingFinalizer(p) {      pods[i] = p.DeepCopy()      pods[i].Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)    }  }// 返回所有归 Job 管理的 Podreturn pods, err}

追踪controller.RecheckDeletionTimestamp这段代码, 在认领 Pod 之前,再确认一遍 Job 还活着,没有被删除。如果 Job 正在删除则绝对不能认领新 Pod

// C:\git\kubernetes-1.22.3\pkg\controller\controller_ref_manager.go// RecheckDeletionTimestamp 返回一个【检查是否正在删除】的函数// 作用:在认领 Pod 之前,再查一遍 Job 是不是还活着// 如果 Job 正在删除 → 不准认领 Pod,避免出错func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func()error {// 返回一个“检查函数”,这个函数会在 ClaimPods 认领 Pod 时被调用return func()error {// 1. 重新去 apiserver 查最新的 Job 对象(不走缓存)    obj, err := getObject()if err != nil {return fmt.Errorf("can't recheck DeletionTimestamp: %v", err)    }// 2. 检查 Job 是否【正在被删除】(DeletionTimestamp 不为空)if obj.GetDeletionTimestamp() != nil {// 如果 Job 正在删除 → 返回错误,拒绝认领 Podreturn fmt.Errorf("%v/%v has just been deleted at %v"                 obj.GetNamespace(), obj.GetName(), obj.GetDeletionTimestamp())    }// 3. Job 正常存活 → 可以认领 Podreturn nil  }}

追踪NewPodControllerRefManager

// C:\git\kubernetes-1.22.3\pkg\controller\controller_ref_manager.go// NewPodControllerRefManager 创建并返回一个 PodControllerRefManager 实例// 作用:专门帮 Job 管理 Pod 的“归属权”(ControllerRef)//// canAdopt 函数:在第一次认领 Pod 之前,做一次安全检查(比如查Job是否还活着)// 这个函数最多只会调用一次//// finalizers:需要给 Pod 自动加上的 Finalizer 列表(用于精准计数)func NewPodControllerRefManager(  podControl PodControlInterface,       // 操作Pod的工具(创建/更新/删除)  controller metav1.Object,             // 当前的父对象(就是 Job)  selector labels.Selector,             // Job用来匹配Pod的标签选择器  controllerKind schema.GroupVersionKind, // 父对象类型:Job  canAdopt func()error,                // 认领前的安全检查函数(你刚才学的双重检查)  finalizers ...string,                 // 要给Pod加的Finalizer(比如JobTrackingFinalizer)) *PodControllerRefManager {// 创建一个 Pod 归属权管理器,并返回return &PodControllerRefManager{    BaseControllerRefManager: BaseControllerRefManager{      Controller:   controller,  // 谁当爹(Job)      Selector:     selector,   // 怎么找儿子(匹配标签)      CanAdoptFunc: canAdopt,    // Job认儿子Pod前的安检    },    controllerKind: controllerKind, // 爹的类型是Job    podControl:     podControl,     // 用来管理Pod的工具    finalizers:     finalizers,     // 给Pod加的Finalizer  }}

并追踪PodControllerRefManager.AdoptPodPodControllerRefManager.ReleasePod

// C:\git\kubernetes-1.22.3\pkg\controller\controller_ref_manager.go// AdoptPod 认领Pod:给Pod添加OwnerReference和Finalizer,让Pod归当前Job管理// 步骤:// 1. 先做CanAdopt安检(确认Job没被删)// 2. 生成补丁:添加OwnerRef + Finalizer// 3. 调用Patch更新Podfunc (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error {// 认领前安检:确认Job有效、未被删除if err := m.CanAdopt(); err != nil {return fmt.Errorf("can't adopt Pod %v/%v (%v): %v", pod.Namespace, pod.Name, pod.UID, err)  }// 生成补丁:设置当前Job为Pod的控制器,并添加指定的Finalizer  patchBytes, err := ownerRefControllerPatch(m.Controller, m.controllerKind, pod.UID, m.finalizers...)if err != nil {return err  }// 执行Patch:把Pod正式收归Job管理return m.podControl.PatchPod(pod.Namespace, pod.Name, patchBytes)}// ReleasePod 释放Pod:移除Pod的OwnerReference和Finalizer// 让Pod不再归当前Job管理func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error {// 生成补丁:删除归属当前Job的OwnerRef,并移除指定的Finalizer  patchBytes, err := deleteOwnerRefStrategicMergePatch(pod.UID, m.Controller.GetUID(), m.finalizers...)if err != nil {return err  }// 执行Patch更新Pod  err = m.podControl.PatchPod(pod.Namespace, pod.Name, patchBytes)if err != nil {// 如果Pod不存在了,忽略错误if errors.IsNotFound(err) {return nil    }// 如果是无效错误(无OwnerRef或UID不匹配),也忽略if errors.IsInvalid(err) {return nil    }  }return err}

继续追踪cm.ClaimPods(pods)

// C:\git\kubernetes-1.22.3\pkg\controller\controller_ref_manager.go// ClaimPods 尝试为当前 Job 【认领/管理】一组 Pod// 核心工作:// 1. 如果 Pod 标签匹配 → 认领孤儿 Pod(没人管的收归 Job 管理)// 2. 如果 Pod 曾经归 Job 管,但现在标签不匹配 → 释放(不再管理)// 3. 可以传入过滤条件,所有条件满足才会认领 Pod// 返回:当前 Job 真正管理的所有 Podfunc (m *PodControllerRefManager) ClaimPods(pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {var claimed []*v1.Pod   // 最终归 Job 管理的 Pod 列表var errlist []error// 收集过程中的错误// 1. 定义匹配规则:这个 Pod 属于当前 Job 吗?    match := func(obj metav1.Object)bool {    pod := obj.(*v1.Pod)// 第一步:检查 Pod 标签是否匹配 Job 的选择器// 不匹配 → 直接不属于这个 Jobif !m.Selector.Matches(labels.Set(pod.Labels)) {return false    }// 第二步:如果有额外过滤条件,全部满足才算匹配for _, filter := range filters {if !filter(pod) {return false      }    }// 标签匹配 + 过滤条件满足 → 属于当前 Jobreturn true  }// 2. 定义认领动作:把 Pod 收归 Job 管理    adopt := func(obj metav1.Object)error {// 调用 AdoptPod 给 Pod 加上 OwnerRef + Finalizerreturn m.AdoptPod(obj.(*v1.Pod))  }// 3. 定义释放动作:不再管理这个 Pod    release := func(obj metav1.Object)error {// 调用 ReleasePod 移除 Pod 的 OwnerRefreturn m.ReleasePod(obj.(*v1.Pod))  }// 4. 遍历所有 Pod,逐个判断:要不要管?  for _, pod := range pods {// 核心逻辑:// 交给 ClaimObject 判断:// 匹配 → 认领// 不匹配但归我管 → 释放// 最终返回这个 Pod 是否归我管理    ok, err := m.ClaimObject(pod, match, adopt, release)if err != nil {      errlist = append(errlist, err) // 收集错误continue    }// 如果 Pod 归当前 Job 管理 → 加入结果列表if ok {      claimed = append(claimed, pod)    }  }// 返回所有被当前 Job 管理的 Podreturn claimed, utilerrors.NewAggregate(errlist)}

继续追踪m.ClaimObject

// C:\git\kubernetes-1.22.3\pkg\controller\controller_ref_manager.go// ClaimObject 核心作用:裁判一个对象(Pod)归谁管// 逻辑规则:// 1. 如果是别人的 → 不管// 2. 如果是我的,但不匹配 → 释放// 3. 如果是孤儿,且匹配我 → 认领// 返回:是否归我管 + 是否出错func (m *BaseControllerRefManager) ClaimObject(  obj metav1.Object,        // 要判断的Pod  match func(metav1.Object)bool// 判断是否匹配我的标签  adopt func(metav1.Object)error// 认领函数(收为己有)  release func(metav1.Object)error// 释放函数(赶走)) (boolerror) {// 第一步:获取当前Pod的“主人”(ControllerRef=真正的管理者)  controllerRef := metav1.GetControllerOfNoCopy(obj)// 情况1:Pod 已经有主人了  if controllerRef != nil {// A. 主人是别人 → 直接不管,返回falseif controllerRef.UID != m.Controller.GetUID() {return falsenil    }// B. 主人是我,且标签匹配 → 本来就是我的,直接返回trueif match(obj) {return truenil    }// C. 主人是我,但标签不匹配了 → 需要释放它// 如果我正在被删除,不操作if m.Controller.GetDeletionTimestamp() != nil {return falsenil    }// 执行释放:解除Pod和我的关系if err := release(obj); err != nil {if errors.IsNotFound(err) { // Pod不存在了,忽略return falsenil      }return false, err // 释放失败,返回错误    }// 释放成功,现在不归我管return falsenil  }// 情况2:Pod 没有主人(孤儿)  // 如果我正在被删除 或 Pod标签不匹配我 → 不管if m.Controller.GetDeletionTimestamp() != nil || !match(obj) {return falsenil  }// 如果Pod正在被删除 → 不管if obj.GetDeletionTimestamp() != nil {return falsenil  }// 标签匹配 + 无主人 + Job正常运行 → 认领它!if err := adopt(obj); err != nil {if errors.IsNotFound(err) { // Pod没了,忽略return falsenil    }return false, err // 认领失败  }// 认领成功!Pod现在归我管了return truenil}

继续追踪syncHandler,也就是syncJob

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// syncJob 如果Job的期望(预期Pod创建/删除)都已满足,则同步该Job// 同一个key(同一个Job)不会被并发调用func (jm *Controller) syncJob(key string) (forget bool, rErr error) {//  8. 统计Pod状态:运行中、成功、失败 // 从得到的pods中统计activePods,判断标准就是pod不是PodSucceeded也不是PodFailed  activePods := controller.FilterActivePods(pods)  // 过滤出运行中的Pod  active := int32(len(activePods))                 // 运行中Pod数量/*    使用getStatus统计succeeded, failed的pod个数        - succeeded代表pod状态为PodSucceeded        - failed代表pod状态为 PodFailed或者 finalizers不为空时并且pod处于deleted的状态  */  succeeded, failed := getStatus(&job, pods, uncounted) // 成功、失败Pod数量//  9. Job首次启动:设置开始时间 + 设置超时闹钟 // 如果job第一次启动,并且不处于挂起状态,那么设置StartTime,并根据ActiveDeadlineSeconds启动timer。if job.Status.StartTime == nil && !jobSuspended(&job) {    now := metav1.Now()    job.Status.StartTime = &now // 记录启动时间// 如果配置了最大运行时间,加入延时队列,时间到自动检查超时if job.Spec.ActiveDeadlineSeconds != nil {      jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)    }  }}

继续追踪syncHandler,也就是syncJob

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// syncJob 如果Job的期望(预期Pod创建/删除)都已满足,则同步该Job// 同一个key(同一个Job)不会被并发调用func (jm *Controller) syncJob(key string) (forget bool, rErr error) {//  8. 统计Pod状态:运行中、成功、失败   activePods := controller.FilterActivePods(pods)  // 过滤出运行中的Pod  active := int32(len(activePods))                 // 运行中Pod数量  succeeded, failed := getStatus(&job, pods, uncounted) // 成功、失败Pod数量//  9. Job首次启动:设置开始时间 + 设置超时闹钟 if job.Status.StartTime == nil && !jobSuspended(&job) {    now := metav1.Now()    job.Status.StartTime = &now // 记录启动时间// 如果配置了最大运行时间,加入延时队列,时间到自动检查超时if job.Spec.ActiveDeadlineSeconds != nil {      klog.V(4).Infof("Job %s has ActiveDeadlineSeconds will sync after %d seconds",        key, *job.Spec.ActiveDeadlineSeconds)      jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)    }  }var manageJobErr errorvar finishedCondition *batch.JobCondition // Job完成/失败的状态条件//  10. 判断是否有新的失败Pod   jobHasNewFailure := failed > job.Status.Failed// 判断失败次数是否超过BackoffLimit  exceedsBackoffLimit := jobHasNewFailure && (active != *job.Spec.Parallelism) &&  (failed > *job.Spec.BackoffLimit)//  11. 判断Job是否失败 if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {// 失败次数超过限制 → 标记Job失败    finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded""Job has reached the specified backoff limit")  } else if pastActiveDeadline(&job) {// 运行时间超过限制 → 标记Job失败    finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded""Job was active longer than specified deadline")  }// 索引式Job专用:计算已完成的索引var prevSucceededIndexes, succeededIndexes orderedIntervalsif isIndexedJob(&job) {    prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(&job, pods)    succeeded = int32(succeededIndexes.total())  }  suspendCondChanged := false // 暂停状态是否变化//====================================================//  12. 如果Job失败 → 删除所有运行中的Pod //====================================================if finishedCondition != nil {//====================================================//  【重要】 删除所有运行中的Pod //====================================================    deleted, err := jm.deleteActivePods(&job, activePods)if uncounted == nil {// 旧版行为:直接认为全部删除      deleted = active    } else if deleted != active {// 还有Pod在删除中(finalizer),不能标记完成      finishedCondition = nil    }    active -= deleted    // 运行中数量减少    failed += deleted    // 失败数量增加    manageJobErr = err  } else {//  13. Job正常运行 → 调整Pod数量(创建/删除)     manageJobCalled := false// 预期满足 + Job没被删除 → 执行Pod管理if jobNeedsSync && job.DeletionTimestamp == nil {      active, action, manageJobErr = jm.manageJob(&job, activePods, succeeded, succeededIndexes)      manageJobCalled = true    }// 判断Job是否完成    complete := falseif job.Spec.Completions == nil {// 未指定完成数:任意一个Pod成功且无运行中Pod → 完成      complete = succeeded > 0 && active == 0    } else {// 指定完成数:成功数 >= 期望完成数 → 完成      complete = succeeded >= *job.Spec.Completions && active == 0    }// 标记Job完成if complete {      finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, """")    } else if feature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled {// 处理Job暂停/恢复if job.Spec.Suspend != nil && *job.Spec.Suspend {// 标记为已暂停var isUpdated bool        job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended""Job suspended")if isUpdated {          suspendCondChanged = true          jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended""Job suspended")        }      } else {// 标记为已恢复,重置开始时间var isUpdated bool        job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed""Job resumed")if isUpdated {          suspendCondChanged = true          jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed""Job resumed")          now := metav1.Now()          job.Status.StartTime = &now        }      }    }  }//  14. 如果有新的成功Pod → 清空失败重试次数   forget = falseif job.Status.Succeeded < succeeded {    forget = true  }//  15. 使用finalizer跟踪的状态更新逻辑 if uncounted != nil {    needsStatusUpdate := suspendCondChanged || active != job.Status.Active    job.Status.Active = active    err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, finishedCondition, needsStatusUpdate)if err != nil {return false, err    }    jobFinished := IsJobFinished(&job)// 有新失败且未结束 → 返回错误,触发退避重试if jobHasNewFailure && !jobFinished {return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)    }    forget = truereturn forget, manageJobErr  }//  16. 旧版逻辑:清理无用finalizer if err := jm.removeTrackingFinalizersFromAllPods(pods); err != nil {return false, fmt.Errorf("removing disabled finalizers from job pods %s: %w", key, err)  }//  17. 如果状态有变化 → 更新Job状态到apiserver if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || suspendCondChanged || finishedCondition != nil {    job.Status.Active = active    job.Status.Succeeded = succeeded    job.Status.Failed = failedif isIndexedJob(&job) {      job.Status.CompletedIndexes = succeededIndexes.String()    }    job.Status.UncountedTerminatedPods = nil    jm.enactJobFinished(&job, finishedCondition) // 设置完成/失败条件//====================================================// 【重要】 同步状态到apiserver//====================================================if err := jm.updateStatusHandler(&job); err != nil {return forget, err    }// 有新失败 → 返回错误,触发重试if jobHasNewFailure && !IsJobFinished(&job) {return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)    }    forget = true  }//  结束 return forget, manageJobErr}

至此,总结一下,Job 控制器 Worker 执行流程梳理。

整体入口也就是控制器Run逻辑,控制器启动时,会创建workersNum个并发Worker协程,所有Worker共享任务队列,循环取出队列中的Job任务执行核心同步函数syncJob

其中单个Worker执行syncJob完整步骤

  1. 拉取Job资源

通过本地缓存lister根据命名空间 + 名称读取Job对象;若Job不存在(已删除),清理该Job对应的期望记录,告知队列遗忘该任务。

  1. 拉取Job关联的全部Pod

调用jm.getPodsForJob,基于Job的标签选择器筛选归属Pod;过程中完成Pod认领、释放逻辑,按需为Pod添加Job跟踪专用Finalizer,返回当前Job管理的所有Pod列表。

  1. Job前置合法性校验
  • 校验IndexedJob特性门控:未开启特性却创建索引Job,直接终止本次同步;
  • 校验CompletionMode:模式值非法则终止本次同步;
  • Job已经标记完成(Complete/Failed),直接结束本次同步。
  1. 统计Pod运行状态数据

过滤出活跃运行的Pod,统计成功、失败Pod数量;若是IndexedJob,额外统计成功索引区间。

  1. Job终止条件判断

依次校验两类Job失败终止条件,满足任一条件则标记Job为失败:

  • 失败次数超限:失败Pod总数达到job.Spec.BackoffLimit(默认 6 次);
  • 运行超时:Job持续活跃时长超过job.Spec.ActiveDeadlineSeconds设定值。
  1. 失败Job处理逻辑

若判定Job进入Failed失败状态:调用jm.deleteActivePods并发删除所有正在运行的活跃Pod;删除完成后更新Job状态为失败。

  1. 正常Job同步调谐

通过jm.expectations.SatisfiedExpectations(key)得到jobNeedsSync标识,标识为 true 代表需要执行Pod扩缩容调谐。调用jm.manageJob,根据并行度、完成数、成功Pod数量创建新Pod或维持当前Pod数量。

  1. Job完成状态判断
  • 无固定完成数Job:任意Pod成功且无活跃Pod,标记Job完成;
  • 有固定完成数Job:成功Pod数量达到spec.completions且无活跃Pod,标记Job完成。
  1. 更新Job状态至API Server

对比内存中Job状态与上一次同步后的状态:若活跃Pod数、成功 / 失败数、完成索引、暂停状态、完成 / 失败条件发生变更,则发起API请求更新Job资源状态。

  1. 同步结果返回队列

返回forget标识:本次同步出现新成功Pod则置为 true,清空该Job的失败退避等待时间;队列下次处理该Job时无需等待退避时长即可重试。

1.3.6.5、expectations期望机制

继续分析jm.manageJob,会看到用到了期望机制,所以先研究一下期望机制

// // C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// syncJob 如果Job的期望(预期Pod创建/删除)都已满足,则同步该Job// 同一个key(同一个Job)不会被并发调用func (jm *Controller) syncJob(key string) (forget bool, rErr error) {// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in// and update the expectations after we've retrieved active pods from the store. If a new pod enters// the store after we've checked the expectation, the job sync is just deferred till the next relist.  jobNeedsSync := jm.expectations.SatisfiedExpectations(key)//  12. 如果Job失败 → 删除所有运行中的Pod if finishedCondition != nil {  } else {//  13. Job正常运行 → 调整Pod数量(创建/删除)     manageJobCalled := false// 预期满足 + Job没被删除 → 执行Pod管理if jobNeedsSync && job.DeletionTimestamp == nil {      active, action, manageJobErr = jm.manageJob(&job, activePods, succeeded, succeededIndexes)      manageJobCalled = true    }  }//  结束 return forget, manageJobErr}

首先看看jobNeedsSync := jm.expectations.SatisfiedExpectations(key),前面一直没有研究expectations

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// Controller ensures that all Job objects have corresponding pods to// run their configured workload.type Controller struct {// A TTLCache of pod creates/deletes each rc expects to see  expectations controller.ControllerExpectationsInterface}

接下来研究一下 expectations(期望)

//文件路径: \src\kubernetes-1.22.3\pkg\controller\controller_utils.gotype ControllerExpectationsInterface interface {  GetExpectations(controllerKey string) (*ControlleeExpectations, boolerror)  SatisfiedExpectations(controllerKey stringbool  DeleteExpectations(controllerKey string)  SetExpectations(controllerKey string, add, del interror  ExpectCreations(controllerKey string, adds interror  ExpectDeletions(controllerKey string, dels interror  CreationObserved(controllerKey string)  DeletionObserved(controllerKey string)  RaiseExpectations(controllerKey string, add, del int)  LowerExpectations(controllerKey string, add, del int)}type ControllerExpectations struct {  cache.Store}type ControlleeExpectations struct {  add       int64  del       int64  key       string  timestamp time.Time}

各个函数的功能如下:

函数签名
功能详细说明
GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
1. 根据控制器唯一标识 controllerKey(通常为「命名空间 / 资源名」),从缓存中查询对应的期望对象 ControlleeExpectations2. 返回值说明:*ControlleeExpectations:查询到的期望对象(未找到则为 nil);bool:是否存在该期望记录;error:查询过程中的错误(如缓存访问异常)。
SatisfiedExpectations(controllerKey string) bool
期望机制的核心过滤函数,判断是否满足触发控制器同步(sync)的条件,返回 true 表示允许syncfalse 表示跳过:1. 若存在期望对象且计数已完成add≤0 && del≤0)→ 返回 true2. 若存在期望对象但已超时(超过 ExpectationsTimeout)→ 返回 true(强制sync,避免永久等待);3. 若存在期望对象且未完成 / 未超时 → 返回 false(跳过无效sync);4. 若不存在期望对象 / 查询出错 → 返回 true(兜底触发sync)。
DeleteExpectations(controllerKey string)
从缓存中删除指定控制器的期望记录,适用于控制器资源(如 Job/Pod)完成生命周期后清理缓存,避免内存泄漏。
SetExpectations(controllerKey string, add, del int) error
为指定控制器设置初始的期望计数1. add:预期要观测的资源创建事件数量;2. del:预期要观测的资源删除事件数量;3. 会创建新的 ControlleeExpectations 对象并写入缓存,若已有记录则直接覆盖;4. 返回值为缓存写入过程中的错误(如对象类型不匹配)。
ExpectCreations(controllerKey string, adds int) error专门用于设置创建事件的期望
,是 SetExpectations 的封装方法,内部调用 SetExpectations(controllerKey, adds, 0),仅指定创建事件数量,删除事件数量固定为 0。
ExpectDeletions(controllerKey string, dels int) error专门用于设置删除事件的期望
,是 SetExpectations 的封装方法,内部调用 SetExpectations(controllerKey, 0, dels),仅指定删除事件数量,创建事件数量固定为 0。
CreationObserved(controllerKey string)
当控制器观测到一个资源创建事件时调用,是 LowerExpectations 的封装方法,内部调用 LowerExpectations(controllerKey, 1, 0)将创建期望计数(add)原子性减 1
DeletionObserved(controllerKey string)
当控制器观测到一个资源删除事件时调用,是 LowerExpectations 的封装方法,内部调用 LowerExpectations(controllerKey, 0, 1)将删除期望计数(del)原子性减 1
RaiseExpectations(controllerKey string, add, del int)
这个函数在我看这个版本的k8s代码中是没有调用
提升期望计数
(增加待观测的事件数量):1. 从缓存中查询指定控制器的期望对象;2. 对 add(创建计数)和 del(删除计数)执行原子性加操作3. 适用于控制器需要追加创建 / 删除资源的场景(如Job扩容Pod数量)。
LowerExpectations(controllerKey string, add, del int)降低期望计数
(减少待观测的事件数量),是 CreationObserved/DeletionObserved 的底层实现:1. 从缓存中查询指定控制器的期望对象;2. 对 add 和 del 执行原子性减操作3. 可手动指定减的数量(如一次性观测到多个事件时批量递减)。

expectations(期望)的作用就是用来减少不必要的sync,尤其是其中的SatisfiedExpectations方法,接下来举个例子直观如何感受减少的无效sync,以 Job 控制器创建 5 个 Pod 为例,对比有无期望机制的 sync 次数:

场景 1:无期望机制(每次事件都触发 sync

控制器决定创建 5 个 Pod,发送 5 个创建请求到APIServer

APIServer 异步创建 Pod,依次触发 5 次「Pod 创建事件」;

控制器收到每个事件都触发一次 sync,总共执行 5 次 sync

但这 5 次 sync 的逻辑几乎完全重复(都是检查 “5 个 Pod 是否创建完成”),只有最后一次 sync 是有效的,前 4 次都是无效的。

场景 2:有期望机制(SatisfiedExpectations 过滤无效 sync

控制器决定创建 5 个 Pod,调用 ExpectCreations 设置 add=5(期望计数);

APIServer 异步创建 Pod,依次触发 5 次「Pod 创建事件」,每次事件调用 CreationObserved 把 add 从 5→4→3→2→1→0;

每次事件触发后,控制器调用 SatisfiedExpectations 校验:

  • 前 4 次事件后,add=4/3/2/1(未完成)→ 返回 false,跳过 sync

  • 第 5 次事件后,add=0(已完成)→ 返回true,执行 1 次sync

    最终只执行 1 次有效sync,前 4 次无效sync被完全过滤。

额外的超时兜底

如果其中 1 个 Pod 创建卡住了(add=1 一直没减到 0),超过 ExpectationsTimeout(5 分钟)后,SatisfiedExpectations 会因为 exp.isExpired() 返回 true,强制触发一次 sync,避免控制器永久等待。

所以,SatisfiedExpectations 减少无效 sync 的核心逻辑

  1. 设置期望:控制器先声明 “要等N个事件发生”;
  2. 事件递减:每发生一个事件,期望计数减 1;
  3. 校验过滤SatisfiedExpectations 只在计数为 0 / 超时后才允许 sync,否则直接返回 false 跳过;
  4. 最终效果:把 “每个事件触发一次 sync” 变成 “所有事件完成后触发一次 sync”,大幅减少重复、无效的 sync 操作,降低控制器和APIServer 的负载。

简单说,SatisfiedExpectations 就是控制器的 “耐心等待开关”:没等齐事件就不干活,等齐了再一次性干,避免瞎忙活。

最关键的设计思想(Kubernetes 通用模式),Kubernetes 所有控制器(Deployment/Job/StatefulSet)都用 Expectations 模式Expectations 机制完整梳理如下:

一、机制定位

K8s 控制器除了 Informer 维护的资源本地缓存外,还维护一套独立本地缓存:expectations(期望管理器),是控制器用来管控 Pod 增删动作的自校验计数器。

二、核心存储内容

针对每一个 Job(通过 key 区分),expectations 会记录两组数值:

  1. 待创建 Pod 计数:控制器发起创建请求后,记录预期收到多少条 Pod 创建事件;
  2. 待删除 Pod 计数:控制器发起删除请求后,记录预期收到多少条 Pod 删除事件。

三、校验规则

调用 SatisfiedExpectations 判断同步权限,逻辑如下:

  1. 待创建、待删除计数全部等于 0:代表上一轮同步发起的所有 Pod 增删操作,对应的 Pod 事件已经全部被 Informer 缓存接收,期望条件达成;
  2. 任意一组计数大于 0:代表还有预期的 Pod 新增 / 删除事件未同步到本地缓存,期望未满足。

四、未满足期望的处理逻辑

如果期望未达成,控制器会直接跳过本轮的 Pod 扩缩容逻辑,不执行 manageJob 创建或删除 Pod

  1. 原因:上一轮同步发起的 Pod API 请求还未全部落地、对应的 Pod 事件还没同步进本地缓存;此时再次计算 Pod 数量并发起操作,会出现数据错乱、重复创建 Pod 等问题;
  2. 兜底策略:若长时间未收到对应 Pod 事件(API 请求异常、事件丢失等),expectations 会自动过期,下一轮同步允许重新执行 Pod 管控逻辑。

五、机制核心目的

通过 expectations 拦截无效同步循环,减少不必要的 Job 同步、重复调用 APIServer 创建 / 删除 Pod,缓解 API 服务压力,同时规避缓存异步带来的竞态问题,保证 Pod 数量计算准确。

1.3.6.5、jm.manageJob

追踪jm.manageJob

// C:\git\kubernetes-1.22.3\pkg\controller\job\job_controller.go// manageJob 是Job控制器的核心方法,负责根据job.Spec中的定义管理运行中的Pod数量// 核心职责:计算期望Pod数 vs 当前活跃Pod数,执行创建/删除操作,让实际状态逼近期望状态// 入参://   job: 目标Job对象//   activePods: 当前属于该Job的所有活跃Pod列表//   succeeded: 已成功完成的Pod数量//   succeededIndexes: Indexed Job模式下已完成的索引区间// 返回值://   int32: 更新后的活跃Pod数量//   string: 当前同步执行的操作类型(创建Pod/删除Pod/仅追踪)//   error: 执行过程中的错误// 注意:该函数不会修改入参 activePodsfunc (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval) (int32stringerror) {// 当前活跃Pod数量  active := int32(len(activePods))// Job的并行度限制  parallelism := *job.Spec.Parallelism// 生成Job唯一标识key  jobKey, err := controller.KeyFunc(job)if err != nil {return 0, metrics.JobSyncActionTracking, nil  }// 如果Job处于暂停状态:删除所有活跃Podif jobSuspended(job) {// 获取需要删除的Pod列表(全部)    podsToDelete := activePodsForRemoval(job, activePods, int(active))// 向Expectations登记:预期删除N个Pod    jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))// 执行删除Pod操作    removed, err := jm.deleteJobPods(job, jobKey, podsToDelete)    active -= removedreturn active, metrics.JobSyncActionPodsDeleted, err  }// ====================== 核心逻辑:计算需要多少个活跃Pod ======================var wantActive int32 = 0if job.Spec.Completions == nil {// 场景1:未指定completions(只要有1个Pod成功,Job就完成)// 规则:只要还没成功,就保持并行度数量的Pod运行;一旦成功,保留现有Pod不再新建if succeeded > 0 {      wantActive = active    } else {      wantActive = parallelism    }  } else {// 场景2:指定了completions(需要固定数量的成功Pod)// 期望活跃数 = 需要完成的总数 - 已成功数    wantActive = *job.Spec.Completions - succeeded// 期望活跃数不能超过并行度上限if wantActive > parallelism {      wantActive = parallelism    }// 期望活跃数不能小于0if wantActive < 0 {      wantActive = 0    }  }// ====================== 步骤1:Pod过多 -> 删除多余Pod ======================// 需要删除的Pod数量 = 当前活跃数 - 期望活跃数  rmAtLeast := active - wantActiveif rmAtLeast < 0 {    rmAtLeast = 0  }// 筛选出需要删除的Pod  podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast))// 限制单次同步最大删除量,防止API压力过大if len(podsToDelete) > maxPodCreateDeletePerSync {    podsToDelete = podsToDelete[:maxPodCreateDeletePerSync]  }// 执行删除逻辑if len(podsToDelete) > 0 {// 登记期望删除的Pod数量    jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))// 批量删除Pod    removed, err := jm.deleteJobPods(job, jobKey, podsToDelete)    active -= removed// 单次同步优先执行删除,删除完成后直接返回,不执行创建return active, metrics.JobSyncActionPodsDeleted, err  }// ====================== 步骤2:Pod不足 -> 创建缺失Pod ======================if active < wantActive {// 需要创建的Pod数量    diff := wantActive - active// 限制单次同步最大创建量if diff > int32(maxPodCreateDeletePerSync) {      diff = int32(maxPodCreateDeletePerSync)    }// 向Expectations登记:预期创建diff个Pod    jm.expectations.ExpectCreations(jobKey, int(diff))    errCh := make(chan error, diff)    wait := sync.WaitGroup{}// Indexed Job 模式:计算需要创建哪些索引的Podvar indexesToAdd []intif isIndexedJob(job) {      indexesToAdd = firstPendingIndexes(activePods, succeededIndexes, int(diff), int(*job.Spec.Completions))      diff = int32(len(indexesToAdd))    }// 预增加活跃数(假定创建成功)    active += diff// 克隆Pod模板,避免修改缓存对象    podTemplate := job.Spec.Template.DeepCopy()// Indexed Job:注入索引相关环境变量if isIndexedJob(job) {      addCompletionIndexEnvVariables(podTemplate)    }// 如果开启Finalizer追踪:给Pod添加跟踪终结器if trackingUncountedPods(job) {      podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)    }// ====================== 慢启动批量创建Pod ======================// 慢启动策略:初始批次小,逐步翻倍,防止大规模创建失败导致API风暴// 例如:1 -> 2 -> 4 -> 8... 直到达到需要创建的数量for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {      errorCount := len(errCh)      wait.Add(int(batchSize))// 并发创建当前批次的Podfor i := int32(0); i < batchSize; i++ {        completionIndex := unknownCompletionIndex// 分配索引(仅Indexed Job)if len(indexesToAdd) > 0 {          completionIndex = indexesToAdd[0]          indexesToAdd = indexesToAdd[1:]        }go func() {          template := podTemplate// Indexed Job:为每个Pod设置索引注解和主机名if completionIndex != unknownCompletionIndex {            template = podTemplate.DeepCopy()            addCompletionIndexAnnotation(template, completionIndex)            template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex)          }defer wait.Done()// 生成Pod名称          generateName := podGenerateNameWithIndex(job.Name, completionIndex)// 调用API创建Pod          err := jm.podControl.CreatePodsWithGenerateName(job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName)if err != nil {// 命名空间正在删除,忽略错误if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {return            }          }// 创建失败处理if err != nil {defer utilruntime.HandleError(err)// 创建失败:立即抵消Expectations中的期望(因为不会收到Pod创建事件)            jm.expectations.CreationObserved(jobKey)            atomic.AddInt32(&active, -1)            errCh <- err          }        }()      }      wait.Wait()// 慢启动失败处理:如果本批次有失败,跳过剩余Pod创建,下次同步重试      skippedPods := diff - batchSizeif errorCount < len(errCh) && skippedPods > 0 {        active -= skippedPodsfor i := int32(0); i < skippedPods; i++ {// 跳过的Pod:抵消对应的期望          jm.expectations.CreationObserved(jobKey)        }break      }      diff -= batchSize    }return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)  }// 无需创建/删除,仅追踪状态return active, metrics.JobSyncActionTracking, nil}

上面的代码很复杂,简单的说就是如下逻辑:

这个函数到底在干嘛?一句话,算清楚 Job 现在应该有几个 Pod,多了删、少了建、正好就不动。

核心流程如下:

  1. 先处理暂停 Job → 全删 Pod
  2. 计算期望 Pod 数(根据 parallelism / completions
  3. Pod 过多 ,那么就删多余的Pod
  4. Pod 过少 ,那么就创建Pod
  5. 创建用慢启动 → 防止批量创建对系统造成压力,防止因为配置错误 / 配额不足,一下子创建成百上千个 Pod全部失败,打爆 APIServer

上面代码中对Expectations 的使用

  • 删 Pod 前:ExpectDeletions 登记预期删除数

  • 建 Pod 前:ExpectCreations 登记预期创建数

  • 创建失败:立刻用 CreationObserved 抵消期望。CreationObserved取消一个 “预期会创建但永远不会来的Pod”,目的是不让Expectations 卡住,不让 Job 卡死。比如如下场景:

    现在:

    导致 Pod 永远不会诞生了,但 Expectations 里还记着,待创建 Pod = 1

    如果不修改它,控制器会永远认为:还有 1 个 Pod 即将创建,一直等待,再也不允许新建 Pod

    Job 直接卡死!

    • 控制器登记了 期望创建 1 个 Pod
    • 结果调用 API 报错了(权限不足、资源不够…)

至此,Job控制器的原理和源码分析完成。

1.4、CronJob控制器

1.4.1、CronJob 核心概念

K8s 中JobCronJobPod 三者存在层级嵌套关系,核心逻辑如下:

  • Pod:最小执行单元,负责真正运行容器任务
  • Job:任务执行单元,一个 Job 会管理一个或多个 Pod,保证任务单次执行完成、重试、结束等逻辑
  • CronJob:定时任务调度单元,基于 Linux Crontab 时间规则,周期性自动创建Job 对象,实现任务定时重复执行

简单总结:CronJob 定时生成 JobJob 调度运行 PodCronJob 核心能力是时间调度,语法与 Linux crontab 完全兼容,可通过标准 Cron 表达式自定义任务执行周期。

1.4.2、CronJob 举例

apiVersion: batch/v1beta1kind: CronJobmetadata:name: pispec:# 调度规则:每分钟执行一次schedule: "*/1 * * * *"# 任务模板:所有周期性生成的 Job 均复用该配置jobTemplate:spec:# 任务需要完成2次成功执行completions: 2# 并行执行数:同一时间仅1个Pod运行parallelism: 1template:spec:containers:name: piimage: perl# 执行命令:计算20位圆周率command: ["perl",  "-Mbignum=bpi""-wle""print bpi(20)"]# 仅失败时重启PodrestartPolicy: OnFailure

配置核心参数说明

  • schedule:核心定时规则,遵循 Linux Crontab 五段式语法(分 时 日 月 周),本配置 */1 * * * * 代表每分钟触发一次任务
  • completions: 2:单个 Job 需要成功完成2次任务执行才算任务结束
  • parallelism: 1:单个 Job 同一时刻只能有1个 Pod 运行,串行执行任务
  • restartPolicy: OnFailurePod 异常失败时重启,正常执行完成则不重启

1.4.3、CronJob 源码分析

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\controllermanager.gofunc NewControllerInitializers(loopMode ControllerLoopMode)map[string]InitFunc {  controllers["cronjob"] = startCronJobControllerreturn controllers}

追踪startCronJobController

// C:\git\kubernetes-1.22.3\cmd\kube-controller-manager\app\batch.go/*如果不想使用 CronJobV2 的控制器,而想要换回原始的控制器时,那你需要显式的将它禁用掉--feature-gates="CronJobControllerV2=false"*/func startCronJobController(ctx ControllerContext) (http.Handler, boolerror) {if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CronJobControllerV2) {    cj2c, err := cronjob.NewControllerV2(ctx.InformerFactory.Batch().V1().Jobs(),      ctx.InformerFactory.Batch().V1().CronJobs(),      ctx.ClientBuilder.ClientOrDie("cronjob-controller"),    )if err != nil {return niltrue, fmt.Errorf("error creating CronJob controller V2: %v", err)    }go cj2c.Run(int(ctx.ComponentConfig.CronJobController.ConcurrentCronJobSyncs), ctx.Stop)return niltruenil  }  cjc, err := cronjob.NewController(    ctx.ClientBuilder.ClientOrDie("cronjob-controller"),  )if err != nil {return niltrue, fmt.Errorf("error creating CronJob controller: %v", err)  }go cjc.Run(ctx.Stop)return niltruenil}
1.4.3.1、 CronJob 控制器 V1 与 V2 核心区别

K8s 提供两种版本的CronJob控制器,均基于控制器模式实现,但底层调度、数据获取、运行机制差异极大,具体区别如下:

CronJobControllerV1 实现特点

  • 数据获取方式特殊:不同于 K8s 常规控制器,不依赖Informer监听资源变动通知,而是直接主动访问APIServer获取数据。
  • 轮询调度机制:固定每 10 秒向APIServer拉取JobCronJob资源信息,校验任务调度规则,判断是否需要触发新任务、创建新资源。
  • 无缓存与队列机制:架构简单,没有缓存、工作队列相关能力,纯定时轮询执行调度逻辑。

CronJobControllerV2 实现特点

V2 控制器优化了 V1 的低效轮询模式,对齐Job控制器主流设计,采用初始化控制器对象 + 执行Run方法的两步运行逻辑,整体性能和时效性大幅提升。

1、控制器初始化阶段

初始化JobCronJob成对的关联资源对象,补齐 V1 缺失的核心能力:

  • 新增延迟队列、工作队列(workqueue),替代原生轮询机制;
  • 通过JobListerCronJobLister实现资源本地缓存,无需频繁请求APIServer
  • 通过JobControlCronJobControl提供资源更新操作接口;
  • 注册JobCronJob资源事件回调,基于Informer监听资源变动。

2、 Run运行阶段

启动指定数量的worker工作协程,持续执行资源同步逻辑:获取CronJob及其关联的Job资源,完成任务状态、调度规则的协调与同步,实现精准定时调度。

接下来只追踪CronJob V2的实现

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.go// ControllerV2 is a controller for CronJobs.// Refactored Cronjob controller that uses DelayingQueue and informerstype ControllerV2 struct {  queue    workqueue.RateLimitingInterface  recorder record.EventRecorder  jobControl     jobControlInterface  cronJobControl cjControlInterface  jobLister     batchv1listers.JobLister  cronJobLister batchv1listers.CronJobLister  jobListerSynced     cache.InformerSynced  cronJobListerSynced cache.InformerSynced// now is a function that returns current time, done to facilitate unit tests  now func() time.Time}

追踪NewControllerV2

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.go// NewControllerV2 创建并初始化一个 CronJob V2 控制器实例// 入参:Job监听器、CronJob监听器、K8s客户端// 返回值:初始化完成的V2控制器 / 错误信息func NewControllerV2(jobInformer batchv1informers.JobInformer, cronJobsInformer batchv1informers.CronJobInformer, kubeClient clientset.Interface) (*ControllerV2, error) {//  1. 初始化事件广播器 // 创建事件广播器,用于给CronJob/Job资源发送K8s事件(可通过kubectl get events查看)  eventBroadcaster := record.NewBroadcaster()// 开启结构化日志输出  eventBroadcaster.StartStructuredLogging(0)// 将事件写入APIServer的Event资源中  eventBroadcaster.StartRecordingToSink(&covev1client.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})//  2. 注册限流指标 // 如果K8s客户端存在且启用了限流,则注册限流监控指标if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {return nil, err    }  }//  3. 初始化ControllerV2核心结构体   jm := &ControllerV2{// 创建带限流的工作队列,是V2替代轮询的核心组件    queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),// 创建事件记录器,用于记录控制器产生的事件    recorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cronjob-controller"}),// Job资源操作控制器,用于创建/删除Job    jobControl: realJobControl{KubeClient: kubeClient},// CronJob资源操作控制器    cronJobControl: &realCJControl{KubeClient: kubeClient},// Job本地缓存读取器,无需直接请求APIServer    jobLister: jobInformer.Lister(),// CronJob本地缓存读取器    cronJobLister: cronJobsInformer.Lister(),// 判断Job缓存是否已完成同步    jobListerSynced: jobInformer.Informer().HasSynced,// 判断CronJob缓存是否已完成同步    cronJobListerSynced: cronJobsInformer.Informer().HasSynced,// 时间函数,方便单元测试替换    now: time.Now,  }//  4. 注册Job资源的事件监听回调 // 监听Job的增删改事件,触发对应处理函数  jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{    AddFunc:    jm.addJob,    // Job创建时触发    UpdateFunc: jm.updateJob, // Job更新时触发    DeleteFunc: jm.deleteJob, // Job删除时触发  })//  5. 注册CronJob资源的事件监听回调 // 监听CronJob的增删改事件,事件驱动核心逻辑  cronJobsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{// CronJob创建:加入工作队列    AddFunc: func(obj interface{}) {      jm.enqueueController(obj)    },// CronJob更新:执行更新逻辑    UpdateFunc: jm.updateCronJob,// CronJob删除:加入工作队列    DeleteFunc: func(obj interface{}) {      jm.enqueueController(obj)    },  })//  6. 注册监控指标 // 注册控制器监控指标,供Prometheus采集  metrics.Register()// 返回初始化完成的控制器return jm, nil}

有了之前的ReplicaSetJob两个控制的学习,看上面的代码就容易理解了,接下来继续追踪Job的回调和

1.4.3.2、CronJob的回调

追踪AddFunc

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.go// addJob 当Job资源被创建时触发的事件处理函数// 核心作用:找到创建该Job的所属CronJob,将其加入工作队列进行调度同步func (jm *ControllerV2) addJob(obj interface{}) {// 将空接口类型断言为K8s Job对象  job := obj.(*batchv1.Job)// 判断Job是否处于删除中(DeletionTimestamp不为空表示正在删除)if job.DeletionTimestamp != nil {// 控制器重启场景下,可能出现刚监听到的Job就处于删除状态// 这种情况不按新增处理,直接按删除逻辑处理    jm.deleteJob(job)return  }// 获取当前Job的 Owner 控制器引用(即:哪个CronJob创建了这个Job)if controllerRef := metav1.GetControllerOf(job); controllerRef != nil {// 根据命名空间+控制器引用,查找到对应的父CronJob资源    cronJob := jm.resolveControllerRef(job.Namespace, controllerRef)// 找不到所属CronJob,直接返回if cronJob == nil {return    }// 找到父CronJob,将其加入工作队列,触发调度逻辑    jm.enqueueController(cronJob)return  }}

追踪UpdateFunc

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.go// updateJob 当Job资源发生更新时触发的事件处理函数// 作用:识别Job所属的CronJob,将关联的CronJob加入队列进行状态同步// 特别处理:如果Job的归属CronJob发生变化,需要同步旧的和新的父CronJob// old: 更新前的Job对象  cur: 更新后的Job对象func (jm *ControllerV2) updateJob(old, cur interface{}) {// 将参数类型断言为标准的Job对象  curJob := cur.(*batchv1.Job)  oldJob := old.(*batchv1.Job)// 如果新旧Job的资源版本一致,说明是无效/重复的更新事件(如周期性resync)// 资源版本(ResourceVersion)是K8s用来标识资源版本的唯一标识,变化才代表真更新if curJob.ResourceVersion == oldJob.ResourceVersion {// 周期性同步会发送所有已知Job的更新事件,版本相同说明无实际变化,直接返回// 同一个Job的两个不同版本,资源版本一定不同return  }// 获取更新前后Job的控制器引用(即所属的CronJob)  curControllerRef := metav1.GetControllerOf(curJob)  oldControllerRef := metav1.GetControllerOf(oldJob)// 判断控制器引用是否发生变化  controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)// 如果控制器引用发生了变化,且旧的控制器引用不为空if controllerRefChanged && oldControllerRef != nil {// 说明Job的归属CronJob变了,需要把旧的CronJob也加入队列同步状态if cronJob := jm.resolveControllerRef(oldJob.Namespace, oldControllerRef); cronJob != nil {      jm.enqueueController(cronJob)    }  }// 处理当前最新的控制器引用(核心逻辑)// 如果当前Job有归属的CronJob,就只需要处理这个父CronJobif curControllerRef != nil {// 根据控制器引用找到对应的CronJob    cronJob := jm.resolveControllerRef(curJob.Namespace, curControllerRef)if cronJob == nil {return    }// 将归属的CronJob加入工作队列,触发状态同步    jm.enqueueController(cronJob)return  }}

继续追踪DeleteFunc

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.go// deleteJob 当Job资源被删除时触发的事件处理函数// 核心作用:处理Job删除事件,找到它所属的父CronJob并加入队列同步状态func (jm *ControllerV2) deleteJob(obj interface{}) {// 1. 尝试将传入的对象转换为 Job 类型  job, ok := obj.(*batchv1.Job)// 2. 如果不能直接转成 Job,处理“墓碑”对象(删除缓存机制)// 当删除事件丢失时,重新列举会发现存储中的Job不在列表里,// 从而生成一个包含已删除键值的墓碑(tombstone)对象,该对象数据可能是旧的if !ok {// 尝试转换为 DeletedFinalStateUnknown(K8s 墓碑类型)    tombstone, ok := obj.(cache.DeletedFinalStateUnknown)if !ok {// 既不是Job也不是墓碑,记录错误并返回      utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))return    }// 从墓碑中取出原始对象,尝试转为Job    job, ok = tombstone.Obj.(*batchv1.Job)if !ok {      utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))return    }  }// 3. 获取该Job的控制器引用(即:属于哪个CronJob)  controllerRef := metav1.GetControllerOf(job)if controllerRef == nil {// 如果没有归属控制器(孤儿Job),无需处理return  }// 4. 根据控制器引用找到对应的父CronJob  cronJob := jm.resolveControllerRef(job.Namespace, controllerRef)if cronJob == nil {return  }// 5. 将对应的CronJob加入工作队列,触发状态同步  jm.enqueueController(cronJob)}

继续追踪enqueueController,其实就是加入到ControllerV2.queue

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.gofunc (jm *ControllerV2) enqueueController(obj interface{}) {  key, err := controller.KeyFunc(obj)if err != nil {    utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))return  }  jm.queue.Add(key)}
1.4.3.3、Job的回调

追踪AddFuncDeleteFunc,它们的实现也是enqueueController

追踪UpdateFunc

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.go// updateCronJob 处理 CronJob 资源的更新事件// 核心逻辑:// 1. 如果 CronJob 的 调度时间表(spec.schedule) 发生变化 → 按新时间表重新计算下次执行时间// 2. 如果是其他字段变化 → 立即重新入队处理func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) {// 将新旧对象转换为 CronJob 类型  oldCJ, okOld := old.(*batchv1.CronJob)  newCJ, okNew := curr.(*batchv1.CronJob)// 类型转换失败,直接返回(非法对象不处理)if !okOld || !okNew {return  }//  核心判断:调度规则是否改变 // 如果 CronJob 的 schedule(定时规则)发生了变更if oldCJ.Spec.Schedule != newCJ.Spec.Schedule {// 尝试解析新的 cron 表达式    sched, err := cron.ParseStandard(newCJ.Spec.Schedule)if err != nil {// 解析失败:说明用户写的 cron 表达式不合法// 打印日志 + 发送 Warning 事件,不再处理该 CronJob 直到下次更新      jm.recorder.Eventf(newCJ, corev1.EventTypeWarning, "UnParseableCronJobSchedule""unparseable schedule for cronjob: %s", newCJ.Spec.Schedule)return    }// 解析成功:计算【下次调度时间】距离现在还有多久    now := jm.now()    t := nextScheduledTimeDuration(sched, now)// 将 CronJob **延迟入队**(等到下次执行时间再处理)    jm.enqueueControllerAfter(curr, *t)return  }//  非 schedule 字段更新 // 如果只是其他字段(如并发策略、保留数、Job 模板等)发生变化// 立即将 CronJob 加入工作队列,马上同步状态  jm.enqueueController(curr)}
1.4.3.4、Run控制器的运行
// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.gofunc (jm *ControllerV2) Run(workers int, stopCh <-chan struct{}) {// 捕获并处理协程崩溃,防止整个控制器挂掉defer utilruntime.HandleCrash()// 退出前关闭工作队列,保证资源安全释放defer jm.queue.ShutDown()  klog.InfoS("Starting cronjob controller v2")defer klog.InfoS("Shutting down cronjob controller v2")//  核心步骤1:等待本地缓存同步完成 // 等待 Job 和 CronJob 的 Informer 本地缓存与 APIServer 完全同步// 不同步完成,绝不开始工作,避免读取脏数据if !cache.WaitForNamedCacheSync("cronjob", stopCh, jm.jobListerSynced, jm.cronJobListerSynced) {return  }//  核心步骤2:启动指定数量的 worker 工作协程 // 循环启动 N 个 worker 协程,持续从队列取任务执行 sync 同步逻辑// wait.Until:每隔 1s 尝试执行 worker,直到收到停止信号for i := 0; i < workers; i++ {go wait.Until(jm.worker, time.Second, stopCh)  }//  核心步骤3:阻塞等待停止信号 // 一直阻塞在这里,直到收到外部的停止指令(如程序退出、Pod 删除)  <-stopCh}

继续追踪jm.worker

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.gofunc (jm *ControllerV2) worker() {for jm.processNextWorkItem() {  }}

继续追踪jm.processNextWorkItem()

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.go// processNextWorkItem 处理工作队列中的下一个任务// 作用:从队列取出任务,执行同步逻辑,并根据结果决定:重试 / 延迟入队 / 结束// 返回值:true-继续处理下一个任务,false-队列已关闭,退出循环func (jm *ControllerV2) processNextWorkItem() bool {// 1. 从工作队列中获取一个任务 key// quit 为 true 表示队列已经被关闭,不再处理  key, quit := jm.queue.Get()if quit {return false  }// 2. 确保任务处理完成后,通知队列该任务已处理完毕// Done 必须调用,否则队列会认为这个任务一直未处理完defer jm.queue.Done(key)// 3. 执行核心同步逻辑(真正的 CronJob 调度、创建 Job、更新状态)// 返回值:requeueAfter - 延迟多久重新入队;err - 执行是否出错  requeueAfter, err := jm.sync(key.(string))// 4. 根据同步结果,处理队列任务switch {// 情况1:同步出错 → 进行限流重试(防止频繁重试压垮 APIServer)case err != nil:    utilruntime.HandleError(fmt.Errorf("error syncing CronJobController %v, requeuing: %v", key.(string), err))    jm.queue.AddRateLimited(key)// 情况2:需要延迟调度 → 忘记旧任务,按指定时间重新入队// 用于 CronJob 下次定时执行case requeueAfter != nil:    jm.queue.Forget(key)       // 标记该任务处理成功,清除重试计数    jm.queue.AddAfter(key, *requeueAfter) // 延迟指定时间后再次入队  }// 返回 true,继续处理队列中的下一个任务return true}

追踪核心jm.sync

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.go// sync 是 CronJob V2 控制器的核心同步函数// 作用:根据队列中的 CronJob 键,执行一次完整的调度过期逻辑:// 1. 获取 CronJob 对象// 2. 查找关联的 Job// 3. 检查是否需要创建新 Job// 4. 清理旧 Job// 5. 返回下次调度时间(用于延迟入队)// 返回值:requeueAfter - 下次调度间隔;err - 执行错误func (jm *ControllerV2) sync(cronJobKey string) (*time.Duration, error) {// 1. 解析 CronJob 的唯一 key,拆分为 命名空间(ns) + 名称(name)  ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey)if err != nil {return nil, err  }// 2. 从【本地缓存】中获取 CronJob 对象(不访问 APIServer)  cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name)switch {// 情况A:CronJob 不存在(已被删除)case errors.IsNotFound(err):// 资源已删除,无需重新入队,直接结束    klog.V(4).InfoS("CronJob not found, may be it is deleted""cronjob", klog.KRef(ns, name), "err", err)return nilnil// 情况B:其他错误(如 APIServer 异常),返回错误并重试case err != nil:return nil, err  }// 3. 获取该 CronJob 【所有关联的 Job】(子 Job)  jobsToBeReconciled, err := jm.getJobsToBeReconciled(cronJob)if err != nil {return nil, err  }// ==================== 核心调度逻辑 ====================// 4. 执行 CronJob 同步://    - 检查是否到达调度时间//    - 创建新 Job//    - 处理并发策略、历史记录限制//    - 计算【下次执行时间】requeueAfter  cronJobCopy, requeueAfter, err := jm.syncCronJob(cronJob, jobsToBeReconciled)if err != nil {    klog.V(2).InfoS("Error reconciling cronjob""cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err)return nil, err  }// 5. 清理已完成/失败的旧 Job(根据 historyLimit 配置)  err = jm.cleanupFinishedJobs(cronJobCopy, jobsToBeReconciled)if err != nil {    klog.V(2).InfoS("Error cleaning up jobs""cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.GetResourceVersion(), "err", err)return nil, err  }// 6. 如果有下次调度时间,返回给队列做延迟入队if requeueAfter != nil {    klog.V(4).InfoS("Re-queuing cronjob""cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "requeueAfter", requeueAfter)return requeueAfter, nil  }// 7. 无下次调度时间(任务被暂停、schedule 非法)//    不再自动入队,直到手动更新 CronJobreturn nilnil}

追踪其中的getJobsToBeReconciled

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.go// getJobsToBeReconciled 获取需要与当前 CronJob 协同处理的所有 Job 列表func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1.CronJob) ([]*batchv1.Job, error) {// 1. 定义 Job 标签选择器(用于筛选 Job)var jobSelector labels.Selector// 如果 CronJob 的 Job 模板没有定义标签 → 匹配所有 Jobif len(cronJob.Spec.JobTemplate.Labels) == 0 {    jobSelector = labels.Everything()  } else {// 否则 → 根据 CronJob 定义的标签来筛选匹配的 Job    jobSelector = labels.Set(cronJob.Spec.JobTemplate.Labels).AsSelector()  }// 2. 从本地缓存(jobLister)中,列出当前命名空间下【匹配标签】的所有 Job  jobList, err := jm.jobLister.Jobs(cronJob.Namespace).List(jobSelector)if err != nil {return nil, err  }// 3. 定义最终要返回的、属于当前 CronJob 的 Job 列表  jobsToBeReconciled := []*batchv1.Job{}// 4. 遍历所有标签匹配的 Job,进一步通过【控制器引用】确认归属关系for _, job := range jobList {// 获取 Job 的 Owner 控制器引用(关键:判断是谁创建的 Job)if controllerRef := metav1.GetControllerOf(job); controllerRef != nil && controllerRef.Name == cronJob.Name {// 确认该 Job 由当前 CronJob 创建 → 加入列表      jobsToBeReconciled = append(jobsToBeReconciled, job)    }  }// 5. 返回所有属于当前 CronJob 的 Jobreturn jobsToBeReconciled, nil}

追踪syncCronJob

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.go// syncCronJob 是 CronJob 控制器的【核心业务逻辑】// 功能:// 1. 维护 CronJob 状态(Active 列表、最后运行时间、完成状态)// 2. 检查是否到达调度时间// 3. 处理并发策略(Forbid/Replace)// 4. 创建新 Job// 5. 返回下次调度时间,实现定时执行func (jm *ControllerV2) syncCronJob(cj *batchv1.CronJob,js []*batchv1.Job) (*batchv1.CronJob, *time.Duration, error) {// 1. 深拷贝 CronJob,避免直接修改缓存中的对象  cj = cj.DeepCopy()  now := jm.now()// --------------------------------------------------------------------------// 第一步:遍历所有子 Job,维护 Active 列表、完成状态// --------------------------------------------------------------------------  childrenJobs := make(map[types.UID]bool)for _, j := range js {    childrenJobs[j.ObjectMeta.UID] = true// 检查 Job 是否在 CronJob Status.Active 列表中    found := inActiveList(*cj, j.ObjectMeta.UID)/*  【重要】 【重要】 【重要】      3 种情况:        Job 还在跑,但我忘了记录 → 报警        Job 跑完了 → 从活跃列表删掉        Job 成功了 → 更新最后成功时间    */// 情况1:Job 未在 Active 列表中,但还没完成 → 记录异常if !found && !IsJobFinished(j) {// 第一步:去 APIServer 重新获取最新版的 CronJob// 为什么?因为本地缓存的 CronJob 可能不是最新的        cjCopy, err := jm.cronJobControl.GetCronJob(cj.Namespace, cj.Name)if err != nil {return nilnil, err        }// 第二步:检查【最新的 CronJob】里有没有这个 Jobif inActiveList(*cjCopy, j.ObjectMeta.UID) {// 如果最新的里面有 → 说明只是本地缓存旧了// 用最新的 CronJob 覆盖本地的,然后跳过,不报错            cj = cjCopycontinue        }// 第三步:如果最新的也没有 → 真正异常// 出现了一个“控制器不认识、但还在运行”的 Job        jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnexpectedJob""Saw a job that the controller did not create or forgot: %s", j.Name)    } else if found && IsJobFinished(j) {// 情况2:Job 已完成,从 Active 列表移除      _, status := getFinishedStatus(j)      deleteFromActiveList(cj, j.ObjectMeta.UID)      jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SawCompletedJob""Saw completed job: %s, status: %v", j.Name, status)    } else if IsJobFinished(j) {// 更新最后成功时间if cj.Status.LastSuccessfulTime == nil {        cj.Status.LastSuccessfulTime = j.Status.CompletionTime      }if j.Status.CompletionTime != nil && j.Status.CompletionTime.After(cj.Status.LastSuccessfulTime.Time) {        cj.Status.LastSuccessfulTime = j.Status.CompletionTime      }    }  }// 清理:如果 Job 实际不存在了,从 Active 列表删除for _, j := range cj.Status.Active {    _, found := childrenJobs[j.UID]if found {continue    }// 去 apiserver 确认 Job 是否真的不存在    _, err := jm.jobControl.GetJob(j.Namespace, j.Name)if errors.IsNotFound(err) {      // 如果我记录的 Job 实际已经删掉了,我就把它从我的 “正在运行列表” 里删掉。      deleteFromActiveList(cj, j.UID)    } else if err != nil {return cj, nil, err    }  }// 更新 CronJob 状态到 APIServer  updatedCJ, err := jm.cronJobControl.UpdateStatus(cj)if err != nil {return cj, nil, err  }  *cj = *updatedCJ// 如果 CronJob 正在删除 → 不再创建 Job,直接返回if cj.DeletionTimestamp != nil {return cj, nilnil  }// 如果 CronJob 被暂停 → 不执行if cj.Spec.Suspend != nil && *cj.Spec.Suspend {return cj, nilnil  }// --------------------------------------------------------------------------// 第二步:解析 Cron 表达式,计算下次调度时间// --------------------------------------------------------------------------  sched, err := cron.ParseStandard(cj.Spec.Schedule)if err != nil {// 表达式非法 → 不再调度    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnparseableSchedule""unparseable schedule: %s : %s", cj.Spec.Schedule, err)return cj, nilnil  }// 获取【应该执行但还没执行】的调度时间  scheduledTime, err := getNextScheduleTime(*cj, now, sched, jm.recorder)if err != nil || scheduledTime == nil {// 计算下一次执行时间    t := nextScheduledTimeDuration(sched, now)return cj, t, nil  }/*     【重要】  【重要】 【重要】      检查能不能创建新 Job,检查 4 件事:      1、是不是错过执行时间了? → 错过就不执行      2、这个时间点已经执行过了? → 不重复创建      3、并发策略是禁止并发? → 有任务在跑就不创建      4、并发策略是替换? → 删掉旧的,创建新的  */// --------------------------------------------------------------------------// 第三步:检查是否错过执行窗口(startingDeadlineSeconds)// --------------------------------------------------------------------------  tooLate := falseif cj.Spec.StartingDeadlineSeconds != nil {    tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now)  }// 1、是不是错过执行时间了? → 错过就不执行if tooLate {    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "MissSchedule""Missed scheduled time to start a job")    t := nextScheduledTimeDuration(sched, now)return cj, t, nil  }// 2、这个时间点已经执行过了? → 不重复创建 ,也就是如果这个调度时间已经处理过 → 跳过if cj.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) {    t := nextScheduledTimeDuration(sched, now)return cj, t, nil  }// --------------------------------------------------------------------------// 第四步:处理并发策略// --------------------------------------------------------------------------// 3、并发策略是禁止并发? → 有任务在跑就不创建;也就是 并发策略 = Forbid(禁止并发)→ 有正在运行的 Job 就不创建if cj.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cj.Status.Active) > 0 {    jm.recorder.Eventf(cj, corev1.EventTypeNormal, "JobAlreadyActive""Not starting job because prior execution is running")    t := nextScheduledTimeDuration(sched, now)return cj, t, nil  }// 4、并发策略是替换? → 删掉旧的,创建新的;也就是    并发策略 = Replace(替换)→ 删除旧 Job,创建新 Jobif cj.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {for _, j := range cj.Status.Active {      job, _ := jm.jobControl.GetJob(j.Namespace, j.Name)      deleteJob(cj, job, jm.jobControl, jm.recorder)    }  }// --------------------------------------------------------------------------// 第五步:根据模板创建 Job(真正执行任务)// --------------------------------------------------------------------------  jobReq, err := getJobFromTemplate2(cj, *scheduledTime)if err != nil {return cj, nil, err  }// 创建 Job  jobResp, err := jm.jobControl.CreateJob(cj.Namespace, jobReq)switch {case errors.IsAlreadyExists(err):return cj, nil, errcase err != nil:    jm.recorder.Eventf(cj, corev1.EventTypeWarning, "FailedCreate""Error creating job: %v", err)return cj, nil, err  }// 日志 + 事件  klog.V(4).InfoS("Created Job""job", jobResp.Name, "cronjob", cj.Name)  jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SuccessfulCreate""Created job %v", jobResp.Name)// --------------------------------------------------------------------------// 第六步:更新 CronJob 状态// --------------------------------------------------------------------------  jobRef, _ := getRef(jobResp)  cj.Status.Active = append(cj.Status.Active, *jobRef)  cj.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}// 把状态同步到 APIServer  _, err = jm.cronJobControl.UpdateStatus(cj)if err != nil {return cj, nil, err  }// --------------------------------------------------------------------------// 第七步:计算【下下次】调度时间并返回// --------------------------------------------------------------------------  t := nextScheduledTimeDuration(sched, now)return cj, t, nil}

追踪cleanupFinishedJobs

// C:\git\kubernetes-1.22.3\pkg\controller\cronjob\cronjob_controllerv2.go// cleanupFinishedJobs 清理 CronJob 已经运行完成的旧 Job// 作用:根据配置保留最近几次的成功/失败任务,删除更早的,避免资源堆积func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1.CronJob, js []*batchv1.Job) error {// 如果成功任务保留数 和 失败任务保留数 都没设置 → 直接返回,不清理if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil {return nil  }// 定义两个列表:// 1. 失败的任务// 2. 成功的任务  failedJobs := []*batchv1.Job{}  successfulJobs := []*batchv1.Job{}// 遍历所有子 Job,把它们分成【成功】和【失败】两组for _, job := range js {// 判断 Job 是否完成,以及是成功还是失败    isFinished, finishedStatus := jm.getFinishedStatus(job)// 完成了,并且是成功 → 加入成功列表if isFinished && finishedStatus == batchv1.JobComplete {      successfulJobs = append(successfulJobs, job)    } else if isFinished && finishedStatus == batchv1.JobFailed {// 完成了,并且是失败 → 加入失败列表      failedJobs = append(failedJobs, job)    }  }// 如果配置了【成功任务保留数量】// 只保留最新的 N 个,删除最老的if cj.Spec.SuccessfulJobsHistoryLimit != nil {    jm.removeOldestJobs(cj, successfulJobs, *cj.Spec.SuccessfulJobsHistoryLimit)  }// 如果配置了【失败任务保留数量】// 只保留最新的 N 个,删除最老的if cj.Spec.FailedJobsHistoryLimit != nil {    jm.removeOldestJobs(cj, failedJobs, *cj.Spec.FailedJobsHistoryLimit)  }// 更新 CronJob 状态(因为可能删了一些任务,要同步状态到apiserver)  _, err := jm.cronJobControl.UpdateStatus(cj)return err}

至此CronJob控制器代码分析完成,感觉代码写的太多了,太乱了,不知道这些源码在编写的时候,是否也有共同遵守的编码规范。

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-06-05 22:32:23 HTTP/1.1 GET : https://www.yeyulingfeng.com/a/717352.html
  2. 运行时间 : 0.087035s [ 吞吐率:11.49req/s ] 内存消耗:5,307.03kb 文件加载:145
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=049e70fa1360a4a983a9777cac9c97fa
  1. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/autoload_static.php ( 6.05 KB )
  7. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/ralouphie/getallheaders/src/getallheaders.php ( 1.60 KB )
  10. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  11. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  12. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  13. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  14. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  15. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  16. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  17. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  18. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  19. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/guzzlehttp/guzzle/src/functions_include.php ( 0.16 KB )
  21. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/guzzlehttp/guzzle/src/functions.php ( 5.54 KB )
  22. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  23. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  24. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  25. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/provider.php ( 0.19 KB )
  26. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  27. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  28. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  29. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/common.php ( 0.03 KB )
  30. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  32. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/alipay.php ( 3.59 KB )
  33. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  34. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/app.php ( 0.95 KB )
  35. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/cache.php ( 0.78 KB )
  36. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/console.php ( 0.23 KB )
  37. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/cookie.php ( 0.56 KB )
  38. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/database.php ( 2.48 KB )
  39. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/filesystem.php ( 0.61 KB )
  40. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/lang.php ( 0.91 KB )
  41. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/log.php ( 1.35 KB )
  42. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/middleware.php ( 0.19 KB )
  43. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/route.php ( 1.89 KB )
  44. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/session.php ( 0.57 KB )
  45. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/trace.php ( 0.34 KB )
  46. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/view.php ( 0.82 KB )
  47. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/event.php ( 0.25 KB )
  48. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  49. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/service.php ( 0.13 KB )
  50. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/AppService.php ( 0.26 KB )
  51. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  52. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  53. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  54. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  55. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  56. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/services.php ( 0.14 KB )
  57. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  58. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  59. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  60. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  61. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  62. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  63. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  64. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  65. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  66. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  67. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  68. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  69. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  70. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  71. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  72. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  73. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  74. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  75. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  76. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  77. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  78. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  79. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  80. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  81. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  82. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  83. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  84. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  85. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  86. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  87. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/Request.php ( 0.09 KB )
  88. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  89. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/middleware.php ( 0.25 KB )
  90. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  91. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  92. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  93. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  94. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  95. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  96. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  97. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  98. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  99. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  100. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  101. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  102. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  103. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/route/app.php ( 3.94 KB )
  104. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  105. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  106. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/controller/Index.php ( 9.87 KB )
  108. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/BaseController.php ( 2.05 KB )
  109. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  110. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  111. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  112. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  113. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  114. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  115. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  116. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  117. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  118. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  119. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  120. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  121. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  122. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  123. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  124. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  125. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  126. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  127. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  128. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  129. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  130. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  131. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  132. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  133. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  134. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  135. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/controller/Es.php ( 3.30 KB )
  136. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  137. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  138. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  139. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  140. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  141. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  142. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  143. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  144. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/runtime/temp/c935550e3e8a3a4c27dd94e439343fdf.php ( 31.50 KB )
  145. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000702s ] mysql:host=127.0.0.1;port=3306;dbname=wenku;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.000771s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000314s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.000287s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.000529s ]
  6. SELECT * FROM `set` [ RunTime:0.000203s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.000562s ]
  8. SELECT * FROM `article` WHERE `id` = 717352 LIMIT 1 [ RunTime:0.001104s ]
  9. UPDATE `article` SET `lasttime` = 1780669943 WHERE `id` = 717352 [ RunTime:0.001191s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 64 LIMIT 1 [ RunTime:0.000255s ]
  11. SELECT * FROM `article` WHERE `id` < 717352 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.000463s ]
  12. SELECT * FROM `article` WHERE `id` > 717352 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.000372s ]
  13. SELECT * FROM `article` WHERE `id` < 717352 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.000725s ]
  14. SELECT * FROM `article` WHERE `id` < 717352 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.000720s ]
  15. SELECT * FROM `article` WHERE `id` < 717352 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.000682s ]
0.088892s