本文最后更新于 2025-10-31,文章内容可能已经过时。

一、基本概念

CompletableFuture是Java 8引入的异步编程工具,实现了CompletionStageFuture接口。它解决了传统Future的阻塞和回调地狱问题,提供了非阻塞、函数式、可编排的异步编程能力。

核心优势

  • 非阻塞回调:提供thenApplythenAccept等方法,任务完成后自动触发回调
  • 链式编程:支持将多个异步任务以链式(流水线)方式组合
  • 多任务组合:轻松等待所有任务完成(allOf)或任意一个任务完成(anyOf
  • 优雅的异常处理:专门的异常处理方法
  • 手动完成:可以手动设置结果或异常

二、CompletableFuture核心机制

任务状态

CompletableFuture有三种状态:

  1. 未完成(Incomplete)
  2. 正常完成(Completed Normally)
  3. 异常完成(Completed Exceptionally)

状态一旦完成(正常/异常),就不可再改变。

线程池

  • 默认:使用ForkJoinPool.commonPool()(公共线程池)
  • 推荐:生产环境中使用自定义线程池,避免阻塞公共线程池
  • 自定义线程池:通过supplyAsync(supplier, executor)runAsync(runnable, executor)指定

三、创建异步任务

1. 有返回值任务(supplyAsync)

// 使用默认线程池
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("执行有返回值任务,线程:" + Thread.currentThread().getName());
    return "Hello from async";
});

// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("执行有返回值任务,自定义线程池,线程:" + Thread.currentThread().getName());
    return "Hello from custom thread pool";
}, executor);

2. 无返回值任务(runAsync)

// 使用默认线程池
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
    System.out.println("执行无返回任务,线程:" + Thread.currentThread().getName());
});

// 使用自定义线程池
CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> {
    System.out.println("执行无返回任务,自定义线程池,线程:" + Thread.currentThread().getName());
}, executor);

3. 手动创建已完成的CompletableFuture(completedFuture)

CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Already completed");
String result = completedFuture.get(); // 直接返回"Already completed"

四、链式任务编排

1. thenApply(同步转换结果)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> resultFuture = future.thenApply(s -> s + " World");
String result = resultFuture.get(); // "Hello World"

2. thenApplyAsync(异步转换结果)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> resultFuture = future.thenApplyAsync(s -> {
    try {
        Thread.sleep(1000); // 模拟耗时操作
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return s + " World";
});
String result = resultFuture.get(); // "Hello World"

3. thenAccept(消费结果,无返回)

CompletableFuture.supplyAsync(() -> "Hello")
    .thenAccept(s -> System.out.println("Result: " + s)); // 输出: Result: Hello

4. thenRun(不关心结果,只执行后续逻辑)

CompletableFuture.supplyAsync(() -> "Hello")
    .thenRun(() -> System.out.println("Task completed")); // 输出: Task completed

5. thenCompose(串行组合,前一个结果用于下一个)

CompletableFuture<String> getUser = CompletableFuture.supplyAsync(() -> "Alice");
CompletableFuture<String> getOrder = getUser.thenCompose(name -> 
    CompletableFuture.supplyAsync(() -> "Order for " + name));

String result = getOrder.get(); // "Order for Alice"

6. whenComplete(执行副作用,如日志)

CompletableFuture.supplyAsync(() -> "Hello")
    .whenComplete((result, ex) -> {
        if (ex != null) {
            System.err.println("Error: " + ex.getMessage());
        } else {
            System.out.println("Result: " + result);
        }
    });

五、任务组合

1. thenCombine(并行合并两个任务)

CompletableFuture<Double> price = CompletableFuture.supplyAsync(() -> 100.0);
CompletableFuture<Double> discount = CompletableFuture.supplyAsync(() -> 0.9);

CompletableFuture<Double> finalPrice = price.thenCombine(discount, (p, d) -> p * d);
double result = finalPrice.get(); // 90.0

2. allOf(等待所有任务完成)

CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(() -> "Product");
CompletableFuture<String> stockFuture = CompletableFuture.supplyAsync(() -> "Stock");
CompletableFuture<String> reviewFuture = CompletableFuture.supplyAsync(() -> "Review");

CompletableFuture<Void> allFutures = CompletableFuture.allOf(productFuture, stockFuture, reviewFuture);
String result = allFutures.thenApply(v -> 
    String.format("Product: %s, Stock: %s, Review: %s", 
        productFuture.join(), 
        stockFuture.join(), 
        reviewFuture.join()
)).get();

// 输出: Product: Product, Stock: Stock, Review: Review

3. anyOf(等待任意一个任务完成)

CompletableFuture<String> fastFuture = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(1000);
    return "Fast result";
});

