asyncio — 异步 I/O 核心
asyncio 是 Python 异步编程的基础设施。理解事件循环原理,才能写出正确高效的异步代码。
为什么需要异步
同步 I/O(3 个请求,串行):
请求1: ████████░░░░░░░░░░░░░░░░ (等待 I/O)
请求2: ████████░░░░░░░░░░░░░░░░
请求3: ████████
总耗时: 3s
异步 I/O(3 个请求,并发):
请求1: ████░░░░░░░░░░░░░░░░████ (等待时切换)
请求2: ████░░░░░░░░░░░░████
请求3: ████░░░░░░░░████
总耗时: ~1s异步适合 I/O 密集型任务(网络请求、文件读写、数据库查询),不适合 CPU 密集型。
事件循环原理
python
import asyncio
# 事件循环是 asyncio 的核心
# 它维护一个任务队列,不断轮询:
# 1. 哪些任务可以继续执行(I/O 就绪)
# 2. 执行到下一个 await 点
# 3. 切换到其他就绪任务
async def main():
loop = asyncio.get_event_loop()
print(f"事件循环: {loop}")
print(f"运行中: {loop.is_running()}")
asyncio.run(main())协程基础
python
import asyncio
async def say_hello(name: str, delay: float):
print(f"开始: {name}")
await asyncio.sleep(delay) # 让出控制权
print(f"完成: {name}")
return f"Hello, {name}!"
# 运行单个协程
result = asyncio.run(say_hello("Alice", 1.0))
print(result)Task — 并发执行
python
import asyncio
import time
async def fetch_data(url: str, delay: float) -> str:
print(f" → 请求 {url}")
await asyncio.sleep(delay) # 模拟网络 I/O
print(f" ← 完成 {url}")
return f"data from {url}"
async def main():
start = time.perf_counter()
# 方式1:gather — 并发执行,等待全部完成
results = await asyncio.gather(
fetch_data("api1.com", 1.0),
fetch_data("api2.com", 1.5),
fetch_data("api3.com", 0.5),
)
print(f"gather 耗时: {time.perf_counter() - start:.2f}s")
# 约 1.5s(最慢的那个),而非 3s
# 方式2:create_task — 立即调度,可以做其他事
start = time.perf_counter()
task1 = asyncio.create_task(fetch_data("api1.com", 1.0))
task2 = asyncio.create_task(fetch_data("api2.com", 1.5))
# 可以在等待期间做其他事
print("任务已提交,做其他事...")
await asyncio.sleep(0.1)
result1 = await task1
result2 = await task2
print(f"create_task 耗时: {time.perf_counter() - start:.2f}s")
asyncio.run(main())asyncio.gather vs asyncio.wait
python
import asyncio
async def task(name, delay, fail=False):
await asyncio.sleep(delay)
if fail:
raise ValueError(f"{name} 失败")
return f"{name} 完成"
async def main():
# gather:一个失败则全部取消(默认)
try:
results = await asyncio.gather(
task("A", 1),
task("B", 0.5, fail=True),
task("C", 1.5),
)
except ValueError as e:
print(f"gather 异常: {e}")
# gather with return_exceptions:收集所有结果(包括异常)
results = await asyncio.gather(
task("A", 1),
task("B", 0.5, fail=True),
task("C", 1.5),
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print(f"异常: {r}")
else:
print(f"成功: {r}")
# wait:更细粒度的控制
tasks = [asyncio.create_task(task(f"T{i}", i*0.3)) for i in range(5)]
done, pending = await asyncio.wait(
tasks,
timeout=0.8, # 超时
return_when=asyncio.FIRST_COMPLETED # 或 ALL_COMPLETED
)
print(f"完成: {len(done)}, 未完成: {len(pending)}")
for t in pending:
t.cancel()
asyncio.run(main())超时控制
python
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "完成"
async def main():
# 方式1:asyncio.timeout(Python 3.11+)
try:
async with asyncio.timeout(2.0):
result = await slow_operation()
except asyncio.TimeoutError:
print("操作超时")
# 方式2:wait_for(兼容旧版本)
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("操作超时")
asyncio.run(main())异步上下文管理器和迭代器
python
import asyncio
class AsyncDatabase:
async def __aenter__(self):
print("连接数据库")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, *args):
print("关闭数据库连接")
await asyncio.sleep(0.05)
async def query(self, sql):
await asyncio.sleep(0.1)
return [{"id": 1}, {"id": 2}]
async def async_generator(n):
for i in range(n):
await asyncio.sleep(0.1)
yield i
async def main():
# 异步上下文管理器
async with AsyncDatabase() as db:
rows = await db.query("SELECT * FROM users")
print(rows)
# 异步迭代器
async for value in async_generator(5):
print(value)
# 异步列表推导
results = [v async for v in async_generator(3)]
print(results)
asyncio.run(main())信号量:限制并发数
python
import asyncio
import httpx
async def fetch(client, url, semaphore):
async with semaphore: # 最多 10 个并发
response = await client.get(url)
return response.text
async def main():
urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(50)]
semaphore = asyncio.Semaphore(10) # 限制并发为 10
async with httpx.AsyncClient() as client:
tasks = [fetch(client, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())队列:生产者-消费者模式
python
import asyncio
import random
async def producer(queue: asyncio.Queue, n: int):
for i in range(n):
item = f"item-{i}"
await queue.put(item)
print(f"生产: {item}")
await asyncio.sleep(random.uniform(0.1, 0.3))
# 发送结束信号
await queue.put(None)
async def consumer(queue: asyncio.Queue, name: str):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"消费者 {name} 处理: {item}")
await asyncio.sleep(random.uniform(0.2, 0.5))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5) # 有界队列
await asyncio.gather(
producer(queue, 10),
consumer(queue, "A"),
consumer(queue, "B"),
)
asyncio.run(main())在同步代码中运行异步
python
import asyncio
async def async_work():
await asyncio.sleep(1)
return "done"
# 方式1:asyncio.run(推荐,Python 3.7+)
result = asyncio.run(async_work())
# 方式2:在已有事件循环中(如 Jupyter)
import nest_asyncio
nest_asyncio.apply()
result = asyncio.run(async_work())
# 方式3:loop.run_until_complete(旧式)
loop = asyncio.new_event_loop()
result = loop.run_until_complete(async_work())
loop.close()常见陷阱
- 不要在协程中调用阻塞函数(
time.sleep、同步文件 I/O),会阻塞整个事件循环 - CPU 密集型任务用
loop.run_in_executor放到线程池/进程池 - 不要创建多个事件循环,一个程序一个循环
python
# 正确:CPU 密集型任务放到线程池
import asyncio
from concurrent.futures import ProcessPoolExecutor
async def main():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_intensive_task, data)