C++异步编程完全指南:从 std::async 到 PPL
本文最后更新于 2025-11-03,文章内容可能已经过时。
C++ 提供了多种异步编程方式,其核心思想与 Java 的 CompletableFuture 类似,但 API 设计和实现方式有显著差异。以下是 C++ 中实现类似功能的完整知识体系。
一、C++ 异步编程的两大体系
1. C++ 标准库异步编程(C++11+)
这是跨平台的异步编程方式,适用于所有 C++ 应用。
| 组件 | 作用 | 类似 Java |
|---|---|---|
| std::future | 代表异步计算的结果 | CompletableFuture |
| std::async | 创建异步任务 | CompletableFuture.supplyAsync |
| std::packaged_task | 将可调用对象与 future 关联 | 无直接对应 |
| std::promise | 用于设置 future 的值 | 无直接对应 |
| std::shared_future | 允许多个 future 共享结果 | 无直接对应 |
2. Windows UWP 应用异步编程(PPL)
适用于 Windows UWP 应用,使用 concurrency::task。
| 组件 | 作用 | 类似 Java 概念 |
|---|---|---|
| concurrency::task | 封装异步操作 | CompletableFuture |
| create_task | 创建任务 | CompletableFuture.supplyAsync |
| task::then | 添加延续任务 | thenApply/thenCompose |
| IAsyncOperation | Windows 运行时异步类型 | 无直接对应 |
二、C++ 标准库异步编程详解
1. 基本用法
创建异步任务
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
// 模拟耗时任务
int fetch_data(int user_id) {
std::this_thread::sleep_for(std::chrono::seconds(1));
return user_id * 10;
}
int main() {
// 创建异步任务(在新线程中执行)
std::future<int> data_future = std::async(std::launch::async, fetch_data, 1);
std::cout << "Main thread continues..." << std::endl;
// 获取结果(阻塞等待)
int result = data_future.get();
std::cout << "Result: " << result << std::endl;
return 0;
}
使用 std::packaged_task(更灵活)
#include <iostream>
#include <future>
#include <thread>
int fetch_data(int user_id) {
std::this_thread::sleep_for(std::chrono::seconds(1));
return user_id * 10;
}
int main() {
// 创建任务包装
std::packaged_task<int(int)> task(fetch_data);
std::future<int> result_future = task.get_future();
// 在新线程中执行任务
std::thread worker(std::move(task), 1);
std::cout << "Main thread continues..." << std::endl;
// 获取结果
int result = result_future.get();
std::cout << "Result: " << result << std::endl;
worker.join();
return 0;
}
2. 任务链式调用
#include <iostream>
#include <future>
#include <vector>
#include <algorithm>
int fetch_data(int user_id) {
std::this_thread::sleep_for(std::chrono::seconds(1));
return user_id * 10;
}
int process_data(int data) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return data + 100;
}
int main() {
// 创建任务链
auto data_future = std::async(std::launch::async, fetch_data, 1);
auto processed_future = data_future.then([](std::future<int> f) {
return process_data(f.get());
});
// 获取最终结果
int result = processed_future.get();
std::cout << "Result: " << result << std::endl;
return 0;
}
3. 多任务组合
#include <iostream>
#include <future>
#include <vector>
#include <algorithm>
int fetch_user(int id) {
std::this_thread::sleep_for(std::chrono::seconds(1));
return id * 10;
}
int fetch_product(int id) {
std::this_thread::sleep_for(std::chrono::seconds(1));
return id * 100;
}
int main() {
// 创建多个任务
auto user_future = std::async(std::launch::async, fetch_user, 1);
auto product_future = std::async(std::launch::async, fetch_product, 2);
// 等待所有任务完成
std::vector<std::future<int>> futures = {user_future, product_future};
std::vector<int> results;
for (auto& f : futures) {
results.push_back(f.get());
}
std::cout << "User result: " << results[0]
<< ", Product result: " << results[1] << std::endl;
return 0;
}
4. 异常处理
#include <iostream>
#include <future>
#include <stdexcept>
int risky_operation() {
if (std::rand() % 2 == 0) {
throw std::runtime_error("Something went wrong");
}
return 1;
}
int main() {
try {
std::future<int> future = std::async(std::launch::async, risky_operation);
int result = future.get();
std::cout << "Result: " << result << std::endl;
} catch (const std::exception& e) {
std::cout << "Error: " << e.what() << std::endl;
}
return 0;
}
5. 超时控制
#include <iostream>
#include <future>
#include <thread>
#include <chrono>
int long_running_task() {
std::this_thread::sleep_for(std::chrono::seconds(5));
return 42;
}
int main() {
auto future = std::async(std::launch::async, long_running_task);
// 等待最多2秒
if (future.wait_for(std::chrono::seconds(2)) == std::future_status::timeout) {
std::cout << "Task timed out" << std::endl;
// 可以在这里取消任务
} else {
int result = future.get();
std::cout << "Result: " << result << std::endl;
}
return 0;
}
三、Windows UWP 应用异步编程(PPL)
1. 基本用法
#include <ppltasks.h>
using namespace concurrency;
using namespace Windows::Devices::Enumeration;
void App::TestAsync() {
// 调用异步方法
IAsyncOperation<DeviceInformationCollection^>^ deviceOp =
DeviceInformation::FindAllAsync();
// 创建任务
auto deviceEnumTask = create_task(deviceOp);
// 添加延续任务
deviceEnumTask.then([this](DeviceInformationCollection^ devices) {
for (int i = 0; i < devices->Size; i++) {
DeviceInformation^ di = devices->GetAt(i);
// 处理设备信息
std::wcout << L"Device: " << di->Name->Data() << std::endl;
}
});
}
2. 任务链式调用
#include <ppltasks.h>
using namespace concurrency;
// 模拟异步操作
concurrency::task<int> fetch_data(int user_id) {
return concurrency::create_task([=]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
return user_id * 10;
});
}
concurrency::task<int> process_data(int data) {
return concurrency::create_task([=]() {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return data + 100;
});
}
void process_order() {
fetch_data(1)
.then([](int data) {
return process_data(data);
})
.then([](int result) {
std::cout << "Order processed: " << result << std::endl;
});
}
3. 多任务组合
#include <ppltasks.h>
using namespace concurrency;
concurrency::task<int> fetch_user(int id) {
return concurrency::create_task([=]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
return id * 10;
});
}
concurrency::task<int> fetch_product(int id) {
return concurrency::create_task([=]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
return id * 100;
});
}
void main() {
// 并行获取用户和商品
auto user_task = fetch_user(1);
auto product_task = fetch_product(2);
// 等待所有任务完成
concurrency::when_all(user_task, product_task).then([](concurrency::task<int> user_task, concurrency::task<int> product_task) {
int user_result = user_task.get();
int product_result = product_task.get();
std::cout << "User: " << user_result << ", Product: " << product_result << std::endl;
});
}
4. 异常处理
#include <ppltasks.h>
using namespace concurrency;
concurrency::task<int> risky_operation() {
return concurrency::create_task([]() {
if (std::rand() % 2 == 0) {
throw std::runtime_error("Something went wrong");
}
return 1;
});
}
void handle_exceptions() {
risky_operation()
.then([](int result) {
return result;
})
.then_error([](const std::exception& e) {
std::cout << "Error: " << e.what() << std::endl;
return 0; // 返回默认值
});
}
四、C++ 异步编程与 Java CompletableFuture 对比
| 功能 | Java CompletableFuture | C++ 标准库 | C++ PPL (UWP) |
|---|---|---|---|
| 创建异步任务 | CompletableFuture.supplyAsync() | std::async() | create_task() |
| 任务链式调用 | thenApply() | then()(链式调用) | then() |
| 多任务组合 | CompletableFuture.allOf() | 手动实现 | concurrency::when_all() |
| 任意任务完成 | CompletableFuture.anyOf() | 手动实现 | concurrency::when_any() |
| 异常处理 | exceptionally() | try-catch | then_error() |
| 超时控制 | completeOnTimeout() | wait_for() | with_timeout() |
| 任务取消 | 无直接支持 | 无直接支持 | task::cancel() |
五、高级技巧与最佳实践
1. 任务组合的高级技巧
1.1 多任务并行处理
// 使用 when_all 处理多个任务
auto task1 = fetch_data(1);
auto task2 = fetch_data(2);
auto task3 = fetch_data(3);
concurrency::when_all(task1, task2, task3).then([](auto t1, auto t2, auto t3) {
int r1 = t1.get();
int r2 = t2.get();
int r3 = t3.get();
// 处理结果
});
1.2 任务条件执行
// 根据第一个任务的结果决定后续操作
fetch_user(1)
.then([](int user_id) {
if (user_id > 10) {
return fetch_premium_data(user_id);
} else {
return fetch_regular_data(user_id);
}
});
2. 异常处理最佳实践
// 使用 then_error 处理异常
auto task = fetch_data(1);
task.then([](int result) { /* 处理成功 */ })
.then_error([](const std::exception& e) {
std::cout << "Error: " << e.what() << std::endl;
return 0; // 返回默认值
});
3. 资源管理
// 确保任务完成后清理资源
auto task = fetch_data(1);
task.then([](int result) {
// 使用结果
// ...
// 确保资源清理
return result;
})
.then([](int result) {
// 任务完成后的清理
std::cout << "Task completed with result: " << result << std::endl;
});
4. 与同步代码的集成
// 在异步中调用同步函数
concurrency::task<int> async_call() {
return concurrency::create_task([]() {
// 同步调用
int result = sync_function();
return result;
});
}
// 在同步中调用异步函数
int sync_call() {
auto task = async_call();
return task.get(); // 会阻塞,需谨慎使用
}
六、真实场景实战
场景1:电商订单处理(C++ 标准库版)
#include <iostream>
#include <future>
#include <vector>
#include <thread>
#include <chrono>
// 模拟数据库查询
int fetch_user(int id) {
std::this_thread::sleep_for(std::chrono::seconds(1));
return id * 10;
}
int fetch_product(int id) {
std::this_thread::sleep_for(std::chrono::seconds(1));
return id * 100;
}
int calculate_price(int product_price) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return product_price * 0.9;
}
int process_payment(double amount) {
std::this_thread::sleep_for(std::chrono::seconds(1));
return static_cast<int>(amount);
}
int main() {
// 创建任务链
auto user_task = std::async(std::launch::async, fetch_user, 1);
auto product_task = std::async(std::launch::async, fetch_product, 2);
// 获取用户和商品信息
auto user_future = user_task;
auto product_future = product_task;
// 计算价格
auto price_task = product_future.then([](std::future<int> f) {
return calculate_price(f.get());
});
// 支付
auto payment_task = price_task.then([](int price) {
return process_payment(price);
});
// 获取最终结果
int payment_result = payment_task.get();
std::cout << "Order processed successfully. Payment: " << payment_result << std::endl;
return 0;
}
场景2:多数据源竞争获取(C++ PPL 版)
#include <ppltasks.h>
using namespace concurrency;
// 模拟从不同数据源获取数据
concurrency::task<std::string> fetch_from_source1() {
return concurrency::create_task([]() {
std::this_thread::sleep_for(std::chrono::seconds(3));
return "Source 1 result";
});
}
concurrency::task<std::string> fetch_from_source2() {
return concurrency::create_task([]() {
std::this_thread::sleep_for(std::chrono::seconds(2));
return "Source 2 result";
});
}
concurrency::task<std::string> fetch_from_source3() {
return concurrency::create_task([]() {
std::this_thread::sleep_for(std::chrono::seconds(4));
return "Source 3 result";
});
}
int main() {
// 获取最快响应
concurrency::when_any(fetch_from_source1(), fetch_from_source2(), fetch_from_source3())
.then([](concurrency::task<std::string> task) {
std::cout << "Fastest result: " << task.get() << std::endl;
});
// 等待所有任务完成(确保程序不退出)
concurrency::task<void> all_done = concurrency::when_all(
fetch_from_source1(), fetch_from_source2(), fetch_from_source3()
);
all_done.wait();
return 0;
}
七、总结
C++ 的异步编程体系虽然与 Java 的 CompletableFuture 在语法上不同,但其核心思想完全一致:
| Java CompletableFuture | C++ 等效实现 |
|---|---|
| supplyAsync() | std::async() / concurrency::create_task() |
| thenApply() | then() (链式调用) |
| allOf() | concurrency::when_all() |
| anyOf() | concurrency::when_any() |
| exceptionally() | then_error() / try-catch |
| completeOnTimeout() | wait_for() / with_timeout() |
关键差异:
- Java 是基于回调的链式调用
- C++ 标准库是基于
std::future和std::async的任务管理 - C++ PPL (UWP) 是基于
concurrency::task的链式调用
最佳实践:
- I/O 密集型任务:使用
std::async或 PPL - CPU 密集型任务:考虑使用
std::async并指定std::launch::async - 避免在主线程中阻塞:不要在 UI 线程中使用
get() - 合理设置超时:使用
wait_for防止长时间等待 - 任务取消:在需要时使用
task::cancel()(PPL)
通过掌握 std::async 和 concurrency::task,可以在 C++ 中实现与 Java CompletableFuture 同等甚至更强大的异步编程能力。
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 软件从业者Hort
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果

