乐于分享
好东西不私藏

Java 多线程神器 ThreadForge,让多线程从此简单

Java 多线程神器 ThreadForge,让多线程从此简单

前言

你是否曾被多线程代码折磨过?一个简单的「并发调用三个接口」,写起来却要 50 多行代码:创建线程池、提交任务、处理 Future、写 try-finally 确保关闭、加超时逻辑、处理异常传播……每次都要重新思考一遍边界条件。

今天要介绍的 ThreadForge,就是为解决这个痛点而生的。


一、被忽视的并发复杂度

场景重现

产品经理说:「用户详情页太慢了,能不能优化一下?」

你一看代码,三个接口串行调用:先查用户信息(200ms),再查订单列表(200ms),最后查积分余额(200ms),加起来 600ms。

「简单,改成并发调用就行。」你心想。

于是你写出了这样的代码:

ExecutorService executor = Executors.newFixedThreadPool(10);try {    Future<User> userFuture = executor.submit(() -> userService.get(uid));    Future<List<Order>> ordersFuture = executor.submit(() -> orderService.list(uid));    Future<Profile> profileFuture = executor.submit(() -> profileService.get(uid));    User user = userFuture.get(500, TimeUnit.MILLISECONDS);    List<Order> orders = ordersFuture.get(500, TimeUnit.MILLISECONDS);    Profile profile = profileFuture.get(500, TimeUnit.MILLISECONDS);return buildResponse(user, orders, profile);catch (TimeoutException e) {// 超时了,其他任务会被取消吗?    userFuture.cancel(true);    ordersFuture.cancel(true);    profileFuture.cancel(true);thrownew RuntimeException("timeout");catch (ExecutionException e) {// 某个服务挂了,其他任务要取消吗?thrownew BusinessException("service error", e.getCause());finally {    executor.shutdown();// 还要等待 terminated?    executor.awaitTermination(5, TimeUnit.SECONDS);}

这段代码看起来已经够完善了,但测试时还是出现了各种问题:

  • 线程泄漏
  • 异常没有正确传播
  • 某个服务超时后其他任务没有取消
  • 代码重复,难以复用

传统方案的困境

Java 的 ExecutorServiceFutureCompletableFuture 确实非常强大,但也足够啰嗦:

任务
繁琐程度
线程池管理
手动创建、配置、关闭
超时处理
每个任务都要写一遍
失败传播
要么吞掉,要么手动包装
任务取消
需要自己判断和调用 cancel
资源清理
try-finally 必不可少

每次遇到类似场景,这些逻辑都要重新写一遍。

某一天,你猛然惊醒:写并发代码,不应该这么费脑子。


二、ThreadForge:化繁为简的结构化并发

设计哲学

ThreadForge 的核心理念是:先降低认知成本,再追求性能

你可以把它理解成一个结构化并发框架——让你用写同步代码的思维写并发代码,同时自动处理那些容易遗漏的边界情况。也可以把它理解成对 Java 内置并发工具的二次包装,目标是让 Java 并发更简单、更清晰。

核心概念:ThreadScope

在 ThreadForge 中,最核心的概念是 ThreadScope(线程作用域)。所有任务都必须在 Scope 内提交,Scope 负责管理所有任务的生命周期。

try (ThreadScope scope = ThreadScope.open()) {    Task<String> user = scope.submit("load-user", () -> fetchUser());    Task<Integer> orders = scope.submit("load-orders", () -> fetchOrders());    scope.await(user, orders);// 到这里,两个任务肯定都结束了(成功、失败或超时)    String result = user.await() + ":" + orders.await();}// scope 关闭时,所有任务自动取消、资源自动清理

这段代码有几个关键点:

  1. 生命周期有边界:所有任务都绑定在 ThreadScope 内,不会泄漏
  2. 默认安全:默认超时、默认失败传播、自动取消
  3. 结构清晰:代码结构就是任务关系,一眼看出是并发执行

对比传统写法,你需要:

  • 创建线程池,配置核心线程数、队列大小
  • 提交任务,手动处理 Future
  • 写 try-finally 确保 shutdown
  • 手动处理超时和异常传播

ThreadForge 让你省掉这些重复劳动,专注业务逻辑。


三、五大核心设计解析

1. 默认行为就是正确的

// 默认:FAIL_FAST + 30秒超时 + 自动取消其他任务try (ThreadScope scope = ThreadScope.open()) {    Task<Integer> a = scope.submit(() -> riskyRpc());    Task<Integer> b = scope.submit(() -> anotherRpc());    scope.await(a, b);catch (ScopeTimeoutException timeout) {// 超时了,所有任务已被自动取消    fallback();catch (FailurePropagationException failed) {// 某个任务失败了,其他任务已被自动取消    handleError(failed);}

不需要配置,不需要思考,开箱即用。这就是 ThreadForge 的设计理念:最常用的场景,应该用最少的代码实现

2. 失败策略明确且统一

不同场景对失败的容忍度不同,ThreadForge 提供了 5 种明确的策略:

策略
行为
FAIL_FAST
快速失败,立即取消其他任务(默认)
COLLECT_ALL
等所有任务结束,汇总所有失败
SUPERVISOR
不自动取消,失败信息收集到 Outcome
CANCEL_OTHERS
失败后取消其余任务,但不抛异常
IGNORE_ALL
忽略失败,只返回成功的结果
// 场景:批量导入,即使部分失败也要知道哪些成功了try (ThreadScope scope = ThreadScope.open()        .withFailurePolicy(FailurePolicy.SUPERVISOR)) {    List<Task<Void>> tasks = ids.stream()        .map(id -> scope.submit(() -> importData(id)))        .collect(toList());    Outcome outcome = scope.await(tasks);// 明确知道哪些成功、哪些失败    log.info("成功: {}, 失败: {}"        outcome.successCount(), outcome.failureCount());}

3. 并发度控制不再需要手动管理队列

// 场景:调用外部 API,最多同时 50 个请求try (ThreadScope scope = ThreadScope.open()        .withConcurrencyLimit(50)) {    List<Task<Result>> tasks = hugeIdList.stream()        .map(id -> scope.submit(() -> externalApi.call(id)))        .collect(toList());    List<Result> results = scope.awaitAll(tasks);}// 自动限流,不会把外部服务打爆

不需要自己写信号量,不需要手动分批,框架自动处理。这就是声明式并发的魅力。

4. 生命周期观测统一收口

ThreadScope scope = ThreadScope.open()    .withHook(new ThreadHook() {@OverridepublicvoidonStart(TaskInfo info){            metrics.taskStarted(info.name());        }@OverridepublicvoidonSuccess(TaskInfo info, Duration duration){            metrics.taskSuccess(info.name(), duration.toMillis());        }@OverridepublicvoidonFailure(TaskInfo info, Throwable error, Duration duration){            log.error("Task {} failed after {}", info.name(), duration, error);            metrics.taskFailed(info.name());        }    });

一处埋点,全局生效。不需要在每个任务里重复写日志和监控代码。

5. 跨 JDK 版本的一致体验

try (ThreadScope scope = ThreadScope.open()) {// JDK 21+:自动使用虚拟线程// JDK 8-20:自动降级到线程池    Task<String> task = scope.submit(() -> longRunningTask());return task.await();}

不需要分叉代码,不需要写 if-else,框架自动适配。


四、典型应用场景

场景一:并发 RPC 聚合

这是最常见的多线程场景:一次页面请求需要调用多个下游服务。

try (ThreadScope scope = ThreadScope.open()) {    Task<User> user = scope.submit(() -> userService.get(uid));    Task<List<Order>> orders = scope.submit(() -> orderService.list(uid));    Task<Profile> profile = scope.submit(() -> profileService.get(uid));    scope.await(user, orders, profile);return buildResponse(user.await(), orders.await(), profile.await());}

原来串行需要 600ms,现在并发只需 200ms,性能提升 3 倍。

场景二:批量数据处理

处理大批量数据时,需要控制并发度,避免打爆数据库或外部服务。

try (ThreadScope scope = ThreadScope.open()        .withConcurrencyLimit(100)        .withDeadline(Duration.ofMinutes(5))) {    List<Task<Void>> tasks = records.stream()        .map(r -> scope.submit(() -> process(r)))        .collect(toList());    scope.awaitAll(tasks);}

自动限流 100 个并发,5 分钟内必须完成。

场景三:生产者-消费者模式

有时候需要启动多个线程处理同一批数据。

try (ThreadScope scope = ThreadScope.open()) {    Channel<Data> channel = Channel.bounded(1000);// 生产者    scope.submit(() -> {for (Data d : datasource) {            channel.send(d);        }        channel.close();returnnull;    });// 4 个消费者    List<Task<Void>> consumers = IntStream.range(04)        .mapToObj(i -> scope.submit(() -> {for (Data d : channel) {                process(d);            }returnnull;        }))        .collect(toList());    scope.awaitAll(consumers);}

五、快速上手

添加依赖

Maven:

<dependency><groupId>pub.lighting</groupId><artifactId>threadforge-core</artifactId><version>1.0.1</version></dependency>

Gradle:

implementation("pub.lighting:threadforge-core:1.0.1")

最小示例

try (ThreadScope scope = ThreadScope.open()) {    Task<String> task = scope.submit(() -> "Hello, ThreadForge");    System.out.println(task.await());}

小结

ThreadForge 的目标不是取代所有并发工具,而是让 80% 的常见场景变得简单、安全、可维护

当你还在调试并发问题时,当新人看不懂老代码里的线程逻辑时,当你想加个超时却不知道从哪儿改起时——不妨试试 ThreadForge。

让并发回归简单,让代码重新可读。


互动话题

你在日常开发中遇到过哪些多线程的「坑」?欢迎在评论区分享你的经历,一起讨论如何写出更优雅的并发代码。

本站文章均为手工撰写未经允许谢绝转载:夜雨聆风 » Java 多线程神器 ThreadForge,让多线程从此简单

猜你喜欢

  • 暂无文章