乐于分享
好东西不私藏

Kube-Scheduler 源码深度解析:启动、选主、事件与调度全流程

Kube-Scheduler 源码深度解析:启动、选主、事件与调度全流程

这篇文章学kube-scheduler,不废话直接发车。

1.1、 kube-scheduler的启动流程

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\scheduler.gofunc main() {  rand.Seed(time.Now().UnixNano())  pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)  command := app.NewSchedulerCommand()  logs.InitLogs()defer logs.FlushLogs()if err := command.Execute(); err != nil {    os.Exit(1)  }}

追踪app.NewSchedulerCommandkube-scheduler启动还是通过cobra.Command实现的。追踪cobra.Command.Run中的runCommand

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.go// runCommand runs the scheduler.func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option)error {  verflag.PrintAndExitIfRequested()  cliflag.PrintFlags(cmd.Flags())  ctx, cancel := context.WithCancel(context.Background())defer cancel()go func() {    stopCh := server.SetupSignalHandler()    <-stopCh    cancel()  }()/*  func Setup(ctx, opts, outOfTreeRegistryOptions) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error)     setup返回一个完整的配置和scheduler对象    */  cc, sched, err := Setup(ctx, opts, registryOptions...)if err != nil {return err  }return Run(ctx, cc, sched)}

追踪Setup

func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {  c, err := opts.Config()}

追踪opts.Config

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\options\options.go// Config return a scheduler config objectfunc (o *Options) Config() (*schedulerappconfig.Config, error) {// 新建一个空的调度器配置对象  c := &schedulerappconfig.Config{}// ApplyTo 把 Options 里的所有参数(端口、日志、调度策略、客户端配置等)填充到配置对象中if err := o.ApplyTo(c); err != nil {return nil, err  }return c, nil// 使用 --kubeconfig传入的配置初始化kube config  kubeConfig, err := createKubeConfig(c.ComponentConfig.ClientConnection, o.Master)if err != nil {return nil, err  }// 使用kube-config 创建client-set对象  client, eventClient, err := createClients(kubeConfig)if err != nil {return nil, err  }  c.EventBroadcaster = events.NewEventBroadcasterAdapter(eventClient)}

继续追踪Options.Config,接下来是初始化事件广播器,这里后面会详细讨论。

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\options\options.go// Config return a scheduler config objectfunc (o *Options) Config() (*schedulerappconfig.Config, error) {/*      EventBroadcaster:事件广播器    统一管理调度器产生的所有事件(如调度成功、调度失败、节点不可用)    依赖 eventClient 将事件发送到 APIServer    */  c.EventBroadcaster = events.NewEventBroadcasterAdapter(eventClient)}

继续追踪Options.Config,接下来是开启选举主的锁配置,默认scheduler 启动的时候 带上参数 --leader-elect=true,代表先选主再进行主流程,为了高可用部署,这里后面会详细讨论。

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\options\options.go// Config return a scheduler config objectfunc (o *Options) Config() (*schedulerappconfig.Config, error) {// Set up leader election if enabled.var leaderElectionConfig *leaderelection.LeaderElectionConfigif c.ComponentConfig.LeaderElection.LeaderElect {// Use the scheduler name in the first profile to record leader election.    schedulerName := corev1.DefaultSchedulerNameif len(c.ComponentConfig.Profiles) != 0 {      schedulerName = c.ComponentConfig.Profiles[0].SchedulerName    }    coreRecorder := c.EventBroadcaster.DeprecatedNewLegacyRecorder(schedulerName)    leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, kubeConfig, coreRecorder)if err != nil {return nil, err    }  }return c, nil}

这样schedulerappconfig.Config对象完成了初始化,回到Setup函数 scheduler.New,创建一个scheduler对象。

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.go// Setup creates a completed config and a scheduler based on the command args and optionsfunc Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {  c, err := opts.Config()if err != nil {return nilnil, err  }  sched, err := scheduler.New(cc.Client,    cc.InformerFactory,    recorderFactory,    ctx.Done(),    scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),    scheduler.WithKubeConfig(cc.KubeConfig),    scheduler.WithProfiles(cc.ComponentConfig.Profiles...),    scheduler.WithLegacyPolicySource(cc.LegacyPolicySource),    scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),    scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),    scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),    scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),    scheduler.WithExtenders(cc.ComponentConfig.Extenders...),    scheduler.WithParallelism(cc.ComponentConfig.Parallelism),    scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging      completedProfiles = append(completedProfiles, profile)    }),  )return &cc, sched, nil}

到这里完成了Schedulerserverconfig.CompletedConfigscheduler.Scheduler

的生成。回到runCommand.Run开始kube-scheduler的运行。

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.go// runCommand runs the scheduler.func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option)error {  cc, sched, err := Setup(ctx, opts, registryOptions...)return Run(ctx, cc, sched)}

