本文最后更新于 2025-11-03,文章内容可能已经过时。

Python 提供了多种方式来实现异步编程,其核心思想与 Java 的 CompletableFuture 高度相似,但语法和 API 设计更具 Pythonic 风格。以下是 Python 中实现类似 CompletableFuture 功能的完整知识体系。


一、Python 异步编程的三大支柱

Python 的异步编程主要围绕三个核心模块构建:

模块用途对应 Java 概念
concurrent.futures线程/进程池执行器,用于执行阻塞任务ExecutorService
asyncio协程、事件循环、异步 I/O 的核心框架CompletableFuture + EventLoop
asyncio.futures异步 Future 对象,代表异步计算的结果CompletableFuture

二、concurrent.futures:Python 版的 ExecutorService

这是最接近 Java CompletableFuture 同步风格的实现,适用于 CPU 密集型或阻塞 I/O 任务。

1. 基本用法

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

# 模拟耗时任务
def fetch_data(user_id):
    time.sleep(1)
    return f"Data for user {user_id}"

# 使用线程池执行
with ThreadPoolExecutor(max_workers=3) as executor:
    # submit 返回 Future 对象
    future1 = executor.submit(fetch_data, 1)
    future2 = executor.submit(fetch_data, 2)
    
    # 获取结果(阻塞)
    result1 = future1.result()
    result2 = future2.result()
    
    print(result1, result2)  # Data for user 1 Data for user 2

2. 任务组合:as_completed 与 wait

from concurrent.futures import as_completed, wait

futures = [executor.submit(fetch_data, i) for i in range(5)]

# 方式1:按完成顺序获取结果
for future in as_completed(futures):
    print(future.result())

# 方式2:等待所有任务完成
done, not_done = wait(futures, timeout=10)
results = [f.result() for f in done]

3. 异常处理

future = executor.submit(lambda: 1/0)

try:
    result = future.result()
except Exception as e:
    print(f"Task failed: {e}")  # Task failed: division by zero

三、asyncio:真正的异步编程核心

asyncio 是 Python 的异步 I/O 框架,它使用 协程 (coroutine)事件循环 (event loop) 实现非阻塞异步编程,是 CompletableFuture的真正对应物。

1. 核心概念

  • async def:定义协程函数
  • await:等待异步操作完成(类似 get(),但不阻塞)
  • asyncio.run():运行主协程
  • asyncio.create_task():创建任务(类似 CompletableFuture.supplyAsync

2. 基本用法

import asyncio
import aiohttp  # 异步 HTTP 客户端

# 定义异步任务
async def fetch_user(user_id):
    print(f"Fetching user {user_id}...")
    await asyncio.sleep(1)  # 模拟网络请求
    return {"id": user_id, "name": f"User {user_id}"}

# 主函数
async def main():
    # 串行执行
    user1 = await fetch_user(1)
    user2 = await fetch_user(2)
    
    # 并行执行(类似 CompletableFuture.allOf)
    users = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3)
    )
    print(users)

# 运行
asyncio.run(main())

3. 任务编排与链式调用

async def process_order(order_id):
    # 第一步:获取用户
    user = await fetch_user(order_id % 3)
    
    # 第二步:获取商品
    product = await fetch_product(order_id)
    
    # 第三步:计算价格
    price = product['price'] * 0.9
    
    # 第四步:支付
    payment = await make_payment(price)
    
    return {
        "order_id": order_id,
        "user": user,
        "product": product,
        "payment": payment
    }

4. 任务组合:asyncio.gather, asyncio.wait, asyncio.as_completed

方法作用类似 Java 方法
asyncio.gather(*coros)等待所有任务完成,返回结果列表CompletableFuture.allOf().thenApply()
asyncio.wait(coros, return_when=)等待任务,可设置返回条件CompletableFuture.allOf() / anyOf()
asyncio.as_completed(coros)按完成顺序返回结果applyToEither 链式调用
# allOf: 等待所有完成
results = await asyncio.gather(
    fetch_user(1),
    fetch_product(1),
    fetch_stock(1)
)

# anyOf: 等待任意一个完成
done, pending = await asyncio.wait(
    [fetch_from_source1(), fetch_from_source2()],
    return_when=asyncio.FIRST_COMPLETED
)

# 按完成顺序处理
for coro in asyncio.as_completed([task1(), task2(), task3()]):
    result = await coro
    print(f"Got result: {result}")

四、高级异步模式:Python 版的 CompletableFuture 技巧

1. 异常处理:try-except 与 handle

async def risky_operation():
    if random.random() > 0.5:
        raise ValueError("Something went wrong")
    return "Success"

# 方式1:try-except(类似 handle)
async def with_exception_handling():
    try:
        result = await risky_operation()
        return result
    except Exception as e:
        print(f"Error: {e}")
        return "Default value"

# 方式2:使用 asyncio.shield 防止取消
try:
    result = await asyncio.shield(risky_operation())
except Exception as e:
    print(f"Operation failed: {e}")

2. 超时控制:asyncio.wait_for

