Java CompletableFuture
本文最后更新于 2025-10-31,文章内容可能已经过时。
一、基本概念
CompletableFuture是Java 8引入的异步编程工具,实现了CompletionStage和Future接口。它解决了传统Future的阻塞和回调地狱问题,提供了非阻塞、函数式、可编排的异步编程能力。
核心优势
- 非阻塞回调:提供
thenApply、thenAccept等方法,任务完成后自动触发回调 - 链式编程:支持将多个异步任务以链式(流水线)方式组合
- 多任务组合:轻松等待所有任务完成(
allOf)或任意一个任务完成(anyOf) - 优雅的异常处理:专门的异常处理方法
- 手动完成:可以手动设置结果或异常
二、CompletableFuture核心机制
任务状态
CompletableFuture有三种状态:
- 未完成(Incomplete)
- 正常完成(Completed Normally)
- 异常完成(Completed Exceptionally)
状态一旦完成(正常/异常),就不可再改变。
线程池
- 默认:使用
ForkJoinPool.commonPool()(公共线程池) - 推荐:生产环境中使用自定义线程池,避免阻塞公共线程池
- 自定义线程池:通过
supplyAsync(supplier, executor)或runAsync(runnable, executor)指定
三、创建异步任务
1. 有返回值任务(supplyAsync)
// 使用默认线程池
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行有返回值任务,线程:" + Thread.currentThread().getName());
return "Hello from async";
});
// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行有返回值任务,自定义线程池,线程:" + Thread.currentThread().getName());
return "Hello from custom thread pool";
}, executor);
2. 无返回值任务(runAsync)
// 使用默认线程池
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
System.out.println("执行无返回任务,线程:" + Thread.currentThread().getName());
});
// 使用自定义线程池
CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> {
System.out.println("执行无返回任务,自定义线程池,线程:" + Thread.currentThread().getName());
}, executor);
3. 手动创建已完成的CompletableFuture(completedFuture)
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Already completed");
String result = completedFuture.get(); // 直接返回"Already completed"
四、链式任务编排
1. thenApply(同步转换结果)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> resultFuture = future.thenApply(s -> s + " World");
String result = resultFuture.get(); // "Hello World"
2. thenApplyAsync(异步转换结果)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> resultFuture = future.thenApplyAsync(s -> {
try {
Thread.sleep(1000); // 模拟耗时操作
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return s + " World";
});
String result = resultFuture.get(); // "Hello World"
3. thenAccept(消费结果,无返回)
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(s -> System.out.println("Result: " + s)); // 输出: Result: Hello
4. thenRun(不关心结果,只执行后续逻辑)
CompletableFuture.supplyAsync(() -> "Hello")
.thenRun(() -> System.out.println("Task completed")); // 输出: Task completed
5. thenCompose(串行组合,前一个结果用于下一个)
CompletableFuture<String> getUser = CompletableFuture.supplyAsync(() -> "Alice");
CompletableFuture<String> getOrder = getUser.thenCompose(name ->
CompletableFuture.supplyAsync(() -> "Order for " + name));
String result = getOrder.get(); // "Order for Alice"
6. whenComplete(执行副作用,如日志)
CompletableFuture.supplyAsync(() -> "Hello")
.whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("Error: " + ex.getMessage());
} else {
System.out.println("Result: " + result);
}
});
五、任务组合
1. thenCombine(并行合并两个任务)
CompletableFuture<Double> price = CompletableFuture.supplyAsync(() -> 100.0);
CompletableFuture<Double> discount = CompletableFuture.supplyAsync(() -> 0.9);
CompletableFuture<Double> finalPrice = price.thenCombine(discount, (p, d) -> p * d);
double result = finalPrice.get(); // 90.0
2. allOf(等待所有任务完成)
CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(() -> "Product");
CompletableFuture<String> stockFuture = CompletableFuture.supplyAsync(() -> "Stock");
CompletableFuture<String> reviewFuture = CompletableFuture.supplyAsync(() -> "Review");
CompletableFuture<Void> allFutures = CompletableFuture.allOf(productFuture, stockFuture, reviewFuture);
String result = allFutures.thenApply(v ->
String.format("Product: %s, Stock: %s, Review: %s",
productFuture.join(),
stockFuture.join(),
reviewFuture.join()
)).get();
// 输出: Product: Product, Stock: Stock, Review: Review
3. anyOf(等待任意一个任务完成)
CompletableFuture<String> fastFuture = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000);
return "Fast result";
});
CompletableFuture<String> slowFuture = CompletableFuture.supplyAsync(() -> {
Thread.sleep(2000);
return "Slow result";
});
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(fastFuture, slowFuture);
String result = (String) anyFuture.get(); // 会先获取fastFuture的结果
六、异常处理
1. exceptionally(仅处理异常,返回默认值)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Something went wrong");
});
String result = future.exceptionally(ex -> {
System.out.println("Exception caught: " + ex.getMessage());
return "Default value";
}).get(); // "Default value"
2. handle(无论成功或异常都会执行,可同时获取结果和异常)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Something went wrong");
});
String result = future.handle((s, ex) -> {
if (ex != null) {
System.out.println("Exception caught: " + ex.getMessage());
return "Default value";
}
return s;
}).get(); // "Default value"
七、超时控制
1. orTimeout(设置超时时间,超时后抛出TimeoutException)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
Thread.sleep(3000);
return "Result";
});
try {
String result = future.orTimeout(2, TimeUnit.SECONDS).get();
} catch (TimeoutException ex) {
System.out.println("Operation timed out");
}
2. completeOnTimeout(设置超时后的默认值,避免长时间阻塞)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
Thread.sleep(3000);
return "Result";
});
String result = future.completeOnTimeout("Timeout value", 2, TimeUnit.SECONDS).get();
// 输出: Timeout value
八、实战场景示例
场景1:并行聚合独立任务(商品详情页)
// 1. 使用自定义线程池,避免使用默认的公共线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 2. 并行发起异步调用
CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(() -> fetchProductInfo("123"), executor);
CompletableFuture<String> stockFuture = CompletableFuture.supplyAsync(() -> fetchStockInfo("123"), executor);
CompletableFuture<String> reviewFuture = CompletableFuture.supplyAsync(() -> fetchReviewInfo("123"), executor);
// 3. 使用 allOf 等待所有任务完成,然后合并结果
String aggregatedResult = CompletableFuture.allOf(productFuture, stockFuture, reviewFuture)
.thenApply(v -> {
// 由于所有任务已完成,join() 不会阻塞,立即返回结果
String product = productFuture.join();
String stock = stockFuture.join();
String review = reviewFuture.join();
return String.format("商品: %s, 库存: %s, 评价: %s", product, stock, review);
}).join(); // 阻塞主线程等待最终聚合结果
System.out.println(aggregatedResult);
// 模拟服务调用
private String fetchProductInfo(String id) {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "iPhone 13";
}
private String fetchStockInfo(String id) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "In Stock";
}
private String fetchReviewInfo(String id) {
try {
Thread.sleep(400);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Excellent product";
}
效果:总耗时从串行的900ms(300+200+400)缩短到约400ms。
场景2:链式异步任务编排(用户订单流程)
// 1. 获取用户信息
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching user data...");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new User("Alice", "alice@example.com");
});
// 2. 根据用户信息获取订单
CompletableFuture<Order> orderFuture = userFuture.thenCompose(user ->
CompletableFuture.supplyAsync(() -> {
System.out.println("Fetching order for " + user.getName());
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new Order(user.getName(), "Order #123", 100.0);
})
);
// 3. 获取支付信息
CompletableFuture<Payment> paymentFuture = orderFuture.thenCompose(order ->
CompletableFuture.supplyAsync(() -> {
System.out.println("Processing payment for " + order.getOrderNumber());
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new Payment(order.getOrderNumber(), 100.0, "Paid");
})
);
// 4. 最终结果
CompletableFuture<String> resultFuture = paymentFuture.thenApply(payment ->
String.format("Payment successful for order %s: $%.2f",
payment.getOrderNumber(), payment.getAmount())
);
// 阻塞获取最终结果
String result = resultFuture.get();
System.out.println(result);
场景3:并行组合与异常处理(多服务调用)
// 1. 创建多个异步任务
CompletableFuture<String> serviceA = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Service A result";
});
CompletableFuture<String> serviceB = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Service B result";
});
CompletableFuture<String> serviceC = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Service C failed");
});
// 2. 并行组合并处理异常
CompletableFuture<String> combinedResult = CompletableFuture
.allOf(serviceA, serviceB, serviceC)
.thenApply(v -> {
// 由于所有任务已完成,可以安全获取结果
return String.format("Service A: %s, Service B: %s, Service C: %s",
serviceA.join(),
serviceB.join(),
serviceC.join());
})
.exceptionally(ex -> {
// 处理任何可能的异常
System.out.println("Error occurred: " + ex.getMessage());
return "Error processing services";
});
String result = combinedResult.get();
System.out.println(result);
九、最佳实践
-
使用自定义线程池:避免使用默认的公共线程池,防止阻塞其他应用
ExecutorService executor = Executors.newFixedThreadPool(10); -
避免阻塞主线程:在Web应用中,避免在Controller中使用
get()或join(),使用异步处理CompletableFuture.supplyAsync(() -> processRequest(), executor) .thenAccept(result -> sendResponse(result)); -
合理处理异常:总是为可能失败的任务添加异常处理
.exceptionally(ex -> { log.error("Error processing task", ex); return defaultValue; }) -
避免回调地狱:使用链式调用而不是嵌套回调
// 避免 future1.thenAccept(result1 -> { future2.thenAccept(result2 -> { // 处理 }); }); // 推荐 future1.thenCombine(future2, (result1, result2) -> { // 处理 }); -
使用超时控制:防止长时间阻塞
resultFuture.completeOnTimeout("Timeout", 3, TimeUnit.SECONDS); -
测试时使用固定线程池:避免测试时依赖默认的ForkJoinPool
ExecutorService executor = Executors.newFixedThreadPool(1);
十、CompletableFuture方法总结表
| 方法类别 | 方法 | 作用 | 适用场景 |
|---|---|---|---|
| 创建任务 | supplyAsync | 有返回值的异步任务 | 需要返回结果的异步操作 |
| runAsync | 无返回值的异步任务 | 不需要返回结果的异步操作 | |
| completedFuture | 创建已完成的CompletableFuture | 预定义结果 | |
| 链式操作 | thenApply | 同步转换结果 | 对结果进行简单转换 |
| thenApplyAsync | 异步转换结果 | 耗时操作的转换 | |
| thenAccept | 消费结果,无返回 | 仅需处理结果 | |
| thenRun | 不关心结果,只执行 | 仅需执行后续逻辑 | |
| thenCompose | 串行组合,前一个结果用于下一个 | 依赖前序任务结果 | |
| whenComplete | 执行副作用(如日志) | 需要记录执行状态 | |
| 任务组合 | thenCombine | 并行合并两个任务 | 合并两个独立任务的结果 |
| allOf | 等待所有任务完成 | 需要所有任务都完成 | |
| anyOf | 等待任意一个任务完成 | 只需一个任务完成即可 | |
| 异常处理 | exceptionally | 仅处理异常,返回默认值 | 仅需处理异常 |
| handle | 无论成功或异常都会执行 | 需要同时处理结果和异常 | |
| 控制与获取 | join | 阻塞获取结果 | 需要阻塞等待结果 |
| get | 阻塞获取结果 | 需要阻塞等待结果 | |
| completeOnTimeout | 设置超时后的默认值 | 避免长时间阻塞 | |
| orTimeout | 设置超时时间,超时后抛出TimeoutException | 需要超时控制 |
十一、总结
CompletableFuture是Java 8引入的异步编程核心工具,它通过以下方式解决了传统Future的局限:
- 非阻塞回调:提供
thenApply、thenAccept等方法,任务完成后自动触发回调 - 链式编排:将多个异步任务以链式方式组合,避免回调地狱
- 多任务组合:轻松处理多个异步任务的依赖关系
- 优雅的异常处理:提供专门的异常处理机制
- 超时控制:避免长时间阻塞
在实际应用中,应根据场景选择合适的方法:
- 并行任务:使用
supplyAsync+allOf+thenCombine - 串行任务:使用
thenCompose或链式thenApply - 异常处理:总是使用
exceptionally或handle - 超时控制:使用
orTimeout或completeOnTimeout
通过合理使用CompletableFuture,可以构建高效、响应式的Java应用,显著提升程序的并发性能和响应能力。
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 软件从业者Hort
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果

