Python asyncio 实战:从回调地狱到协程优雅

asyncio 是 Python 异步编程的核心,搞懂 event loop、Task、gather 这些概念才能写出真正高效的异步代码。

$1.3k 字/约 8 min👁— views

Python asyncio 实战:从回调地狱到协程优雅

先搞清楚:该用哪种并发

场景 推荐方案
I/O 密集(网络请求、文件读写) asyncio / 多线程
CPU 密集(计算、图像处理) 多进程(multiprocessing)
简单并发、兼容老代码 多线程(threading)
高性能异步 I/O asyncio

asyncio 的核心优势:单线程处理大量并发 I/O,内存开销远小于多线程

Event Loop 工作原理

┌─────────────────────────────────────────────┐
│                Event Loop                    │
│                                              │
│  ┌──────────┐    ┌──────────┐    ┌────────┐ │
│  │ Coroutine│    │  Task    │    │ Future │ │
│  │(async def)│──▶│(Task包装 │──▶│(底层   │ │
│  │          │    │ coroutine│    │ 结果   │ │
│  └──────────┘    └──────────┘    └────────┘ │
│                                              │
│  等待 I/O 时挂起 → 切换到其他任务 → I/O 完成后恢复  │
└─────────────────────────────────────────────┘
  • Coroutineasync def 定义的函数,调用后返回协程对象
  • Task:对 coroutine 的封装,由 event loop 调度
  • Future:代表一个异步操作的最终结果

基础语法

import asyncio

# 定义协程
async def fetch_data(url: str) -> str:
    print(f"开始请求 {url}")
    await asyncio.sleep(1)  # 模拟 I/O 等待
    return f"data from {url}"

# 运行协程
async def main():
    result = await fetch_data("https://api.example.com")
    print(result)

asyncio.run(main())

asyncio.gather():并发执行

import asyncio
import time

async def task(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"{name} done"

async def main():
    start = time.time()

    # 顺序执行:约 3 秒
    # r1 = await task("A", 1)
    # r2 = await task("B", 1)
    # r3 = await task("C", 1)

    # 并发执行:约 1 秒
    results = await asyncio.gather(
        task("A", 1),
        task("B", 1),
        task("C", 1),
    )

    print(results)  # ['A done', 'B done', 'C done']
    print(f"耗时: {time.time() - start:.2f}s")  # 约 1.00s

asyncio.run(main())

gather() 异常处理

async def risky(name: str) -> str:
    if name == "B":
        raise ValueError("B failed!")
    return f"{name} ok"

async def main():
    # return_exceptions=True:异常作为结果返回,不中断其他任务
    results = await asyncio.gather(
        risky("A"),
        risky("B"),
        risky("C"),
        return_exceptions=True,
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"Error: {r}")
        else:
            print(f"OK: {r}")

asyncio.run(main())

create_task():后台任务

import asyncio

async def background_job(n: int) -> None:
    await asyncio.sleep(0.5)
    print(f"Job {n} complete")

async def main():
    # 创建任务后立即返回,任务在后台运行
    t1 = asyncio.create_task(background_job(1))
    t2 = asyncio.create_task(background_job(2))

    print("做其他事情...")
    await asyncio.sleep(0.1)
    print("继续做其他事情...")

    # 等待任务完成
    await t1
    await t2

asyncio.run(main())

asyncio.Queue:生产者消费者

import asyncio

async def producer(queue: asyncio.Queue, items: list[str]) -> None:
    for item in items:
        await queue.put(item)
        print(f"生产: {item}")
        await asyncio.sleep(0.1)
    await queue.put(None)  # 终止信号

async def consumer(queue: asyncio.Queue, name: str) -> None:
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # 传递终止信号给其他 consumer
            break
        print(f"{name} 消费: {item}")
        await asyncio.sleep(0.3)
        queue.task_done()

async def main():
    queue: asyncio.Queue[str | None] = asyncio.Queue(maxsize=5)
    items = [f"task_{i}" for i in range(6)]

    await asyncio.gather(
        producer(queue, items),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
    )

asyncio.run(main())

aiohttp:异步 HTTP 请求

import asyncio
import aiohttp  # pip install aiohttp

async def fetch(session: aiohttp.ClientSession, url: str) -> tuple[str, int]:
    async with session.get(url) as resp:
        return url, resp.status

async def fetch_all(urls: list[str]) -> list[tuple[str, int]]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404",
    ]
    results = await fetch_all(urls)
    for url, status in results:
        print(f"{status} - {url}")

asyncio.run(main())

asyncio.Semaphore:限制并发数

import asyncio
import aiohttp

async def fetch_with_limit(
    session: aiohttp.ClientSession,
    url: str,
    sem: asyncio.Semaphore,
) -> str:
    async with sem:  # 同时最多 10 个请求
        async with session.get(url) as resp:
            return await resp.text()

async def main():
    urls = [f"https://httpbin.org/get?n={i}" for i in range(50)]
    sem = asyncio.Semaphore(10)  # 并发上限 10

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url, sem) for url in urls]
        results = await asyncio.gather(*tasks)
    print(f"获取了 {len(results)} 个响应")

asyncio.run(main())

超时控制:asyncio.wait_for()

import asyncio

async def slow_operation() -> str:
    await asyncio.sleep(5)
    return "done"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时!")

asyncio.run(main())

常见坑:在 async 里调用阻塞函数

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

# ❌ 错误:阻塞整个 event loop
async def bad():
    time.sleep(2)  # 阻塞!其他协程无法运行
    return "done"

# ✅ 正确:放到线程池里跑
async def good():
    loop = asyncio.get_running_loop()
    # 在线程池中运行阻塞函数
    result = await loop.run_in_executor(None, time.sleep, 2)
    return "done"

# 或者用 asyncio.to_thread(Python 3.9+)
async def better():
    await asyncio.to_thread(time.sleep, 2)
    return "done"

实际案例:并发抓取 100 个 URL

import asyncio
import time
import aiohttp
import requests  # pip install requests

URLS = [f"https://httpbin.org/get?n={i}" for i in range(20)]

# 同步版本
def sync_fetch_all() -> None:
    for url in URLS:
        requests.get(url)

# 异步版本
async def async_fetch_all() -> None:
    sem = asyncio.Semaphore(10)

    async def fetch(session: aiohttp.ClientSession, url: str) -> None:
        async with sem:
            async with session.get(url) as r:
                await r.read()

    async with aiohttp.ClientSession() as session:
        await asyncio.gather(*[fetch(session, url) for url in URLS])

def main():
    # 同步
    t0 = time.time()
    sync_fetch_all()
    sync_time = time.time() - t0
    print(f"同步: {sync_time:.2f}s")

    # 异步
    t0 = time.time()
    asyncio.run(async_fetch_all())
    async_time = time.time() - t0
    print(f"异步: {async_time:.2f}s")
    print(f"加速比: {sync_time / async_time:.1f}x")

if __name__ == "__main__":
    main()
# 典型输出:
# 同步: 12.34s
# 异步: 1.23s
# 加速比: 10.0x

总结

API 用途
async def / await 定义和等待协程
asyncio.run() 程序入口,运行主协程
asyncio.gather() 并发运行多个协程
asyncio.create_task() 创建后台任务
asyncio.Queue 生产者消费者
asyncio.Semaphore 限制并发数
asyncio.wait_for() 超时控制
loop.run_in_executor() 阻塞函数异步化

asyncio 适合 I/O 密集型场景。CPU 密集型任务请用 multiprocessing,两者可以配合使用。