Skip to content

asyncio — 异步 I/O 核心

asyncio 是 Python 异步编程的基础设施。理解事件循环原理,才能写出正确高效的异步代码。

为什么需要异步

同步 I/O(3 个请求,串行):
  请求1: ████████░░░░░░░░░░░░░░░░  (等待 I/O)
  请求2:                 ████████░░░░░░░░░░░░░░░░
  请求3:                                 ████████
  总耗时: 3s

异步 I/O(3 个请求,并发):
  请求1: ████░░░░░░░░░░░░░░░░████  (等待时切换)
  请求2:     ████░░░░░░░░░░░░████
  请求3:         ████░░░░░░░░████
  总耗时: ~1s

异步适合 I/O 密集型任务(网络请求、文件读写、数据库查询),不适合 CPU 密集型。

事件循环原理

python
import asyncio

# 事件循环是 asyncio 的核心
# 它维护一个任务队列,不断轮询:
# 1. 哪些任务可以继续执行(I/O 就绪)
# 2. 执行到下一个 await 点
# 3. 切换到其他就绪任务

async def main():
    loop = asyncio.get_event_loop()
    print(f"事件循环: {loop}")
    print(f"运行中: {loop.is_running()}")

asyncio.run(main())

协程基础

python
import asyncio

async def say_hello(name: str, delay: float):
    print(f"开始: {name}")
    await asyncio.sleep(delay)   # 让出控制权
    print(f"完成: {name}")
    return f"Hello, {name}!"

# 运行单个协程
result = asyncio.run(say_hello("Alice", 1.0))
print(result)

Task — 并发执行

python
import asyncio
import time

async def fetch_data(url: str, delay: float) -> str:
    print(f"  → 请求 {url}")
    await asyncio.sleep(delay)   # 模拟网络 I/O
    print(f"  ← 完成 {url}")
    return f"data from {url}"

async def main():
    start = time.perf_counter()

    # 方式1:gather — 并发执行,等待全部完成
    results = await asyncio.gather(
        fetch_data("api1.com", 1.0),
        fetch_data("api2.com", 1.5),
        fetch_data("api3.com", 0.5),
    )
    print(f"gather 耗时: {time.perf_counter() - start:.2f}s")
    # 约 1.5s(最慢的那个),而非 3s

    # 方式2:create_task — 立即调度,可以做其他事
    start = time.perf_counter()
    task1 = asyncio.create_task(fetch_data("api1.com", 1.0))
    task2 = asyncio.create_task(fetch_data("api2.com", 1.5))

    # 可以在等待期间做其他事
    print("任务已提交,做其他事...")
    await asyncio.sleep(0.1)

    result1 = await task1
    result2 = await task2
    print(f"create_task 耗时: {time.perf_counter() - start:.2f}s")

asyncio.run(main())

asyncio.gather vs asyncio.wait

python
import asyncio

async def task(name, delay, fail=False):
    await asyncio.sleep(delay)
    if fail:
        raise ValueError(f"{name} 失败")
    return f"{name} 完成"

async def main():
    # gather:一个失败则全部取消(默认)
    try:
        results = await asyncio.gather(
            task("A", 1),
            task("B", 0.5, fail=True),
            task("C", 1.5),
        )
    except ValueError as e:
        print(f"gather 异常: {e}")

    # gather with return_exceptions:收集所有结果(包括异常)
    results = await asyncio.gather(
        task("A", 1),
        task("B", 0.5, fail=True),
        task("C", 1.5),
        return_exceptions=True
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"异常: {r}")
        else:
            print(f"成功: {r}")

    # wait:更细粒度的控制
    tasks = [asyncio.create_task(task(f"T{i}", i*0.3)) for i in range(5)]
    done, pending = await asyncio.wait(
        tasks,
        timeout=0.8,                          # 超时
        return_when=asyncio.FIRST_COMPLETED   # 或 ALL_COMPLETED
    )
    print(f"完成: {len(done)}, 未完成: {len(pending)}")
    for t in pending:
        t.cancel()

asyncio.run(main())

超时控制

python
import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "完成"

async def main():
    # 方式1:asyncio.timeout(Python 3.11+)
    try:
        async with asyncio.timeout(2.0):
            result = await slow_operation()
    except asyncio.TimeoutError:
        print("操作超时")

    # 方式2:wait_for(兼容旧版本)
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
    except asyncio.TimeoutError:
        print("操作超时")

asyncio.run(main())

异步上下文管理器和迭代器

python
import asyncio

class AsyncDatabase:
    async def __aenter__(self):
        print("连接数据库")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, *args):
        print("关闭数据库连接")
        await asyncio.sleep(0.05)

    async def query(self, sql):
        await asyncio.sleep(0.1)
        return [{"id": 1}, {"id": 2}]

async def async_generator(n):
    for i in range(n):
        await asyncio.sleep(0.1)
        yield i

async def main():
    # 异步上下文管理器
    async with AsyncDatabase() as db:
        rows = await db.query("SELECT * FROM users")
        print(rows)

    # 异步迭代器
    async for value in async_generator(5):
        print(value)

    # 异步列表推导
    results = [v async for v in async_generator(3)]
    print(results)

asyncio.run(main())

信号量:限制并发数

python
import asyncio
import httpx

async def fetch(client, url, semaphore):
    async with semaphore:  # 最多 10 个并发
        response = await client.get(url)
        return response.text

async def main():
    urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(50)]
    semaphore = asyncio.Semaphore(10)  # 限制并发为 10

    async with httpx.AsyncClient() as client:
        tasks = [fetch(client, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)

asyncio.run(main())

队列:生产者-消费者模式

python
import asyncio
import random

async def producer(queue: asyncio.Queue, n: int):
    for i in range(n):
        item = f"item-{i}"
        await queue.put(item)
        print(f"生产: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.3))
    # 发送结束信号
    await queue.put(None)

async def consumer(queue: asyncio.Queue, name: str):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"消费者 {name} 处理: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.5))
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)  # 有界队列

    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, "A"),
        consumer(queue, "B"),
    )

asyncio.run(main())

在同步代码中运行异步

python
import asyncio

async def async_work():
    await asyncio.sleep(1)
    return "done"

# 方式1:asyncio.run(推荐,Python 3.7+)
result = asyncio.run(async_work())

# 方式2:在已有事件循环中(如 Jupyter)
import nest_asyncio
nest_asyncio.apply()
result = asyncio.run(async_work())

# 方式3:loop.run_until_complete(旧式)
loop = asyncio.new_event_loop()
result = loop.run_until_complete(async_work())
loop.close()

常见陷阱

  1. 不要在协程中调用阻塞函数time.sleep、同步文件 I/O),会阻塞整个事件循环
  2. CPU 密集型任务loop.run_in_executor 放到线程池/进程池
  3. 不要创建多个事件循环,一个程序一个循环
python
# 正确:CPU 密集型任务放到线程池
import asyncio
from concurrent.futures import ProcessPoolExecutor

async def main():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_intensive_task, data)

本站内容由 褚成志 整理编写,仅供学习参考