本文最后更新于 2025-11-03,文章内容可能已经过时。

C++ 提供了多种异步编程方式,其核心思想与 Java 的 CompletableFuture 类似,但 API 设计和实现方式有显著差异。以下是 C++ 中实现类似功能的完整知识体系。

一、C++ 异步编程的两大体系

1. C++ 标准库异步编程(C++11+)

这是跨平台的异步编程方式,适用于所有 C++ 应用。

组件作用类似 Java
std::future代表异步计算的结果CompletableFuture
std::async创建异步任务CompletableFuture.supplyAsync
std::packaged_task将可调用对象与 future 关联无直接对应
std::promise用于设置 future 的值无直接对应
std::shared_future允许多个 future 共享结果无直接对应

2. Windows UWP 应用异步编程(PPL)

适用于 Windows UWP 应用,使用 concurrency::task。

组件作用类似 Java 概念
concurrency::task封装异步操作CompletableFuture
create_task创建任务CompletableFuture.supplyAsync
task::then添加延续任务thenApply/thenCompose
IAsyncOperationWindows 运行时异步类型无直接对应

二、C++ 标准库异步编程详解

1. 基本用法

创建异步任务
#include <iostream>
#include <future>
#include <thread>
#include <chrono>

// 模拟耗时任务
int fetch_data(int user_id) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return user_id * 10;
}

int main() {
    // 创建异步任务(在新线程中执行)
    std::future<int> data_future = std::async(std::launch::async, fetch_data, 1);
    
    std::cout << "Main thread continues..." << std::endl;
    
    // 获取结果(阻塞等待)
    int result = data_future.get();
    std::cout << "Result: " << result << std::endl;
    
    return 0;
}
使用 std::packaged_task(更灵活)
#include <iostream>
#include <future>
#include <thread>

int fetch_data(int user_id) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return user_id * 10;
}

int main() {
    // 创建任务包装
    std::packaged_task<int(int)> task(fetch_data);
    std::future<int> result_future = task.get_future();
    
    // 在新线程中执行任务
    std::thread worker(std::move(task), 1);
    
    std::cout << "Main thread continues..." << std::endl;
    
    // 获取结果
    int result = result_future.get();
    std::cout << "Result: " << result << std::endl;
    
    worker.join();
    return 0;
}

2. 任务链式调用

#include <iostream>
#include <future>
#include <vector>
#include <algorithm>

int fetch_data(int user_id) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return user_id * 10;
}

int process_data(int data) {
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    return data + 100;
}

int main() {
    // 创建任务链
    auto data_future = std::async(std::launch::async, fetch_data, 1);
    auto processed_future = data_future.then([](std::future<int> f) {
        return process_data(f.get());
    });
    
    // 获取最终结果
    int result = processed_future.get();
    std::cout << "Result: " << result << std::endl;
    
    return 0;
}

3. 多任务组合

#include <iostream>
#include <future>
#include <vector>
#include <algorithm>

int fetch_user(int id) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return id * 10;
}

int fetch_product(int id) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return id * 100;
}

int main() {
    // 创建多个任务
    auto user_future = std::async(std::launch::async, fetch_user, 1);
    auto product_future = std::async(std::launch::async, fetch_product, 2);
    
    // 等待所有任务完成
    std::vector<std::future<int>> futures = {user_future, product_future};
    std::vector<int> results;
    
    for (auto& f : futures) {
        results.push_back(f.get());
    }
    
    std::cout << "User result: " << results[0] 
              << ", Product result: " << results[1] << std::endl;
    
    return 0;
}

4. 异常处理

#include <iostream>
#include <future>
#include <stdexcept>

int risky_operation() {
    if (std::rand() % 2 == 0) {
        throw std::runtime_error("Something went wrong");
    }
    return 1;
}

int main() {
    try {
        std::future<int> future = std::async(std::launch::async, risky_operation);
        int result = future.get();
        std::cout << "Result: " << result << std::endl;
    } catch (const std::exception& e) {
        std::cout << "Error: " << e.what() << std::endl;
    }
    
    return 0;
}

5. 超时控制

#include <iostream>
#include <future>
#include <thread>
#include <chrono>

int long_running_task() {
    std::this_thread::sleep_for(std::chrono::seconds(5));
    return 42;
}