CompletableFuture<String> slowFuture = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(2000);
    return "Slow result";
});

CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(fastFuture, slowFuture);
String result = (String) anyFuture.get(); // 会先获取fastFuture的结果

六、异常处理

1. exceptionally(仅处理异常,返回默认值)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Something went wrong");
});

String result = future.exceptionally(ex -> {
    System.out.println("Exception caught: " + ex.getMessage());
    return "Default value";
}).get(); // "Default value"

2. handle(无论成功或异常都会执行,可同时获取结果和异常)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Something went wrong");
});

String result = future.handle((s, ex) -> {
    if (ex != null) {
        System.out.println("Exception caught: " + ex.getMessage());
        return "Default value";
    }
    return s;
}).get(); // "Default value"

七、超时控制

1. orTimeout(设置超时时间,超时后抛出TimeoutException)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(3000);
    return "Result";
});

try {
    String result = future.orTimeout(2, TimeUnit.SECONDS).get();
} catch (TimeoutException ex) {
    System.out.println("Operation timed out");
}

2. completeOnTimeout(设置超时后的默认值,避免长时间阻塞)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(3000);
    return "Result";
});

String result = future.completeOnTimeout("Timeout value", 2, TimeUnit.SECONDS).get();
// 输出: Timeout value

八、实战场景示例

场景1:并行聚合独立任务(商品详情页)

// 1. 使用自定义线程池,避免使用默认的公共线程池
ExecutorService executor = Executors.newFixedThreadPool(3);

// 2. 并行发起异步调用
CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(() -> fetchProductInfo("123"), executor);
CompletableFuture<String> stockFuture = CompletableFuture.supplyAsync(() -> fetchStockInfo("123"), executor);
CompletableFuture<String> reviewFuture = CompletableFuture.supplyAsync(() -> fetchReviewInfo("123"), executor);

// 3. 使用 allOf 等待所有任务完成,然后合并结果
String aggregatedResult = CompletableFuture.allOf(productFuture, stockFuture, reviewFuture)
    .thenApply(v -> {
        // 由于所有任务已完成,join() 不会阻塞,立即返回结果
        String product = productFuture.join();
        String stock = stockFuture.join();
        String review = reviewFuture.join();
        return String.format("商品: %s, 库存: %s, 评价: %s", product, stock, review);
    }).join(); // 阻塞主线程等待最终聚合结果

System.out.println(aggregatedResult);

// 模拟服务调用
private String fetchProductInfo(String id) {
    try {
        Thread.sleep(300);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return "iPhone 13";
}

private String fetchStockInfo(String id) {
    try {
        Thread.sleep(200);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return "In Stock";
}

private String fetchReviewInfo(String id) {
    try {
        Thread.sleep(400);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return "Excellent product";
}

效果:总耗时从串行的900ms(300+200+400)缩短到约400ms。

场景2:链式异步任务编排(用户订单流程)

// 1. 获取用户信息
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("Fetching user data...");
    try {
        Thread.sleep(500);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return new User("Alice", "alice@example.com");
});

// 2. 根据用户信息获取订单
CompletableFuture<Order> orderFuture = userFuture.thenCompose(user -> 
    CompletableFuture.supplyAsync(() -> {
        System.out.println("Fetching order for " + user.getName());
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return new Order(user.getName(), "Order #123", 100.0);
    })
);

// 3. 获取支付信息
CompletableFuture<Payment> paymentFuture = orderFuture.thenCompose(order -> 
    CompletableFuture.supplyAsync(() -> {
        System.out.println("Processing payment for " + order.getOrderNumber());
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return new Payment(order.getOrderNumber(), 100.0, "Paid");
    })
);

// 4. 最终结果
CompletableFuture<String> resultFuture = paymentFuture.thenApply(payment -> 
    String.format("Payment successful for order %s: $%.2f", 
        payment.getOrderNumber(), payment.getAmount())
);

// 阻塞获取最终结果
String result = resultFuture.get();
System.out.println(result);

场景3:并行组合与异常处理(多服务调用)

// 1. 创建多个异步任务
CompletableFuture<String> serviceA = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(300);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return "Service A result";
});

