Hermes源码学习:定时任务系统是如何驱动「无人值守」运转的
01 | 先抛一个问题
定时任务这件事,不同系统有不同的实现方式:
●Linux Cron:系统级守护进程,crond 一直在跑
●Airflow:大一统平台,光启动就要 3 分钟
●Python-APScheduler:需要自己写 daemon,或挂靠到某个长驻进程
那 Hermes Agent 的定时任务,靠什么驱动?
答案是:gateway 进程本身。
没有独立 cron daemon,没有系统级服务,只依赖 gateway 运行时的一个后台线程。
今天把源码翻了一遍,把这个机制彻底讲清楚。
02 | 核心文件就三个
整个 cron 系统位于 hermes-agent/cron/ 目录,结构极简:
~/.hermes/cron/├── jobs.json # 任务定义(持久化存储)├── output/ # 执行结果(每任务每时间戳一个md文件)└── .tick.lock # 文件锁(防止并发tick)对应的源码:
cron/jobs.py | |
cron/scheduler.py | tick()run_job() 执行 |
gateway/run.py | _start_cron_ticker() |
三件事搞清楚,这套系统就理解了。
03 | 任务是怎么定义和调度的
三种调度类型
jobs.py 里 parse_schedule() 解析三种格式:
"30m" # 一次性,30分钟后跑"every 2h" # 循环,每2小时跑一次"0 9 * * *" # 标准cron表达式,每天9点解析后存入 jobs.json,结构大概是这样:
{ "id": "a1b2c3d4e5f6", "name": "早间资讯", "prompt": "帮我整理今日热榜...", "skills": ["daily-article"], "schedule": { "kind": "cron", "expr": "0 9 * * *" }, "deliver": "origin", # 投递回创建任务的对话 "next_run_at": "2026-05-15T09:00:00+08:00", "last_run_at": null, "repeat": {"times": null, "completed": 0} # null=永久循环}next_run 是怎么算的
核心逻辑在 compute_next_run():
●一次性:从当前时间加 duration,或解析 ISO 时间戳
●循环interval:last_run + interval 分钟
●Cron表达式:用 croniter 库计算下一次触发时间
有趣的是,为了防止系统重启后任务"穿越"重复触发,compute_next_run() 有一个容错窗口:
# 如果一个循环任务本该在1小时前触发,但系统停机了,# 会自动快进到"最近一次该触发的时间点",而不是从现在重新计数# 窗口上限是2小时(MAX_GRACE = 7200秒)04 | tick() 是怎么工作的
scheduler.py 里的 tick() 是调度核心:
def tick(verbose: bool = True, adapters=None, loop=None) -> int: # ① 文件锁,防止多进程同时tick(比如gateway和手动CLI同时跑) lock_fd = open(_LOCK_FILE, "w") fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # ② 查所有"到期"的任务 due_jobs = get_due_jobs() for job in due_jobs: # ③ 对循环任务,先把 next_run_at 更新到"下一次" # 这样如果进程中途崩掉,不会重复触发 advance_next_run(job["id"]) # ④ 真正执行 success, output, final_response, error = run_job(job) # ⑤ 保存输出到 ~/.hermes/cron/output/ save_job_output(job["id"], output) # ⑥ 投递结果 _deliver_result(job, final_response, adapters, loop)一个细节:在 tick() 里面,执行前就先 advance_next_run() 更新了下一轮时间。这是一种"预占位"策略——如果进程在执行到一半崩掉,重启后不会误判任务还在到期状态。
05 | run_job() 里面发生了什么
run_job() 启动一个完全独立的 AIAgent 实例:
def run_job(job: dict): _cron_session_id = f"cron_{job_id}_{timestamp}" agent = AIAgent( model=..., session_id=_cron_session_id, enabled_toolsets=["terminal", "file", "web", ...], ) result = agent.run_conversation(prompt)几个关键点:
1. 隔离 session:每个 cron run 是新会话,ID 格式 cron_{job_id}_{YYYYMMDD_HHMMSS},不会污染主对话上下文。
2. 技能预加载:如果 job 定义了 skills,执行前会把 skill 内容拼接到 prompt 前面——相当于每次任务开始时"注入"了特定技能上下文。
# _build_job_prompt() 内部if skills: for skill_name in skills: content = skill_view(skill_name) # 读取 SKILL.md prompt = f"[SYSTEM: skill {skill_name} loaded]\n{content}\n\n{prompt}"3. 前置脚本:如果定义了 script,会在 agent 运行前先跑一个 Python 脚本,把 stdout 作为上下文注入。这常用于"先抓数据,再分析"类任务。
if script: success, script_output = _run_job_script(script) prompt = f"## Script Output\n{script_output}\n\n{prompt}"4. 投递结果:agent 返回后,_deliver_result() 负责把结果送到目标:
●deliver=origin:回到任务创建时的聊天
●deliver=local:只存文件,不推送
●deliver=telegram:CHAT_ID:推送到指定平台
06 | Gateway 是怎么把这一切串起来的
这是最优雅的部分。
gateway 启动时,在 start_gateway() 里面直接起了一个后台线程:
# gateway/run.py 约 line 8934cron_stop = threading.Event()cron_thread = threading.Thread( target=_start_cron_ticker, args=(cron_stop,), kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()}, daemon=True, name="cron-ticker",)cron_thread.start()_start_cron_ticker() 的循环体非常干净:
def _start_cron_ticker(stop_event, adapters, loop, interval=60): while not stop_event.is_set(): cron_tick(verbose=False, adapters=adapters, loop=loop) stop_event.wait(timeout=interval)每 60 秒一次,不依赖任何外部进程。
07 | 不同启动方式下的行为差异
hermes gateway run | |||
hermes gateway install | |||
hermes cron list/create | |||
hermes cron run ID |
关键结论:gateway 进程和 cron ticker 是耦合关系——gateway 活着,cron 就跑;gateway 关了,cron 就停。没有进程守护的复杂性,但确实有这个隐含依赖。
08 | 两个投递路径的细节
_deliver_result() 对"gateway 在不在"很敏感:
路径一:Live Adapter(gateway 在跑)
# 通过 run_coroutine_threadsafe 把消息投递进 gateway 的事件循环# 这样 E2EE 加密房间(Matrix)等需要实时加密的场景才能工作future = asyncio.run_coroutine_threadsafe( runtime_adapter.send(chat_id, text, metadata=send_metadata), loop,)路径二:Standalone(gateway 没在跑)
# 在新线程里起独立 asyncio.run()# 无法使用 live adapter,但能正常发送非加密消息result = asyncio.run(_send_to_platform(platform, pconfig, chat_id, ...))09 | 回头看这个设计
翻完源码,回头想这个系统的设计:
优点:
●零外部依赖,只需要 gateway 一个进程
●文件锁保证多实例安全
●投递路径自动适配"gateway 在不在"两种状态
●任务隔离,互不干扰
局限:
●gateway 关了,所有 cron 静默暂停——没有告警,用户可能感知不到
●60 秒的 tick 精度,不适合秒级任务
●投递失败没有自动重试机制
理解了这个核心架构,再去看 Hermes 的其他子系统(session 管理、记忆体系、工具注册),会发现它们都遵循同一个隐含模式:保持简单,让一个中心进程串联一切。
行动提示
如果你已经在用 Hermes Agent,想验证这套机制:
# 查看当前所有定时任务hermes cron list# 看某条任务最近一次输出ls ~/.hermes/cron/output/<job_id>/ | tail -3# 手动触发一次(立即执行,不等 schedule)hermes cron run <job_id>后台挂着 hermes gateway run 的时候,~/.hermes/logs/gateway.log 里可以看到 cron ticker 的执行记录:
grep "Cron ticker\|Running job\|completed successfully" ~/.hermes/logs/gateway.log | tail -20本文是 Hermes 源码学习系列第 N 篇,欢迎关注。
夜雨聆风