追踪Run方法

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.gofunc Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler)error {/*     注册/configz的路由,可以通过这个路由获取到kube-scheduler配置信息    curl -s  -k  https://localhost:10259/configz --header "Authorization: Bearer $TOKEN"    |python -m json.tool  */if cz, err := configz.New("componentconfig"); err == nil {    cz.Set(cc.ComponentConfig)  } else {return fmt.Errorf("unable to register configz: %s", err)  }// 启动事件广播管理器  cc.EventBroadcaster.StartRecordingToSink(ctx.Done())// 初始化健康检查var checks []healthz.HealthCheckerif cc.ComponentConfig.LeaderElection.LeaderElect {    checks = append(checks, cc.LeaderElection.WatchDog)  }}

继续追踪Run

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.gofunc Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler)error {/*    channel 打开 → 还没选主 / 我不是主    channel 关闭 → 我已经成为主!  */  waitingForLeader := make(chan struct{})/*    isLeader() 函数给健康检查使用:        没成为主isLeader返回false,成为主isLeader返回true。  */  isLeader := func()bool {select {case _, ok := <-waitingForLeader:// if channel is closed, we are leadingreturn !okdefault:// channel is open, we are waiting for a leaderreturn false    }           }if cc.InsecureMetricsServing != nil {/*        handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader), nil, nil)        */var metricsHandler http.Handler = newMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader)        handler := buildHandlerChain(metricsHandler, nilnil)         if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {return fmt.Errorf("failed to start metrics server: %v", err)    }  }}

追踪isLeader,可以看到isLeader会决定某些功能是否开启。例如深入newMetricsHandler,其中的installMetricHandler,注册 /metrics 指标接口,并且只有主节点(Leader)才暴露资源使用指标,从节点不暴露。

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.gofunc installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers informers.SharedInformerFactory, isLeader func()bool) {  configz.InstallHandler(pathRecorderMux)  pathRecorderMux.Handle("/metrics", legacyregistry.HandlerWithReset())  resourceMetricsHandler := resources.Handler(informers.Core().V1().Pods().Lister())  pathRecorderMux.HandleFunc("/metrics/resources"func(w http.ResponseWriter, req *http.Request) {if !isLeader() {return    }    resourceMetricsHandler.ServeHTTP(w, req)  })}

继续追踪buildHandlerChain

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.go// buildHandlerChain 用标准过滤器包装传入的 handler,形成一条完整的 HTTP 请求处理链func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz authorizer.Authorizer) http.Handler {// 创建请求信息解析器,用来解析 HTTP 请求里的 API 信息(如:访问哪个资源、什么操作)  requestInfoResolver := &apirequest.RequestInfoFactory{}// 创建认证失败时的默认处理器:当用户认证不通过时,返回 401 Unauthorized  failedHandler := genericapifilters.Unauthorized(scheme.Codecs)// 第 1 层:授权检查 → 验证当前用户是否有权限访问这个接口(RBAC 权限判断)  handler = genericapifilters.WithAuthorization(handler, authz, scheme.Codecs)// 第 2 层:认证检查 → 验证用户是谁(Token、证书等),失败则走 failedHandler  handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil)// 第 3 层:解析请求信息 → 从 HTTP 请求中提取 API 资源、动词、命名空间等信息  handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)// 第 4 层:设置缓存控制 → 告诉客户端不要缓存健康检查/metrics 接口  handler = genericapifilters.WithCacheControl(handler)// 第 5 层:HTTP 请求日志 → 记录请求的路径、响应码、耗时等  handler = genericfilters.WithHTTPLogging(handler)// 第 6 层:Panic 恢复 → 处理接口内部崩溃,避免进程直接挂掉,返回 500 错误  handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)// 返回包装完成的完整处理链return handler}

继续回到Run

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.go// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler)error {// 启动所有informer  cc.InformerFactory.Start(ctx.Done())// 执行调度前,先把通过informer把资源缓存到本地    cc.InformerFactory.WaitForCacheSync(ctx.Done())}

开启LeaderElection选主的流程,如果被选为主的话则执行sched.Run

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.go// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler)error {// If leader election is enabled, runCommand via LeaderElector until done and exit.if cc.LeaderElection != nil {    cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{      OnStartedLeading: func(ctx context.Context) {close(waitingForLeader)        sched.Run(ctx)      },      OnStoppedLeading: func() {select {case <-ctx.Done():// We were asked to terminate. Exit 0.          klog.Info("Requested to terminate. Exiting.")          os.Exit(0)default:// We lost the lock.          klog.Exitf("leaderelection lost")        }      },    }    leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)if err != nil {return fmt.Errorf("couldn't create leader elector: %v", err)    }    leaderElector.Run(ctx)return fmt.Errorf("lost lease")  }// Leader election is disabled, so runCommand inline until done.close(waitingForLeader)  sched.Run(ctx)return fmt.Errorf("finished without leader elect")}

至此,kube-scheduler启动完成

1.2 、kube-scheduler中的leader election选主机制

kube-schedulerkube-controller-manager常以多副本部署实现高可用,依靠leaderelection机制完成选主。该机制借助 K8s API 操作原子性构建分布式锁,实例间相互竞争,同一时段仅有leader实例执行业务。原有leader故障后,集群会重新推选新leader,保障组件持续稳定运行,这类设计在 K8s 中普遍应用。

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\options\options.go// Complete completes the remaining instantiation of the options obj.// In particular, it injects the latest internal versioned ComponentConfig.func (o *Options) Complete(nfs *cliflag.NamedFlagSets) error {// 根据--leader-elect=true 配置开启选主抢锁  leaderelection := nfs.FlagSet("leader election")if leaderelection.Changed("leader-elect") {    cfg.LeaderElection.LeaderElect = o.ComponentConfig.LeaderElection.LeaderElect    }}// Config return a scheduler config objectfunc (o *Options) Config() (*schedulerappconfig.Config, error) {// Set up leader election if enabled.var leaderElectionConfig *leaderelection.LeaderElectionConfigif c.ComponentConfig.LeaderElection.LeaderElect {// Use the scheduler name in the first profile to record leader election.    schedulerName := corev1.DefaultSchedulerNameif len(c.ComponentConfig.Profiles) != 0 {      schedulerName = c.ComponentConfig.Profiles[0].SchedulerName    }    coreRecorder := c.EventBroadcaster.DeprecatedNewLegacyRecorder(schedulerName)    leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, kubeConfig, coreRecorder)if err != nil {return nil, err    }  }}

追踪makeLeaderElectionConfigmakeLeaderElectionConfig 用于创建选主抢锁配置:

  1. 以主机名拼接uuid作为实例唯一标识

  2. 依托resourcelock资源锁实现竞争选主

  3. 锁相关默认配置可通过/configz接口查询

    curl -s  -k  https://localhost:10259/configz --header "Authorization: Bearer $TOKEN"    |python -m json.tool{   "LeaderElection": {            "LeaderElect": true,            "LeaseDuration": "15s",            "RenewDeadline": "10s",            "ResourceLock": "leases",            "ResourceName": "kube-scheduler",            "ResourceNamespace": "kube-system",            "RetryPeriod": "2s"        }}
// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\options\options.go// makeLeaderElectionConfig 根据参数创建一个选主配置,用于抢分布式锁func makeLeaderElectionConfig(  config componentbaseconfig.LeaderElectionConfiguration,   kubeConfig *restclient.Config,   recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {// 获取当前机器的主机名,用于标识当前实例  hostname, err := os.Hostname()if err != nil {return nil, fmt.Errorf("unable to get hostname: %v", err)  }// 给当前实例生成一个全局唯一ID:主机名 + UUID// 目的:防止同一台机器上的两个进程冲突  id := hostname + "_" + string(uuid.NewUUID())// ====================== 核心:创建分布式锁 ======================// 创建资源锁(基于 Lease/ConfigMap/Endpoint 实现)// 所有 scheduler 抢的就是这把锁  rl, err := resourcelock.NewFromKubeconfig(    config.ResourceLock,       // 锁类型:Lease/ConfigMap    config.ResourceNamespace,  // 锁所在命名空间:kube-system    config.ResourceName,       // 锁名称:kube-scheduler    resourcelock.ResourceLockConfig{      Identity:      id,       // 当前实例唯一ID      EventRecorder: recorder, // 事件记录器    },    kubeConfig,                // 连接 APIServer 的配置    config.RenewDeadline.Duration,  )if err != nil {return nil, fmt.Errorf("couldn't create resource lock: %v", err)  }// 返回最终的选主配置return &leaderelection.LeaderElectionConfig{    Lock:            rl,         // 分布式锁(所有实例抢这把锁)    LeaseDuration:   config.LeaseDuration.Duration,  // 锁租期:15s    RenewDeadline:   config.RenewDeadline.Duration,  // 续租超时:10s    RetryPeriod:     config.RetryPeriod.Duration,    // 抢锁重试间隔:2s    WatchDog:        leaderelection.NewLeaderHealthzAdaptor(time.Second * 20), // 健康检查    Name:            "kube-scheduler",               // 选主组件名称    ReleaseOnCancel: true,                           // 停止时主动释放锁  }, nil}

追踪resourcelock.NewFromKubeconfig,观察resourcelock资源锁的初始化

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\leaderelection\resourcelock\interface.go// NewFromKubeconfig will create a lock of a given type according to the input parameters.// Timeout set for a client used to contact to Kubernetes should be lower than// RenewDeadline to keep a single hung request from forcing a leader loss.// Setting it to max(time.Second, RenewDeadline/2) as a reasonable heuristic.func NewFromKubeconfig(lockType string, ns string, name string, rlc ResourceLockConfig, kubeconfig *restclient.Config, renewDeadline time.Duration) (Interface, error) {// shallow copy, do not modify the kubeconfig  config := *kubeconfig  timeout := renewDeadline / 2if timeout < time.Second {    timeout = time.Second  }  config.Timeout = timeout  leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "leader-election"))return New(lockType, ns, name, leaderElectionClient.CoreV1(), leaderElectionClient.CoordinationV1(), rlc)}

其中的New方法用来创建资源锁,可以看到支持多种锁类型:LeaseConfigMapEndpoints 等

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\leaderelection\resourcelock\interface.go// Manufacture will create a lock of a given type according to the input parameters// New 根据传入的参数,创建指定类型的资源锁(真正干活的锁)func New(  lockType string,                // 锁类型:Lease、ConfigMap、Endpoints 等  ns string,                      // 命名空间(一般是 kube-system)  name string,                    // 锁名字(如 kube-scheduler)  coreClient corev1.CoreV1Interface,      // 核心API客户端  coordinationClient coordinationv1.CoordinationV1Interface, // Lease 专用客户端  rlc ResourceLockConfig)// 锁配置(唯一标识、事件记录器)(Interface, error) {// 1. 创建 Endpoints 锁(老式锁)  endpointsLock := &EndpointsLock{    EndpointsMeta: metav1.ObjectMeta{      Namespace: ns,      Name:      name,    },    Client:     coreClient,    LockConfig: rlc,  }// 2. 创建 ConfigMap 锁(老式锁)  configmapLock := &ConfigMapLock{    ConfigMapMeta: metav1.ObjectMeta{      Namespace: ns,      Name:      name,    },    Client:     coreClient,    LockConfig: rlc,  }// 3. 创建 Lease 锁(新式、推荐、高性能锁)  leaseLock := &LeaseLock{    LeaseMeta: metav1.ObjectMeta{      Namespace: ns,      Name:      name,    },    Client:     coordinationClient,    LockConfig: rlc,  }// ====================== 核心:根据类型返回对应锁 ======================switch lockType {case EndpointsResourceLock:return endpointsLock, nil// 只用 Endpoints 锁case ConfigMapsResourceLock:return configmapLock, nil// 只用 ConfigMap 锁case LeasesResourceLock:return leaseLock, nil// 只用 Lease 锁(K8s 现在默认用这个)case EndpointsLeasesResourceLock:return &MultiLock{             // 双锁:主锁 Endpoints + 备锁 Lease      Primary:   endpointsLock,      Secondary: leaseLock,    }, nilcase ConfigMapsLeasesResourceLock:return &MultiLock{             // 双锁:主锁 ConfigMap + 备锁 Lease      Primary:   configmapLock,      Secondary: leaseLock,    }, nildefault:return nil, fmt.Errorf("Invalid lock-type %s", lockType)  }}

回到kube-scheduleserverRun方法,查看scheduler中抢锁的运行

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.go// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler)error {// If leader election is enabled, runCommand via LeaderElector until done and exit.// 如果开启了 --leader-elect=trueif cc.LeaderElection != nil {// 设置选主回调    cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{// 成功抢到锁,成为 Leader      OnStartedLeading: func(ctx context.Context) {// 标记自己是主close(waitingForLeader)// 真正开始调度Pod        sched.Run(ctx)      },// 锁丢了(主节点挂了/网络断了)      OnStoppedLeading: func() {select {case <-ctx.Done():// We were asked to terminate. Exit 0.          klog.Info("Requested to terminate. Exiting.")          os.Exit(0)default:// We lost the lock.// 直接退出          klog.Exitf("leaderelection lost")        }      },    }// 创建选主器          leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)if err != nil {return fmt.Errorf("couldn't create leader elector: %v", err)    }// 开始抢锁、续租、监听锁变化    leaderElector.Run(ctx)return fmt.Errorf("lost lease")  }// 没有开启选主(--leader-elect=false)// Leader election is disabled, so runCommand inline until done.// 不抢主close(waitingForLeader)// 直接开始调度  sched.Run(ctx)return fmt.Errorf("finished without leader elect")}

追踪过leaderElector.Run开始执行抢锁选主

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\leaderelection\leaderelection.go// Run starts the leader election loop. Run will not return// before leader election loop is stopped by ctx or it has// stopped holding the leader lease// Run 启动 leader 选举循环。// Run 函数会一直阻塞,直到:// 上下文 ctx 被取消,或者失去了 leader 锁(lease) 除此之外它永远不会返回func (le *LeaderElector) Run(ctx context.Context) {defer runtime.HandleCrash()/*  无论什么原因退出这个函数,一定会执行:宣布停止成为 leader  cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{      OnStartedLeading: func(ctx context.Context) {      },      OnStoppedLeading: func() {      },    }  */defer func() {// 这里的OnStoppedLeading,就是前面Run函数中注册的    le.config.Callbacks.OnStoppedLeading()  }()// 第一步:抢锁(阻塞直到抢到锁 或者 ctx 取消)// 如果抢锁失败(比如被人取消了),直接 returnif !le.acquire(ctx) {return  }// 抢到锁了!创建一个子 context,用于停止 leader 任务  ctx, cancel := context.WithCancel(ctx)defer cancel()/*  开启协程,执行用户的业务逻辑(比如 kube-scheduler 开始调度)  cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{      OnStartedLeading: func(ctx context.Context) {      },      OnStoppedLeading: func() {      },    }  */go le.config.Callbacks.OnStartedLeading(ctx)// 【关键】死循环:不断续租锁,保证一直是 leader// 一旦续租失败(丢锁),这个函数就会返回  le.renew(ctx)}

追踪le.acquire分析抢锁

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\leaderelection\leaderelection.go// acquire 循环调用 tryAcquireOrRenew 尝试抢锁// 一旦抢锁成功,立刻返回 true// 如果 ctx 被取消(进程停止),返回 falsefunc (le *LeaderElector) acquire(ctx context.Context) bool {// 创建一个可取消的 context,用来在抢到锁后停止循环  ctx, cancel := context.WithCancel(ctx)defer cancel()// 标记:是否成功抢到锁  succeeded := false// 锁的描述信息(如:kube-system/kube-scheduler)  desc := le.config.Lock.Describe()// ====================== 核心循环 ======================// wait.JitterUntil = 定时、带随机抖动的循环执行函数// 每隔 RetryPeriod(默认2秒)执行一次// =====================================================  wait.JitterUntil(func() {// 尝试抢锁 或 续租锁(真正执行抢锁逻辑)    succeeded = le.tryAcquireOrRenew(ctx)// 更新状态:是否从"从节点"变成"主节点"    le.maybeReportTransition()// 如果没抢到锁 → 打印日志,继续下一轮循环if !succeeded {      klog.V(4).Infof("failed to acquire lease %v", desc)return    }// ============= 抢锁成功!=============// 记录事件:变成 leader    le.config.Lock.RecordEvent("became leader")// 监控指标:标记成为主节点    le.metrics.leaderOn(le.config.Name)// 取消循环,停止继续抢锁    cancel()  }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())// 返回最终是否抢到锁return succeeded}

追踪tryAcquireOrRenew,流程如下:

// tryAcquireOrRenew 尝试获取 Leader 租约(如果还没获得),// 或者尝试续约租约(如果已经获得)。成功返回 true,失败返回 false。func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {// 获取当前时间  now := metav1.Now()// 构造一个新的 Leader 选举记录(默认值,后续会根据场景修改)  leaderElectionRecord := rl.LeaderElectionRecord{    HolderIdentity:       le.config.Lock.Identity(), // 当前节点的唯一标识    LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), // 租约有效期(秒)    RenewTime:            now,                       // 本次续约/获取时间    AcquireTime:          now,                       // 首次成为 Leader 的时间  }// 步骤 1:尝试从共享存储中获取现有的锁记录// Get 从 K8s 资源(Lease/ConfigMap/Endpoint)中读取旧的选举记录  oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)if err != nil {// 如果获取失败,且错误不是「记录不存在」,则直接返回失败if !errors.IsNotFound(err) {      klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)return false    }// 场景 A:锁记录不存在,第一次抢主,直接创建锁// 创建锁记录,写入当前节点信息if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {      klog.Errorf("error initially creating leader election record: %v", err)return false    }// 缓存最新记录,标记自己成为 Leader    le.setObservedRecord(&leaderElectionRecord)return true  }// 步骤 2:已获取到旧记录,更新本地缓存并校验合法性// 如果远程记录和本地缓存不一致,更新本地缓存(避免重复解析)if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {    le.setObservedRecord(oldLeaderElectionRecord)    le.observedRawRecord = oldLeaderElectionRawRecord  }// 核心判断:// 1. 锁有持有者// 2. 租约未过期(上次观察时间 + 租约时长 > 当前时间)// 3. 本节点不是 Leader// 满足全部条件 → 锁被别人持有且有效,本节点不能抢主,直接返回 falseif len(oldLeaderElectionRecord.HolderIdentity) > 0 &&    le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&    !le.IsLeader() {    klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)return false  }// 步骤 3:准备更新锁记录(抢主 或 续约)// 如果本节点已经是 Leader → 执行【续约】操作if le.IsLeader() {// 续约时保持:首次获取时间不变、主节点切换次数不变    leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime    leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions  } else {// 如果本节点不是 Leader → 执行【抢主】操作// 抢主成功,主节点切换次数 +1    leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1  }// 步骤 4:原子更新共享锁,完成抢主/续约// 将新记录更新到 K8s 共享存储if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {    klog.Errorf("Failed to update lock: %v", err)return false  }// 更新本地缓存的最新记录  le.setObservedRecord(&leaderElectionRecord)// 抢主/续约成功return true}

当多个le.config.Lock.Update调用的时候,会有并发问题,通过resourceVersion字段可以解决这个问题。每个锁(Lease)都有一个版本号resourceVersion,只要有人更新锁,版本号就会自动加1,版本号是唯一、连续、不可重复的。每个Update只认自己拿到的那个版本号,版本号不对,就不允许更新!

[root]# kubectl get lease -n kube-systemNAME                      HOLDER                                              AGEkube-controller-manager   k8s-master01_def02570-9146-54c1-b700-66dd407ff612   221dkube-scheduler            k8s-master01_29da2906-4a43-4db1-36f9-4bf8919b4cda   221d

如上,kube‑scheduler 和 kube‑controller‑manager 均基于Lease 资源实现选主抢锁,锁资源统一创建在 kube‑system 命名空间下。其中 holder 字段标识当前锁的持有者,取值格式为节点机器名 + UUID,用于区分不同实例。

1.3、k8s Event

之前在看到kube‑scheduler的启动流程中, 提到k8sEvent那么。接下来就来研究一下Event。接下来就沿着kube‑scheduler代码,分析一下K8s事件源码。

提到k8sEvent就要祭出这张图

  • EventRecorder:是事件生成者,k8s组件通过调用它的方法来生成事件;
  • EventBroadcaster:事件广播器,负责消费EventRecorder产生的事件,然后分发给broadcasterWatcher
  • broadcasterWatcher:用于定义事件的处理方式,如上报apiserver

这张图完整展示了 Kubernetes 事件从产生、广播、聚合过滤到最终上报 APIServer 的全链路,逐环节拆解如下:

一、整体分层

  • 上层:apiserver:事件最终存储入口
  • 中层:EventBroadcaster:事件广播核心组件
  • 左侧:EventRecorder:业务组件(调度器、控制器等)的事件生成器
  • 下方:事件消费、聚合、上报、日志落盘模块

二、逐流程环节解析

1. 事件产生(业务侧)

  1. 用户 / 组件触发业务逻辑(如 Pod 调度失败)
  2. EventRecorder.generateEvent:构造 events.k8s.io/v1.Event 事件对象
  3. recorder.Action:将事件发送到 EventBroadcaster 广播器

2. 事件广播分发

  1. EventBroadcaster.m.loop:内部循环接收流入事件(c.incoming
  2. distribute:将事件分发给所有订阅者(事件上报、日志组件)
  3. c.result + ResultChan:通过通道将事件下发给下游消费模块

3. 下游双消费分支

3.1、分支 A:结构化日志落盘

StartStructuredLogging:将事件输出到 klog,用于本地日志记录,不上报 apiserver

3.2、分支 B:事件聚合 + 上报 APIServer(核心链路)

  1. StartEventWatcher:监听事件通道,接收分发后的事件

  2. StartRecordingToSink:启动两个后台协程:

    • 定时保活重复事件(refreshExistingEventSeries
    • 定时清理过期事件(finishSeries
    • 内部完成 Filter(过滤)、aggregates(聚合)、counts(计数),即你之前学的 recordToSink 聚合逻辑
  3. Create/Patch:调用 recordEvent,新事件走 Create、重复聚合事件走 Patch,最终发送到 apiserver 存储。


三、核心对应源码映射

图中模块
对应分析的源码
generateEvent
/Action
recordSchedulingFailure
 中 Eventf 事件生成逻辑
distribute
广播器事件分发逻辑
StartRecordingToSinkStartRecordingToSink
 启动保活、清理、上报协程
Filter,aggregates,countsrecordToSink
 事件聚合(计数、合并重复事件)
Create/PatchrecordEvent
 上报 APIServer 的核心逻辑
c.incoming
接收所有新来的事件,让发送方永远不阻塞
c.result
把事件分发给所有 “消费者”

四、关键设计亮点

  1. 异步解耦:事件生成与上报分离,通过广播 + 通道异步处理,不阻塞业务主流程;
  2. 事件聚合:相同事件只计数、不重复上报,大幅减轻 apiserver/etcd 压力;
  3. 双输出:同时支持 apiserver 存储 + 本地 klog 日志,兼顾集群查询与问题排查;
  4. 生命周期管理:通过定时保活、过期清理,保证事件有效性与内存稳定。

接下来开始追踪代码

1.3.1、EventBroadcaster的初始化

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\options\options.gofunc (o *Options) Config() (*schedulerappconfig.Config, error) {  c := &schedulerappconfig.Config{}if err := o.ApplyTo(c); err != nil {return nil, err  }// Prepare kube config.  kubeConfig, err := createKubeConfig(c.ComponentConfig.ClientConnection, o.Master)if err != nil {return nil, err  }// Prepare kube clients.  client, eventClient, err := createClients(kubeConfig)if err != nil {return nil, err  }  c.EventBroadcaster = events.NewEventBroadcasterAdapter(eventClient)}

追踪events.NewEventBroadcasterAdapter

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.go// NewEventBroadcasterAdapter 创建一个事件广播器适配器// 用于简化组件从旧事件API迁移到新事件API的过程func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdapter {// 创建适配器实例  eventClient := &eventBroadcasterAdapterImpl{}/*     第一步:检测集群是否支持 【新事件API】 events.k8s.io/v1    GroupName is the group name use in this package    const GroupName = "events.k8s.io"    // SchemeGroupVersion is group version used to register these objects    var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}  */if _, err := client.Discovery().ServerResourcesForGroupVersion(eventsv1.SchemeGroupVersion.String()); err == nil {// 如果支持新API → 初始化新API的客户端和广播器// 新事件API客户端    eventClient.eventsv1Client = client.EventsV1()// 新事件广播器    eventClient.eventsv1Broadcaster = NewBroadcaster(&EventSinkImpl{Interface: eventClient.eventsv1Client})   }// 第二步:无条件创建 【旧事件API】 core/v1.Event// 虽然新API慢慢普及,但旧的开销很小,所以统一都创建  eventClient.coreClient = client.CoreV1()            // 旧API客户端  eventClient.coreBroadcaster = record.NewBroadcaster() // 旧事件广播器// 返回适配器(上层组件用它发事件,不用管新旧版本)return eventClient}

两个NewBroadcaster的底层实现都是eventBroadcasterImpl

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\record\event.go// Creates a new event broadcaster.func NewBroadcaster() EventBroadcaster {return &eventBroadcasterImpl{    Broadcaster:   watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),    sleepDuration: defaultSleepDuration,  }}type eventBroadcasterImpl struct {  *watch.Broadcaster  sleepDuration time.Duration  options       CorrelatorOptions}// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.go// NewBroadcaster Creates a new event broadcaster.func NewBroadcaster(sink EventSink) EventBroadcaster {return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*eventsv1.Event{})}// NewBroadcasterForTest Creates a new event broadcaster for test purposes.func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*eventsv1.Event) EventBroadcaster {return &eventBroadcasterImpl{    Broadcaster:   watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),    eventCache:    eventCache,    sleepDuration: sleepDuration,    sink:          sink,  }}type eventBroadcasterImpl struct {  *watch.Broadcaster  mu            sync.Mutex  eventCache    map[eventKey]*eventsv1.Event  sleepDuration time.Duration  sink          EventSink}

1.3.2、eventRecorder初始化

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.go// Setup creates a completed config and a scheduler based on the command args and optionsfunc Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {if errs := opts.Validate(); len(errs) > 0 {return nilnil, utilerrors.NewAggregate(errs)  }  c, err := opts.Config()if err != nil {return nilnil, err  }// Get the completed config  cc := c.Complete()  outOfTreeRegistry := make(runtime.Registry)for _, option := range outOfTreeRegistryOptions {if err := option(outOfTreeRegistry); err != nil {return nilnil, err    }  }// recorderFactory代表生成eventRecorder的工厂函数  recorderFactory := getRecorderFactory(&cc)}

追踪getRecorderFactory

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.gofunc getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory {return func(name string) events.EventRecorder {return cc.EventBroadcaster.NewRecorder(name)  }}

追踪NewRecorder

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.go// NewRecorder returns an EventRecorder that records events with the given event source.func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {  hostname, _ := os.Hostname()  reportingInstance := reportingController + "-" + hostnamereturn &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}}

1.3.3、开启event事件广播器

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.go// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler)error {if cz, err := configz.New("componentconfig"); err == nil {    cz.Set(cc.ComponentConfig)  } else {return fmt.Errorf("unable to register configz: %s", err)  }// 开启`event`事件广播器    cc.EventBroadcaster.StartRecordingToSink(ctx.Done())}

追踪StartRecordingToSink

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.go// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)go wait.Until(e.finishSeries, finishTime, stopCh)  e.startRecordingEvents(stopCh)}

追踪refreshExistingEventSeries,它的作用是给重复事件续期,让它一直显示在 kubectl describe 里

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.go// refreshExistingEventSeries 刷新事件系列的TTL,给事件“续期”func (e *eventBroadcasterImpl) refreshExistingEventSeries() {// 加锁,保证并发安全(操作eventCache)  e.mu.Lock()defer e.mu.Unlock()// 遍历所有缓存的事件for isomorphicKey, event := range e.eventCache {// 只处理【有系列信息】的重复事件(重复发生的同类型事件)if event.Series != nil {// 发送事件到 APIServer(更新计数、刷新最后观察时间)if recordedEvent, retry := recordEvent(e.sink, event); !retry {// 更新成功,把最新的事件存回缓存if recordedEvent != nil {          e.eventCache[isomorphicKey] = recordedEvent        }      }    }  }}

追踪finishSeries,它的作用是事件太久没发生,那么清理掉释放内存

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.go// finishSeries 检查事件是否已经长期不发生// 1. 有系列的事件:写入最终计数后删除// 2. 普通事件:直接删除func (e *eventBroadcasterImpl) finishSeries() {// 加锁保护缓存  e.mu.Lock()defer e.mu.Unlock()// 遍历缓存中的所有事件for isomorphicKey, event := range e.eventCache {    eventSerie := event.Seriesif eventSerie != nil {// 如果【系列事件】最后一次发生已经超过了 finishTimeif eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {// 最后上报一次最终状态(总发生次数)if _, retry := recordEvent(e.sink, event); !retry {// 上报成功 → 从缓存删除delete(e.eventCache, isomorphicKey)        }      }    } else {// 如果是【普通单件事件】,且已过期if event.EventTime.Time.Before(time.Now().Add(-finishTime)) {// 直接从缓存删除delete(e.eventCache, isomorphicKey)      }    }  }}

其中EventSeries是用来事件聚合使用的

// C:\git\kubernetes-1.22.3\vendor\k8s.io\api\events\v1\types.gotype Event struct {// series is data about the Event series this event represents or nil if it's a singleton Event.// +optional/*    作用:事件聚合(系列)信息    存放重复事件的:计数、最后观察时间    比如 “镜像拉取失败” 发生 100 次,只存一条事件,用 Series 记录次数。  */  Series *EventSeries `json:"series,omitempty" protobuf:"bytes,3,opt,name=series"`}type EventSeries struct {// count is the number of occurrences in this series up to the last heartbeat time.  Count int32 `json:"count" protobuf:"varint,1,opt,name=count"`// lastObservedTime is the time when last Event from the series was seen before last heartbeat.  LastObservedTime metav1.MicroTime `json:"lastObservedTime" protobuf:"bytes,2,opt,name=lastObservedTime"`}

继续追踪startRecordingEvents

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.gofunc (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) {  eventHandler := func(obj runtime.Object) {    event, ok := obj.(*eventsv1.Event)if !ok {      klog.Errorf("unexpected type, expected eventsv1.Event")return    }    e.recordToSink(event, clock.RealClock{})  }  stopWatcher := e.StartEventWatcher(eventHandler)go func() {    <-stopCh    stopWatcher()  }()}

其中需要关注两个函数StartEventWatcherrecordToSink,先追踪StartEventWatcher

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.go// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.// The return value is used to stop recordingfunc (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)func() {  watcher := e.Watch()go func() {defer utilruntime.HandleCrash()for {      watchEvent, ok := <-watcher.ResultChan()if !ok {return      }      eventHandler(watchEvent.Object)    }  }()return watcher.Stop}

也就是说从broadcasterWatcher.result这个事件的Eventchan中读取Event,然后调用eventHandler处理事件,内部也就是调用recordToSink处理事件

// C:\git\kubernetes-1.22.3\vendor\k8s.io\apimachinery\pkg\watch\mux.go// Watch adds a new watcher to the list and returns an Interface for it.// Note: new watchers will only receive new events. They won't get an entire history// of previous events. It will block until the watcher is actually added to the// broadcaster.func (m *Broadcaster) Watch() Interface {var w *broadcasterWatcher  m.blockQueue(func() {    id := m.nextWatcher    m.nextWatcher++    w = &broadcasterWatcher{      result:  make(chan Event, m.watchQueueLength),      stopped: make(chan struct{}),      id:      id,      m:       m,    }    m.watchers[id] = w  })if w == nil {// The panic here is to be consistent with the previous interface behavior// we are willing to re-evaluate in the future.panic("broadcaster already stopped")  }return w}

追踪recordToSink

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.go// recordToSink 事件聚合 + 上报入口// 核心作用:相同事件合并计数,不同事件才发送到 APIServerfunc (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) {// 深拷贝事件,防止多协程并发修改(安全)  eventCopy := event.DeepCopy()// 启动协程异步处理,不阻塞主线程go func() {// 第一步:加锁检查缓存,做【事件聚合】    evToRecord := func() *eventsv1.Event {      e.mu.Lock()defer e.mu.Unlock()// 生成事件唯一 Key(相同事件会生成同一个key)      eventKey := getKey(eventCopy)// 从缓存中查找是否已经存在【相同事件】      isomorphicEvent, isIsomorphic := e.eventCache[eventKey]if isIsomorphic {// 【情况1】缓存里已有相同事件,那么聚合,只更新计数和时间if isomorphicEvent.Series != nil {// 已有系列信息,那么次数+1,更新最后时间          isomorphicEvent.Series.Count++          isomorphicEvent.Series.LastObservedTime = metav1.MicroTime{Time: clock.Now()}// 返回 nil,那么代表【只聚合,不上报】return nil        }// 第一次聚合:给事件加上 Series 信息        isomorphicEvent.Series = &eventsv1.EventSeries{          Count:            1,          LastObservedTime: metav1.MicroTime{Time: clock.Now()},        }// 返回事件,那么代表【需要上报】return isomorphicEvent      }// 【情况2】缓存里没有,那么新事件,加入缓存      e.eventCache[eventKey] = eventCopy// 返回事件,那么代表【需要上报】return eventCopy    }()// 第二步:如果需要上报(不是纯聚合),就发送到 APIServerif evToRecord != nil {// 真正执行上报(调用 recordEvent → Patch/Create)      recordedEvent := e.attemptRecording(evToRecord)// 上报成功,用 APIServer 返回的最新事件更新缓存if recordedEvent != nil {        recordedEventKey := getKey(recordedEvent)        e.mu.Lock()defer e.mu.Unlock()        e.eventCache[recordedEventKey] = recordedEvent      }    }  }()}

追踪getKey

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.gofunc getKey(event *eventsv1.Event) eventKey {  key := eventKey{    action:              event.Action,    reason:              event.Reason,    reportingController: event.ReportingController,    regarding:           event.Regarding,  }if event.Related != nil {    key.related = *event.Related  }return key}type eventKey struct {  action              string  reason              string  reportingController string  regarding           corev1.ObjectReference  related             corev1.ObjectReference}

其中各个字段含义如下

字段名
对应 Event 字段
类型
作用说明
action
Actionstring
机器可读的操作类型,用于区分相同原因下的不同操作
reason
Reasonstring
人类可读的事件原因,是判断同类事件的核心字段
reportingController
ReportingControllerstring
上报事件的控制器 / 组件,确保只有同一组件产生的事件才聚合
regarding
Regardingcorev1.ObjectReference
事件关联的主对象(Pod/Node 等),确保只聚合同一资源的事件
related
Relatedcorev1.ObjectReference
事件关联的次要对象,精细化区分事件场景

继续追踪attemptRecording

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.gofunc (e *eventBroadcasterImpl) attemptRecording(event *eventsv1.Event) *eventsv1.Event {  tries := 0for {if recordedEvent, retry := recordEvent(e.sink, event); !retry {return recordedEvent    }    tries++if tries >= maxTriesPerEvent {      klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)return nil    }// Randomize sleep so that various clients won't all be// synced up if the master goes down.    time.Sleep(wait.Jitter(e.sleepDuration, 0.25))  }}

继续追踪recordEvent,调用的就是EventSink.Patch

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\event_broadcaster.gofunc recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) {if isEventSeries {    newEvent, err = sink.Patch(event, patch)  }}// EventSink knows how to store events (client-go implements it.)// EventSink must respect the namespace that will be embedded in 'event'.// It is assumed that EventSink will return the same sorts of errors as// client-go's REST client.type EventSink interface {  Create(event *eventsv1.Event) (*eventsv1.Event, error)  Update(event *eventsv1.Event) (*eventsv1.Event, error)  Patch(oldEvent *eventsv1.Event, data []byte) (*eventsv1.Event, error)}

1.3.4、调用 eventRecorder产生的事件

// recordSchedulingFailure 记录 Pod 调度失败事件// 作用:Pod 调度失败 → 发送事件、更新 Pod 状态、设置 nominatedNode(候选节点)func (sched *Scheduler) recordSchedulingFailure(  fwk framework.Framework,  podInfo *framework.QueuedPodInfo,  err error,  reason string,  nominatedNode string,) {// 1. 把调度错误记录到调度器内部日志(错误计数、监控指标)  sched.Error(podInfo, err)// 2. 设置 nominatedNode(推荐下次优先尝试调度的节点)// 把 Pod 标记为“推荐在某个节点重试调度”,避免并发竞争if sched.SchedulingQueue != nil {    sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatedNode)  }  pod := podInfo.Pod// 截断错误信息,避免太长  msg := truncateMessage(err.Error())// 3. 发送【调度失败】事件到 Kubernetes Event 系统// Type: Warning// Reason: FailedScheduling// 消息:为什么调度失败(如:节点资源不足、节点亲和性不匹配)  fwk.EventRecorder().Eventf(    pod,nil,    v1.EventTypeWarning,"FailedScheduling","Scheduling",    msg,  )// 4. 更新 Pod 状态:// 设置 Pod 条件 PodScheduled = False// 标记调度失败原因 + 消息if err := updatePod(    sched.client,    pod,    &v1.PodCondition{      Type:    v1.PodScheduled,      Status:  v1.ConditionFalse,      Reason:  reason,      Message: err.Error(),    },    nominatedNode, // 同时写入推荐节点  ); err != nil {    klog.ErrorS(err, "Error updating pod""pod", klog.KObj(pod))  }}

其中EventRecorder就是一个初始化之后的eventRecorder

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\events\interfaces.go// EventRecorder returns an event recorder.func (f *frameworkImpl) EventRecorder() events.EventRecorder {return f.eventRecorder}

1.4、k8sinformer机制

在开始研究 kube-scheduler内对pod的调度机制之前,需要研究informer的原理作为铺垫。

说到informer就必须要祭出这张图

1.4.1、Informer 机制核心作用

  1. 保证消息实时性、可靠性、顺序性

    不依赖中间件,通过 ListWatch长连接监听资源变更。

  2. 大幅降低 APIServer / Etcd 压力

    使用本地缓存 Indexer,避免组件频繁直接请求集群。

  3. 统一资源监听能力

    让 K8s 组件、自定义控制器、Operator 快速获取资源实时状态。


1.4.2、Informer 核心组件

组件
作用
Reflector
与 APIServer 建立长连接,通过 ListWatch 监听资源变化
DeltaFIFO
内部事件队列,存储资源变更事件(Added/Updated/Deleted
Informer
调度核心:消费事件、更新本地缓存、分发事件
Indexer本地缓存
,存储资源全量对象,支持索引查询,不请求 APIServer

1.4.3、Informer 完整工作流程

  1. Reflector 监听 APIServer 资源变化
  2. 变更事件写入 DeltaFIFO
  3. Informer 消费队列,更新 Indexer 本地缓存
  4. Informer 抛出事件回调
  5. 标准生产流程:事件 → Key → WorkQueue → 从 Indexer 取最新对象 → 执行业务

1.4.4、Indexer 本地缓存

Indexer 是Informer 内置的线程安全本地存储,作用:

  • 存储资源最新完整对象
  • 支持按 Key / 标签索引查询
  • 纯本地读取,零 APIServer 压力
  • 与集群状态保持最终一致

Indexer 常用操作

// 按 key 查询对象obj, exists, err := indexer.GetByKey(key)// 列出所有缓存对象list := indexer.List()// 按索引查询objs, err := indexer.ByIndex("indexKey""indexValue")

1.4.5、生产级标准流程:事件 → Key → 队列 → 取最新对象 → 处理

这是 K8s 官方推荐的控制器标准模型,解决:

  • 事件阻塞
  • 重复处理
  • 高并发削峰
  • 失败自动重试
  • 永远使用最新状态

流程说明

  1. 事件:资源新增 / 更新 / 删除触发回调
  2. Key:将对象转为唯一标识(namespace/name 或自定义)
  3. 队列:Key 入队,解耦事件与业务
  4. 取最新对象:从 Indexer 本地缓存获取最新状态
  5. 处理:执行业务调谐逻辑(Reconcile

1.4.6、标准流程Demo

package mainimport ("context""flag""fmt""log""path/filepath""time"// K8s 原生资源定义  v1 "k8s.io/api/core/v1"  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"// 工具包:循环等待"k8s.io/apimachinery/pkg/util/wait"// client-go 核心:Informer、缓存、事件处理"k8s.io/client-go/informers""k8s.io/client-go/kubernetes""k8s.io/client-go/tools/cache""k8s.io/client-go/tools/clientcmd"// 读取用户家目录"k8s.io/client-go/util/homedir"// client-go 队列实现(限速队列、重试队列)"k8s.io/client-go/util/workqueue")// ===================== 控制器结构体定义 =====================// Controller 自定义控制器,整合四大核心组件:// 1. clientset: 与 K8s APIServer 交互的客户端// 2. informer: 资源监听器,内部包含 Indexer 本地缓存// 3. queue: 工作队列,实现事件解耦、去重、重试、削峰type Controller struct {  clientset *kubernetes.Clientset       // K8s 客户端  informer  cache.SharedIndexInformer   // 共享Informer,内置Indexer缓存  queue     workqueue.RateLimitingInterface // 限速工作队列}// ===================== 创建控制器 =====================// NewController 初始化控制器、队列、Informer、事件处理器func NewController(clientset *kubernetes.Clientset) *Controller {// 1. 创建【限速工作队列】// DefaultControllerRateLimiter:K8s标准限速器,失败后递增重试间隔,避免风暴  queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// 2. 创建 SharedInformerFactory 工厂// 第二个参数 resync 周期:1分钟全量同步一次集群资源,防止事件丢失、数据不一致  factory := informers.NewSharedInformerFactory(clientset, 1*time.Minute)// 3. 获取 Pod 资源对应的 Informer// Core().V1().Pods() 代表监听 v1 版本的 Pod 资源  podInformer := factory.Core().V1().Pods().Informer()// 4. 注册【资源事件处理器】ResourceEventHandlers// 核心设计:事件回调里**只做一件事**:对象转Key → 放入队列,不执行业务逻辑  podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{// AddFunc:监听到资源新建时触发    AddFunc: func(obj interface{}) {// MetaNamespaceKeyFunc:固定格式生成资源唯一Key:namespace/name      key, err := cache.MetaNamespaceKeyFunc(obj)if err == nil {        log.Println("新增事件,入队 KEY:", key)        queue.Add(key) // Key 加入队列      }    },// UpdateFunc:监听到资源更新时触发(状态、标签、配置变更都会触发)    UpdateFunc: func(oldObj, newObj interface{}) {      key, err := cache.MetaNamespaceKeyFunc(newObj)if err == nil {        log.Println("更新事件,入队 KEY:", key)        queue.Add(key) // Key 加入队列      }    },// DeleteFunc:监听到资源删除时触发    DeleteFunc: func(obj interface{}) {// DeletionHandlingMetaNamespaceKeyFunc:兼容已删除的对象,安全生成Key      key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)if err == nil {        log.Println("删除事件,入队 KEY:", key)        queue.Add(key) // Key 加入队列      }    },  })// 组装并返回控制器实例return &Controller{    clientset: clientset,    informer:  podInformer,    queue:     queue,  }}// ===================== 启动控制器 =====================// Run 启动Informer、同步缓存、启动队列消费协程func (c *Controller) Run(ctx context.Context) {// 程序退出时关闭队列,释放资源defer c.queue.ShutDown()// 1. 启动 Informer// Informer 内部会启动 Reflector,建立 ListWatch 长连接监听 APIServergo c.informer.Run(ctx.Done())// 2. 等待本地缓存(Indexer)全量同步完成// 作用:确保启动时集群现有资源全部加载到本地缓存,再开始处理业务if !cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced) {    log.Fatal("缓存同步失败,程序退出")  }  log.Println("缓存同步完成,Indexer 本地缓存已就绪")// 3. 启动 Worker 循环:持续消费队列// wait.Until:每隔1秒执行一次 runWorker,ctx 取消则停止go wait.Until(c.runWorker, time.Second, ctx.Done())// 阻塞主线程,等待退出信号  <-ctx.Done()  log.Println("收到退出信号,控制器停止运行")}// ===================== 队列消费主循环 =====================// runWorker 死循环:不断从队列取出任务并处理func (c *Controller) runWorker() {// processNextWorkItem 返回 true 则继续循环,false 退出for c.processNextWorkItem() {  }}// ===================== 处理单个队列任务 =====================// processNextWorkItem:从队列取Key → 调用业务逻辑 → 处理重试/成功标记// 对应流程:ProcessItem 从队列取 Keyfunc (c *Controller) processNextWorkItem() bool {// 从队列获取 Key// quit=true 代表队列已关闭,终止循环  key, quit := c.queue.Get()if quit {return false  }// defer 标记任务完成:无论成功失败,都标记当前Key处理完毕defer c.queue.Done(key)// 调用核心调谐逻辑:根据Key执行业务  err := c.syncHandler(key.(string))if err == nil {// 处理成功:清除该Key的重试记录    c.queue.Forget(key)return true  }// 处理失败:使用限速策略重新入队,自动重试  c.queue.AddRateLimited(key)  log.Printf("处理失败,KEY: %s,错误: %v", key, err)return true}// ===================== 核心业务逻辑(调谐逻辑) =====================// syncHandler:根据Key从 Indexer 读取最新对象 + 执行业务// 对应流程:根据Key从Indexer取最新对象 → HandleObject 业务处理func (c *Controller) syncHandler(key stringerror {// --------------------------// Indexer 本地缓存使用(重点)// GetIndexer() 获取 Informer 内置的本地索引缓存// GetByKey(key):根据 namespace/name 读取对象,**纯本地查询,不请求APIServer**// --------------------------  obj, exists, err := c.informer.GetIndexer().GetByKey(key)if err != nil {return err  }// exists=false:缓存中找不到该对象,说明资源已被删除,直接返回if !exists {    log.Printf("缓存中已无该对象:%s", key)return nil  }// 类型断言:转为 Pod 对象  pod, ok := obj.(*v1.Pod)if !ok {return fmt.Errorf("对象类型错误,非Pod资源")  }// ===================== 自定义业务逻辑区域 =====================  log.Printf("========================================")  log.Printf("【开始处理Pod】命名空间: %s 名称: %s", pod.Namespace, pod.Name)  log.Printf("Pod当前状态: %s", pod.Status.Phase)  log.Printf("Pod创建时间: %s", pod.CreationTimestamp.Format(time.RFC3339))  log.Printf("========================================")// 演示 Indexer 批量查询:列出本地缓存中所有 Pod(依旧本地读取)  allPods := c.informer.GetIndexer().List()  log.Printf("Indexer本地缓存中Pod总数:%d\n"len(allPods))return nil}// ===================== 程序入口 =====================func main() {// 1. 解析命令行参数,读取 kubeconfig 配置文件var kubeconfig *string// 自动适配 Linux/Windows,读取 ~/.kube/configif home := homedir.HomeDir(); home != "" {    kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube""config"), "K8s kubeconfig 文件路径")  }  flag.Parse()// 2. 加载 kubeconfig,生成客户端配置  config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)if err != nil {    log.Fatalf("加载kubeconfig失败: %v", err)  }// 3. 创建 K8s 客户端 clientset  clientset, err := kubernetes.NewForConfig(config)if err != nil {    log.Fatalf("创建clientset客户端失败: %v", err)  }// 4. 初始化控制器  controller := NewController(clientset)// 5. 上下文:用于优雅退出(接收终止信号)  ctx, cancel := context.WithCancel(context.Background())defer cancel()  log.Println("自定义控制器启动成功,开始监听Pod资源变化...")// 6. 启动控制器  controller.Run(ctx)}

1.4.7、ByIndexDemo

从本地Indexer支持创建索引和按索引查询,有如下demo

package mainimport ("context""flag""fmt""log""path/filepath""time"  v1 "k8s.io/api/core/v1""k8s.io/client-go/informers""k8s.io/client-go/kubernetes""k8s.io/client-go/tools/cache""k8s.io/client-go/tools/clientcmd""k8s.io/client-go/util/homedir")// ==========================// 索引函数:按标签 app 建立索引// ==========================func podAppIndexFunc(obj interface{}) ([]stringerror) {  pod, ok := obj.(*v1.Pod)if !ok {return nil, fmt.Errorf("not pod")  }return []string{pod.Labels["app"]}, nil}func main() {// 加载 kubeconfigvar kubeconfig *stringif home := homedir.HomeDir(); home != "" {    kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube""config"), "")  }  flag.Parse()// 创建k8s客户端  config, _ := clientcmd.BuildConfigFromFlags("", *kubeconfig)  clientset, _ := kubernetes.NewForConfig(config)// 创建 informer  factory := informers.NewSharedInformerFactory(clientset, time.Minute)  podInformer := factory.Core().V1().Pods().Informer()// ==========================// 1. 添加索引// ==========================  _ = podInformer.AddIndexers(cache.Indexers{"app": podAppIndexFunc,  })// 启动 informer 同步缓存  ctx := context.Background()go podInformer.Run(ctx.Done())  cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced)  log.Println("✅ 缓存同步完成,每3秒执行一次 ByIndex 查询")// ==========================// 2. 【核心】只演示 ByIndex// ==========================for {// 从本地 Indexer 按索引查询 app=nginx    pods, _ := podInformer.GetIndexer().ByIndex("app""nginx")    log.Printf("===== ByIndex(app=nginx) 找到 %d 个 Pod ====="len(pods))for _, obj := range pods {      pod := obj.(*v1.Pod)      log.Printf("→ %s/%s\n", pod.Namespace, pod.Name)    }    time.Sleep(3 * time.Second)  }}

1.5、在kube-scheduler中的informer源码解读

InformerFactory的初始化如下

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\options\options.go// Config return a scheduler config objectfunc (o *Options) Config() (*schedulerappconfig.Config, error) {  c := &schedulerappconfig.Config{}  c.InformerFactory = scheduler.NewInformerFactory(client, 0)return c, nil}

追踪scheduler.NewInformerFactory(client, 0),其中传入的resync=0,代表不进行周期性的list,而是通过第一次的全量list,加上后续的增量更新。

// C:\git\kubernetes-1.22.3\pkg\scheduler\scheduler.go// NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific// in-place podInformer.func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory {  informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod)  informerFactory.InformerFor(&v1.Pod{}, newPodInformer)return informerFactory}

追踪informers.NewSharedInformerFactory,可以看到informers: make(map[reflect.Type]cache.SharedIndexInformer),这个就是各个类型到对应的Informer的映射

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\informers\factory.go// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {  factory := &sharedInformerFactory{    client:           client,    namespace:        v1.NamespaceAll,    defaultResync:    defaultResync,/* 可以想想成为          informers = {                类型(v1.Pod):        Pod 专用的 SharedInformer(唯一),                类型(v1.Node):       Node 专用的 SharedInformer(唯一),                类型(appsv1.Deployment): Deployment 专用的 SharedInformer(唯一),            }    */    informers:        make(map[reflect.Type]cache.SharedIndexInformer),    startedInformers: make(map[reflect.Type]bool),    customResync:     make(map[reflect.Type]time.Duration),  }// Apply all optionsfor _, opt := range options {    factory = opt(factory)  }return factory}

1.5.1、pod-informer初始化

// C:\git\kubernetes-1.22.3\pkg\scheduler\scheduler.go// NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific// in-place podInformer.func NewInformerFactory(  cs clientset.Interface,   resyncPeriod time.Duration,) informers.SharedInformerFactory {// 1. 创建系统默认的共享 Informer 工厂  informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod)// 2. 重点:替换/注册 Pod 的 Informer,使用我们自定义的 newPodInformer  informerFactory.InformerFor(&v1.Pod{}, newPodInformer)return informerFactory}func newPodInformer(  cs clientset.Interface,   resyncPeriod time.Duration,) cache.SharedIndexInformer {/*     只监听不是成功、不是失败的Pod,也就是只监听:        Pending,Running,Unknown        已经跑完的、退出的 Pod 不监听、不缓存、不占内存!  */  selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v"    v1.PodSucceeded,     v1.PodFailed,  )// 把过滤条件设置到 FieldSelector  tweakListOptions := func(options *metav1.ListOptions) {    options.FieldSelector = selector  }// 创建【带过滤条件的】Pod Informerreturn coreinformers.NewFilteredPodInformer(    cs,    metav1.NamespaceAll,    resyncPeriod,nil,    tweakListOptions, // 过滤器  )}

追踪informerFactory.InformerFor(&v1.Pod{}, newPodInformer)

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\informers\factory.go// InformerFor 方法的作用:// 为指定的 K8s 资源(Pod/Deployment/Node 等)创建【唯一的一个】SharedIndexInformer// 如果已经创建过,就直接返回旧的,不再创建新的 → 实现共享、节约内存func (f *sharedInformerFactory) InformerFor(    obj runtime.Object,            // 要监听的资源,例如 &v1.Pod{}    newFunc internalinterfaces.NewInformerFunc, // 创建 informer 的函数) cache.SharedIndexInformer {// 加锁:保证多协程并发调用时,不会同时创建同一个资源的 informer    f.lock.Lock()defer f.lock.Unlock()// 拿到资源的反射类型,比如 Pod 类型、Deployment 类型// 这个类型会作为 map 的 key    informerType := reflect.TypeOf(obj)// 从 map 中查看:这个类型的 informer 已经创建过了吗?    informer, exists := f.informers[informerType]// 如果已经创建过 → 直接返回,不再创建!// 这就是共享的核心:一个资源只创建一个 informerif exists {return informer    }// 获取 resync 同步周期// 如果有自定义周期用自定义,没有就用默认的 defaultResync    resyncPeriod, exists := f.customResync[informerType]if !exists {        resyncPeriod = f.defaultResync    }// 调用 newFunc 函数,创建一个新的 informer    informer = newFunc(f.client, resyncPeriod)// 把创建好的 informer 存进 map// 下次有人需要,直接从 map 取    f.informers[informerType] = informer// 返回创建好的 informerreturn informer}

继续追踪coreinformers.NewFilteredPodInformer

// NewFilteredPodInformer 作用:// 创建一个【带过滤功能】的 Pod 专用 SharedIndexInformer// 官方建议:一定要用共享的 informer,不要自己独立创建// 好处:节约内存、减少对 APIServer 的连接数func NewFilteredPodInformer(  client kubernetes.Interface,          // k8s 客户端  namespace string,                     // 监听的命名空间(all 表示所有)  resyncPeriod time.Duration,            // 全量同步周期  indexers cache.Indexers,              // 索引配置(比如按标签索引)  tweakListOptions internalinterfaces.TweakListOptionsFunc, // 过滤条件) cache.SharedIndexInformer {// 创建并返回一个 共享的 SharedIndexInformerreturn cache.NewSharedIndexInformer(// --------------------------// 第1部分:配置 ListWatch// 这是 informer 监听 APIServer 的核心// --------------------------    &cache.ListWatch{// ListFunc:全量拉取数据时调用// 程序启动时会调用一次      ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {// 如果有过滤条件(比如只看 Running 的 Pod),先设置过滤if tweakListOptions != nil {          tweakListOptions(&options)        }// 调用 APIServer 获取 Pod 列表return client.CoreV1().Pods(namespace).List(context.TODO(), options)      },// WatchFunc:监听变化(创建/更新/删除)// 建立长连接,实时接收事件      WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {// 同样可以设置过滤条件if tweakListOptions != nil {          tweakListOptions(&options)        }// 调用 APIServer Watch 接口return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)      },    },// 第2部分:告诉 informer 要监听的资源类型是 Pod    &corev1.Pod{},// 第3部分:同步周期 + 索引配置    resyncPeriod,    indexers,  )}

尽管coreinformers.NewFilteredPodInformer中入参的indexers cache.Indexersnil

coreinformers.NewFilteredPodInformer(    cs,    metav1.NamespaceAll,    resyncPeriod,    nil,    tweakListOptions, // 过滤器  )

但是还是要研究一下Indexers

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\cache\store.go// NewIndexer returns an Indexer implemented simply with a map and a lock.func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {return &cache{    cacheStorage: NewThreadSafeStore(indexers, Indices{}),           keyFunc:      keyFunc,  }}

一、keyFunc 的作用

作用:给每个对象生成一个唯一的key

用途:把对象存进 items map 时,当 map 的 key

例子(默认)

keyFunc(obj) → "default/nginx-pod"

存到 items

items["default/nginx-pod"] = Pod对象

特点

  • 唯一
  • 一个对象只有 一个 key
  • 用来 定位单个对象

indexers入参的作用如下

二、indexers 的作用

作用:给对象生成【分类标签】,建立索引

用途:用来快速查询一批对象

例子

indexers["app"](obj) → "nginx"

存到 indices 索引表里

indices["app"]["nginx"] = { "default/nginx-pod""default/nginx-1" }

特点

  • 可以有 多个索引
  • 一个对象可以属于 多个分类
  • 用来 批量查询

追踪threadSafeMap

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\cache\thread_safe_store.go// threadSafeMap implements ThreadSafeStoretype threadSafeMap struct {  lock  sync.RWMutex  items map[string]interface{}// indexers maps a name to an IndexFunc  indexers Indexers// indices maps a name to an Index  indices Indices}// Index maps the indexed value to a set of keys in the store that match on that valuetype Index map[string]sets.String// Indexers maps a name to a IndexFunctype Indexers map[string]IndexFunc// Indices maps a name to an Indextype Indices map[string]Indextype String map[string]Empty

要理解上面各个字段的用途,可以查看下面的例子,我们现在一共有 4 个 Pod

"default/nginx-pod"    → &Pod{Name:"nginx-pod", Labels:{"app":"nginx"}}"default/nginx-1"      → &Pod{Name:"nginx-1", Labels:{"app":"nginx"}}"default/nginx-2"      → &Pod{Name:"nginx-2", Labels:{"app":"nginx"}}"default/mysql-1"      → &Pod{Name:"mysql-1", Labels:{"app":"mysql"}}

先看 items中存放的数据

items = {"default/nginx-pod":    Pod{Name: "nginx-pod-1", app: nginx},"default/nginx-1":      Pod{Name: "nginx-1", app: nginx},"default/nginx-2":      Pod{Name: "nginx-2", app: nginx},"default/mysql-1":      Pod{Name: "mysql-1", app: mysql},}

再看 indexers(索引规则)

indexers := Indexers{"app": podAppIndexFunc,  }func podAppIndexFunc(obj interface{}) ([]stringerror) {    pod := obj.(*v1.Pod)return []string{pod.Labels["app"]}, nil}

最后看indices 中存放的数据

indices = {"app": {                  // 索引名称"nginx": {            // 索引值"default/nginx-pod": {},  // 指向 items 里的 key"default/nginx-1":   {},"default/nginx-2":   {},        },"mysql": {            // 索引值"default/mysql-1": {},        },    },}

NewIndexer时传入的keyFunc,使用的是MetaNamespaceKeyFunc,也就是对象的namespace/name

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\cache\store.go// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make// keys for API objects which implement meta.Interface.// The key uses the format <namespace>/<name> unless <namespace> is empty, then// it's just <name>.//// TODO: replace key-as-string with a key-as-struct so that this// packing/unpacking won't be necessary.func MetaNamespaceKeyFunc(obj interface{}) (stringerror) {if key, ok := obj.(ExplicitKey); ok {return string(key), nil  }  meta, err := meta.Accessor(obj)if err != nil {return "", fmt.Errorf("object has no meta: %v", err)  }if len(meta.GetNamespace()) > 0 {return meta.GetNamespace() + "/" + meta.GetName(), nil  }return meta.GetName(), nil}

1.5.2、 kube-scheduler给各种 Informer 注册对应的事件处理函数

addAllEventHandlers用于调度器中给各种 Informer 注册对应的事件处理函数,用来增/删/改 同步到调度器缓存。

// C:\git\kubernetes-1.22.3\pkg\scheduler\scheduler.go// New returns a Schedulerfunc New(client clientset.Interface,  informerFactory informers.SharedInformerFactory,  recorderFactory profile.RecorderFactory,  stopCh <-chan struct{},  opts ...Option) (*Scheduler, error) {  addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))return sched, nil}

追踪addAllEventHandlers其中的AddFuncUpdateFuncDeleteFunc

func addAllEventHandlers(  sched *Scheduler, // 调度器实例  informerFactory informers.SharedInformerFactory, // 内置资源 informer 工厂  dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, // 动态资源 informer 工厂  gvkMap map[framework.GVK]framework.ActionType, // 需要监听的资源类型(GVK)与事件类型) {// ====================== 1. 监听【已调度成功的 Pod】 ======================// 作用:维护调度器内部缓存,记录哪些 Pod 已经调度到节点上  informerFactory.Core().V1().Pods().Informer().AddEventHandler(    cache.FilteringResourceEventHandler{// 过滤器:只处理【已经分配了节点的 Pod】      FilterFunc: func(obj interface{})bool {switch t := obj.(type) {case *v1.Pod:/*             判断 Pod 是否已经分配节点(nodeName 不为空)    // assignedPod selects pods that are assigned (scheduled and running).    func assignedPod(pod *v1.Pod) bool {        return len(pod.Spec.NodeName) != 0    }                    */return assignedPod(t)case cache.DeletedFinalStateUnknown:// 处理删除事件的兜底对象if pod, ok := t.Obj.(*v1.Pod); ok {return assignedPod(pod)          }          utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))return falsedefault:          utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))return false        }      },// 事件处理:增/删/改 同步到调度器缓存      Handler: cache.ResourceEventHandlerFuncs{        AddFunc:    sched.addPodToCache,    // 已调度Pod新增 → 加入缓存        UpdateFunc: sched.updatePodInCache, // 已调度Pod更新 → 更新缓存        DeleteFunc: sched.deletePodFromCache, // 已调度Pod删除 → 从缓存删除      },    },  )// ====================== 2. 监听【未调度的 Pod】 ======================// 作用:把新创建、待调度的 Pod 加入调度队列,让调度器开始工作// unscheduled pod queue  informerFactory.Core().V1().Pods().Informer().AddEventHandler(    cache.FilteringResourceEventHandler{      FilterFunc: func(obj interface{})bool {switch t := obj.(type) {case *v1.Pod:return !assignedPod(t) && responsibleForPod(t, sched.Profiles)case cache.DeletedFinalStateUnknown:if pod, ok := t.Obj.(*v1.Pod); ok {return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)          }          utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))return falsedefault:          utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))return false        }      },      Handler: cache.ResourceEventHandlerFuncs{        AddFunc:    sched.addPodToSchedulingQueue,        UpdateFunc: sched.updatePodInSchedulingQueue,        DeleteFunc: sched.deletePodFromSchedulingQueue,      },    },  )// ====================== 3. 监听 Node 节点变化 ======================// 作用:维护调度器的节点缓存,节点增/删/改都会同步到缓存// ====================== 4. 构造通用事件处理器 ======================// 作用:根据传入的事件类型(Add/Update/Delete),生成对应的事件处理函数// 当集群资源变化时,将等待的 Pod 重新加入调度队列// ====================== 5. 遍历所有需要监听的资源,注册事件 // 监听 PV/PVC/Service/CSI/StorageClass 等影响调度的资源  }

追踪AddEventHandler

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\cache\shared_informer.gofunc (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {  s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)}

追踪AddEventHandlerWithResyncPeriod

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\cache\shared_informer.gofunc (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {    s.startedLock.Lock()defer s.startedLock.Unlock()if s.stopped {       klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)return    }if resyncPeriod > 0 {if resyncPeriod < minimumResyncPeriod {          klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)          resyncPeriod = minimumResyncPeriod       }if resyncPeriod < s.resyncCheckPeriod {if s.started {             klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)             resyncPeriod = s.resyncCheckPeriod          } else {// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners// accordingly             s.resyncCheckPeriod = resyncPeriod             s.processor.resyncCheckPeriodChanged(resyncPeriod)          }       }    }    listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)if !s.started {       s.processor.addListener(listener)return    }// in order to safely join, we have to// 1. stop sending add/update/delete notifications// 2. do a list against the store// 3. send synthetic "Add" events to the new handler// 4. unblock    s.blockDeltas.Lock()defer s.blockDeltas.Unlock()    s.processor.addListener(listener)for _, item := range s.indexer.List() {       listener.add(addNotification{newObj: item})    }}

追踪上面的代码的如下部分

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\cache\shared_informer.golistener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)if !s.started {    s.processor.addListener(listener)return  }

追踪newProcessListener

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\cache\shared_informer.gofunc newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {  ret := &processorListener{    nextCh:                make(chan interface{}),    addCh:                 make(chan interface{}),    handler:               handler,    pendingNotifications:  *buffer.NewRingGrowing(bufferSize),    requestedResyncPeriod: requestedResyncPeriod,    resyncPeriod:          resyncPeriod,  }  ret.determineNextResync(now)return ret}

然后继续追踪processorListener.nextCh是如何被写入事件的,后面processorListener.run会消费这个被写入的事件。

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\cache\shared_informer.go// pop 是 processorListener 的核心事件转发协程// 作用:从 addCh 接收事件,通过环形缓冲削峰,最终转发到 nextCh 交给 run() 处理// 核心设计:// 1. 局部变量 nextCh 是“写入开关”,不是 p.nextCh,但会动态指向 p.nextCh// 2. 第一个事件不会丢弃,会暂存在 notification 变量中,再打开开关发送// 3. 没有事件时,nextCh = nil,让 select 不阻塞、不浪费资源// 4. 事件处理顺序:addCh → 缓冲/暂存 → nextCh → run() → 你的 OnAdd/OnUpdate/OnDeletefunc (p *processorListener) pop() {defer utilruntime.HandleCrash()   // 崩溃保护,防止 panic 导致整个 informer 挂掉defer close(p.nextCh)             // pop 退出时关闭 nextCh,通知 run() 协程停止// 关键:这里的 nextCh 是【局部变量】,不是 p.nextCh// 作用是“动态写入开关”:// - 为 nil 时:case nextCh <- notification 永远不触发(关闭写入)// - 指向 p.nextCh 时:才允许向真正的通道发送事件var nextCh chan<- interface{}// 用来暂存“待发送”的事件,避免直接写入通道导致阻塞var notification interface{}// 无限循环,处理事件for {select {// ==============================================// 【分支1】向 nextCh 写入事件(只有 nextCh != nil 时才会触发)// 此时局部变量 nextCh 已经指向 p.nextCh// ==============================================case nextCh <- notification:// 事件已成功发送到 nextCh,run() 会收到并处理// 从环形缓冲区读取下一个待发送事件var ok bool      notification, ok = p.pendingNotifications.ReadOne()// 如果缓冲区没有更多事件了if !ok {        nextCh = nil // 关闭写入开关:局部 nextCh 变回 nil,不再触发此 case      }// ==============================================// 【分支2】从 addCh 接收新事件(来自 sharedProcessor 分发)// ==============================================case notificationToAdd, ok := <-p.addCh:// addCh 被关闭,直接退出协程if !ok {return      }// ==============================================// 你问过:第一个事件会不会丢?// 答案:绝对不会!这里先暂存,再打开开关// ==============================================if notification == nil {// 优化场景:当前没有待发送事件,缓冲区也为空// 1. 把新事件直接暂存到局部变量 notification        notification = notificationToAdd// 2. 打开开关:让局部 nextCh 指向真正的 p.nextCh// 下一轮循环就会进入上面的 case,把事件发出去        nextCh = p.nextCh      } else {// 当前已有事件等待发送,新事件写入环形缓冲区,防止阻塞        p.pendingNotifications.WriteOne(notificationToAdd)      }    }  }}

上面这段代码,这是 Go 并发编程的神级写法,也是 k8s 高性能的关键!理解它有点迷惑,所以有如下简单的demo

package mainimport ("fmt""time")// 演示:动态把 channel 设为 nil(开关效果)func main() {// 创建一个真实的 channel  ch := make(chan int)// 局部变量:动态开关var dynamicCh chan<- int // 初始 = nilvar currentValue int// 模拟生产者:每隔1秒发一个数据go func() {for i := 1; i <= 3; i++ {      time.Sleep(1 * time.Second)      fmt.Printf("\n 生产者发送数据: %d\n", i)      ch <- i    }// 发送完关闭 channelclose(ch)    fmt.Println("\n 生产者关闭 channel")  }()// 模拟 processorListener.pop() 的核心逻辑  fmt.Println("=== 开始监听 ===")for {select {// ======================================// 【发送端】只有 dynamicCh != nil 时才生效// ======================================case dynamicCh <- currentValue:      fmt.Println(" 数据发送成功到消费者")// 发送完 → 没有数据了 → 关闭开关(设为 nil)      dynamicCh = nil      currentValue = 0      fmt.Println(" 发送完成,关闭开关 (dynamicCh = nil)")// ======================================// 【接收端】接收生产者数据// ======================================case val, ok := <-ch:if !ok {        fmt.Println(" 消费者退出")return      }      fmt.Printf(" 消费者收到数据: %d\n", val)// 拿到数据 → 打开开关!让发送端可以工作      currentValue = val      dynamicCh = ch // 指向真实 channel,打开开关      fmt.Println(" 拿到数据,打开开关 (dynamicCh = ch)")    }  }}

这个https://cloud.tencent.com.cn/developer/article/2623224页面上有关于go语言的这个feature的详细解是。

每次添加一个handler,那么创建一个listener,然后把listener添加到processor。后续启动 listener.run()处理事件(调用用户注册的 AddFunc/UpdateFunc/DeleteFunc

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\cache\shared_informer.gofunc (p *sharedProcessor) addListener(listener *processorListener) {  p.listenersLock.Lock()defer p.listenersLock.Unlock()  p.addListenerLocked(listener)if p.listenersStarted {    p.wg.Start(listener.run)    p.wg.Start(listener.pop)  }}

1.5.3、启动informers

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.go// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler)error {// Start all informers.  cc.InformerFactory.Start(ctx.Done())// Wait for all caches to sync before scheduling.  cc.InformerFactory.WaitForCacheSync(ctx.Done())  sched.Run(ctx)return fmt.Errorf("finished without leader elect")}

追踪InformerFactory.Start

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\informers\factory.go// Start 启动所有已经创建好的 informer// 作用:遍历所有已注册的 informer,启动它们的 Run 循环(开始监听 APIServer)func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {  f.lock.Lock()defer f.lock.Unlock()// 遍历工厂中所有已经创建好的 informer// f.informers map[reflect.Type]cache.SharedIndexInformerfor informerType, informer := range f.informers {// 判断这个类型的 informer 是否已经启动过// 没有启动才执行启动if !f.startedInformers[informerType] {// 【关键】启动 informer 的事件循环(后台协程)// informer.Run() 会启动 List + Watch,开始同步数据go informer.Run(stopCh)// 标记为已启动,防止重复启动      f.startedInformers[informerType] = true    }  }}

继续追踪go informer.Run(stopCh)

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\tools\cache\shared_informer.go// Run 方法:启动 sharedIndexInformer,开始监听、同步、处理数据// 这是 Informer 的主循环,一旦启动就会一直运行func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash() // 崩溃处理,防止程序直接挂掉// ====================== 1. 创建 DeltaFIFO 队列 ======================// DeltaFIFO:存储从 APIServer 监听到的事件(Add/Update/Delete)// KnownObjects:s.indexer 就是本地缓存,用于队列同步、去重  fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{    KnownObjects:          s.indexer,    EmitDeltaTypeReplaced: true,  })// ====================== 2. 配置 Controller ======================// Controller 是真正执行 List + Watch 的核心组件  cfg := &Config{    Queue:            fifo,         // 事件队列    ListerWatcher:    s.listerWatcher, // 用来 List/Watch APIServer    ObjectType:       s.objectType,     // 监听的资源类型(Pod/Node等)    FullResyncPeriod: s.resyncCheckPeriod, // 全量同步周期    RetryOnError:     false,    ShouldResync:     s.processor.shouldResync, // 是否需要重新同步    Process:           s.HandleDeltas, // 从队列取出事件后的处理函数    WatchErrorHandler: s.watchErrorHandler, // 监听出错处理  }// ====================== 3. 初始化并创建 Controller ======================func() {    s.startedLock.Lock()defer s.startedLock.Unlock()    s.controller = New(cfg) // 创建 controller    s.controller.(*controller).clock = s.clock    s.started = true // 标记 informer 已启动  }()// ====================== 4. 启动事件处理器 ======================// processorStopCh:专门用来关闭事件处理器  processorStopCh := make(chan struct{})var wg wait.Groupdefer wg.Wait()              // 等待处理器完全退出defer close(processorStopCh) // 退出时关闭处理器// 启动缓存检测器 + 事件分发处理器  wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)  wg.StartWithChannel(processorStopCh, s.processor.run)// 退出时标记已停止defer func() {    s.startedLock.Lock()defer s.startedLock.Unlock()    s.stopped = true  }()// ====================== 5. 【真正启动】Controller 主循环 ======================// 这里会一直阻塞:执行 List + Watch,不断从 APIServer 同步数据到队列  s.controller.Run(stopCh)}

其中sharedIndexInformer

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\tools\cache\shared_informer.gotype sharedIndexInformer struct {// 1. 本地缓存(存储Pod/Node等数据)  indexer    Indexer// 2. 类型:cache.Controller (来自 client-go/tools/cache)//    作用:负责 启动Reflector + 消费队列 + 同步数据  controller Controller// 3. 事件分发器(把变化事件发给scheduler等业务代码)  processor             *sharedProcessor// 4. 定义怎么去APIServer拉取数据(List/Watch函数)  listerWatcher ListerWatcher// 5. 其他都是配置:同步周期、锁、状态...  objectType                       runtime.Object  resyncCheckPeriod                time.Duration  defaultEventHandlerResyncPeriod  time.Duration  clock                            clock.Clock  started, stopped                 bool  startedLock                      sync.Mutex  watchErrorHandler                WatchErrorHandler}sharedIndexInformer   (总控:整个监听器)      │      ├── indexer      (本地缓存)      ├── processor    (事件分发)      ├── listerWatcher(怎么拉数据)      └── controller   【你问的这个】(同步控制器)            │            └── reflector (真正去APIServer拉数据)

追踪s.processor.run,查看事件分发处理。

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\tools\cache\shared_informer.go// run 方法:启动 sharedProcessor(事件分发器)// 作用:启动所有注册的 listener,开始分发事件// 一直运行到 stopCh 收到停止信号func (p *sharedProcessor) run(stopCh <-chan struct{}) {// ====================== 1. 启动所有监听器 ======================// 加读锁,防止遍历期间 listeners 被修改func() {    p.listenersLock.RLock()defer p.listenersLock.RUnlock()// 遍历所有注册的 listener(每个 AddEventHandler 都会创建一个 listener)for _, listener := range p.listeners {// 启动 listener.run():处理事件(调用用户注册的 AddFunc/UpdateFunc/DeleteFunc)      p.wg.Start(listener.run)// 启动 listener.pop():从队列里取事件      p.wg.Start(listener.pop)    }// 标记所有监听器已启动    p.listenersStarted = true  }()// ====================== 2. 等待停止信号 ======================// 阻塞在这里,直到外部关闭 stopCh  <-stopCh //<================程序会卡在这里一直运行listener.run和listener.pop// ====================== 3. 收到停止信号 → 优雅关闭 ======================  p.listenersLock.RLock()defer p.listenersLock.RUnlock()// 关闭每个 listener 的 addCh 通道 → 通知 pop 协程停止for _, listener := range p.listeners {close(listener.addCh)  }// 等待所有 run() 和 pop() 协程全部退出  p.wg.Wait()}

追踪listener.run,前面每次添加一个handler,那么创建一个listener,然后把listener添加到processor。后续启动 listener.run()处理事件(调用用户注册的 AddFunc/UpdateFunc/DeleteFunc

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\tools\cache\shared_informer.go func (p *processorListener) run() {// this call blocks until the channel is closed.  When a panic happens during the notification// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)// the next notification will be attempted.  This is usually better than the alternative of never// delivering again.  stopCh := make(chan struct{})  wait.Until(func() {for next := range p.nextCh {switch notification := next.(type) {case updateNotification:        p.handler.OnUpdate(notification.oldObj, notification.newObj)case addNotification:        p.handler.OnAdd(notification.newObj)case deleteNotification:        p.handler.OnDelete(notification.oldObj)default:        utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))      }    }// the only way to get here is if the p.nextCh is empty and closedclose(stopCh)  }, 1*time.Second, stopCh)}

processorListenerrun中执行eventHandler注册的回调方法

// C:\git\kubernetes-1.22.3\vendor\k8s.io\client-go\tools\cache\shared_informer.go// run 是 processorListener 的消费协程// 职责:从 nextCh 读取事件,分发给用户注册的 OnAdd/OnUpdate/OnDelete// 核心保障:// 1. 即使业务代码(OnAdd)panic,也不会导致整个 informer 挂掉// 2. panic 后会自动恢复,1秒后继续处理下一条事件// 3. 事件处理是串行的,保证顺序func (p *processorListener) run() {// 用于停止内部循环的通道  stopCh := make(chan struct{})// wait.Until 是 Kubernetes 工具函数:// 作用:无限循环执行传入的函数,除非 stopCh 被关闭// 如果函数 panic,会自动 recover,等待1秒后重试  wait.Until(func() {// 核心循环:不断从 nextCh 读取事件// 只有 nextCh 被 close 时,for 循环才会退出for next := range p.nextCh {// 根据事件类型,分发到不同的回调switch notification := next.(type) {case updateNotification:// 调用你写的 OnUpdate 函数        p.handler.OnUpdate(notification.oldObj, notification.newObj)case addNotification:// 调用你写的 OnAdd 函数(例如 addPodToCache)        p.handler.OnAdd(notification.newObj)case deleteNotification:// 调用你写的 OnDelete 函数        p.handler.OnDelete(notification.oldObj)default:// 未知事件,报错但不崩溃        utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))      }    }// 只有 p.nextCh 被关闭,才会走到这里// 关闭 stopCh → 让 wait.Until 停止循环close(stopCh)  }, 1*time.Second, stopCh) // 1秒:panic 后的重试间隔}

继续回到sharedIndexInformer.Run,追踪其中的controller.Run

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\tools\cache\shared_informer.go// Run 方法:启动 sharedIndexInformer,开始监听、同步、处理数据// 这是 Informer 的主循环,一旦启动就会一直运行func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {  // ====================== 5. 【真正启动】Controller 主循环 ======================// 这里会一直阻塞:执行 List + Watch,不断从 APIServer 同步数据到队列  s.controller.Run(stopCh)}

追踪controller.Run

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\tools\cache\controller.gofunc (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {    <-stopCh    c.config.Queue.Close()  }()  r := NewReflector(    c.config.ListerWatcher,    c.config.ObjectType,    c.config.Queue,    c.config.FullResyncPeriod,  )  r.ShouldResync = c.config.ShouldResync  r.WatchListPageSize = c.config.WatchListPageSize  r.clock = c.clockif c.config.WatchErrorHandler != nil {    r.watchErrorHandler = c.config.WatchErrorHandler  }  c.reflectorMutex.Lock()  c.reflector = r  c.reflectorMutex.Unlock()var wg wait.Group  wg.StartWithChannel(stopCh, r.Run)  wait.Until(c.processLoop, time.Second, stopCh)  wg.Wait()}

追踪r.Runr.Run 代表生产,往Queue里放数据

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\tools\cache\reflector.go// Run repeatedly uses the reflector's ListAndWatch to fetch all the// objects and subsequent deltas.// Run will exit when stopCh is closed.func (r *Reflector) Run(stopCh <-chan struct{}) {  klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)  wait.BackoffUntil(func() {if err := r.ListAndWatch(stopCh); err != nil {      r.watchErrorHandler(r, err)    }  }, r.backoffManager, true, stopCh)  klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)}

继续追踪Reflector.ListAndWatch

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\tools\cache\reflector.go// ListAndWatch first lists all items and get the resource version at the moment of call,// and then use the resource version to watch.// It returns error if ListAndWatch didn't even try to initialize watch.func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {    }}

继续追踪Reflector.watchHandler

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\tools\cache\reflector.go// watchHandler watches w and keeps *resourceVersion up to date.func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {  eventCount := 0// Stopping the watcher should be idempotent and if we return from this function there's no way// we're coming back in with the same watch interface.defer w.Stop()loop:for {select {case event, ok := <-w.ResultChan():switch event.Type {case watch.Added:// 添加到队列        err := r.store.Add(event.Object)if err != nil {          utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))        }// 更新队列中事件case watch.Modified:        err := r.store.Update(event.Object)if err != nil {          utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))        }// 删除队列中事件case watch.Deleted:// TODO: Will any consumers need access to the "last known// state", which is passed in event.Object? If so, may need// to change this.        err := r.store.Delete(event.Object)if err != nil {          utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))        }case watch.Bookmark:// A `Bookmark` means watch has synced here, just update the resourceVersiondefault:        utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))      }      *resourceVersion = newResourceVersion      r.setLastSyncResourceVersion(newResourceVersion)if rvu, ok := r.store.(ResourceVersionUpdater); ok {        rvu.UpdateResourceVersion(newResourceVersion)      }      eventCount++    }  }}

那么在哪里消费队列中的事件

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\tools\cache\controller.go// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.// It's an error to call Run more than once.// Run blocks; call via go.func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {    <-stopCh    c.config.Queue.Close()  }()  r := NewReflector(    c.config.ListerWatcher,    c.config.ObjectType,    c.config.Queue,    c.config.FullResyncPeriod,  )  r.ShouldResync = c.config.ShouldResync  r.WatchListPageSize = c.config.WatchListPageSize  r.clock = c.clockif c.config.WatchErrorHandler != nil {    r.watchErrorHandler = c.config.WatchErrorHandler  }  c.reflectorMutex.Lock()  c.reflector = r  c.reflectorMutex.Unlock()var wg wait.Group// 这里向队列写入事件  wg.StartWithChannel(stopCh, r.Run)// 这里消费写入队列的事件  wait.Until(c.processLoop, time.Second, stopCh)  wg.Wait()}

继续追踪controller.processLoop,追踪消费事件

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\tools\cache\controller.go// processLoop drains the work queue.// TODO: Consider doing the processing in parallel. This will require a little thought// to make sure that we don't end up processing the same object multiple times// concurrently.//// TODO: Plumb through the stopCh here (and down to the queue) so that this can// actually exit when the controller is stopped. Or just give up on this stuff// ever being stoppable. Converting this whole package to use Context would// also be helpful.func (c *controller) processLoop() {for {    obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return      }if c.config.RetryOnError {// This is the safe way to re-enqueue.        c.config.Queue.AddIfNotPresent(obj)      }    }  }}

追踪其中的c.config.Process,也就是sharedIndexInformer.HandleDeltas

// C:\git\kubernetes-1.22.3\staging\src\k8s.io\client-go\tools\cache\shared_informer.go// HandleDeltas:处理从 DeltaFIFO 弹出来的事件// 核心职责:// 1. 维护本地缓存 indexer(Add/Update/Delete)// 2. 将事件分发给所有注册的 handler(listener)// 3. 区分普通事件 和 全量同步(resync)事件func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {// 加锁:防止在添加新 handler 时,事件乱序  s.blockDeltas.Lock()defer s.blockDeltas.Unlock()// 遍历 Deltas(一批事件,按旧→新顺序)for _, d := range obj.(Deltas) {switch d.Type {// 处理:新增 / 更新 / 替换 / 同步 事件case Sync, Replaced, Added, Updated:// 缓存检测(调试用,忽略)      s.cacheMutationDetector.AddObject(d.Object)// 尝试从本地缓存获取旧对象if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {// 1. 【更新本地缓存】if err := s.indexer.Update(d.Object); err != nil {return err        }// 判断是否是【Resync 同步事件】        isSync := falseswitch {case d.Type == Sync:// 明确的 Sync 事件 → 标记为同步          isSync = truecase d.Type == Replaced:// 如果对象被替换但 ResourceVersion 没变 → 视为同步          isSync = meta.GetResourceVersion(old) == meta.GetResourceVersion(d.Object)        }// 2. 【分发 UPDATE 事件】给所有 handler        s.processor.distribute(          updateNotification{oldObj: old, newObj: d.Object},          isSync, // 只传给需要 resync 的 listener        )      } else {// 缓存中不存在 → 新增对象// 1. 【写入本地缓存】if err := s.indexer.Add(d.Object); err != nil {return err        }// 2. 【分发 ADD 事件】        s.processor.distribute(          addNotification{newObj: d.Object},false,        )      }// 处理:删除事件case Deleted:// 1. 【从本地缓存删除】if err := s.indexer.Delete(d.Object); err != nil {return err      }// 2. 【分发 DELETE 事件】      s.processor.distribute(        deleteNotification{oldObj: d.Object},false,      )    }  }return nil}

可以看到上面的代码功能就是,在indexer中判断对象是否存在,存在更新,不存在就新增同时调用distribute函数分发给listener。至此更新store也就是indexer,然后分发事件全部完成。

APIServer  ↓Reflector(List/Watch)  ↓DeltaFIFO(事件队列)  ↓HandleDeltas 【你现在看的这个函数】  1. 更新本地缓存 indexer  2. 构造 add/update/delete 通知  ↓sharedProcessor(事件总线)  ↓每个 handler 对应一个 processorListener  ↓pop() + run()  ↓你的业务代码:OnAdd/OnUpdate/OnDelete,也就是添加到`internalqueue.NewSchedulingQueue`中

1.6、kube-scheduler利用informer机制调度pod

// C:\git\kubernetes-1.22.3\pkg\scheduler\scheduler.go// Scheduler 是 Kubernetes 调度器的核心结构体// 作用:监听未调度的 Pod,为 Pod 选择合适节点,并将绑定结果写回 APIServertype Scheduler struct {// 调度器本地内存缓存:保存所有节点、Pod 的资源与状态信息// 调度器不直接访问 APIServer,而是读缓存,大幅提升调度速度// NodeLister(节点列表)和 Algorithm(调度算法)都会依赖这个缓存  SchedulerCache internalcache.Cache// 调度算法核心:实现 预选(Predicate) + 优选(Priority) 逻辑// 负责从所有节点中过滤、打分,最终选出最优节点  Algorithm ScheduleAlgorithm// 调度扩展器:支持外部 HTTP 调度服务扩展调度逻辑(外挂调度器)  Extenders []framework.Extender// 获取下一个待调度 Pod 的函数:会阻塞直到有新 Pod 可用// 不用 channel 是因为调度耗时可能很长,channel 会导致 Pod 信息过期  NextPod func() *framework.QueuedPodInfo// 调度出错时的回调函数:传入出错的 Pod 和错误信息// 用于打印日志、上报事件、处理重试  Error func(*framework.QueuedPodInfo, error)// 调度器关闭信号:关闭这个通道,调度器会优雅退出  StopEverything <-chan struct{}// 待调度 Pod 优先级队列:所有未调度的 Pod 在这里排队// 支持优先级、退避重试、抢占、nominated 机制  SchedulingQueue internalqueue.SchedulingQueue// 调度配置集合:一个调度器可以支持多套调度策略/插件配置// 不同 Pod 可以使用不同的调度 profile  Profiles profile.Map// K8s 客户端:与 kube-apiserver 通信// 最终调度完成后,通过它将 Pod 与 Node 的绑定关系写入 etcd  client clientset.Interface}

其中的Profiles profile.Map使用KubeSchedulerConfiguration完成,启动 kube-scheduler 时通过命令行参数 --config 指定文件路径:

kube-scheduler --config=/etc/kubernetes/scheduler-config.yaml

其中KubeSchedulerConfiguration配置类似如下

# scheduler-config.yamlapiVersion: kubescheduler.config.k8s.io/v1kind: KubeSchedulerConfigurationclientConnection:kubeconfig: /etc/kubernetes/scheduler.conf# 核心:定义多套调度Profileprofiles:# Profile1:默认调度器,所有不指定schedulerName的Pod走这套规则schedulerName: default-schedulerplugins:filter:enabled:name: NodeNamename: TaintTolerationname: NodeResourcesFitscore:enabled:name: NodeResourcesFitweight: 5# Profile2:GPU专用调度器,Pod指定schedulerName=gpu-scheduler才会使用schedulerName: gpu-schedulerplugins:filter:enabled:name: NodeNamename: GpuFilter # 自定义GPU过滤插件score:enabled:name: GpuPriorityweight: 10

参考:https://kubernetes.io/zh-cn/docs/reference/scheduling/config/

pod调度的时候,可以使用schedulerName去指定调度器

apiVersion: v1kind: Podmetadata:name: gpu-podspec:# 匹配配置文件中 schedulerName: gpu-scheduler 的ProfileschedulerName: gpu-schedulercontainers:name: gpu-containerimage: nvidia/cudaresources:limits:nvidia.com/gpu: 1

继续追踪SchedulingQueue的初始化

// C:\git\kubernetes-1.22.3\pkg\scheduler\scheduler.go// New returns a Schedulerfunc New(client clientset.Interface,  informerFactory informers.SharedInformerFactory,  recorderFactory profile.RecorderFactory,  stopCh <-chan struct{},  opts ...Option) (*Scheduler, error) {var sched *Schedulerif options.legacyPolicySource == nil {// Create the config from component config    sc, err := configurator.create()if err != nil {return nil, fmt.Errorf("couldn't create scheduler: %v", err)    }    sched = sc  } else {// Create the config from a user specified policy source.    policy := &schedulerapi.Policy{}switch {case options.legacyPolicySource.File != nil:if err := initPolicyFromFile(options.legacyPolicySource.File.Path, policy); err != nil {return nil, err      }case options.legacyPolicySource.ConfigMap != nil:if err := initPolicyFromConfigMap(client, options.legacyPolicySource.ConfigMap, policy); err != nil {return nil, err      }    }// Set extenders on the configurator now that we've decoded the policy// In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig,// which would have set extenders in the above instantiation of Configurator from CC options)    configurator.extenders = policy.Extenders    sc, err := configurator.createFromPolicy(*policy)if err != nil {return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)    }    sched = sc  }// Additional tweaks to the config produced by the configurator.  sched.StopEverything = stopEverything  sched.client = client// Build dynamic client and dynamic informer factoryvar dynInformerFactory dynamicinformer.DynamicSharedInformerFactory// options.kubeConfig can be nil in tests.if options.kubeConfig != nil {    dynClient := dynamic.NewForConfigOrDie(options.kubeConfig)    dynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil)  }  addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))return sched, nil}

追踪configurator.create()

// C:\git\kubernetes-1.22.3\pkg\scheduler\factory.go// create() 方法:根据注册的插件,创建并返回一个完整的 Scheduler 调度器实例// 这是调度器创建的核心方法,所有组件(队列、算法、profile、缓存)都在这里组装完成func (c *Configurator) create() (*Scheduler, error) {// 定义扩展器切片和被忽略的扩展资源列表var extenders []framework.Extendervar ignoredExtendedResources []string//  1. 初始化并处理 Extenders(外部 HTTP 调度扩展器) // 如果配置了 extenders,遍历创建 HTTP 扩展器//  2. 将 Extender 忽略的资源注入 NodeResourcesFit 插件 // 如果有扩展资源需要忽略,更新每个 Profile 里的 NodeResourcesFit 插件配置//  3. 创建 Pod 提名器(用于 Pod 抢占机制) // nominator 用于记录被抢占的 Pod,调度器会优先重试这些 Pod//  4. 创建 Profiles Map(核心:多套调度规则) // 根据配置文件中的 profiles 数组,创建 profile.Map// key = schedulerName, value = 完整调度框架 Framework//  5. 创建调度队列 SchedulingQueue // 所有 Profile 必须使用同一个队列排序函数(保证全局排序一致)  lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc()// 创建优先级调度队列  podQueue := internalqueue.NewSchedulingQueue(    lessFn,    c.informerFactory,    internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),    internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),    internalqueue.WithPodNominator(nominator),    internalqueue.WithClusterEventMap(c.clusterEventMap),  )//  6. 启动缓存调试器(用于观测缓存状态) //  7. 创建调度算法实例(GenericScheduler) // 实现预选 + 优选逻辑//  8. 组装所有组件,返回最终的 Scheduler 结构体 return &Scheduler{    SchedulerCache:  c.schedulerCache,    // 本地内存缓存    Algorithm:       algo,                // 调度算法(预选+优选)    Extenders:       extenders,           // 外部扩展器    Profiles:        profiles,            // 多套调度规则(核心!)    NextPod:         internalqueue.MakeNextPodFunc(podQueue), // 从队列取Pod的函数    Error:           MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache), // 错误处理    StopEverything:  c.StopEverything,    // 退出信号    SchedulingQueue: podQueue,            // 待调度Pod优先级队列  }, nil}

追踪internalqueue.NewSchedulingQueue这是一个带有优先级的队列

// C:\git\kubernetes-1.22.3\pkg\scheduler\internal\queue\scheduling_queue.go// NewSchedulingQueue initializes a priority queue as a new scheduling queue.func NewSchedulingQueue(  lessFn framework.LessFunc,  informerFactory informers.SharedInformerFactory,  opts ...Option) SchedulingQueue {return NewPriorityQueue(lessFn, informerFactory, opts...)}

为什么会有优先级?因为有些pod比较重要,需要优先调度,运行。

关于调度优先级,可以查看这个文档 https://kubernetes.io/zh-cn/docs/concepts/scheduling-eviction/pod-priority-preemption/

# high-priority.yaml 高优先级apiVersion: scheduling.k8s.io/v1kind: PriorityClassmetadata:name: high-priorityvalue: 2000000# 数字越大优先级越高globalDefault: false# 不作为全局默认description: "高优先级,用于核心服务"# low-priority.yaml 低优先级apiVersion: scheduling.k8s.io/v1kind: PriorityClassmetadata:name: low-priorityvalue: 1000000# 数字越小优先级越低globalDefault: falsedescription: "低优先级,用于测试业务"

创建和查看优先级

[root@master]# kubectl apply -f high-priority.yaml[root@master]# kubectl apply -f low-priority.yaml[root@master]# kubectl get   PriorityClassNAME                      VALUE        GLOBAL-DEFAULT   AGElow-priority              1000000      true             12dhigh-priority             2000000      true             12d

使用高优先级

apiVersion: v1kind: Podmetadata:name: nginx-highspec:priorityClassName: high-priority# 使用高优先级containers:name: nginximage: nginxresources:requests:cpu: 100mmemory: 128Mi

继续追踪internalqueue.MakeNextPodFunc(podQueue),也就是NextPod的初始化

// C:\git\kubernetes-1.22.3\pkg\scheduler\internal\queue\scheduling_queue.go// MakeNextPodFunc returns a function to retrieve the next pod from a given// scheduling queuefunc MakeNextPodFunc(queue SchedulingQueue)func() *framework.QueuedPodInfo {return func() *framework.QueuedPodInfo {    podInfo, err := queue.Pop()if err == nil {      klog.V(4).InfoS("About to try and schedule pod""pod", klog.KObj(podInfo.Pod))return podInfo    }    klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")return nil  }}

可以看到就是从 podQueue中 pop一个pod的信息。那么pod是如何添加到队列中的

// C:\git\kubernetes-1.22.3\pkg\scheduler\eventhandlers.gofunc addAllEventHandlers(  sched *Scheduler,  informerFactory informers.SharedInformerFactory,  dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,  gvkMap map[framework.GVK]framework.ActionType,) {// scheduled pod cache  informerFactory.Core().V1().Pods().Informer().AddEventHandler(    cache.FilteringResourceEventHandler{      FilterFunc: func(obj interface{})bool {      },      Handler: cache.ResourceEventHandlerFuncs{        AddFunc:    sched.addPodToCache,        UpdateFunc: sched.updatePodInCache,        DeleteFunc: sched.deletePodFromCache,      },    },  )

通过调用sched.addPodToCachesched.updatePodInCachesched.deletePodFromCache

至此 创建的pod入队出队我们都了解了。

1.6.1、pod的调度过程

接下来我们追踪一下pod的调度过程,从NewSchedulerCommand.Run入口

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.go// runCommand runs the scheduler.func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option)error {  verflag.PrintAndExitIfRequested()  cliflag.PrintFlags(cmd.Flags())  ctx, cancel := context.WithCancel(context.Background())defer cancel()go func() {    stopCh := server.SetupSignalHandler()    <-stopCh    cancel()  }()  cc, sched, err := Setup(ctx, opts, registryOptions...)if err != nil {return err  }return Run(ctx, cc, sched)}

追踪Run

// C:\git\kubernetes-1.22.3\cmd\kube-scheduler\app\server.go// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler)error {  sched.Run(ctx)return fmt.Errorf("finished without leader elect")}

继续追踪sched.Run(ctx)

// C:\git\kubernetes-1.22.3\pkg\scheduler\scheduler.go// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.func (sched *Scheduler) Run(ctx context.Context) {  sched.SchedulingQueue.Run()  wait.UntilWithContext(ctx, sched.scheduleOne, 0)  sched.SchedulingQueue.Close()}

继续追踪sched.scheduleOnesched.scheduleOne调度原理如下:

// C:\git\kubernetes-1.22.3\pkg\scheduler\scheduler.go// scheduleOne 为单个Pod执行完整的调度工作流。// 所有调度逻辑都是串行执行的,保证节点筛选的准确性。func (sched *Scheduler) scheduleOne(ctx context.Context) {// 1. 从调度队列中取出下一个要调度的Pod  podInfo := sched.NextPod()// 如果队列为空或Pod已无效,直接退出if podInfo == nil || podInfo.Pod == nil {return  }  pod := podInfo.Pod// 2. 根据Pod的 spec.schedulerName 找到对应的调度框架Profile  fwk, err := sched.frameworkForPod(pod)if err != nil {// 理论上不会走到这里,因为入队的Pod都会匹配到一个profile    klog.ErrorS(err, "Error occurred")return  }// 3. 判断是否需要跳过本次调度(比如Pod已被删除、已调度等)if sched.skipPodSchedule(fwk, pod) {return  }  klog.V(3).InfoS("Attempting to schedule pod""pod", klog.KObj(pod))// ------------------------------// 【调度周期 Scheduling Cycle 开始】// ------------------------------  start := time.Now()// 创建本次调度的上下文状态,存储插件中间数据  state := framework.NewCycleState()  state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)// 存储需要被激活的Pod(由插件填充)  podsToActivate := framework.NewPodsToActivate()  state.Write(framework.PodsToActivateKey, podsToActivate)// 创建调度周期的上下文  schedulingCycleCtx, cancel := context.WithCancel(ctx)defer cancel()// 4. 【核心】调用调度算法:预选(Predicate) + 优选(Priority),选出最优节点  scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)// ------------------------------// 调度失败处理:没有合适节点// ------------------------------if err != nil {    nominatedNode := ""// 如果是因为所有节点都不满足(FitError)if fitError, ok := err.(*framework.FitError); ok {// 执行 PostFilter 插件(主要做抢占 Preemption)if fwk.HasPostFilterPlugins() {        result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)if status.IsSuccess() && result != nil {          nominatedNode = result.NominatedNodeName        }      }// 记录指标:Pod无法调度      metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))    } else if err == ErrNoNodesAvailable {// 没有可用节点      metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))    } else {// 调度出错      klog.ErrorS(err, "Error selecting node for pod""pod", klog.KObj(pod))      metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))    }// 记录失败,将Pod放回队列等待重试    sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)return  }// 调度算法执行耗时指标  metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))// ------------------------------// 5. 【Assume 假定绑定】乐观预占节点资源// 作用:在真正绑定之前,先在缓存里标记Pod已调度到该节点,避免并发调度超配// ------------------------------  assumedPodInfo := podInfo.DeepCopy()  assumedPod := assumedPodInfo.Pod// 给Pod设置NodeName,并写入SchedulerCache  err = sched.assume(assumedPod, scheduleResult.SuggestedHost)if err != nil {    metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))    sched.recordSchedulingFailure(fwk, assumedPodInfo, err, SchedulerError, "")return  }// ------------------------------// 6. 运行 Reserve 插件:预留节点资源// ------------------------------if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {    metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))// 失败则回滚资源    fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)    sched.SchedulerCache.ForgetPod(assumedPod)    sched.recordSchedulingFailure(fwk, assumedPodInfo, sts.AsError(), SchedulerError, "")return  }// ------------------------------// 7. 运行 Permit 插件:允许/等待/拒绝 调度// ------------------------------  runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {// 插件拒绝本次调度,清理资源并重试    metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))    fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)    sched.SchedulerCache.ForgetPod(assumedPod)    sched.recordSchedulingFailure(fwk, assumedPodInfo, runPermitStatus.AsError(), SchedulerError, "")return  }// 如果有需要激活的Pod,激活它们(从backOff队列放回活跃队列)if len(podsToActivate.Map) != 0 {    sched.SchedulingQueue.Activate(podsToActivate.Map)    podsToActivate.Map = make(map[string]*v1.Pod)  }// ------------------------------// 【绑定周期 Binding Cycle 开始】// 异步执行,不阻塞调度主循环// ------------------------------go func() {    bindingCycleCtx, cancel := context.WithCancel(ctx)defer cancel()    metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc()defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec()// ------------------------------// 等待 Permit(允许) 插件放行// ------------------------------    waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod)if !waitOnPermitStatus.IsSuccess() {      metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))      fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)      sched.SchedulerCache.ForgetPod(assumedPod)      sched.recordSchedulingFailure(fwk, assumedPodInfo, waitOnPermitStatus.AsError(), SchedulerError, "")return    }// ------------------------------// 运行 PreBind 插件:绑定前准备// ------------------------------    preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)if !preBindStatus.IsSuccess() {      metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))      fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)      sched.SchedulerCache.ForgetPod(assumedPod)      sched.recordSchedulingFailure(fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")return    }// ------------------------------// 【真正绑定】调用APIServer将Pod绑定到Node// ------------------------------    err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)if err != nil {// 绑定失败:清理资源、重试      metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))      fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)      sched.SchedulerCache.ForgetPod(assumedPod)      sched.recordSchedulingFailure(fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, "")    } else {// ------------------------------// 绑定成功!调度完成// ------------------------------      klog.InfoS("Successfully bound pod to node""pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost)// 指标统计      metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))      metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))// 运行 PostBind 插件:绑定后清理      fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)    }  }()}

有三个地方需要特殊关注sched.Algorithm.Schedulesched.assumesched.bind三个处理

1.6.1.1、追踪sched.Algorithm.Schedule

整个函数只做 4 件事:

  1. 拍快照

    把集群节点、资源信息复制一份,防止调度中途数据变化。

  2. 预选(Filter

    调用所有过滤插件,筛选出能运行这个 Pod 的节点(FeasibleNodes)。

    • 没有合格节点 → 调度失败
    • 只剩 1 个合格节点 → 直接用它
  3. 优选(Score

    给所有合格节点打分。

  4. 选最高分

    选出分数最高的节点,封装成 ScheduleResult 返回。

// C:\git\kubernetes-1.22.3\pkg\scheduler\generic_scheduler.go// Schedule 尝试将给定的Pod调度到节点列表中的一个。// 成功:返回选中的节点名称// 失败:返回包含失败原因的 FitError 错误func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {// 调度链路追踪,用于打印耗时日志  trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})defer trace.LogIfLong(100 * time.Millisecond)// 1. 对节点和Pod信息做内存快照(保证调度过程中数据不变)if err := g.snapshot(); err != nil {return result, err  }  trace.Step("Snapshotting scheduler cache and node infos done")// 2. 如果集群没有任何节点,直接返回错误if g.nodeInfoSnapshot.NumNodes() == 0 {return result, ErrNoNodesAvailable  }// 3. 【预选阶段】调用过滤插件,找出所有 符合条件的节点(FeasibleNodes)  feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, extenders, fwk, state, pod)if err != nil {return result, err  }  trace.Step("Computing predicates done")// 4. 如果没有任何合格节点,返回调度失败(FitError)if len(feasibleNodes) == 0 {return result, &framework.FitError{      Pod:         pod,      NumAllNodes: g.nodeInfoSnapshot.NumNodes(),      Diagnosis:   diagnosis, // 记录每个节点为什么不合格    }  }// 5. 如果预选后只剩1个节点,直接用它,不用再打分if len(feasibleNodes) == 1 {return ScheduleResult{      SuggestedHost:  feasibleNodes[0].Name,       // 最终节点      EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap), // 总评估节点数      FeasibleNodes:  1,                            // 合格节点数    }, nil  }// 6. 【优选阶段】对所有合格节点进行打分  priorityList, err := prioritizeNodes(ctx, extenders, fwk, state, pod, feasibleNodes)if err != nil {return result, err  }// 7. 选择分数最高的节点  host, err := g.selectHost(priorityList)  trace.Step("Prioritizing done")// 8. 返回调度结果(最终节点、检查过多少节点、合格多少节点)return ScheduleResult{    SuggestedHost:  host,    EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),    FeasibleNodes:  len(feasibleNodes),  }, err}

追踪其中的findNodesThatFitPod,这个方法内部完成了PreFilterFilter操作

// C:\git\kubernetes-1.22.3\pkg\scheduler\generic_scheduler.go// findNodesThatFitPod// 根据调度框架的 filter 插件和外部扩展器(extender)过滤节点,找到适合当前 Pod 的节点。// 也就是:预选阶段 → 筛选出 Feasible Nodes(合格节点)func (g *genericScheduler) findNodesThatFitPod(  ctx context.Context,  extenders []framework.Extender,  fwk framework.Framework,  state *framework.CycleState,  pod *v1.Pod,) ([]*v1.Node, framework.Diagnosis, error) {// 初始化诊断信息:记录每个节点为什么不合格、哪个插件导致调度失败  diagnosis := framework.Diagnosis{    NodeToStatusMap:      make(framework.NodeToStatusMap), // key:节点名 value:失败原因    UnschedulablePlugins: sets.NewString(),                // 导致不可调度的插件列表  }//  1. 运行 PreFilter 插件(预过滤,准备工作) // 提前计算 Pod 亲和性、拓扑等信息,给后面过滤阶段提速  s := fwk.RunPreFilterPlugins(ctx, state, pod)// 获取当前集群所有节点的快照列表  allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()if err != nil {return nil, diagnosis, err  }// 如果 PreFilter 直接失败if !s.IsSuccess() {// 如果不是“不可调度”错误,直接返回真正的错误if !s.IsUnschedulable() {return nil, diagnosis, s.AsError()    }// PreFilter 判定 Pod 无法调度 → 所有节点都标记为失败for _, n := range allNodes {      diagnosis.NodeToStatusMap[n.Node().Name] = s    }// 记录是哪个插件导致的失败    diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())// 返回空合格节点列表return nil, diagnosis, nil  }//  2. 优先检查“提名节点”(Nominated Node)优化 // "NominatedNodeName" 可能是在之前的调度周期中,由于抢占机制而被设置的。// 这个节点很可能是唯一适合该 Pod 的节点,因此我们在遍历所有节点之前,优先尝试这个节点。if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {// 只检查提名的那一个节点    feasibleNodes, err := g.evaluateNominatedNode(ctx, extenders, pod, fwk, state, diagnosis)if err != nil {      klog.ErrorS(err, "Evaluation failed on nominated node""pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)    }// 如果提名节点合格 → 直接返回,不再检查其他节点if len(feasibleNodes) != 0 {return feasibleNodes, diagnosis, nil    }  }//  3. 【核心】过滤所有节点:运行内置 Filter 插件 // 遍历所有节点,执行 Filter 插件(如资源、污点、亲和性、端口等检查)// 筛选出通过所有内置插件的节点  feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)if err != nil {return nil, diagnosis, err  }//  4. 再次过滤:通过外部 Extender 扩展器检查 // 让外部扩展器再过滤一遍(自定义调度逻辑)  feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)if err != nil {return nil, diagnosis, err  }//  5. 返回最终合格节点 + 诊断信息 return feasibleNodes, diagnosis, nil}

追踪g.evaluateNominatedNode

// C:\git\kubernetes-1.22.3\pkg\scheduler\generic_scheduler.go// evaluateNominatedNode// 作用:**只检查 Pod 提名的那个节点(NominatedNodeName),看看能不能用**// 目的:如果这个节点能用,就直接用,不用检查所有节点,调度更快!func (g *genericScheduler) evaluateNominatedNode(  ctx context.Context,  extenders []framework.Extender,  pod *v1.Pod,  fwk framework.Framework,  state *framework.CycleState,  diagnosis framework.Diagnosis,) ([]*v1.Node, error) {// 1. 拿到 Pod 的【提名节点名字】// 也就是 pod.Status.NominatedNodeName(之前抢占时标记的节点)  nnn := pod.Status.NominatedNodeName// 2. 从缓存里拿到这个提名节点的信息  nodeInfo, err := g.nodeInfoSnapshot.Get(nnn)if err != nil {return nil, err // 节点不存在,直接返回错误  }// 包装成列表,只检查这一个节点  node := []*framework.NodeInfo{nodeInfo}// 3. 【核心】只检查这一个节点是否合格// 调用你刚才学的 findNodesThatPassFilters// 检查:资源、污点、亲和性、两遍 Nominated Pod 安全检查  feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, node)if err != nil {return nil, err  }// 4. 再让外部扩展器检查一下这个节点  feasibleNodes, err = findNodesThatPassExtenders(extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)if err != nil {return nil, err  }// 5. 如果合格,返回这个节点// 调度器就会直接用它,不再检查其他节点return feasibleNodes, nil}

追踪findNodesThatPassFilters

// C:\git\kubernetes-1.22.3\pkg\scheduler\generic_scheduler.go// findNodesThatPassFilters 核心作用:// 遍历所有节点,通过 Filter 插件检查,筛选出【能运行当前 Pod】的合格节点(Feasible Nodes)func (g *genericScheduler) findNodesThatPassFilters(  ctx context.Context,  fwk framework.Framework,  state *framework.CycleState,  pod *v1.Pod,  diagnosis framework.Diagnosis,  nodes []*framework.NodeInfo) ([]*v1.Node, error) {  numNodesToFind := g.numFeasibleNodesToFind(int32(len(nodes)))// Create feasible list with enough space to avoid growing it// and allow assigning.  feasibleNodes := make([]*v1.Node, numNodesToFind)if !fwk.HasFilterPlugins() {    length := len(nodes)for i := range feasibleNodes {      feasibleNodes[i] = nodes[(g.nextStartNodeIndex+i)%length].Node()    }    g.nextStartNodeIndex = (g.nextStartNodeIndex + len(feasibleNodes)) % lengthreturn feasibleNodes, nil  }  errCh := parallelize.NewErrorChannel()var statusesLock sync.Mutexvar feasibleNodesLen int32  ctx, cancel := context.WithCancel(ctx)  checkNode := func(i int) {// We check the nodes starting from where we left off in the previous scheduling cycle,// this is to make sure all nodes have the same chance of being examined across pods.    nodeInfo := nodes[(g.nextStartNodeIndex+i)%len(nodes)]    status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)if status.Code() == framework.Error {      errCh.SendErrorWithCancel(status.AsError(), cancel)return    }if status.IsSuccess() {      length := atomic.AddInt32(&feasibleNodesLen, 1)if length > numNodesToFind {        cancel()        atomic.AddInt32(&feasibleNodesLen, -1)      } else {        feasibleNodes[length-1] = nodeInfo.Node()      }    } else {      statusesLock.Lock()      diagnosis.NodeToStatusMap[nodeInfo.Node().Name] = status      diagnosis.UnschedulablePlugins.Insert(status.FailedPlugin())      statusesLock.Unlock()    }  }  beginCheckNode := time.Now()  statusCode := framework.Successdefer func() {// We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins// function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.// Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.    metrics.FrameworkExtensionPointDuration.WithLabelValues(runtime.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))  }()// Stops searching for more nodes once the configured number of feasible nodes// are found.  fwk.Parallelizer().Until(ctx, len(nodes), checkNode)  processedNodes := int(feasibleNodesLen) + len(diagnosis.NodeToStatusMap)  g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(nodes)  feasibleNodes = feasibleNodes[:feasibleNodesLen]if err := errCh.ReceiveError(); err != nil {    statusCode = framework.Errorreturn nil, err  }return feasibleNodes, nil}

追踪fwk.RunFilterPluginsWithNominatedPods

// C:\git\kubernetes-1.22.3\pkg\scheduler\framework\runtime\framework.go// RunFilterPluginsWithNominatedPods// 作用:在【抢占场景】下对节点做 Filter 检查// 专门处理:这个节点上有 **Nominated Pod(提名Pod,等待抢占的高优先级Pod)**func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(  ctx context.Context,  state *framework.CycleState,  pod *v1.Pod,  info *framework.NodeInfo,) *framework.Status {var status *framework.Status  podsAdded := false // 是否真的添加了 Nominated Pod 进行模拟// ==============================// 核心:最多执行【两遍过滤】// ==============================for i := 0; i < 2; i++ {    stateToUse := state    nodeInfoToUse := info// ==============================// 第一遍检查(i=0)// 把节点上的 **Nominated Pod 假装已经运行**// ==============================if i == 0 {var err error// 把节点上【同优先级 / 更高优先级】的 Nominated Pod// 临时加到节点里,模拟它们已经占用资源      podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)if err != nil {return framework.AsStatus(err)      }    } else if !podsAdded || !status.IsSuccess() {// 如果没有 Nominated Pod,或者第一遍失败,就不跑第二遍break    }// 执行所有 Filter 插件(NodeName、资源、污点、亲和性等)    statusMap := f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)    status = statusMap.Merge()// 如果出现系统错误(不是普通不可调度),直接返回if !status.IsSuccess() && !status.IsUnschedulable() {return status    }  }// 返回最终检查结果return status}

Nominated Pod(提名Pod)和Victim Pod(牺牲Pod)是 Kubernetes 调度器在Pod优先级与抢占(Preemption)机制中使用的两个关键角色:

  • Nominated Pod(提名Pod):指一个高优先级Pod因资源不足调度失败后,被调度器 “提名” 到某个节点(设置.status.nominatedNodeName)以等待低优先级Pod被驱逐。它尚未实际运行在该节点上,只是表达 “意图” 在资源释放后优先调度至此;提名不保证最终调度成功(更高优先级Pod可能后来居上,覆盖这个节点的提名)。
  • Victim Pod(牺牲Pod):指被高优先级抢占者选中、将在该节点上被优雅终止(graceful termination)以释放资源的低优先级Pod,由调度器的DefaultPreemption插件根据优先级和驱逐策略选出。

两者完整抢占流程:高优先级Pod(抢占者)→ 遍历节点计算可驱逐资源,选中目标节点并标记自身为Nominated Pod → 在该节点筛选低优先级Pod作为Victim Pod → 下发删除指令给所有Victim Pod → 等待Victim Pod走完优雅终止周期、释放资源后,Nominated Pod才有可能被真正调度绑定节点。

上面代码的,RunFilterPluginsWithNominatedPods是抢占场景下的双重验证

因为 Nominated Pod 是不确定的:

  • 它们还没真正运行
  • 它们可能最终调度到别的节点
  • 它们也可能真的运行在这个节点

所以调度器必须 保守判断:

第一遍检查:假设 Nominated Pod 真的会来这个节点

  • 把它们假装加到节点上
  • 占用资源
  • 检查当前 Pod 还能不能放下

第二遍检查:假设 Nominated Pod 最终不来这个节点

  • 不把它们加进来
  • 再检查一遍当前 Pod 能不能放下

两遍都通过 → 节点安全,任意一遍不通过 → 节点不安全

继续追踪statusMap := f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)

// C:\git\kubernetes-1.22.3\pkg\scheduler\framework\runtime\framework.go// RunFilterPlugins runs the set of configured Filter plugins for pod on// the given node. If any of these plugins doesn't return "Success", the// given node is not suitable for running pod.// Meanwhile, the failure message and status are set for the given node.// RunFilterPlugins// 作用:对 一个节点 运行 所有 Filter 插件// 返回结果:每个插件的通过/失败状态func (f *frameworkImpl) RunFilterPlugins(  ctx context.Context,  state *framework.CycleState,  // 调度上下文  pod *v1.Pod,                  // 要调度的Pod  nodeInfo *framework.NodeInfo, // 当前正在检查的节点) framework.PluginToStatus {// 用来保存:每个插件 -> 检查结果(成功/失败)  statuses := make(framework.PluginToStatus)// 遍历所有注册的 Filter 插件// 例如:NodeName、资源检查、污点检查、亲和性检查、端口检查...for _, pl := range f.filterPlugins {// 【核心】运行当前这个插件的过滤逻辑(runFilterPlugin)    pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)// ------------------------------// 如果插件执行结果 != 成功// ------------------------------if !pluginStatus.IsSuccess() {// 如果不是“不可调度”,而是真正的错误(比如崩溃、异常)if !pluginStatus.IsUnschedulable() {// 构造错误信息,标记是哪个插件挂了        errStatus := framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), pluginStatus.AsError())).WithFailedPlugin(pl.Name())// 直接返回错误,终止所有检查return map[string]*framework.Status{pl.Name(): errStatus}      }// 标记:是哪个插件导致的失败      pluginStatus.SetFailedPlugin(pl.Name())// 把失败结果存起来      statuses[pl.Name()] = pluginStatus// 如果配置了【不需要跑完所有插件】// 只要有一个插件失败,立刻停止,直接返回结果(性能优化)if !f.runAllFilters {return statuses      }    }  }// 所有插件都通过了,返回空(或全部成功)return statuses}

追踪runFilterPlugin

// C:\git\kubernetes-1.22.3\pkg\scheduler\framework\runtime\framework.go// runFilterPlugin// 调度框架统一执行 Filter 插件的方法// 作用:调用具体插件的 Filter 逻辑,并统一处理指标统计func (f *frameworkImpl) runFilterPlugin(  ctx context.Context,  pl framework.FilterPlugin,  // 具体要执行的插件,例如 NodeName、TaintToleration  state *framework.CycleState,  pod *v1.Pod,  nodeInfo *framework.NodeInfo,) *framework.Status {// 如果不需要记录插件监控指标// 直接调用插件的 Filter 方法,返回结果if !state.ShouldRecordPluginMetrics() {return pl.Filter(ctx, state, pod, nodeInfo)  }// ===================== 下面是需要记录指标的情况 =====================// 记录插件开始执行时间  startTime := time.Now()// 【真正执行】调用具体插件的 Filter 方法(比如 NodeName.Filter())// 检查 Pod 是否能调度到当前节点  status := pl.Filter(ctx, state, pod, nodeInfo)// 异步记录插件执行耗时(成功/失败 耗时)  f.metricsRecorder.observePluginDurationAsync(Filter, pl.Name(), status, metrics.SinceInSeconds(startTime))// 返回过滤结果(成功 / 不可调度 / 错误)return status}

追踪其中的pl.Filter,可以看到有下面的这些过滤插件,都实现了Filter方法

拿出其中的一个最简单的NodeNameFilter去看看具体是如何实现的

// C:\git\kubernetes-1.22.3\pkg\scheduler\framework\plugins\nodename\node_name.go// Name 返回插件名称,用于日志、识别插件func (pl *NodeName) Name() string {return Name}// Filter 是插件的核心方法// 作用:判断当前 Pod 是否**允许**调度到当前节点// 调度器在预选阶段(Filter)会调用它func (pl *NodeName) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {// 如果节点信息不存在,返回错误if nodeInfo.Node() == nil {return framework.NewStatus(framework.Error, "node not found")  }// 调用 Fits() 检查:Pod 是否能放到这个节点if !Fits(pod, nodeInfo) {// 不满足 → 返回“不可调度”状态return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReason)  }// 满足条件 → 允许调度,返回 nilreturn nil}// Fits 真正的检查逻辑// 检查 Pod 是否允许调度到当前节点func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo)bool {// 满足两个条件之一就算匹配:// 1. Pod 没有指定 nodeName(可以随便调度)// 2. Pod 指定了 nodeName,且和当前节点名字一样return len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == nodeInfo.Node().Name}// New 创建插件实例func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {return &NodeName{}, nil}

追踪NodeName其中的New方法,也就是这个过滤插件被创建的地方

// C:\git\kubernetes-1.22.3\pkg\scheduler\framework\plugins\registry.go// NewInTreeRegistry builds the registry with all the in-tree plugins.// A scheduler that runs out of tree plugins can register additional plugins// through the WithFrameworkOutOfTreeRegistry option.func NewInTreeRegistry() runtime.Registry {  fts := plfeature.Features{    EnablePodAffinityNamespaceSelector: feature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector),    EnablePodDisruptionBudget:          feature.DefaultFeatureGate.Enabled(features.PodDisruptionBudget),    EnablePodOverhead:                  feature.DefaultFeatureGate.Enabled(features.PodOverhead),    EnableReadWriteOncePod:             feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),  }return runtime.Registry{    selectorspread.Name:      selectorspread.New,    imagelocality.Name:       imagelocality.New,    tainttoleration.Name:     tainttoleration.New,// 这里注册了nodename过滤插件    nodename.Name:            nodename.New,    nodeports.Name:           nodeports.New,    nodepreferavoidpods.Name: nodepreferavoidpods.New,    nodeaffinity.Name:        nodeaffinity.New,    podtopologyspread.Name:   podtopologyspread.New,    nodeunschedulable.Name:   nodeunschedulable.New,    noderesources.FitName: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {return noderesources.NewFit(plArgs, fh, fts)    },    noderesources.BalancedAllocationName: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {return noderesources.NewBalancedAllocation(plArgs, fh, fts)    },    noderesources.MostAllocatedName: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {return noderesources.NewMostAllocated(plArgs, fh, fts)    },    noderesources.LeastAllocatedName: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {return noderesources.NewLeastAllocated(plArgs, fh, fts)    },    noderesources.RequestedToCapacityRatioName: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {return noderesources.NewRequestedToCapacityRatio(plArgs, fh, fts)    },    volumebinding.Name: volumebinding.New,    volumerestrictions.Name: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {return volumerestrictions.New(plArgs, fh, fts)    },    volumezone.Name:                volumezone.New,    nodevolumelimits.CSIName:       nodevolumelimits.NewCSI,    nodevolumelimits.EBSName:       nodevolumelimits.NewEBS,    nodevolumelimits.GCEPDName:     nodevolumelimits.NewGCEPD,    nodevolumelimits.AzureDiskName: nodevolumelimits.NewAzureDisk,    nodevolumelimits.CinderName:    nodevolumelimits.NewCinder,    interpodaffinity.Name: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {return interpodaffinity.New(plArgs, fh, fts)    },    nodelabel.Name:       nodelabel.New,    serviceaffinity.Name: serviceaffinity.New,    queuesort.Name:       queuesort.New,    defaultbinder.Name:   defaultbinder.New,    defaultpreemption.Name: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {return defaultpreemption.New(plArgs, fh, fts)    },  }}

继续追踪NewInTreeRegistry,这里又回到了之前分析过的New方法,其中注册了所有的过滤插件。

// C:\git\kubernetes-1.22.3\pkg\scheduler\scheduler.go// New returns a Schedulerfunc New(client clientset.Interface,  informerFactory informers.SharedInformerFactory,  recorderFactory profile.RecorderFactory,  stopCh <-chan struct{},  opts ...Option) (*Scheduler, error) {// 这里注册了所有过滤插件  registry := frameworkplugins.NewInTreeRegistry()if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {return nil, err  }  configurator := &Configurator{    registry:                 registry,  }  addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))return sched, nil}
1.6.1.2、追踪sched.assume
// C:\git\kubernetes-1.22.3\pkg\scheduler\scheduler.go// assume 向调度缓存发送信号:假定这个Pod已经绑定到节点了。// 目的是让后续的绑定操作可以异步进行,不阻塞调度循环。// 注意:这个函数会直接修改传入的 assumed 对象func (sched *Scheduler) assume(assumed *v1.Pod, host stringerror {// 【关键步骤1】给Pod强行设置目标节点名称// 乐观假设:绑定一定会成功,先把NodeName写上// 这样后续调度就会认为这个Pod已经在这个节点上了  assumed.Spec.NodeName = host// 【关键步骤2】把这个“假定已调度”的Pod存入调度缓存// 缓存会更新节点的资源占用(CPU/内存/端口),相当于预占资源// 防止其他Pod同时被调度到同一个节点,导致资源超卖if err := sched.SchedulerCache.AssumePod(assumed); err != nil {    klog.ErrorS(err, "scheduler cache AssumePod failed")return err  }// 【清理工作】如果这个Pod之前是被“提名”的抢占Pod,现在调度成功,从队列里删掉if sched.SchedulingQueue != nil {    sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)  }return nil}

继续追踪sched.SchedulerCache.AssumePod

// C:\git\kubernetes-1.22.3\pkg\scheduler\internal\cache\cache.go// schedulerCache 调度器本地内存缓存// 作用:保存节点、Pod、资源的最新状态,让调度器不用每次都请求 apiserver// 是调度器高性能的核心:所有预选、优选、资源计算都读这个缓存type schedulerCache struct {// 关闭调度器的信号通道  stop   <-chan struct{}// 假定Pod(Assumed Pod)的过期时间// 如果一个Pod假定绑定后,太久没真正Bind,就会自动清理  ttl    time.Duration// 后台定期清理过期Pod的周期  period time.Duration// 全局互斥锁:保证多协程并发操作缓存时安全// 保护下面所有字段  mu sync.RWMutex// 【假定Pod集合】// 存储所有处于 Assume 状态(还没真正Bind)的Pod的key// 作用:快速区分哪些Pod是“假调度”,哪些是真绑定  assumedPods sets.String// Pod状态映射表:key=pod唯一标识,value=pod状态// 记录每个Pod是 假定/已绑定/待删除 状态  podStates map[string]*podState// 节点信息映射表:key=节点名,value=节点完整信息(资源、Pod列表)  nodes     map[string]*nodeInfoListItem// 最近更新过的节点(链表头)// 频繁变动的节点放头部,加速访问(调度器性能优化)  headNode *nodeInfoListItem// 节点树:用于根据标签、亲和性快速筛选节点  nodeTree *nodeTree// 镜像缓存:记录每个节点上有哪些镜像,加速调度决策  imageStates map[string]*imageState}// podState:调度器缓存中,记录【单个 Pod 的状态】// 主要用来管理 Assume(假定)状态的 Pod:是否过期、是否绑定完成type podState struct {// 指向真实的 Pod 对象  pod *v1.Pod// 过期截止时间// 作用:给 Assume 状态的 Pod 设置“有效期”// 如果超过这个时间还没完成绑定,就认为这个 Assume 失效,需要清理  deadline *time.Time// 标记:绑定是否已经完成// 作用:// 1. 如果还在绑定中(bindingFinished=false)→ 不能清理这个 Pod// 2. 如果绑定已经完成(bindingFinished=true)→ 可以正常处理  bindingFinished bool}// nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly// linked list. When a NodeInfo is updated, it goes to the head of the list.// The items closer to the head are the most recently updated items.type nodeInfoListItem struct {  info *framework.NodeInfo  next *nodeInfoListItem  prev *nodeInfoListItem}

继续追踪schedulerCache.AssumePod

// C:\git\kubernetes-1.22.3\pkg\scheduler\internal\cache\cache.go// AssumePod// 作用:将一个【假定已调度】的 Pod 加入调度缓存// 逻辑:标记这个 Pod 已经占用了节点资源,防止其他 Pod 重复调度func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {// 1. 获取 Pod 的唯一标识 key(namespace/name)  key, err := framework.GetPodKey(pod)if err != nil {return err  }// 2. 加锁:保证并发安全(调度器多协程操作缓存)  cache.mu.Lock()defer cache.mu.Unlock()// 3. 检查 Pod 已经存在 → 不能重复 Assumeif _, ok := cache.podStates[key]; ok {return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)  }// 4. 【核心】把 Pod 加入节点,更新节点的资源占用  cache.addPod(pod)// 5. 记录 Pod 状态:标记为【假定】状态  ps := &podState{    pod: pod,  }  cache.podStates[key] = ps// 6. 把 Pod 标记为【假定 Pod】(和真实已绑定 Pod 区分开)  cache.assumedPods.Insert(key)return nil}// C:\git\kubernetes-1.22.3\pkg\scheduler\internal\cache\cache.go// addPod// 作用:将 Pod 挂载到对应节点,更新节点信息(已加锁)func (cache *schedulerCache) addPod(pod *v1.Pod) {// 1. 根据 NodeName 找到节点  n, ok := cache.nodes[pod.Spec.NodeName]// 2. 节点不存在就新建一个if !ok {    n = newNodeInfoListItem(framework.NewNodeInfo())    cache.nodes[pod.Spec.NodeName] = n  }// 3. 【核心】把 Pod 加到节点上,更新节点资源  n.info.AddPod(pod)// 4. 把节点移到链表头部:提升下次调度查询速度  cache.moveNodeInfoToHead(pod.Spec.NodeName)}// C:\git\kubernetes-1.22.3\pkg\scheduler\framework\types.go// AddPod// 包装函数,简化调用func (n *NodeInfo) AddPod(pod *v1.Pod) {  n.AddPodInfo(NewPodInfo(pod))}// C:\git\kubernetes-1.22.3\pkg\scheduler\framework\types.go// AddPodInfo// 【最底层核心】把 Pod 资源计入节点,更新节点所有资源信息func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) {// 1. 计算这个 Pod 需要的所有资源:CPU、内存、存储、GPU等  res, non0CPU, non0Mem := calculateResource(podInfo.Pod)// 2. 把 Pod 资源【累加到节点已用资源】上  n.Requested.MilliCPU += res.MilliCPU        // 节点已用 CPU  n.Requested.Memory += res.Memory            // 节点已用内存  n.Requested.EphemeralStorage += res.EphemeralStorage // 节点已用存储// 累加自定义资源(GPU、RDMA 等)for rName, rQuant := range res.ScalarResources {    n.Requested.ScalarResources[rName] += rQuant  }// 3. 记录非零资源(调度算法需要)  n.NonZeroRequested.MilliCPU += non0CPU  n.NonZeroRequested.Memory += non0Mem// 4. 把 Pod 加入节点的 Pod 列表  n.Pods = append(n.Pods, podInfo)// 5. 如果 Pod 有亲和性/反亲和性,单独记录if podWithAffinity(podInfo.Pod) {    n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo)  }if podWithRequiredAntiAffinity(podInfo.Pod) {    n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo)  }// 6. 更新节点已占用端口、PVC引用计数  n.updateUsedPorts(podInfo.Pod, true)  n.updatePVCRefCounts(podInfo.Pod, true)// 7. 节点版本+1,代表节点资源发生变化  n.Generation = nextGeneration()}

至此完成了Assume验证解读

1.6.1.3、追踪sched.bind
// C:\git\kubernetes-1.22.3\pkg\scheduler\scheduler.go// bind 真正将 Pod 绑定到目标节点// 绑定优先级:1. 外部扩展器(extenders) 优先  2. 框架插件(framework plugins)// 这个函数会**异步执行**,不阻塞调度主循环func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {// 【收尾函数】无论绑定成功/失败,最后都会执行 finishBinding// 作用:更新缓存状态、记录指标、清理资源defer func() {    sched.finishBinding(fwk, assumed, targetNode, err)  }()// 1. 先尝试让【外部扩展器 Extenders】执行绑定// 如果 Extenders 处理了绑定(bound=true),直接返回结果  bound, err := sched.extendersBinding(assumed, targetNode)if bound {return err  }// 2. 扩展器没处理 → 执行【框架内置 Bind 插件】// 真正向 kube-apiserver 发送绑定请求,将 Pod 与 Node 绑定  bindStatus := fwk.RunBindPlugins(ctx, state, assumed, targetNode)// 绑定成功 → 返回 nilif bindStatus.IsSuccess() {return nil  }// 绑定失败 → 返回错误if bindStatus.Code() == framework.Error {return bindStatus.AsError()  }// 其他异常情况return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message())}

追踪sched.extendersBinding

// C:\git\kubernetes-1.22.3\pkg\scheduler\scheduler.go// TODO(#87159): Move this to a Plugin.func (sched *Scheduler) extendersBinding(pod *v1.Pod, node string) (boolerror) {for _, extender := range sched.Extenders {if !extender.IsBinder() || !extender.IsInterested(pod) {continue    }return true, extender.Bind(&v1.Binding{      ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID},      Target:     v1.ObjectReference{Kind: "Node", Name: node},    })  }return falsenil}

继续追踪extender.Bind

// C:\git\kubernetes-1.22.3\pkg\scheduler\extender.go// Bind 将 Pod 绑定到节点的操作 委托给外部扩展器(extender) 去执行// 也就是说:不自己绑定,而是发请求让外部服务完成绑定func (h *HTTPExtender) Bind(binding *v1.Binding) error {// 定义外部扩展器的绑定结果结构体var result extenderv1.ExtenderBindingResult// 如果当前扩展器没有开启绑定能力(不是binder),直接报错// 理论上不会走到这里,因为只有配置了bind能力才会调用if !h.IsBinder() {return fmt.Errorf("unexpected empty bindVerb in extender")  }// 构造要发送给外部扩展器的参数// 包含:Pod名字、命名空间、Pod唯一ID、目标节点  req := &extenderv1.ExtenderBindingArgs{    PodName:      binding.Name,    PodNamespace: binding.Namespace,    PodUID:       binding.UID,    Node:         binding.Target.Name,  }// 【核心】发送 HTTP 请求给外部扩展器// 让外部服务去执行真正的 Pod 绑定操作if err := h.send(h.bindVerb, req, &result); err != nil {return err  }// 如果外部扩展器返回了错误信息,则返回错误if result.Error != "" {return fmt.Errorf(result.Error)  }// 外部扩展器绑定成功return nil}

这里发出了HTTP 请求给apiserver,让外部服务去执行真正的 Pod 绑定操作。

1.7、参考文献:

https://midbai.com/post/kubernete-event/

https://cloud.tencent.com/developer/article/1731747

https://cloud.tencent.com.cn/developer/article/2623224https://kubernetes.io/zh-cn/docs/reference/scheduling/config/

https://kubernetes.io/zh-cn/docs/reference/scheduling/config/

基本 文件 流程 错误 SQL 调试
  1. 请求信息 : 2026-05-31 22:33:40 HTTP/1.1 GET : https://www.yeyulingfeng.com/a/691247.html
  2. 运行时间 : 0.096025s [ 吞吐率:10.41req/s ] 内存消耗:5,201.54kb 文件加载:145
  3. 缓存信息 : 0 reads,0 writes
  4. 会话信息 : SESSION_ID=cdb88fed5a53482e3749a8ca84431493
  1. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/public/index.php ( 0.79 KB )
  2. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/autoload.php ( 0.17 KB )
  3. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/autoload_real.php ( 2.49 KB )
  4. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/platform_check.php ( 0.90 KB )
  5. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/ClassLoader.php ( 14.03 KB )
  6. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/composer/autoload_static.php ( 6.05 KB )
  7. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper.php ( 8.34 KB )
  8. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-validate/src/helper.php ( 2.19 KB )
  9. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/ralouphie/getallheaders/src/getallheaders.php ( 1.60 KB )
  10. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/helper.php ( 1.47 KB )
  11. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/stubs/load_stubs.php ( 0.16 KB )
  12. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Exception.php ( 1.69 KB )
  13. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-container/src/Facade.php ( 2.71 KB )
  14. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/deprecation-contracts/function.php ( 0.99 KB )
  15. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/polyfill-mbstring/bootstrap.php ( 8.26 KB )
  16. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/polyfill-mbstring/bootstrap80.php ( 9.78 KB )
  17. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/var-dumper/Resources/functions/dump.php ( 1.49 KB )
  18. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-dumper/src/helper.php ( 0.18 KB )
  19. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/symfony/var-dumper/VarDumper.php ( 4.30 KB )
  20. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/guzzlehttp/guzzle/src/functions_include.php ( 0.16 KB )
  21. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/guzzlehttp/guzzle/src/functions.php ( 5.54 KB )
  22. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/App.php ( 15.30 KB )
  23. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-container/src/Container.php ( 15.76 KB )
  24. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/container/src/ContainerInterface.php ( 1.02 KB )
  25. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/provider.php ( 0.19 KB )
  26. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Http.php ( 6.04 KB )
  27. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper/Str.php ( 7.29 KB )
  28. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Env.php ( 4.68 KB )
  29. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/common.php ( 0.03 KB )
  30. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/helper.php ( 18.78 KB )
  31. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Config.php ( 5.54 KB )
  32. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/alipay.php ( 3.59 KB )
  33. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/Env.php ( 1.67 KB )
  34. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/app.php ( 0.95 KB )
  35. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/cache.php ( 0.78 KB )
  36. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/console.php ( 0.23 KB )
  37. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/cookie.php ( 0.56 KB )
  38. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/database.php ( 2.48 KB )
  39. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/filesystem.php ( 0.61 KB )
  40. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/lang.php ( 0.91 KB )
  41. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/log.php ( 1.35 KB )
  42. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/middleware.php ( 0.19 KB )
  43. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/route.php ( 1.89 KB )
  44. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/session.php ( 0.57 KB )
  45. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/trace.php ( 0.34 KB )
  46. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/config/view.php ( 0.82 KB )
  47. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/event.php ( 0.25 KB )
  48. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Event.php ( 7.67 KB )
  49. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/service.php ( 0.13 KB )
  50. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/AppService.php ( 0.26 KB )
  51. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Service.php ( 1.64 KB )
  52. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Lang.php ( 7.35 KB )
  53. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/lang/zh-cn.php ( 13.70 KB )
  54. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/Error.php ( 3.31 KB )
  55. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/RegisterService.php ( 1.33 KB )
  56. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/services.php ( 0.14 KB )
  57. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/PaginatorService.php ( 1.52 KB )
  58. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/ValidateService.php ( 0.99 KB )
  59. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/service/ModelService.php ( 2.04 KB )
  60. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/Service.php ( 0.77 KB )
  61. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Middleware.php ( 6.72 KB )
  62. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/initializer/BootService.php ( 0.77 KB )
  63. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/Paginator.php ( 11.86 KB )
  64. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-validate/src/Validate.php ( 63.20 KB )
  65. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/Model.php ( 23.55 KB )
  66. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/Attribute.php ( 21.05 KB )
  67. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/AutoWriteData.php ( 4.21 KB )
  68. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/Conversion.php ( 6.44 KB )
  69. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/DbConnect.php ( 5.16 KB )
  70. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/ModelEvent.php ( 2.33 KB )
  71. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/concern/RelationShip.php ( 28.29 KB )
  72. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/contract/Arrayable.php ( 0.09 KB )
  73. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/contract/Jsonable.php ( 0.13 KB )
  74. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/model/contract/Modelable.php ( 0.09 KB )
  75. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Db.php ( 2.88 KB )
  76. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/DbManager.php ( 8.52 KB )
  77. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Log.php ( 6.28 KB )
  78. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Manager.php ( 3.92 KB )
  79. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/log/src/LoggerTrait.php ( 2.69 KB )
  80. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/log/src/LoggerInterface.php ( 2.71 KB )
  81. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Cache.php ( 4.92 KB )
  82. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/psr/simple-cache/src/CacheInterface.php ( 4.71 KB )
  83. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/helper/Arr.php ( 16.63 KB )
  84. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/cache/driver/File.php ( 7.84 KB )
  85. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/cache/Driver.php ( 9.03 KB )
  86. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/CacheHandlerInterface.php ( 1.99 KB )
  87. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/Request.php ( 0.09 KB )
  88. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Request.php ( 55.78 KB )
  89. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/middleware.php ( 0.25 KB )
  90. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Pipeline.php ( 2.61 KB )
  91. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/TraceDebug.php ( 3.40 KB )
  92. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/middleware/SessionInit.php ( 1.94 KB )
  93. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Session.php ( 1.80 KB )
  94. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/session/driver/File.php ( 6.27 KB )
  95. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/SessionHandlerInterface.php ( 0.87 KB )
  96. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/session/Store.php ( 7.12 KB )
  97. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Route.php ( 23.73 KB )
  98. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleName.php ( 5.75 KB )
  99. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Domain.php ( 2.53 KB )
  100. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleGroup.php ( 22.43 KB )
  101. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Rule.php ( 26.95 KB )
  102. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/RuleItem.php ( 9.78 KB )
  103. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/route/app.php ( 3.94 KB )
  104. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/Route.php ( 4.70 KB )
  105. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/dispatch/Controller.php ( 4.74 KB )
  106. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/route/Dispatch.php ( 10.44 KB )
  107. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/controller/Index.php ( 9.87 KB )
  108. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/BaseController.php ( 2.05 KB )
  109. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/facade/Db.php ( 0.93 KB )
  110. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/connector/Mysql.php ( 5.44 KB )
  111. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/PDOConnection.php ( 52.47 KB )
  112. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Connection.php ( 8.39 KB )
  113. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/ConnectionInterface.php ( 4.57 KB )
  114. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/builder/Mysql.php ( 16.58 KB )
  115. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Builder.php ( 24.06 KB )
  116. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/BaseBuilder.php ( 27.50 KB )
  117. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/Query.php ( 15.71 KB )
  118. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/BaseQuery.php ( 45.13 KB )
  119. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/TimeFieldQuery.php ( 7.43 KB )
  120. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/AggregateQuery.php ( 3.26 KB )
  121. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ModelRelationQuery.php ( 20.07 KB )
  122. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ParamsBind.php ( 3.66 KB )
  123. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/ResultOperation.php ( 7.01 KB )
  124. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/WhereQuery.php ( 19.37 KB )
  125. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/JoinAndViewQuery.php ( 7.11 KB )
  126. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/TableFieldInfo.php ( 2.63 KB )
  127. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-orm/src/db/concern/Transaction.php ( 2.77 KB )
  128. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/log/driver/File.php ( 5.96 KB )
  129. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/LogHandlerInterface.php ( 0.86 KB )
  130. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/log/Channel.php ( 3.89 KB )
  131. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/event/LogRecord.php ( 1.02 KB )
  132. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-helper/src/Collection.php ( 16.47 KB )
  133. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/facade/View.php ( 1.70 KB )
  134. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/View.php ( 4.39 KB )
  135. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/app/controller/Es.php ( 3.30 KB )
  136. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Response.php ( 8.81 KB )
  137. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/response/View.php ( 3.29 KB )
  138. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/Cookie.php ( 6.06 KB )
  139. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-view/src/Think.php ( 8.38 KB )
  140. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/framework/src/think/contract/TemplateHandlerInterface.php ( 1.60 KB )
  141. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/Template.php ( 46.61 KB )
  142. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/template/driver/File.php ( 2.41 KB )
  143. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-template/src/template/contract/DriverInterface.php ( 0.86 KB )
  144. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/runtime/temp/c935550e3e8a3a4c27dd94e439343fdf.php ( 31.50 KB )
  145. /yingpanguazai/ssd/ssd1/www/wwww.yeyulingfeng.com/vendor/topthink/think-trace/src/Html.php ( 4.42 KB )
  1. CONNECT:[ UseTime:0.000512s ] mysql:host=127.0.0.1;port=3306;dbname=wenku;charset=utf8mb4
  2. SHOW FULL COLUMNS FROM `fenlei` [ RunTime:0.000667s ]
  3. SELECT * FROM `fenlei` WHERE `fid` = 0 [ RunTime:0.000339s ]
  4. SELECT * FROM `fenlei` WHERE `fid` = 63 [ RunTime:0.000313s ]
  5. SHOW FULL COLUMNS FROM `set` [ RunTime:0.000581s ]
  6. SELECT * FROM `set` [ RunTime:0.000297s ]
  7. SHOW FULL COLUMNS FROM `article` [ RunTime:0.000628s ]
  8. SELECT * FROM `article` WHERE `id` = 691247 LIMIT 1 [ RunTime:0.000925s ]
  9. UPDATE `article` SET `lasttime` = 1780238020 WHERE `id` = 691247 [ RunTime:0.000916s ]
  10. SELECT * FROM `fenlei` WHERE `id` = 64 LIMIT 1 [ RunTime:0.000276s ]
  11. SELECT * FROM `article` WHERE `id` < 691247 ORDER BY `id` DESC LIMIT 1 [ RunTime:0.000496s ]
  12. SELECT * FROM `article` WHERE `id` > 691247 ORDER BY `id` ASC LIMIT 1 [ RunTime:0.000390s ]
  13. SELECT * FROM `article` WHERE `id` < 691247 ORDER BY `id` DESC LIMIT 10 [ RunTime:0.000709s ]
  14. SELECT * FROM `article` WHERE `id` < 691247 ORDER BY `id` DESC LIMIT 10,10 [ RunTime:0.000697s ]
  15. SELECT * FROM `article` WHERE `id` < 691247 ORDER BY `id` DESC LIMIT 20,10 [ RunTime:0.000979s ]
0.097841s