Python 异步编程完全指南:从 concurrent.futures 到 asyncio
本文最后更新于 2025-11-03,文章内容可能已经过时。
Python 提供了多种方式来实现异步编程,其核心思想与 Java 的
CompletableFuture高度相似,但语法和 API 设计更具 Pythonic 风格。以下是 Python 中实现类似CompletableFuture功能的完整知识体系。
一、Python 异步编程的三大支柱
Python 的异步编程主要围绕三个核心模块构建:
| 模块 | 用途 | 对应 Java 概念 |
|---|---|---|
| concurrent.futures | 线程/进程池执行器,用于执行阻塞任务 | ExecutorService |
| asyncio | 协程、事件循环、异步 I/O 的核心框架 | CompletableFuture + EventLoop |
| asyncio.futures | 异步 Future 对象,代表异步计算的结果 | CompletableFuture |
二、concurrent.futures:Python 版的 ExecutorService
这是最接近 Java CompletableFuture 同步风格的实现,适用于 CPU 密集型或阻塞 I/O 任务。
1. 基本用法
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
# 模拟耗时任务
def fetch_data(user_id):
time.sleep(1)
return f"Data for user {user_id}"
# 使用线程池执行
with ThreadPoolExecutor(max_workers=3) as executor:
# submit 返回 Future 对象
future1 = executor.submit(fetch_data, 1)
future2 = executor.submit(fetch_data, 2)
# 获取结果(阻塞)
result1 = future1.result()
result2 = future2.result()
print(result1, result2) # Data for user 1 Data for user 2
2. 任务组合:as_completed 与 wait
from concurrent.futures import as_completed, wait
futures = [executor.submit(fetch_data, i) for i in range(5)]
# 方式1:按完成顺序获取结果
for future in as_completed(futures):
print(future.result())
# 方式2:等待所有任务完成
done, not_done = wait(futures, timeout=10)
results = [f.result() for f in done]
3. 异常处理
future = executor.submit(lambda: 1/0)
try:
result = future.result()
except Exception as e:
print(f"Task failed: {e}") # Task failed: division by zero
三、asyncio:真正的异步编程核心
asyncio 是 Python 的异步 I/O 框架,它使用 协程 (coroutine) 和 事件循环 (event loop) 实现非阻塞异步编程,是 CompletableFuture的真正对应物。
1. 核心概念
async def:定义协程函数await:等待异步操作完成(类似get(),但不阻塞)asyncio.run():运行主协程asyncio.create_task():创建任务(类似CompletableFuture.supplyAsync)
2. 基本用法
import asyncio
import aiohttp # 异步 HTTP 客户端
# 定义异步任务
async def fetch_user(user_id):
print(f"Fetching user {user_id}...")
await asyncio.sleep(1) # 模拟网络请求
return {"id": user_id, "name": f"User {user_id}"}
# 主函数
async def main():
# 串行执行
user1 = await fetch_user(1)
user2 = await fetch_user(2)
# 并行执行(类似 CompletableFuture.allOf)
users = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3)
)
print(users)
# 运行
asyncio.run(main())
3. 任务编排与链式调用
async def process_order(order_id):
# 第一步:获取用户
user = await fetch_user(order_id % 3)
# 第二步:获取商品
product = await fetch_product(order_id)
# 第三步:计算价格
price = product['price'] * 0.9
# 第四步:支付
payment = await make_payment(price)
return {
"order_id": order_id,
"user": user,
"product": product,
"payment": payment
}
4. 任务组合:asyncio.gather, asyncio.wait, asyncio.as_completed
| 方法 | 作用 | 类似 Java 方法 |
|---|---|---|
| asyncio.gather(*coros) | 等待所有任务完成,返回结果列表 | CompletableFuture.allOf().thenApply() |
| asyncio.wait(coros, return_when=) | 等待任务,可设置返回条件 | CompletableFuture.allOf() / anyOf() |
| asyncio.as_completed(coros) | 按完成顺序返回结果 | applyToEither 链式调用 |
# allOf: 等待所有完成
results = await asyncio.gather(
fetch_user(1),
fetch_product(1),
fetch_stock(1)
)
# anyOf: 等待任意一个完成
done, pending = await asyncio.wait(
[fetch_from_source1(), fetch_from_source2()],
return_when=asyncio.FIRST_COMPLETED
)
# 按完成顺序处理
for coro in asyncio.as_completed([task1(), task2(), task3()]):
result = await coro
print(f"Got result: {result}")
四、高级异步模式:Python 版的 CompletableFuture 技巧
1. 异常处理:try-except 与 handle
async def risky_operation():
if random.random() > 0.5:
raise ValueError("Something went wrong")
return "Success"
# 方式1:try-except(类似 handle)
async def with_exception_handling():
try:
result = await risky_operation()
return result
except Exception as e:
print(f"Error: {e}")
return "Default value"
# 方式2:使用 asyncio.shield 防止取消
try:
result = await asyncio.shield(risky_operation())
except Exception as e:
print(f"Operation failed: {e}")
2. 超时控制:asyncio.wait_for
async def fetch_with_timeout():
try:
result = await asyncio.wait_for(fetch_user(1), timeout=2.0)
return result
except asyncio.TimeoutError:
print("Request timed out")
return "Timeout default"
3. 任务取消:task.cancel()
task = asyncio.create_task(long_running_task())
# 取消任务
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
4. 任务依赖与条件执行
async def conditional_execution():
user = await fetch_user(1)
if user['premium']:
order = await process_premium_order(user)
else:
order = await process_regular_order(user)
return order
五、异步与同步的混合编程
1. 在异步中调用同步函数
import asyncio
import requests # 同步 HTTP 库
def sync_fetch(url):
return requests.get(url).text
async def async_with_sync():
# 在异步中调用同步函数,避免阻塞事件循环
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, # 使用默认线程池
sync_fetch,
"https://httpbin.org/get"
)
return result
2. 在同步中调用异步函数
def sync_wrapper():
return asyncio.run(async_function())
六、真实场景实战
场景1:电商订单处理(异步版)
import asyncio
import aiohttp
async def fetch_user(user_id):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/users/{user_id}") as resp:
return await resp.json()
async def fetch_product(product_id):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.example.com/products/{product_id}") as resp:
return await resp.json()
async def process_order(order_id):
# 并行获取用户和商品信息
user_task = asyncio.create_task(fetch_user(order_id % 10))
product_task = asyncio.create_task(fetch_product(order_id % 5))
user, product = await asyncio.gather(user_task, product_task)
# 计算价格
price = product['price'] * 0.9
# 支付
payment = await make_payment_async(price)
return {
"order_id": order_id,
"user": user,
"product": product,
"price": price,
"payment": payment
}
async def main():
orders = await asyncio.gather(
*[process_order(i) for i in range(5)]
)
print(f"Processed {len(orders)} orders")
if __name__ == "__main__":
asyncio.run(main())
场景2:多数据源竞争(最快响应)
async def fetch_from_source1():
await asyncio.sleep(0.3)
return "Source 1 result"
async def fetch_from_source2():
await asyncio.sleep(0.2)
return "Source 2 result"
async def fetch_from_source3():
await asyncio.sleep(0.4)
return "Source 3 result"
async def get_fastest_response():
done, pending = await asyncio.wait(
[fetch_from_source1(), fetch_from_source2(), fetch_from_source3()],
return_when=asyncio.FIRST_COMPLETED
)
# 取消剩余任务
for task in pending:
task.cancel()
return done.pop().result()
# 使用
result = asyncio.run(get_fastest_response())
print(result) # Source 2 result
七、asyncio 与 concurrent.futures 对比
| 特性 | concurrent.futures | asyncio |
|---|---|---|
| 编程模型 | 命令式,类似 Java | 协程,声明式 |
| 并发模型 | 多线程/多进程 | 单线程事件循环 |
| 适用场景 | 阻塞 I/O、CPU 密集型 | 高并发 I/O 密集型 |
| 内存开销 | 高(每个线程) | 低(协程轻量) |
| 学习曲线 | 低 | 中高 |
| 异常处理 | Future.exception() | try-except |
| 任务组合 | wait, as_completed | gather, wait, as_completed |
八、最佳实践总结
1. 何时使用 asyncio?
- 高并发网络请求(HTTP、数据库)
- Web 服务器(FastAPI、aiohttp)
- 实时数据处理
- 微服务调用
2. 何时使用 concurrent.futures?
- CPU 密集型任务(科学计算、图像处理)
- 调用阻塞的第三方库
- 需要多进程并行计算
3. 关键原则
- I/O 密集型用
asyncio - CPU 密集型用
ProcessPoolExecutor - 避免在
asyncio中阻塞调用 - 使用
asyncio.gather替代串行await - 合理设置超时:
asyncio.wait_for - 及时取消任务:
task.cancel()
九、常用异步库推荐
| 库 | 用途 |
|---|---|
| aiohttp | 异步 HTTP 客户端/服务器 |
| fastapi | 异步 Web 框架 |
| aiomysql / asyncpg | 异步数据库驱动 |
| aiofiles | 异步文件操作 |
| requests-async | 异步版 requests(已归档,推荐 aiohttp) |
十、总结
Python 的异步编程体系虽然与 Java 的 CompletableFuture 在语法上不同,但其核心思想完全一致:
Java CompletableFuture | Python 等效实现 |
|---|---|
| supplyAsync() | asyncio.create_task() / loop.run_in_executor() |
| thenApply() | await + 协程链式调用 |
| thenCompose() | await 嵌套协程 |
| allOf() | asyncio.gather() |
| anyOf() | asyncio.wait(return_when=FIRST_COMPLETED) |
| exceptionally() | try-except |
| completeOnTimeout() | asyncio.wait_for() |
核心差异:
- Java 是基于回调的链式调用
- Python 是基于
await的顺序式编程,代码更直观
通过掌握 asyncio 和 concurrent.futures,可以在 Python 中实现与 Java CompletableFuture 同等甚至更强大的异步编程能力。
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 软件从业者Hort
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果

