Python asyncio 实战:从回调地狱到协程优雅
先搞清楚:该用哪种并发
| 场景 | 推荐方案 |
|---|---|
| I/O 密集(网络请求、文件读写) | asyncio / 多线程 |
| CPU 密集(计算、图像处理) | 多进程(multiprocessing) |
| 简单并发、兼容老代码 | 多线程(threading) |
| 高性能异步 I/O | asyncio |
asyncio 的核心优势:单线程处理大量并发 I/O,内存开销远小于多线程。
Event Loop 工作原理
┌─────────────────────────────────────────────┐
│ Event Loop │
│ │
│ ┌──────────┐ ┌──────────┐ ┌────────┐ │
│ │ Coroutine│ │ Task │ │ Future │ │
│ │(async def)│──▶│(Task包装 │──▶│(底层 │ │
│ │ │ │ coroutine│ │ 结果 │ │
│ └──────────┘ └──────────┘ └────────┘ │
│ │
│ 等待 I/O 时挂起 → 切换到其他任务 → I/O 完成后恢复 │
└─────────────────────────────────────────────┘
- Coroutine:
async def定义的函数,调用后返回协程对象 - Task:对 coroutine 的封装,由 event loop 调度
- Future:代表一个异步操作的最终结果
基础语法
import asyncio
# 定义协程
async def fetch_data(url: str) -> str:
print(f"开始请求 {url}")
await asyncio.sleep(1) # 模拟 I/O 等待
return f"data from {url}"
# 运行协程
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())
asyncio.gather():并发执行
import asyncio
import time
async def task(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} done"
async def main():
start = time.time()
# 顺序执行:约 3 秒
# r1 = await task("A", 1)
# r2 = await task("B", 1)
# r3 = await task("C", 1)
# 并发执行:约 1 秒
results = await asyncio.gather(
task("A", 1),
task("B", 1),
task("C", 1),
)
print(results) # ['A done', 'B done', 'C done']
print(f"耗时: {time.time() - start:.2f}s") # 约 1.00s
asyncio.run(main())
gather() 异常处理
async def risky(name: str) -> str:
if name == "B":
raise ValueError("B failed!")
return f"{name} ok"
async def main():
# return_exceptions=True:异常作为结果返回,不中断其他任务
results = await asyncio.gather(
risky("A"),
risky("B"),
risky("C"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"OK: {r}")
asyncio.run(main())
create_task():后台任务
import asyncio
async def background_job(n: int) -> None:
await asyncio.sleep(0.5)
print(f"Job {n} complete")
async def main():
# 创建任务后立即返回,任务在后台运行
t1 = asyncio.create_task(background_job(1))
t2 = asyncio.create_task(background_job(2))
print("做其他事情...")
await asyncio.sleep(0.1)
print("继续做其他事情...")
# 等待任务完成
await t1
await t2
asyncio.run(main())
asyncio.Queue:生产者消费者
import asyncio
async def producer(queue: asyncio.Queue, items: list[str]) -> None:
for item in items:
await queue.put(item)
print(f"生产: {item}")
await asyncio.sleep(0.1)
await queue.put(None) # 终止信号
async def consumer(queue: asyncio.Queue, name: str) -> None:
while True:
item = await queue.get()
if item is None:
await queue.put(None) # 传递终止信号给其他 consumer
break
print(f"{name} 消费: {item}")
await asyncio.sleep(0.3)
queue.task_done()
async def main():
queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=5)
items = [f"task_{i}" for i in range(6)]
await asyncio.gather(
producer(queue, items),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
)
asyncio.run(main())
aiohttp:异步 HTTP 请求
import asyncio
import aiohttp # pip install aiohttp
async def fetch(session: aiohttp.ClientSession, url: str) -> tuple[str, int]:
async with session.get(url) as resp:
return url, resp.status
async def fetch_all(urls: list[str]) -> list[tuple[str, int]]:
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
]
results = await fetch_all(urls)
for url, status in results:
print(f"{status} - {url}")
asyncio.run(main())
asyncio.Semaphore:限制并发数
import asyncio
import aiohttp
async def fetch_with_limit(
session: aiohttp.ClientSession,
url: str,
sem: asyncio.Semaphore,
) -> str:
async with sem: # 同时最多 10 个请求
async with session.get(url) as resp:
return await resp.text()
async def main():
urls = [f"https://httpbin.org/get?n={i}" for i in range(50)]
sem = asyncio.Semaphore(10) # 并发上限 10
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url, sem) for url in urls]
results = await asyncio.gather(*tasks)
print(f"获取了 {len(results)} 个响应")
asyncio.run(main())
超时控制:asyncio.wait_for()
import asyncio
async def slow_operation() -> str:
await asyncio.sleep(5)
return "done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("操作超时!")
asyncio.run(main())
常见坑:在 async 里调用阻塞函数
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
# ❌ 错误:阻塞整个 event loop
async def bad():
time.sleep(2) # 阻塞!其他协程无法运行
return "done"
# ✅ 正确:放到线程池里跑
async def good():
loop = asyncio.get_running_loop()
# 在线程池中运行阻塞函数
result = await loop.run_in_executor(None, time.sleep, 2)
return "done"
# 或者用 asyncio.to_thread(Python 3.9+)
async def better():
await asyncio.to_thread(time.sleep, 2)
return "done"
实际案例:并发抓取 100 个 URL
import asyncio
import time
import aiohttp
import requests # pip install requests
URLS = [f"https://httpbin.org/get?n={i}" for i in range(20)]
# 同步版本
def sync_fetch_all() -> None:
for url in URLS:
requests.get(url)
# 异步版本
async def async_fetch_all() -> None:
sem = asyncio.Semaphore(10)
async def fetch(session: aiohttp.ClientSession, url: str) -> None:
async with sem:
async with session.get(url) as r:
await r.read()
async with aiohttp.ClientSession() as session:
await asyncio.gather(*[fetch(session, url) for url in URLS])
def main():
# 同步
t0 = time.time()
sync_fetch_all()
sync_time = time.time() - t0
print(f"同步: {sync_time:.2f}s")
# 异步
t0 = time.time()
asyncio.run(async_fetch_all())
async_time = time.time() - t0
print(f"异步: {async_time:.2f}s")
print(f"加速比: {sync_time / async_time:.1f}x")
if __name__ == "__main__":
main()
# 典型输出:
# 同步: 12.34s
# 异步: 1.23s
# 加速比: 10.0x
总结
| API | 用途 |
|---|---|
async def / await |
定义和等待协程 |
asyncio.run() |
程序入口,运行主协程 |
asyncio.gather() |
并发运行多个协程 |
asyncio.create_task() |
创建后台任务 |
asyncio.Queue |
生产者消费者 |
asyncio.Semaphore |
限制并发数 |
asyncio.wait_for() |
超时控制 |
loop.run_in_executor() |
阻塞函数异步化 |
asyncio 适合 I/O 密集型场景。CPU 密集型任务请用 multiprocessing,两者可以配合使用。