上一篇文章我们从整体架构上认识了 HAMi,知道 HAMi 主要由 MutatingWebhook、Scheduler Extender、Device Plugin 和 HAMi-Core 几部分组成。
这一篇开始进入源码层面,重点分析:cmd/scheduler/main.go。HAMi scheduler 进程启动后,会先通过 Cobra 解析命令行参数,然后初始化 Kubernetes client、加载设备配置、创建 Scheduler 对象、启动后台设备信息同步逻辑、启动 informer 缓存、启动 metrics 服务,最后通过 httprouter 注册 /filter、/bind、/webhook、/healthz、/readyz 等 HTTP 路由,并通过 HTTP 或 HTTPS 对外提供服务。
更准确地说,HAMi scheduler 不是完全替代 kube-scheduler 的独立调度器。它更像是一个同时承担两类职责的 HTTP 服务:
1. 对 kube-apiserver 提供 /webhook用于 Pod 创建 admission 阶段的 mutation。2. 对 kube-scheduler extender 提供 /filter 和 /bind用于 GPU / NPU 等异构设备的调度过滤和绑定。
一、整体启动流程
从 cmd/scheduler/main.go 看,HAMi scheduler 的整体启动流程可以概括为:
程序启动||-- Go 自动执行 init()| || |-- 给 rootCmd 注册 flags| |-- 注册 device 配置相关 flags| |-- 注册 klog flags| |-- 注册 version 子命令||-- main()| || |-- rootCmd.Execute()| || |-- Cobra 解析命令行参数| |-- 匹配当前要执行的 command| |-- 如果执行的是 rootCmd 本身,则进入 RunE| || |-- flag.PrintPFlags(cmd.Flags())| |-- start()||-- start()||-- 设置 node lock timeout|-- 初始化 Kubernetes client|-- 加载 device config 并初始化设备处理逻辑|-- 获取当前 hostname|-- 创建 Scheduler 对象|-- 后台启动 RegisterFromNodeAnnotations()|-- 启动 informer|-- 启动 metrics 服务|-- 创建 httprouter.Router|-- 注册 /filter、/bind、/webhook、/healthz、/readyz|-- 启动 HTTP 或 HTTPS Server
这里最容易混淆的是:
init():注册参数,不是真正解析参数;rootCmd.Execute():真正解析命令行参数,并决定执行哪个 command;RunE:当前 command 真正要执行的业务逻辑;start():HAMi scheduler 的正式启动入口。
二、rootCmd:HAMi scheduler 的命令行入口
在 cmd/scheduler/main.go 中,核心全局变量是 rootCmd:
rootCmd = &cobra.Command{Use: "scheduler",Short: "kubernetes vgpu scheduler",RunE: func(cmd *cobra.Command, args []string) error {flag.PrintPFlags(cmd.Flags())return start()},}
这里的 cobra.Command 可以理解为 Cobra 框架里的“命令对象”。
几个字段含义如下:
Use:命令名称,这里是 scheduler。Short:命令的简短描述。RunE:当前命令真正执行的函数。RunE 和 Run 类似,但 RunE 可以返回 error。
所以当用户执行的是 HAMi scheduler 主命令时,最终会执行 RunE,而 RunE 里面又调用了 start()。
rootCmd.Execute()-> 匹配到 rootCmd-> 执行 rootCmd.RunE()-> 调用 start()
但是要注意:源码里还注册了一个子命令:
rootCmd.AddCommand(version.VersionCmd)所以如果用户执行的是:
scheduler version那么 Cobra 会优先匹配 version 子命令,通常不会进入 rootCmd 的 RunE,也就不会启动 scheduler 服务。
三、init():注册 flags
init() 函数中注册了大量启动参数,例如:
rootCmd.Flags().StringVar(&config.HTTPBind, "http_bind", "127.0.0.1:8080", "http server bind address")rootCmd.Flags().StringVar(&tlsCertFile, "cert_file", "", "tls cert file")rootCmd.Flags().StringVar(&tlsKeyFile, "key_file", "", "tls key file")rootCmd.Flags().StringVar(&config.SchedulerName, "scheduler-name", "", "the name to be added to pod.spec.schedulerName if not empty")rootCmd.Flags().Int32Var(&config.DefaultMem, "default-mem", 0, "default gpu device memory to allocate")rootCmd.Flags().Int32Var(&config.DefaultCores, "default-cores", 0, "default gpu core percentage to allocate")rootCmd.Flags().Int32Var(&config.DefaultResourceNum, "default-gpu", 1, "default gpu to allocate")
这类 StringVar、Int32Var、BoolVar、DurationVar 的作用是:
把一个命令行参数绑定到一个 Go 变量上。比如:
rootCmd.Flags().StringVar(&config.HTTPBind, "http_bind", "127.0.0.1:8080", "http server bind address")意思是:
注册一个命令行参数:--http_bind默认值:127.0.0.1:8080绑定变量:config.HTTPBind
如果启动时不传:
scheduler那么:
config.HTTPBind = "127.0.0.1:8080"如果启动时传:
scheduler --http_bind=0.0.0.0:443那么 Cobra 解析参数后:
config.HTTPBind = "0.0.0.0:443"但是这个赋值不是 init() 完成的,而是在 rootCmd.Execute() 解析命令行参数时完成的。
四、main():真正触发 Cobra 执行
funcmain() {if err := rootCmd.Execute(); err != nil {klog.Fatal(err)}}
这里的关键是:rootCmd.Execute()。Cobra 的 Execute() 会根据命令行参数查找 command tree 中匹配的命令,并解析对应 flags。因此 HAMi scheduler 的启动可以理解为:
main()||-- rootCmd.Execute()||-- 解析命令行参数|-- 找到要执行的 command|-- 执行 command.RunE()||-- start()
五、start():HAMi scheduler 真正启动入口
start() 是 HAMi scheduler 进程真正干活的地方。
核心流程如下:
start()||-- nodelock.NodeLockTimeout = config.NodeLockTimeout||-- client.InitGlobalClient(...)||-- config.InitDevices()||-- os.Hostname()||-- scheduler.NewScheduler()||-- go sher.RegisterFromNodeAnnotations()||-- sher.Start()||-- defer sher.Stop()||-- go initMetrics(...)||-- router := httprouter.New()||-- router.POST("/filter", routes.PredicateRoute(sher))|-- router.POST("/bind", routes.Bind(sher))|-- router.POST("/webhook", routes.WebHookRoute())|-- router.GET("/healthz", routes.HealthzRoute())|-- router.GET("/readyz", routes.ReadyzRoute(sher))||-- http.ListenAndServe(...) 或 server.ListenAndServeTLS(...)
接下来按顺序分析。
5.1、设置 NodeLockTimeout
源码中首先执行:
nodelock.NodeLockTimeout = config.NodeLockTimeoutklog.InfoS("Set node lock timeout", "timeout", nodelock.NodeLockTimeout)
这里是把命令行参数或者配置中解析出来的 config.NodeLockTimeout 赋值给 nodelock.NodeLockTimeout。
HAMi 在调度设备时需要避免多个调度请求同时修改同一个节点的设备分配状态,所以会有节点级别的锁。NodeLockTimeout 就是这个节点锁的超时时间。
5.2、初始化 Kubernetes client
接下来是:
client.InitGlobalClient(client.WithBurst(config.Burst),client.WithQPS(config.QPS),client.WithTimeout(config.Timeout),)
HAMi scheduler 后续需要访问 kube-apiserver,例如:
1. 读取 Pod2. 读取 Node3. 读取 ResourceQuota4. 创建 Binding5. Patch Pod / Node 相关状态
所以启动初期必须先初始化 Kubernetes client。
后续 sher.Start() 中会通过 client.GetClient() 获取这个 client,然后创建 informer。
5.3、初始化异构设备处理逻辑:config.InitDevices()
接下来是:
config.InitDevices()这一步很重要。HAMi 支持 NVIDIA GPU、Ascend NPU、Cambricon MLU、Hygon DCU、Metax、Mthreads、Iluvatar、Kunlun、AWS Neuron、AMD、VastAI 等多类异构设备。
config.InitDevices() 的逻辑可以概括为:
1. 如果 device.DevicesMap 已经有内容,说明设备处理逻辑已经初始化过,直接返回;2. 否则读取 --device-config-file 指定的设备配置文件;3. 调用 LoadConfig(configFile) 解析 YAML;4. 调用 InitDevicesWithConfig(config) 初始化各类设备;5. 把各类设备处理器注册到 device.DevicesMap 和 device.DevicesToHandle 中。
也就是说,HAMi scheduler 不是写死只处理 NVIDIA GPU,而是通过设备配置和各厂商 device handler 实现异构设备支持。
这一步完成后,后续 /webhook、/filter、/bind 才知道:
哪些资源名属于 HAMi 管理;不同设备类型如何解析资源请求;不同设备类型如何参与打分和分配;不同设备类型如何写入 Pod annotation。
5.4、获取 HostName
config.HostName, err = os.Hostname()if err != nil {return fmt.Errorf("unable to get hostname: %v", err)}if config.HostName == "" {return fmt.Errorf("empty hostname returned")}
这里获取的是当前 scheduler 进程所在容器或者宿主机的 hostname。
这个 hostname 后续会用于 leader election 相关逻辑。例如创建 Scheduler 时,如果开启了 leader election,会用 config.HostName 作为当前 scheduler 实例的身份标识。
5.5、创建 Scheduler 对象
sher = scheduler.NewScheduler()这一步创建 HAMi 的 Scheduler 对象,并初始化它的内部状态。
可以把 Scheduler 对象理解成 HAMi scheduler 的核心内存状态管理器,它维护的信息包括:
1. Kubernetes client2. Pod lister3. Node lister4. ResourceQuota lister5. PodManager6. NodeManager7. QuotaManager8. 节点设备状态缓存9. 调度缓存10. leader election 状态11. informer stop channel12. synced 状态
所以这个 Scheduler 对象不是只处理 HTTP 请求,它还维护 HAMi 调度判断所需的本地缓存和状态。后续 /filter 和 /bind 路由最终都会调用这个对象的方法:
5.6、后台同步 Node annotation:registerFromNodeAnnotations()
创建 Scheduler 对象后,执行:
go sher.RegisterFromNodeAnnotations()注意这里有一个 go,说明它是后台 goroutine。RegisterFromNodeAnnotations() 的作用是:持续从 Kubernetes Node annotation 中同步设备信息,并更新 HAMi scheduler 内部的节点设备视图。
因为 HAMi device-plugin 会运行在各个节点上,它负责发现本机 GPU / NPU 等设备,然后把设备信息上报到 Node annotation。HAMi scheduler 再从 Node annotation 中读取这些信息,构建自己的全局设备视图。
这个方法内部不是只执行一次,而是循环运行。它会被几类事件触发:
1. nodeNotify2. leaderNotify3. 15 秒 ticker4. stopCh
可以理解成:
Node 信息变了,触发同步;leader 状态变了,触发同步;即使没有事件,每 15 秒也会周期性检查;收到 stopCh 后退出。
5.7、启动 informer:sher.Start()
接下来执行:
err = sher.Start()if err != nil {return err}defer sher.Stop()
sher.Start() 是 HAMi scheduler 运行前非常关键的一步。
它主要做几件事:
1. 获取 Kubernetes client;2. 创建 SharedInformerFactory;3. 创建 Pod lister;4. 创建 Node lister;5. 创建 ResourceQuota lister;6. 给 Pod informer 注册 Add / Update / Delete 事件处理函数;7. 给 Node informer 注册 Add / Delete 事件处理函数;8. 给 ResourceQuota informer 注册 Add / Update / Delete 事件处理函数;9. 启动 informer;10. 等待 informer cache 同步;11. 如果开启 leader election,则启动 Lease informer;12. 标记 scheduler started。
所以 HAMi 使用 informer 维护本地缓存:
当前有哪些 Pod;Pod 分别占用了哪些设备;当前有哪些 Node;Node 上有哪些设备;namespace 下有哪些 ResourceQuota;设备资源使用量是多少。
后续 kube-scheduler 调用 /filter 时,HAMi 就可以基于本地缓存快速判断:
这个 Pod 能不能放到这个 Node 上?这个 Node 上有没有满足要求的 GPU / NPU?显存是否足够?算力 core 是否足够?ResourceQuota 是否允许?
5.8、启动 metrics 服务
sher.Start() 成功后,执行:
go initMetrics(config.MetricsBindAddress, legacyMetrics)这里同样是后台 goroutine。它的作用是启动 Prometheus metrics 服务。默认监听地址来自参数:
--metrics-bind-address默认值是:
:9395这说明 metrics 服务和 /filter、/bind、/webhook 主 HTTP 服务不是同一个入口。
5.9、创建 HTTP Router:httprouter.New()
router := httprouter.New()这行代码创建了一个 httprouter.Router。httprouter 是 Go 里的一个高性能 HTTP 路由器,也可以叫 mux。它的作用是:根据 HTTP method 和 URL path,把请求分发给对应的 handler。
比如:
POST /filter -> routes.PredicateRoute(sher)POST /bind -> routes.Bind(sher)POST /webhook -> routes.WebHookRoute()GET /healthz -> routes.HealthzRoute()GET /readyz -> routes.ReadyzRoute(sher)
这里要注意:httprouter.New() 只是创建路由器,不是启动 HTTP 服务。
真正开始监听端口的是后面的:
http.ListenAndServe(config.HTTPBind, router)或者 HTTPS 分支中的:
server.ListenAndServeTLS("", "")所以可以把这一段理解成:
httprouter.New() 只是创建“请求分发表”。router.POST(...) 是往请求分发表里注册规则。http.ListenAndServe(...) 才是真正监听端口并处理请求。
5.10、注册 /filter 路由
router.POST("/filter", routes.PredicateRoute(sher))/filter 是 kube-scheduler extender 调用的接口。
当 kube-scheduler 发现某个 Pod 请求了 HAMi 管理的扩展资源时,会根据 extender 配置,把候选节点和 Pod 信息通过 HTTP POST 发送给 HAMi scheduler 的 /filter 接口。
routes.PredicateRoute(sher) 返回的是一个 httprouter.Handle。
它内部大致做这些事:
1. 检查 request body;2. 限制 request body 最大 1MB;3. 把请求体反序列化成 ExtenderArgs;4. 等待 HAMi scheduler cache 同步;5. 调用 s.Filter(extenderArgs);6. 把 ExtenderFilterResult 序列化成 JSON 返回给 kube-scheduler。
5.12、注册 /bind 路由
router.POST("/bind", routes.Bind(sher))/bind 也是 kube-scheduler extender 调用的接口。
它的 handler 内部大致做这些事:
1. 限制 request body 最大 1MB;2. 把请求体反序列化成 ExtenderBindingArgs;3. 调用 s.Bind(extenderBindingArgs);4. 返回 ExtenderBindingResult。
5.13、注册 /webhook 路由
router.POST("/webhook", routes.WebHookRoute())/webhook 和 /filter、/bind 的调用方不一样。
/filter、/bind: 调用方是 kube-scheduler extender。/webhook: 调用方是 kube-apiserver 的 admission webhook 机制。
当用户创建 Pod 时,请求会先到 kube-apiserver。kube-apiserver 在认证、授权之后,持久化 Pod 之前,会进入 admission 阶段。如果集群中配置了 HAMi 的 MutatingWebhookConfiguration,并且这个 Pod 没有被 selector 排除,那么 kube-apiserver 就会调用 HAMi scheduler 的 /webhook。
/webhook 的触发时机是:Pod 创建时,kube-apiserver admission 阶段。它主要负责对 Pod 做 mutation,例如根据 Pod 请求的 HAMi 资源,补充或调整调度相关字段。其中一个很关键的点是:如果 Pod 请求了 HAMi 管理的设备资源,webhook 可以把 pod.spec.schedulerName 改成 HAMi 配置的 schedulerName。
5.14、注册 /healthz 和 /readyz
router.GET("/healthz", routes.HealthzRoute())router.GET("/readyz", routes.ReadyzRoute(sher))
/healthz 比较简单:请求进来后直接返回 HTTP 200。用于存活检查。
/readyz 会检查当前 scheduler 是否是 leader:
如果是 leader,打印 Scheduler extender is leader;如果不是 leader,打印 Scheduler extender has not become leader yet。
5.15、HTTP 和 HTTPS 两种启动方式
HAMi 注册完路由后,会判断是否配置了 TLS 证书:
if len(tlsCertFile) == 0 || len(tlsKeyFile) == 0 {http.ListenAndServe(config.HTTPBind, router)} else {...server.ListenAndServeTLS("", "")}
这里有两个分支。
5.15.1. 没有配置证书:启动普通 HTTP
http.ListenAndServe(config.HTTPBind, router)这表示:监听 config.HTTPBind;收到请求后交给 router 处理。
例如:
config.HTTPBind = "127.0.0.1:8080"那么 HAMi scheduler 就监听:
127.0.0.1:80805.15.2. 配置了证书:启动 HTTPS
如果传入了:
--cert_file--key_file
则进入 HTTPS 分支。
源码中创建了证书 watcher:
certWatcher, err := certwatcher.New(tlsCertFile, tlsKeyFile)然后创建 TLS 配置:
tlsCfg := &tls.Config{GetCertificate: certWatcher.GetCertificate,}
再创建 HTTP Server:
server := &http.Server{Addr: addr,Handler: handler,TLSConfig: tlsCfg,}
最后启动:
server.ListenAndServeTLS("", "")这里 ListenAndServeTLS("", "") 传空字符串,是因为证书不是通过参数文件直接交给 ListenAndServeTLS 加载,而是通过 TLSConfig.GetCertificate 动态提供。
所以这一段可以总结为:
普通模式:http.ListenAndServe(config.HTTPBind, router)HTTPS 模式:certwatcher 监听证书文件变化tls.Config.GetCertificate 动态获取证书http.Server 使用 TLSConfig 启动 HTTPS Server
六、/webhook、/filter、/bind 的调用关系总结
最后把三个核心接口和 Kubernetes 调度链路串起来。
6.1 Pod 创建时:kube-apiserver 调用 /webhook
用户创建 Pod||-- kubectl apply / API 请求||-- kube-apiserver||-- 认证|-- 授权|-- Admission 阶段||-- 调用 HAMi /webhook||-- 检查 Pod 是否请求 HAMi 管理的设备资源|-- 根据需要修改 Pod|-- 可能设置 pod.spec.schedulerName
/webhook 的核心作用是:
在 Pod 进入调度前,对 Pod 做 mutation。6.2 Pod 调度时:kube-scheduler 调用 /filter
Pod 已经进入调度队列||-- kube-scheduler 发现 Pod 使用了 HAMi 管理的扩展资源||-- 根据 extender 配置调用 HAMi /filter||-- HAMi 读取本地缓存|-- 检查候选 Node 的 GPU / NPU 资源|-- 返回哪些节点可用,哪些节点不可用
/filter 的核心作用是:
在 kube-scheduler 的候选节点基础上,进一步按照 HAMi 的设备资源视图过滤节点。6.3 Pod 绑定时:kube-scheduler 调用 /bind
/filter 过滤完成||-- kube-scheduler 选择目标节点||-- 调用 HAMi /bind||-- HAMi 执行绑定逻辑|-- 写入设备分配信息|-- 创建 Binding|-- Pod 最终绑定到目标 Node
/bind 的核心作用是:
完成最终节点绑定,并让 Pod 带上 HAMi 需要的设备分配结果。七、本文总结
本文从 cmd/scheduler/main.go 出发,梳理了 HAMi scheduler 的启动流程。
核心链路如下:
main()||-- rootCmd.Execute()||-- 解析命令行参数|-- 执行 RunE||-- start()||-- 初始化 Kubernetes client|-- 初始化设备处理逻辑|-- 创建 Scheduler 对象|-- 启动 RegisterFromNodeAnnotations()|-- 启动 informer|-- 启动 metrics|-- 创建 httprouter|-- 注册 /webhook、/filter、/bind、/healthz、/readyz|-- 启动 HTTP / HTTPS Server
几个重点结论:
1. init() 只是注册参数,不是解析参数。2. rootCmd.Execute() 才会解析命令行参数并执行 RunE。3. start() 是 HAMi scheduler 真正启动入口。4. config.InitDevices() 负责加载设备配置并初始化各类异构设备处理逻辑。5. scheduler.NewScheduler() 只是创建 Scheduler 对象。6. RegisterFromNodeAnnotations() 是后台同步 Node annotation 设备信息的 goroutine。7. sher.Start() 负责启动 informer 并等待 cache 同步。8. httprouter.New() 只是创建路由器,不是启动 HTTP Server。9. /webhook 由 kube-apiserver admission webhook 调用。10. /filter 和 /bind 由 kube-scheduler extender 调用。11. HAMi scheduler 不是完全替代 kube-scheduler,而是通过 extender 机制与 kube-scheduler 配合。
下面为start()的完整源码
funcstart() error {// Initialize node lock timeout from confignodelock.NodeLockTimeout = config.NodeLockTimeoutklog.InfoS("Set node lock timeout", "timeout", nodelock.NodeLockTimeout)client.InitGlobalClient(client.WithBurst(config.Burst),client.WithQPS(config.QPS),client.WithTimeout(config.Timeout),)config.InitDevices()var err errorconfig.HostName, err = os.Hostname()if err != nil {return fmt.Errorf("unable to get hostname: %v", err)}if config.HostName == "" {return fmt.Errorf("empty hostname returned")}sher = scheduler.NewScheduler()go sher.RegisterFromNodeAnnotations()err = sher.Start()if err != nil {return err}defer sher.Stop()// start monitor metricsgo initMetrics(config.MetricsBindAddress, legacyMetrics)// start http serverrouter := httprouter.New()router.POST("/filter", routes.PredicateRoute(sher))router.POST("/bind", routes.Bind(sher))router.POST("/webhook", routes.WebHookRoute())router.GET("/healthz", routes.HealthzRoute())router.GET("/readyz", routes.ReadyzRoute(sher))klog.Info("listen on ", config.HTTPBind)if enableProfiling {injectProfilingRoute(router)klog.Infof("Profiling enabled, visit %s/debug/pprof/ to view profiles", config.HTTPBind)}if len(tlsCertFile) == 0 || len(tlsKeyFile) == 0 {if err := http.ListenAndServe(config.HTTPBind, router); err != nil {return fmt.Errorf("listen and Serve error, %v", err)}} else {certWatcher, err := certwatcher.New(tlsCertFile, tlsKeyFile)if err != nil {return fmt.Errorf("failed to create cert watcher: %w", err)}tlsCfg := &tls.Config{GetCertificate: certWatcher.GetCertificate,}ctx, cancel := context.WithCancel(context.Background())defer cancel()go func() {if err := certWatcher.Start(ctx); err != nil && err != context.Canceled {klog.ErrorS(err, "cert watcher error")}}()addr := config.HTTPBindhandler := routerserver := &http.Server{Addr: addr,Handler: handler,TLSConfig: tlsCfg,}klog.InfoS("Starting HTTPS server", "address", addr)if err := server.ListenAndServeTLS("", ""); err != nil {return fmt.Errorf("HTTPS server error: %w", err)}}return nil}
夜雨聆风