async def fetch_with_timeout():
    try:
        result = await asyncio.wait_for(fetch_user(1), timeout=2.0)
        return result
    except asyncio.TimeoutError:
        print("Request timed out")
        return "Timeout default"

3. 任务取消:task.cancel()

task = asyncio.create_task(long_running_task())

# 取消任务
task.cancel()

try:
    await task
except asyncio.CancelledError:
    print("Task was cancelled")

4. 任务依赖与条件执行

async def conditional_execution():
    user = await fetch_user(1)
    
    if user['premium']:
        order = await process_premium_order(user)
    else:
        order = await process_regular_order(user)
        
    return order

五、异步与同步的混合编程

1. 在异步中调用同步函数

import asyncio
import requests  # 同步 HTTP 库

def sync_fetch(url):
    return requests.get(url).text

async def async_with_sync():
    # 在异步中调用同步函数,避免阻塞事件循环
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(
        None,  # 使用默认线程池
        sync_fetch,
        "https://httpbin.org/get"
    )
    return result

2. 在同步中调用异步函数

def sync_wrapper():
    return asyncio.run(async_function())

六、真实场景实战

场景1:电商订单处理(异步版)

import asyncio
import aiohttp

async def fetch_user(user_id):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/users/{user_id}") as resp:
            return await resp.json()

async def fetch_product(product_id):
    async with aiohttp.ClientSession() as session:
        async with session.get(f"https://api.example.com/products/{product_id}") as resp:
            return await resp.json()

async def process_order(order_id):
    # 并行获取用户和商品信息
    user_task = asyncio.create_task(fetch_user(order_id % 10))
    product_task = asyncio.create_task(fetch_product(order_id % 5))
    
    user, product = await asyncio.gather(user_task, product_task)
    
    # 计算价格
    price = product['price'] * 0.9
    
    # 支付
    payment = await make_payment_async(price)
    
    return {
        "order_id": order_id,
        "user": user,
        "product": product,
        "price": price,
        "payment": payment
    }

async def main():
    orders = await asyncio.gather(
        *[process_order(i) for i in range(5)]
    )
    print(f"Processed {len(orders)} orders")

if __name__ == "__main__":
    asyncio.run(main())

场景2:多数据源竞争(最快响应)

async def fetch_from_source1():
    await asyncio.sleep(0.3)
    return "Source 1 result"

async def fetch_from_source2():
    await asyncio.sleep(0.2)
    return "Source 2 result"

async def fetch_from_source3():
    await asyncio.sleep(0.4)
    return "Source 3 result"

async def get_fastest_response():
    done, pending = await asyncio.wait(
        [fetch_from_source1(), fetch_from_source2(), fetch_from_source3()],
        return_when=asyncio.FIRST_COMPLETED
    )
    
    # 取消剩余任务
    for task in pending:
        task.cancel()
        
    return done.pop().result()

# 使用
result = asyncio.run(get_fastest_response())
print(result)  # Source 2 result

七、asyncio 与 concurrent.futures 对比

特性concurrent.futuresasyncio
编程模型命令式,类似 Java协程,声明式
并发模型多线程/多进程单线程事件循环
适用场景阻塞 I/O、CPU 密集型高并发 I/O 密集型
内存开销高(每个线程)低(协程轻量)
学习曲线中高
异常处理Future.exception()try-except
任务组合wait, as_completedgather, wait, as_completed

八、最佳实践总结

1. 何时使用 asyncio?

  • 高并发网络请求(HTTP、数据库)
  • Web 服务器(FastAPI、aiohttp)
  • 实时数据处理
  • 微服务调用

2. 何时使用 concurrent.futures?

  • CPU 密集型任务(科学计算、图像处理)
  • 调用阻塞的第三方库
  • 需要多进程并行计算

3. 关键原则

  • I/O 密集型用 asyncio
  • CPU 密集型用 ProcessPoolExecutor
  • 避免在 asyncio 中阻塞调用
  • 使用 asyncio.gather 替代串行 await
  • 合理设置超时:asyncio.wait_for
  • 及时取消任务:task.cancel()

九、常用异步库推荐

用途
aiohttp异步 HTTP 客户端/服务器
fastapi异步 Web 框架
aiomysql / asyncpg异步数据库驱动
aiofiles异步文件操作
requests-async异步版 requests(已归档,推荐 aiohttp)

十、总结

Python 的异步编程体系虽然与 Java 的 CompletableFuture 在语法上不同,但其核心思想完全一致:

Java CompletableFuturePython 等效实现
supplyAsync()asyncio.create_task() / loop.run_in_executor()
thenApply()await + 协程链式调用
thenCompose()await 嵌套协程
allOf()asyncio.gather()
anyOf()asyncio.wait(return_when=FIRST_COMPLETED)
exceptionally()try-except
completeOnTimeout()asyncio.wait_for()

核心差异

  • Java 是基于回调的链式调用
  • Python 是基于 await 的顺序式编程,代码更直观

通过掌握 asyncioconcurrent.futures,可以在 Python 中实现与 Java CompletableFuture 同等甚至更强大的异步编程能力。