int main() {
    auto future = std::async(std::launch::async, long_running_task);
    
    // 等待最多2秒
    if (future.wait_for(std::chrono::seconds(2)) == std::future_status::timeout) {
        std::cout << "Task timed out" << std::endl;
        // 可以在这里取消任务
    } else {
        int result = future.get();
        std::cout << "Result: " << result << std::endl;
    }
    
    return 0;
}

三、Windows UWP 应用异步编程(PPL)

1. 基本用法

#include <ppltasks.h>
using namespace concurrency;
using namespace Windows::Devices::Enumeration;

void App::TestAsync() {
    // 调用异步方法
    IAsyncOperation<DeviceInformationCollection^>^ deviceOp = 
        DeviceInformation::FindAllAsync();
    
    // 创建任务
    auto deviceEnumTask = create_task(deviceOp);
    
    // 添加延续任务
    deviceEnumTask.then([this](DeviceInformationCollection^ devices) {
        for (int i = 0; i < devices->Size; i++) {
            DeviceInformation^ di = devices->GetAt(i);
            // 处理设备信息
            std::wcout << L"Device: " << di->Name->Data() << std::endl;
        }
    });
}

2. 任务链式调用

#include <ppltasks.h>
using namespace concurrency;

// 模拟异步操作
concurrency::task<int> fetch_data(int user_id) {
    return concurrency::create_task([=]() {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        return user_id * 10;
    });
}

concurrency::task<int> process_data(int data) {
    return concurrency::create_task([=]() {
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
        return data + 100;
    });
}

void process_order() {
    fetch_data(1)
        .then([](int data) {
            return process_data(data);
        })
        .then([](int result) {
            std::cout << "Order processed: " << result << std::endl;
        });
}

3. 多任务组合

#include <ppltasks.h>
using namespace concurrency;

concurrency::task<int> fetch_user(int id) {
    return concurrency::create_task([=]() {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        return id * 10;
    });
}

concurrency::task<int> fetch_product(int id) {
    return concurrency::create_task([=]() {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        return id * 100;
    });
}

void main() {
    // 并行获取用户和商品
    auto user_task = fetch_user(1);
    auto product_task = fetch_product(2);
    
    // 等待所有任务完成
    concurrency::when_all(user_task, product_task).then([](concurrency::task<int> user_task, concurrency::task<int> product_task) {
        int user_result = user_task.get();
        int product_result = product_task.get();
        std::cout << "User: " << user_result << ", Product: " << product_result << std::endl;
    });
}

4. 异常处理

#include <ppltasks.h>
using namespace concurrency;

concurrency::task<int> risky_operation() {
    return concurrency::create_task([]() {
        if (std::rand() % 2 == 0) {
            throw std::runtime_error("Something went wrong");
        }
        return 1;
    });
}

void handle_exceptions() {
    risky_operation()
        .then([](int result) {
            return result;
        })
        .then_error([](const std::exception& e) {
            std::cout << "Error: " << e.what() << std::endl;
            return 0; // 返回默认值
        });
}

四、C++ 异步编程与 Java CompletableFuture 对比

功能Java CompletableFutureC++ 标准库C++ PPL (UWP)
创建异步任务CompletableFuture.supplyAsync()std::async()create_task()
任务链式调用thenApply()then()(链式调用)then()
多任务组合CompletableFuture.allOf()手动实现concurrency::when_all()
任意任务完成CompletableFuture.anyOf()手动实现concurrency::when_any()
异常处理exceptionally()try-catchthen_error()
超时控制completeOnTimeout()wait_for()with_timeout()
任务取消无直接支持无直接支持task::cancel()

五、高级技巧与最佳实践

1. 任务组合的高级技巧

1.1 多任务并行处理
// 使用 when_all 处理多个任务
auto task1 = fetch_data(1);
auto task2 = fetch_data(2);
auto task3 = fetch_data(3);

concurrency::when_all(task1, task2, task3).then([](auto t1, auto t2, auto t3) {
    int r1 = t1.get();
    int r2 = t2.get();
    int r3 = t3.get();
    // 处理结果
});
1.2 任务条件执行
// 根据第一个任务的结果决定后续操作
fetch_user(1)
    .then([](int user_id) {
        if (user_id > 10) {
            return fetch_premium_data(user_id);
        } else {
            return fetch_regular_data(user_id);
        }
    });

2. 异常处理最佳实践

