OpenClaw工具拆解之cron+nodes
一、cron 工具
1.1 工具概述
功能:管理定时任务和唤醒事件
核心特性:
-
• 仅所有者可用(ownerOnly=true) -
• 8 个 actions(status/list/add/update/remove/run/runs/wake) -
• 3 种调度类型(at/every/cron) -
• 2 种负载类型(systemEvent/agentTurn) -
• 4 种会话目标(main/isolated/current/session:xxx) -
• 支持 webhook 回调
1.2 Schema 定义
位置:第 24963 行
constCronToolSchema = Type.Object({
action: stringEnum(CRON_ACTIONS),
gatewayUrl: Type.Optional(Type.String()),
gatewayToken: Type.Optional(Type.String()),
timeoutMs: Type.Optional(Type.Number()),
includeDisabled: Type.Optional(Type.Boolean()),
job: Type.Optional(Type.Object({}, { additionalProperties: true })),
jobId: Type.Optional(Type.String()),
id: Type.Optional(Type.String()),
patch: Type.Optional(Type.Object({}, { additionalProperties: true })),
text: Type.Optional(Type.String()),
mode: optionalStringEnum$3(CRON_WAKE_MODES),
runMode: optionalStringEnum$3(CRON_RUN_MODES),
contextMessages: Type.Optional(Type.Number({
minimum: 0,
maximum: REMINDER_CONTEXT_MESSAGES_MAX
}))
}, { additionalProperties: true });
1.3 完整执行代码
位置:第 25058 行
functioncreateCronTool(opts, deps) {
const callGateway = deps?.callGatewayTool ?? callGatewayTool;
return {
label: "Cron",
name: "cron",
ownerOnly: true,
displaySummary: "Schedule and manage cron jobs and wake events.",
description: `Manage Gateway cron jobs (status/list/add/update/remove/run/runs) and send wake events.
ACTIONS:
- status: Check cron scheduler status
- list: List jobs (use includeDisabled:true to include disabled)
- add: Create job (requires job object)
- update: Modify job (requires jobId + patch object)
- remove: Delete job (requires jobId)
- run: Trigger job immediately (requires jobId)
- runs: Get job run history (requires jobId)
- wake: Send wake event (requires text, optional mode)
JOB SCHEMA:
{
"name": "string (optional)",
"schedule": { ... }, // Required: when to run
"payload": { ... }, // Required: what to execute
"delivery": { ... }, // Optional: announce/webhook
"sessionTarget": "main" | "isolated" | "current" | "session:<custom-id>",
"enabled": true | false // Optional, default true
}
SCHEDULE TYPES:
- "at": One-shot at absolute time
{ "kind": "at", "at": "<ISO-8601 timestamp>" }
- "every": Recurring interval
{ "kind": "every", "everyMs": <interval-ms>, "anchorMs": <optional> }
- "cron": Cron expression
{ "kind": "cron", "expr": "<cron-expression>", "tz": "<optional-timezone>" }
PAYLOAD TYPES:
- "systemEvent": Injects text as system event
{ "kind": "systemEvent", "text": "<message>" }
- "agentTurn": Runs agent with message
{ "kind": "agentTurn", "message": "<prompt>", "model": "<optional>", "thinking": "<optional>" }
CRITICAL CONSTRAINTS:
- sessionTarget="main" REQUIRES payload.kind="systemEvent"
- sessionTarget="isolated" | "current" | "session:xxx" REQUIRES payload.kind="agentTurn"`,
parameters: CronToolSchema,
execute: async (_toolCallId, args) => {
const params = args;
// 1. 解析 action(必填)
const action = readStringParam$1(params, "action", { required: true });
// 2. 解析 Gateway 调用选项
const gatewayOpts = {
...readGatewayCallOptions(params),
timeoutMs: typeof params.timeoutMs === "number" && Number.isFinite(params.timeoutMs) ?
params.timeoutMs : 6e4// 默认 60 秒
};
switch (action) {
// === action: status ===
case"status":
returnjsonResult(awaitcallGateway("cron.status", gatewayOpts, {}));
// === action: list ===
case"list":
returnjsonResult(awaitcallGateway("cron.list", gatewayOpts, {
includeDisabled: Boolean(params.includeDisabled)
}));
// === action: add ===
case"add": {
// 3. 构建 job 对象(支持扁平化参数)
if (!params.job || typeof params.job === "object" && params.job !== null && Object.keys(params.job).length === 0) {
constJOB_KEYS = newSet([
"name", "schedule", "sessionTarget", "wakeMode", "payload", "delivery",
"enabled", "description", "deleteAfterRun", "agentId", "sessionKey",
"message", "text", "model", "thinking", "timeoutSeconds", "allowUnsafeExternalContent"
]);
const synthetic = {};
let found = false;
for (const key ofObject.keys(params)) {
if (JOB_KEYS.has(key) && params[key] !== void0) {
synthetic[key] = params[key];
found = true;
}
}
if (found && (synthetic.schedule !== void0 || synthetic.payload !== void0 ||
synthetic.message !== void0 || synthetic.text !== void0)) {
params.job = synthetic;
}
}
if (!params.job || typeof params.job !== "object") {
thrownewError("job required");
}
// 4. 标准化 job
const job = normalizeCronJobCreate(params.job, {
sessionContext: { sessionKey: opts?.agentSessionKey }
}) ?? params.job;
// 5. 自动填充 agentId 和 sessionKey
if (job && typeof job === "object") {
const cfg = loadConfig();
const { mainKey, alias } = resolveMainSessionAlias(cfg);
const resolvedSessionKey = opts?.agentSessionKey ?
resolveInternalSessionKey({ key: opts.agentSessionKey, alias, mainKey }) : void0;
if (!("agentId"in job)) {
const agentId = opts?.agentSessionKey ?
resolveSessionAgentId({ sessionKey: opts.agentSessionKey, config: cfg }) : void0;
if (agentId) job.agentId = agentId;
}
if (!("sessionKey"in job) && resolvedSessionKey) {
job.sessionKey = resolvedSessionKey;
}
}
// 6. 处理 webhook URL
if (opts?.agentSessionKey && job && typeof job === "object" &&
"payload"in job && job.payload?.kind === "agentTurn") {
const deliveryValue = job.delivery;
const delivery = isRecord$3(deliveryValue) ? deliveryValue : void0;
const mode = (typeof delivery?.mode === "string" ? delivery.mode : "").trim().toLowerCase();
if (mode === "webhook") {
const webhookUrl = normalizeHttpWebhookUrl(delivery?.to);
if (!webhookUrl) {
thrownewError('delivery.mode="webhook" requires delivery.to to be a valid http(s) URL');
}
if (delivery) delivery.to = webhookUrl;
}
// 推断交付目标
const hasTarget = typeof delivery?.channel === "string" && delivery.channel.trim() ||
typeof delivery?.to === "string" && delivery.to.trim();
if ((deliveryValue == null || delivery) &&
(mode === "" || mode === "announce") && !hasTarget) {
const inferred = inferDeliveryFromSessionKey(opts.agentSessionKey);
if (inferred) {
job.delivery = { ...delivery, ...inferred };
}
}
}
// 7. 添加上下文消息(仅 systemEvent)
const contextMessages = typeof params.contextMessages === "number" &&
Number.isFinite(params.contextMessages) ? params.contextMessages : 0;
if (job && typeof job === "object" && "payload"in job && job.payload?.kind === "systemEvent") {
const payload = job.payload;
if (typeof payload.text === "string" && payload.text.trim()) {
const contextLines = awaitbuildReminderContextLines({
agentSessionKey: opts?.agentSessionKey,
gatewayOpts,
contextMessages,
callGatewayTool: callGateway
});
if (contextLines.length > 0) {
payload.text = `${stripExistingContext(payload.text)}${REMINDER_CONTEXT_MARKER}${contextLines.join("\n")}`;
}
}
}
returnjsonResult(awaitcallGateway("cron.add", gatewayOpts, job));
}
// === action: update ===
case"update": {
const id = readStringParam$1(params, "jobId") ?? readStringParam$1(params, "id");
if (!id) {
thrownewError("jobId required (id accepted for backward compatibility)");
}
// 构建 patch 对象(支持扁平化参数)
if (!params.patch || typeof params.patch === "object" && params.patch !== null &&
Object.keys(params.patch).length === 0) {
constPATCH_KEYS = newSet([
"name", "schedule", "payload", "delivery", "enabled", "description",
"deleteAfterRun", "agentId", "sessionKey", "sessionTarget", "wakeMode",
"failureAlert", "allowUnsafeExternalContent"
]);
const synthetic = {};
let found = false;
for (const key ofObject.keys(params)) {
if (PATCH_KEYS.has(key) && params[key] !== void0) {
synthetic[key] = params[key];
found = true;
}
}
if (found) {
params.patch = synthetic;
}
}
if (!params.patch || typeof params.patch !== "object") {
thrownewError("patch required");
}
returnjsonResult(awaitcallGateway("cron.update", gatewayOpts, {
id,
patch: normalizeCronJobPatch(params.patch) ?? params.patch
}));
}
// === action: remove ===
case"remove": {
const id = readStringParam$1(params, "jobId") ?? readStringParam$1(params, "id");
if (!id) {
thrownewError("jobId required (id accepted for backward compatibility)");
}
returnjsonResult(awaitcallGateway("cron.remove", gatewayOpts, { id }));
}
// === action: run ===
case"run": {
const id = readStringParam$1(params, "jobId") ?? readStringParam$1(params, "id");
if (!id) {
thrownewError("jobId required (id accepted for backward compatibility)");
}
returnjsonResult(awaitcallGateway("cron.run", gatewayOpts, {
id,
mode: params.runMode === "due" || params.runMode === "force" ? params.runMode : "force"
}));
}
// === action: runs ===
case"runs": {
const id = readStringParam$1(params, "jobId") ?? readStringParam$1(params, "id");
if (!id) {
thrownewError("jobId required (id accepted for backward compatibility)");
}
returnjsonResult(awaitcallGateway("cron.runs", gatewayOpts, { id }));
}
// === action: wake ===
case"wake": {
const text = readStringParam$1(params, "text", { required: true });
returnjsonResult(awaitcallGateway("wake", gatewayOpts, {
mode: params.mode === "now" || params.mode === "next-heartbeat" ? params.mode : "next-heartbeat",
text
}, { expectFinal: false }));
}
// === 未知 action ===
default:
thrownewError(`Unknown action: ${action}`);
}
}
};
}
1.4 调度类型详解
// 1. 一次性调度(at)
{
"kind": "at",
"at": "2026-03-30T09:00:00+08:00"// ISO-8601 格式
}
// 2. 周期性调度(every)
{
"kind": "every",
"everyMs": 3600000, // 1 小时
"anchorMs": 1711716000000// 可选:起始时间
}
// 3. Cron 表达式
{
"kind": "cron",
"expr": "0 9 * * *", // 每天 9 点
"tz": "Asia/Shanghai"// 可选:时区
}
1.5 负载类型详解
// 1. 系统事件(注入到会话)
{
"kind": "systemEvent",
"text": "提醒:开会时间到了"
}
// 2. Agent 执行(隔离会话)
{
"kind": "agentTurn",
"message": "分析今天的股票数据",
"model": "bailian/qwen3.5-plus", // 可选
"thinking": "high", // 可选
"timeoutSeconds": 300// 可选,0 表示无超时
}
1.6 会话目标约束
|
|
|
|
main |
systemEvent |
|
isolated |
agentTurn |
|
current |
agentTurn |
|
session:xxx |
agentTurn |
|
1.7 执行流程图
cron 工具调用
↓
1. 解析 action(必填)
↓
2. 根据 action 执行
├─ status → 获取调度器状态
├─ list → 获取任务列表
├─ add → 创建任务
│ ├─ 构建 job 对象
│ ├─ 标准化 job
│ ├─ 自动填充 agentId/sessionKey
│ ├─ 处理 webhook URL
│ └─ 添加上下文消息
├─ update → 更新任务
├─ remove → 删除任务
├─ run → 立即执行
├─ runs → 获取执行历史
└─ wake → 发送唤醒事件
↓
3. 返回结果
二、nodes 工具
2.1 工具概述
功能:管理配对的节点设备
核心特性:
-
• 仅所有者可用(ownerOnly=true) -
• 支持多种操作(status/describe/pairing/notify/camera/photos/screen/location/notifications/run/invoke) -
• 支持摄像头截图(前置/后置/双摄) -
• 支持获取最新照片 -
• 支持通知管理
2.2 Schema 定义
位置:第 103339 行
constNodesToolSchema = Type.Object({
action: stringEnum(NODE_ACTIONS),
node: Type.Optional(Type.String()),
requestId: Type.Optional(Type.String()),
title: Type.Optional(Type.String()),
body: Type.Optional(Type.String()),
sound: Type.Optional(Type.String()),
priority: Type.Optional(Type.String()),
delivery: Type.Optional(Type.String()),
facing: Type.Optional(Type.String()),
maxWidth: Type.Optional(Type.Number()),
quality: Type.Optional(Type.Number()),
delayMs: Type.Optional(Type.Number()),
deviceId: Type.Optional(Type.String()),
limit: Type.Optional(Type.Number()),
notificationKey: Type.Optional(Type.String()),
notificationAction: Type.Optional(Type.String()),
notificationReplyText: Type.Optional(Type.String())
// ... 更多参数
});
2.3 完整执行代码(部分)
位置:第 103378 行
functioncreateNodesTool(options) {
const sessionKey = options?.agentSessionKey?.trim() || void0;
const turnSourceChannel = options?.agentChannel?.trim() || void0;
const turnSourceTo = options?.currentChannelId?.trim() || void0;
const turnSourceAccountId = options?.agentAccountId?.trim() || void0;
const turnSourceThreadId = options?.currentThreadTs;
const agentId = resolveSessionAgentId({
sessionKey: options?.agentSessionKey,
config: options?.config
});
const imageSanitization = resolveImageSanitizationLimits(options?.config);
return {
label: "Nodes",
name: "nodes",
ownerOnly: true,
description: "Discover and control paired nodes (status/describe/pairing/notify/camera/photos/screen/location/notifications/run/invoke).",
parameters: NodesToolSchema,
execute: async (_toolCallId, args) => {
const params = args;
// 1. 解析 action(必填)
const action = readStringParam$1(params, "action", { required: true });
const gatewayOpts = readGatewayCallOptions(params);
try {
switch (action) {
// === action: status ===
case"status":
returnjsonResult(awaitcallGatewayTool("node.list", gatewayOpts, {}));
// === action: describe ===
case"describe":
returnjsonResult(awaitcallGatewayTool("node.describe", gatewayOpts, {
nodeId: awaitresolveNodeId(gatewayOpts, readStringParam$1(params, "node", { required: true }))
}));
// === action: pending ===
case"pending":
returnjsonResult(awaitcallGatewayTool("node.pair.list", gatewayOpts, {}));
// === action: approve ===
case"approve":
returnjsonResult(awaitcallGatewayTool("node.pair.approve", gatewayOpts, {
requestId: readStringParam$1(params, "requestId", { required: true })
}));
// === action: reject ===
case"reject":
returnjsonResult(awaitcallGatewayTool("node.pair.reject", gatewayOpts, {
requestId: readStringParam$1(params, "requestId", { required: true })
}));
// === action: notify ===
case"notify": {
const node = readStringParam$1(params, "node", { required: true });
const title = typeof params.title === "string" ? params.title : "";
const body = typeof params.body === "string" ? params.body : "";
if (!title.trim() && !body.trim()) {
thrownewError("title or body required");
}
awaitcallGatewayTool("node.invoke", gatewayOpts, {
nodeId: awaitresolveNodeId(gatewayOpts, node),
command: "system.notify",
params: {
title: title.trim() || void0,
body: body.trim() || void0,
sound: typeof params.sound === "string" ? params.sound : void0,
priority: typeof params.priority === "string" ? params.priority : void0,
delivery: typeof params.delivery === "string" ? params.delivery : void0
},
idempotencyKey: crypto$1.randomUUID()
});
returnjsonResult({ ok: true });
}
// === action: camera_snap ===
case"camera_snap": {
const resolvedNode = awaitresolveNode(gatewayOpts,
readStringParam$1(params, "node", { required: true }));
const nodeId = resolvedNode.nodeId;
// 解析摄像头方向
const facingRaw = typeof params.facing === "string" ?
params.facing.toLowerCase() : "front";
const facings = facingRaw === "both" ? ["front", "back"] :
facingRaw === "front" || facingRaw === "back" ? [facingRaw] :
(() => { thrownewError("invalid facing (front|back|both)"); })();
const maxWidth = typeof params.maxWidth === "number" && Number.isFinite(params.maxWidth) ?
params.maxWidth : 1600;
const quality = typeof params.quality === "number" && Number.isFinite(params.quality) ?
params.quality : 0.95;
const delayMs = typeof params.delayMs === "number" && Number.isFinite(params.delayMs) ?
params.delayMs : void0;
const deviceId = typeof params.deviceId === "string" && params.deviceId.trim() ?
params.deviceId.trim() : void0;
if (deviceId && facings.length > 1) {
thrownewError("facing=both is not allowed when deviceId is set");
}
const content = [];
const details = [];
// 拍摄照片
for (const facing of facings) {
const payload = parseCameraSnapPayload((awaitcallGatewayTool("node.invoke", gatewayOpts, {
nodeId,
command: "camera.snap",
params: {
facing,
maxWidth,
quality,
format: "jpg",
delayMs,
deviceId
},
idempotencyKey: crypto$1.randomUUID()
}))?.payload);
const normalizedFormat = payload.format.toLowerCase();
if (normalizedFormat !== "jpg" && normalizedFormat !== "jpeg" && normalizedFormat !== "png") {
thrownewError(`unsupported camera.snap format: ${payload.format}`);
}
const isJpeg = normalizedFormat === "jpg" || normalizedFormat === "jpeg";
const filePath = cameraTempPath({
kind: "snap",
facing,
ext: isJpeg ? "jpg" : "png"
});
awaitwriteCameraPayloadToFile({
filePath,
payload,
expectedHost: resolvedNode.remoteIp,
invalidPayloadMessage: "invalid camera.snap payload"
});
// 如果有视觉模型,添加图片到内容
if (options?.modelHasVision && payload.base64) {
content.push({
type: "image",
data: payload.base64,
mimeType: imageMimeFromFormat(payload.format) ?? (isJpeg ? "image/jpeg" : "image/png")
});
}
details.push({
facing,
path: filePath,
width: payload.width,
height: payload.height
});
}
returnawaitsanitizeToolResultImages({
content,
details: {
snaps: details,
media: { mediaUrls: details.map((entry) => entry.path).filter((path) =>typeof path === "string") }
}
}, "nodes:camera_snap", imageSanitization);
}
// === action: photos_latest ===
case"photos_latest": {
const resolvedNode = awaitresolveNode(gatewayOpts,
readStringParam$1(params, "node", { required: true }));
const nodeId = resolvedNode.nodeId;
const limitRaw = typeof params.limit === "number" && Number.isFinite(params.limit) ?
Math.floor(params.limit) : DEFAULT_PHOTOS_LIMIT;
const raw = awaitcallGatewayTool("node.invoke", gatewayOpts, {
nodeId,
command: "photos.latest",
params: {
limit: Math.max(1, Math.min(limitRaw, MAX_PHOTOS_LIMIT)),
maxWidth: typeof params.maxWidth === "number" && Number.isFinite(params.maxWidth) ?
params.maxWidth : DEFAULT_PHOTOS_MAX_WIDTH,
quality: typeof params.quality === "number" && Number.isFinite(params.quality) ?
params.quality : DEFAULT_PHOTOS_QUALITY
},
idempotencyKey: crypto$1.randomUUID()
});
const payload = raw?.payload && typeof raw.payload === "object" &&
!Array.isArray(raw.payload) ? raw.payload : {};
const photos = Array.isArray(payload.photos) ? payload.photos : [];
if (photos.length === 0) {
returnawaitsanitizeToolResultImages({
content: [],
details: []
}, "nodes:photos_latest", imageSanitization);
}
const content = [];
const details = [];
for (const [index, photoRaw] of photos.entries()) {
const photo = parseCameraSnapPayload(photoRaw);
const normalizedFormat = photo.format.toLowerCase();
if (normalizedFormat !== "jpg" && normalizedFormat !== "jpeg" && normalizedFormat !== "png") {
thrownewError(`unsupported photos.latest format: ${photo.format}`);
}
const isJpeg = normalizedFormat === "jpg" || normalizedFormat === "jpeg";
const filePath = cameraTempPath({
kind: "snap",
ext: isJpeg ? "jpg" : "png",
id: crypto$1.randomUUID()
});
awaitwriteCameraPayloadToFile({
filePath,
payload: photo,
expectedHost: resolvedNode.remoteIp,
invalidPayloadMessage: "invalid photos.latest payload"
});
if (options?.modelHasVision && photo.base64) {
content.push({
type: "image",
data: photo.base64,
mimeType: imageMimeFromFormat(photo.format) ?? (isJpeg ? "image/jpeg" : "image/png")
});
}
const createdAt = photoRaw && typeof photoRaw === "object" &&
!Array.isArray(photoRaw) ? photoRaw.createdAt : void0;
details.push({
index,
path: filePath,
width: photo.width,
height: photo.height,
...typeof createdAt === "string" ? { createdAt } : {}
});
}
returnawaitsanitizeToolResultImages({
content,
details: {
photos: details,
media: { mediaUrls: details.map((entry) => entry.path).filter((path) =>typeof path === "string") }
}
}, "nodes:photos_latest", imageSanitization);
}
// === action: camera_list / notifications_list / device_status / etc ===
case"camera_list":
case"notifications_list":
case"device_status":
case"device_info":
case"device_permissions":
case"device_health": {
const node = readStringParam$1(params, "node", { required: true });
const command = NODE_READ_ACTION_COMMANDS[action];
const payloadRaw = awaitinvokeNodeCommandPayload({
gatewayOpts,
node,
command
});
returnjsonResult(payloadRaw && typeof payloadRaw === "object" &&
payloadRaw !== null ? payloadRaw : {});
}
// === action: notifications_action ===
case"notifications_action": {
const node = readStringParam$1(params, "node", { required: true });
const notificationKey = readStringParam$1(params, "notificationKey", { required: true });
const notificationAction = typeof params.notificationAction === "string" ?
params.notificationAction.trim().toLowerCase() : "";
if (notificationAction !== "open" && notificationAction !== "dismiss" &&
notificationAction !== "reply") {
thrownewError("notificationAction must be open|dismiss|reply");
}
const notificationReplyText = typeof params.notificationReplyText === "string" ?
params.notificationReplyText.trim() : void0;
if (notificationAction === "reply" && !notificationReplyText) {
thrownewError("notificationReplyText required when notificationAction=reply");
}
const payloadRaw = awaitinvokeNodeCommandPayload({
gatewayOpts,
node,
command: "notifications.actions",
commandParams: {
key: notificationKey,
action: notificationAction,
replyText: notificationReplyText
}
});
returnjsonResult(payloadRaw && typeof payloadRaw === "object" &&
payloadRaw !== null ? payloadRaw : {});
}
// === 未知 action ===
default:
thrownewError(`Unknown action: ${action}`);
}
} catch (err) {
returnjsonResult({
ok: false,
error: err.message
});
}
}
};
}
2.4 支持的 Actions
|
|
|
|
status |
|
|
describe |
|
|
pending |
|
|
approve |
|
|
reject |
|
|
notify |
|
|
camera_snap |
|
|
photos_latest |
|
|
camera_list |
|
|
notifications_list |
|
|
notifications_action |
|
|
device_status |
|
|
device_info |
|
|
device_permissions |
|
|
device_health |
|
|
2.5 执行流程图
nodes 工具调用
↓
1. 解析 action(必填)
↓
2. 解析 node(如果需要)
↓
3. 根据 action 执行
├─ status → 列出节点
├─ describe → 描述节点
├─ notify → 发送通知
├─ camera_snap → 拍摄照片
│ ├─ 解析摄像头方向
│ ├─ 调用摄像头
│ ├─ 保存临时文件
│ └─ 返回图片(如果有视觉模型)
├─ photos_latest → 获取最新照片
└─ 其他 → 调用节点命令
↓
4. 返回结果
三、关键机制对比
3.1 权限控制
|
|
|
|
| 所有者限制 |
|
|
| Gateway 调用 |
|
|
3.2 操作类型
|
|
|
|
| actions 数量 |
|
|
| 创建操作 |
|
|
| 批量操作 |
|
|
3.3 媒体处理
|
|
|
|
| 图片支持 |
|
|
| 视觉模型 |
|
|
| 图片清理 |
|
|
夜雨聆风