CompletableFuture<String> serviceB = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(200);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return "Service B result";
});

CompletableFuture<String> serviceC = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Service C failed");
});

// 2. 并行组合并处理异常
CompletableFuture<String> combinedResult = CompletableFuture
    .allOf(serviceA, serviceB, serviceC)
    .thenApply(v -> {
        // 由于所有任务已完成,可以安全获取结果
        return String.format("Service A: %s, Service B: %s, Service C: %s",
            serviceA.join(),
            serviceB.join(),
            serviceC.join());
    })
    .exceptionally(ex -> {
        // 处理任何可能的异常
        System.out.println("Error occurred: " + ex.getMessage());
        return "Error processing services";
    });

String result = combinedResult.get();
System.out.println(result);

九、最佳实践

  1. 使用自定义线程池:避免使用默认的公共线程池,防止阻塞其他应用

    ExecutorService executor = Executors.newFixedThreadPool(10);
    
  2. 避免阻塞主线程:在Web应用中,避免在Controller中使用get()join(),使用异步处理

    CompletableFuture.supplyAsync(() -> processRequest(), executor)
         .thenAccept(result -> sendResponse(result));
    
  3. 合理处理异常:总是为可能失败的任务添加异常处理

    .exceptionally(ex -> {
        log.error("Error processing task", ex);
        return defaultValue;
    })
    
  4. 避免回调地狱:使用链式调用而不是嵌套回调

    // 避免
    future1.thenAccept(result1 -> {
        future2.thenAccept(result2 -> {
            // 处理
        });
    });
    
    // 推荐
    future1.thenCombine(future2, (result1, result2) -> {
        // 处理
    });
    
  5. 使用超时控制:防止长时间阻塞

    resultFuture.completeOnTimeout("Timeout", 3, TimeUnit.SECONDS);
    
  6. 测试时使用固定线程池:避免测试时依赖默认的ForkJoinPool

    ExecutorService executor = Executors.newFixedThreadPool(1);
    

十、CompletableFuture方法总结表

方法类别方法作用适用场景
创建任务supplyAsync有返回值的异步任务需要返回结果的异步操作
runAsync无返回值的异步任务不需要返回结果的异步操作
completedFuture创建已完成的CompletableFuture预定义结果
链式操作thenApply同步转换结果对结果进行简单转换
thenApplyAsync异步转换结果耗时操作的转换
thenAccept消费结果,无返回仅需处理结果
thenRun不关心结果,只执行仅需执行后续逻辑
thenCompose串行组合,前一个结果用于下一个依赖前序任务结果
whenComplete执行副作用(如日志)需要记录执行状态
任务组合thenCombine并行合并两个任务合并两个独立任务的结果
allOf等待所有任务完成需要所有任务都完成
anyOf等待任意一个任务完成只需一个任务完成即可
异常处理exceptionally仅处理异常,返回默认值仅需处理异常
handle无论成功或异常都会执行需要同时处理结果和异常
控制与获取join阻塞获取结果需要阻塞等待结果
get阻塞获取结果需要阻塞等待结果
completeOnTimeout设置超时后的默认值避免长时间阻塞
orTimeout设置超时时间,超时后抛出TimeoutException需要超时控制

十一、总结

CompletableFuture是Java 8引入的异步编程核心工具,它通过以下方式解决了传统Future的局限:

  1. 非阻塞回调:提供thenApplythenAccept等方法,任务完成后自动触发回调
  2. 链式编排:将多个异步任务以链式方式组合,避免回调地狱
  3. 多任务组合:轻松处理多个异步任务的依赖关系
  4. 优雅的异常处理:提供专门的异常处理机制
  5. 超时控制:避免长时间阻塞

在实际应用中,应根据场景选择合适的方法:

  • 并行任务:使用supplyAsync + allOf + thenCombine
  • 串行任务:使用thenCompose或链式thenApply
  • 异常处理:总是使用exceptionallyhandle
  • 超时控制:使用orTimeoutcompleteOnTimeout

通过合理使用CompletableFuture,可以构建高效、响应式的Java应用,显著提升程序的并发性能和响应能力。