// 使用 then_error 处理异常
auto task = fetch_data(1);
task.then([](int result) { /* 处理成功 */ })
   .then_error([](const std::exception& e) {
        std::cout << "Error: " << e.what() << std::endl;
        return 0; // 返回默认值
   });

3. 资源管理

// 确保任务完成后清理资源
auto task = fetch_data(1);
task.then([](int result) {
    // 使用结果
    // ...
    // 确保资源清理
    return result;
})
.then([](int result) {
    // 任务完成后的清理
    std::cout << "Task completed with result: " << result << std::endl;
});

4. 与同步代码的集成

// 在异步中调用同步函数
concurrency::task<int> async_call() {
    return concurrency::create_task([]() {
        // 同步调用
        int result = sync_function();
        return result;
    });
}

// 在同步中调用异步函数
int sync_call() {
    auto task = async_call();
    return task.get(); // 会阻塞,需谨慎使用
}

六、真实场景实战

场景1:电商订单处理(C++ 标准库版)

#include <iostream>
#include <future>
#include <vector>
#include <thread>
#include <chrono>

// 模拟数据库查询
int fetch_user(int id) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return id * 10;
}

int fetch_product(int id) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return id * 100;
}

int calculate_price(int product_price) {
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    return product_price * 0.9;
}

int process_payment(double amount) {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return static_cast<int>(amount);
}

int main() {
    // 创建任务链
    auto user_task = std::async(std::launch::async, fetch_user, 1);
    auto product_task = std::async(std::launch::async, fetch_product, 2);
    
    // 获取用户和商品信息
    auto user_future = user_task;
    auto product_future = product_task;
    
    // 计算价格
    auto price_task = product_future.then([](std::future<int> f) {
        return calculate_price(f.get());
    });
    
    // 支付
    auto payment_task = price_task.then([](int price) {
        return process_payment(price);
    });
    
    // 获取最终结果
    int payment_result = payment_task.get();
    
    std::cout << "Order processed successfully. Payment: " << payment_result << std::endl;
    
    return 0;
}

场景2:多数据源竞争获取(C++ PPL 版)

#include <ppltasks.h>
using namespace concurrency;

// 模拟从不同数据源获取数据
concurrency::task<std::string> fetch_from_source1() {
    return concurrency::create_task([]() {
        std::this_thread::sleep_for(std::chrono::seconds(3));
        return "Source 1 result";
    });
}

concurrency::task<std::string> fetch_from_source2() {
    return concurrency::create_task([]() {
        std::this_thread::sleep_for(std::chrono::seconds(2));
        return "Source 2 result";
    });
}

concurrency::task<std::string> fetch_from_source3() {
    return concurrency::create_task([]() {
        std::this_thread::sleep_for(std::chrono::seconds(4));
        return "Source 3 result";
    });
}

int main() {
    // 获取最快响应
    concurrency::when_any(fetch_from_source1(), fetch_from_source2(), fetch_from_source3())
        .then([](concurrency::task<std::string> task) {
            std::cout << "Fastest result: " << task.get() << std::endl;
        });
    
    // 等待所有任务完成(确保程序不退出)
    concurrency::task<void> all_done = concurrency::when_all(
        fetch_from_source1(), fetch_from_source2(), fetch_from_source3()
    );
    
    all_done.wait();
    
    return 0;
}

七、总结

C++ 的异步编程体系虽然与 Java 的 CompletableFuture 在语法上不同,但其核心思想完全一致:

Java CompletableFutureC++ 等效实现
supplyAsync()std::async() / concurrency::create_task()
thenApply()then() (链式调用)
allOf()concurrency::when_all()
anyOf()concurrency::when_any()
exceptionally()then_error() / try-catch
completeOnTimeout()wait_for() / with_timeout()

关键差异

  • Java 是基于回调的链式调用
  • C++ 标准库是基于 std::futurestd::async 的任务管理
  • C++ PPL (UWP) 是基于 concurrency::task 的链式调用

最佳实践

  1. I/O 密集型任务:使用 std::async 或 PPL
  2. CPU 密集型任务:考虑使用 std::async 并指定 std::launch::async
  3. 避免在主线程中阻塞:不要在 UI 线程中使用 get()
  4. 合理设置超时:使用 wait_for 防止长时间等待
  5. 任务取消:在需要时使用 task::cancel()(PPL)

通过掌握 std::asyncconcurrency::task,可以在 C++ 中实现与 Java CompletableFuture 同等甚至更强大的异步编程能力。