
上一篇讲的是边界:ThreadScope 负责把一批并发任务收进同一个生命周期。
这一篇往里再走一层,直接来看执行链路,带着问题来看:
❝一个
❞Task从submit(...)到最后成功、失败或取消,中间到底经历了什么?
1. 为什么要有 Task<T>
CompletableFuture 本身已经非常强大了,但业务代码处理并发阶段时,往往还需要这些东西:
明确的任务名称 一眼就能看明白的状态机 实际执行线程的记录 稳定的取消入口 生命周期绑定 更贴近业务的等待和异常语义
所以 ThreadForge 额外包装了一层 Task<T> 做对外暴露。
Task 的核心字段在 src/main/java/io/threadforge/Task.java:
ounter(lineounter(lineounter(lineounter(lineounter(lineprivate final long id;private final String name;private final CompletableFuture<T> future;private final AtomicReference<State> state;private final AtomicReference<Thread> runnerThread;
状态定义如下:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linepublic enum State {PENDING,RUNNING,SUCCESS,FAILED,CANCELLED}
可以先这样来理解 :
❝❞
Task<T>是带状态机、元信息和取消能力的任务句柄。
2. Task 状态流转:从 PENDING 到终态
5 个状态,5 个状态切换方法,路径很少,清晰明了:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linestateDiagram-v2[*] --> PENDING: new TaskPENDING --> RUNNING: markRunning(thread)RUNNING --> SUCCESS: markSuccess()RUNNING --> FAILED: markFailed()RUNNING --> CANCELLED: markCancelled()PENDING --> CANCELLED: cancel before runPENDING --> FAILED: rejected / timeoutSUCCESS --> [*]FAILED --> [*]CANCELLED --> [*]
状态切换的入口都在 Task 上,包级可见,外部只能读:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// 包级 — 由 ThreadScope 在正确时机调用boolean markRunning(Thread runner)void markSuccess()void markFailed()void markCancelled()void interruptRunner()
先看 markRunning(...):
ounter(lineounter(lineounter(lineounter(lineboolean marked = state.compareAndSet(State.PENDING, State.RUNNING);if (marked) {runnerThread.set(runner);}
主要做了两件事:
CAS 从 PENDING切到RUNNING——只有一个线程能成功成功后把执行线程记下来——后面取消时知道该中断谁
如果 CAS 失败(比如任务在排队的间隙被 cancel 了),方法返回 false,调用方跳过执行。这就是 cancel 和 markRunning 之间的竞态保护:cancel 先改状态为 CANCELLED,markRunning 的 CAS 就会失败,任务不会开始跑。
其余三个终态方法结构对称——清掉 runner 引用,设置状态:
ounter(lineounter(lineounter(linevoid markSuccess() { runnerThread.set(null); state.set(State.SUCCESS); }void markFailed() { runnerThread.set(null); state.set(State.FAILED); }void markCancelled() { runnerThread.set(null); state.set(State.CANCELLED); }
终态后 runnerThread 清理成 null,避免外界持有一个已终止线程的句柄。
3. cancel:三步走,协作式终止
Task.cancel() 实现很简洁:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linepublic boolean cancel() {state.set(State.CANCELLED);Thread runner = runnerThread.get();if (runner != null) {runner.interrupt();}return future.cancel(true);}
其实就三步,保持顺序:
「先设状态为 CANCELLED」——这一步配合上一节讲的markRunningCAS,抢占式阻止任务启动「再中断 runner」——如果任务已经在跑,发中断信号;如果还没拿到 runner,跳过 「最后取消底层 future」—— future.cancel(true)会尝试中断 future 内部的执行线程
注意为什么是「协作式」终止:Thread.interrupt() 只是发信号,不是强行杀线程。任务代码、阻塞点、中断处理逻辑要一起配合才能停下来。ThreadForge 靠三层协作:
Thread.interrupt()—— 唤醒阻塞中的线程CancellationToken.throwIfCancelled()—— 任务代码主动检查取消点任务业务逻辑对中断的响应 —— 捕获 InterruptedException后做清理并退出
单独哪一层都不够,三层叠加才算是可靠。
4. submit 的完整时序:从参数校验到进入调度器
ThreadScope 有十几个 submit(...) 重载,全部收敛到一个私有方法。去掉参数校验和并发许可申请后,核心构造逻辑如下:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linefinal CompletableFuture<T> future = new CompletableFuture<T>();final Task<T> task = new Task<T>(id, name, future);final TaskInfo info = new TaskInfo(scopeId, id, name, Instant.now(), scheduler.name());final ExecutionContextCarrier executionContext = ExecutionContextCarrier.capture();final ScheduledTask timeoutTask = scheduleTaskTimeout(task, info, taskTimeout);tasks.add(task);future.whenComplete((value, throwable) -> {tasks.remove(task);if (timeoutTask != null) {timeoutTask.cancel();}});
然后交给调度器:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(linescheduler.executor().execute(Scheduler.prioritized(executionContext.wrapRunnable(() -> runTask(task, info, callable, taskRetryPolicy,permitAcquired ? semaphore : null)),taskPriority,id));
如果调度器拒绝执行(RejectedExecutionException),框架会释放并发许可、标记任务失败并触发 hook。
逐行看:
建 CompletableFuture— 任务结果的载体建 Task— 带状态机和元信息的句柄建 TaskInfo— 观测用的快照数据捕获上下文 — Context和 OpenTelemetry 双份注册任务级 timeout — 可选的独立时间预算 登记到 tasks— scope 后续可以捞出所有任务「注册 whenComplete回调」 — 任务无论成功、失败还是取消,结束时自动从tasks队列移除,同时取消关联的 timeout 任务包成带优先级的 runnable,交给调度器
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linesequenceDiagramparticipant Userparticipant Scope as ThreadScopeparticipant Context as ExecutionContextCarrierparticipant Schedulerparticipant Taskparticipant Retry as RetryExecutorparticipant Hook as ThreadHookparticipant MetricsUser->>Scope: submit(name, callable)Scope->>Scope: lockConfiguration()Scope->>Scope: acquireSubmissionPermit()Scope->>Task: new Task(id, name, future)Scope->>Context: capture()Scope->>Scope: scheduleTaskTimeout()Scope->>Scope: tasks.add(task)Scope->>Scheduler: executor.execute(prioritizedRunnable)Scheduler->>Scope: runTask(task, callable)Scope->>Task: markRunning(currentThread)Scope->>Hook: onStart(info)Scope->>Metrics: recordStart()Scope->>Retry: execute(callable, retryPolicy, token)alt callable successRetry-->>Scope: valueScope->>Task: markSuccess()Scope->>Hook: onSuccess(info, duration)Scope->>Metrics: recordTerminal(SUCCESS)else callable failedRetry-->>Scope: throwableScope->>Task: markFailed()Scope->>Hook: onFailure(info, error, duration)Scope->>Metrics: recordTerminal(FAILED)else cancelledScope->>Task: markCancelled()Scope->>Hook: onCancel(info, duration)Scope->>Metrics: recordTerminal(CANCELLED)end
上半段提交,下半段执行,最后按成功、失败、取消三条路径收尾。
5. runTask(...):真正执行任务的内核
runTask(...) 是整条链路的执行引擎。完整方法约 40 行,精简后主结构如下:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linelong started = System.nanoTime();CompletableFuture<T> future = task.toCompletableFuture();try {// ① 启动前双重取消检查if (task.isCancelled() || token.isCancelled()) {completeTaskCancelled(task, future, new CancelledException("..."), info, started);return;}// ② 状态机: PENDING → RUNNINGif (!task.markRunning(Thread.currentThread())) {return;}// ③ 触发观测safeHookStart(info);token.throwIfCancelled();// ④ 真正执行(含重试)T value = RetryExecutor.execute(callable, retryPolicy, token);if (future.complete(value)) {task.markSuccess();safeHookSuccess(info, elapsedNanos(started));}} catch (InterruptedException e) {Thread.currentThread().interrupt();completeTaskCancelled(task, future, new CancelledException("..."), info, started);} catch (CancelledException e) {completeTaskCancelled(task, future, e, info, started);} catch (Throwable t) {completeTaskFailure(task, future, t, info, started);} finally {if (acquiredSemaphore != null) {acquiredSemaphore.release(); // 释放并发许可}}
异常走了三条分叉:
InterruptedException→ 重置线程中断标志后走取消路径CancelledException→ 直接走取消路径其他 Throwable→ 走失败路径(completeTaskFailure内部通过future.completeExceptionally(...)把异常写入 future,再调用task.markFailed()和 hook)
失败、取消、重试、并发许可——全收在这一个方法里。
6. Scheduler:执行策略和所有权分离
ounter(lineounter(lineounter(lineounter(lineprivate final ExecutorService executor;private final boolean ownsExecutor;private final String name;private final boolean virtualThreadMode;
四个字段,四个职责:
executor— 任务交给谁执行ownsExecutor— scope 关闭时要不要连带关掉执行器name— 日志和观测里的标识virtualThreadMode— 是否跑在虚拟线程上
6.1 Scheduler.detect()
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(linepublic static Scheduler detect() {if (isVirtualThreadSupported()) {return virtualThreads();}return commonPool();}
isVirtualThreadSupported() 通过反射探测 Executors.newVirtualThreadPerTaskExecutor(),virtualThreads() 内部 DCL 延迟创建。反射找不到方法或调用失败,自动回退到 commonPool()。调用方一行不用改。
6.2 Scheduler.fixed(int size)
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineint queueCapacity = Math.max(256, size * 100);new ThreadPoolExecutor(size, size, 60L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(queueCapacity),new NamedThreadFactory("threadforge-fixed"),new ThreadPoolExecutor.CallerRunsPolicy());executor.allowCoreThreadTimeOut(true);
core = max,固定线程数;线程打满后任务入有界队列 queueCapacity至少 256,随线程数线性放大CallerRunsPolicy:队列满时提交线程自己执行,形成自然反压allowCoreThreadTimeOut(true):空闲超过 60 秒的核心线程也回收,避免高峰过后空转
6.3 Scheduler.priority(int size)
同样固定线程池,但队列换成无界的 PriorityBlockingQueue:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(linenew ThreadPoolExecutor(size, size, 60L, TimeUnit.SECONDS,new PriorityBlockingQueue<Runnable>(),new NamedThreadFactory("threadforge-priority"),new ThreadPoolExecutor.CallerRunsPolicy());
队列无界,CallerRunsPolicy 一般不会被触发。优先级靠 PrioritizedRunnable.compareTo(...) 实现:
ounter(lineounter(lineounter(lineint byPriority = Integer.compare(this.taskPriority.rank(), other.taskPriority.rank());if (byPriority != 0) return byPriority;return Long.compare(this.sequence, other.sequence);
先按 TaskPriority 排,同优先级按提交顺序(FIFO)。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineflowchart TDA[Scheduler.detect] --> B{JDK 支持虚拟线程?}B -->|是| C[virtualThreads]B -->|否| D[commonPool]E[Scheduler.fixed] --> F[ThreadPoolExecutor + LinkedBlockingQueue]G[Scheduler.priority] --> H[ThreadPoolExecutor + PriorityBlockingQueue]I[Scheduler.from] --> J[外部 ExecutorService]C --> K[executor.execute]D --> KF --> KH --> KJ --> K
无论哪种方式创建,最后都从 executor.execute(...) 出去。
7. ExecutionContextCarrier:线程切换时,上下文怎么跟着走
线程切换后,提交线程里的 Context 和 OpenTelemetry 上下文就断了——两者都基于线程局部变量。ExecutionContextCarrier 专门解决这个问题:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(line// 提交时捕获static ExecutionContextCarrier capture() {return new ExecutionContextCarrier(Context.capture(),OpenTelemetryBridge.currentContext());}// 执行时安装,结束后恢复Context.Snapshot previous = Context.install(contextSnapshot);Object scope = OpenTelemetryBridge.makeCurrent(otelParentContext);try {return callable.call();} finally {OpenTelemetryBridge.closeScope(scope);Context.restore(previous);}
传播了两类上下文:
Context——ThreadForge自己的线程局部变量OpenTelemetry —— 外部 Tracing 系统的当前 span
❝submit 时 capture,执行时 install,结束后 restore。
❞
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linesequenceDiagramparticipant Main as 提交线程participant Carrier as ExecutionContextCarrierparticipant Worker as 工作线程participant Contextparticipant OTel as OpenTelemetryBridgeMain->>Context: Context.put(traceId)Main->>Carrier: capture()Carrier->>Context: Context.capture()Carrier->>OTel: currentContext()Worker->>Carrier: wrapCallable.call()Carrier->>Context: install(snapshot)Carrier->>OTel: makeCurrent(parentContext)Worker->>Worker: 执行业务 callableCarrier->>OTel: closeScope()Carrier->>Context: restore(previous)
8. RetryExecutor:重试为什么必须感知取消
RetryExecutor 的核心循环是这样的:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linewhile (true) {token.throwIfCancelled();try {return callable.call();} catch (InterruptedException e) {throw e; // 中断直接透传,不重试} catch (CancelledException e) {throw e; // 取消直接透传,不重试} catch (Throwable failure) {if (!retryPolicy.allowsRetry(attempt, failure)) {// 不再重试,把之前所有失败附加到 suppressedif (previousFailures != null) {for (Throwable prev : previousFailures) {if (prev != failure) failure.addSuppressed(prev);}}throw failure;}previousFailures.add(failure);sleepBeforeRetry(retryPolicy.nextDelay(attempt, failure), token);attempt++;}}
「中断和取消直接透传」——不是业务失败,不能重试,框架必须立即响应 「每次循环先检查 token」——上次执行成功后,下一轮也要看 scope 是否已取消 「sleep 分片」——每 100ms 醒来检查一次取消
sleepBeforeRetry(...) 实现:
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(linelong remainingMillis = delay.toMillis();while (remainingMillis > 0L) {token.throwIfCancelled();long chunk = Math.min(remainingMillis, 100L);Thread.sleep(chunk);remainingMillis -= chunk;}
❝重试也要服从作用域的取消和 deadline。
❞
如果最终还是失败,前面每次失败的异常会累加到最终异常的 suppressed 里——排查时能看到完整的失败历史,不是只有最后一次的错误信息。
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineflowchart TDA[RetryExecutor.execute] --> B[token.throwIfCancelled]B --> C[callable.call]C -->|成功| D[返回结果]C -->|InterruptedException| E[直接抛出]C -->|CancelledException| F[直接抛出]C -->|其他异常| G{RetryPolicy 允许重试?}G -->|否| H[附加 suppressed failures]H --> I[抛出最终异常]G -->|是| J[记录本次失败]J --> K[计算 nextDelay]K --> L[分片 sleep, 每 100ms 检查取消]L --> M[attempt++]M --> B
分片 sleep 是关键:假设重试要等 5 秒,整段睡过去,中途收到取消信号就会反应慢半拍。分成 100ms 的小段,每段醒来检查一次取消,scope 一喊停就能更快响应。
9. 两层时间预算:scope deadline + task timeout
9.1 scope 级 deadline
ounter(linewithDeadline(Duration)
配置整个 ThreadScope 的截止时间。到达后:
deadlineTriggered = true,token.cancel()作用域内所有任务收到取消信号 后续等待抛出 ScopeTimeoutException
9.2 task 级 timeout
ounter(linescope.submit("rpc-a", callable, Duration.ofMillis(200));
给单个任务设置独立预算。内部注册一个超时任务:
ounter(lineounter(lineounter(lineounter(lineounter(lineif (task.toCompletableFuture().completeExceptionally(timeoutException)) {task.markFailed();task.interruptRunner();safeHookFailure(info, timeoutException, timeout.toNanos());}
只影响当前任务:标记失败、中断执行线程、触发 hook。不影响 scope 内其他任务。
ThreadScope | |||
Task |
一个管全局,一个管局部。
10. Hook 和 Metrics:观测内嵌在执行链路里
ThreadHook 是 interface,4 个方法全是 default——只覆写需要的:
ounter(lineounter(lineounter(lineounter(linedefault void onStart(TaskInfo info) {}default void onSuccess(TaskInfo info, Duration duration) {}default void onFailure(TaskInfo info, Throwable error, Duration duration) {}default void onCancel(TaskInfo info, Duration duration) {}
通过 andThen(...) 组合多个 hook,每个回调内部独立 catch 异常——前一个抛异常不阻止后续执行:
ounter(lineThreadHook combined = hook1.andThen(hook2).andThen(hook3);
元信息来自 TaskInfo:
ounter(linenew TaskInfo(scopeId, id, name, Instant.now(), scheduler.name())
内置指标由 ScopeMetrics 维护,LongAdder + AtomicLong,竞争开销低:
ounter(lineounter(linestarted / succeeded / failed / cancelledtotalDurationNanos / maxDurationNanos
观测和执行事件发生在同一时刻:
ounter(lineounter(lineounter(linesafeHookStart(info); metrics.recordStart();// ... 执行 ...safeHookSuccess(...) metrics.recordTerminal(Task.State.SUCCESS, ...)
ounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineounter(lineflowchart TDA[runTask 开始] --> B[safeHookStart]B --> C[metrics.recordStart]C --> D[执行业务 Callable]D -->|成功| E[safeHookSuccess]E --> F[metrics.recordTerminal SUCCESS]D -->|失败| G[safeHookFailure]G --> H[metrics.recordTerminal FAILED]D -->|取消| I[safeHookCancel]I --> J[metrics.recordTerminal CANCELLED]
任务走到哪个生命周期节点,hook 和 metrics 就跟到哪里。
写在最后
最近换工作,刚进了一个新团队。上家的节奏比较正常,现在几乎每天加班赶进度。
说不清忙和闲哪个更好。
职位又往上走了一步,压力也跟着翻了一倍。
向上对齐、向下兜底,还是技术最纯粹。没有汇报,没有上下级,简单而美好。
这一篇是 ThreadForge 源码解读的执行链路部分,下一篇继续,解读高阶编排怎么在一个 scope 里协同工作。 也算是给整个系列收官。
项目地址:github.com/wuuJiawei/ThreadForge,欢迎提 issue,也欢迎 star。
夜雨聆风