victory的博客

长安一片月,万户捣衣声

0%

python | 协程

python协程

  1. 协程是什么?

    协程是一种用户级别的线程,由程序(而不是操作系统)调度。在Python种,由async def定义,使用await暂停和恢复执行。适合处理大量IO阻塞任务(如Redis、HTTP、文件、数据库等)。

  2. 为什么要用协程?
    目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念就是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。

    其实不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。
    而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。

    因为协程是用户自己来编写调度逻辑的,对于我们的CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。

  3. 简单协程示例

    1
    2
    3
    4
    5
    6
    7
    8
    import asyncio

    async def say_hello():
    print("Hello ...")
    await asyncio.sleep(1)
    print("... World!")

    asyncio.run(say_hello())
  4. 运行Redis订阅监听、HTTP并发请求、异步文件写入、异步数据库查询的协程驱动系统代码示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    import asyncio
    import aiohttp
    import aiofiles
    import aiomysql
    import redis.asyncio as aioredis

    # === Redis 订阅监听 ===
    async def redis_listener():
    redis = aioredis.Redis()
    pubsub = redis.pubsub()
    await pubsub.subscribe("demo")
    print("[Redis] Subscribed to 'demo'")

    async for msg in pubsub.listen():
    if msg["type"] == "message":
    print(f"[Redis] Received: {msg['data'].decode()}")

    # === 并发 HTTP 请求 ===
    async def fetch(session, url, i):
    async with session.get(url) as resp:
    text = await resp.text()
    print(f"[HTTP] #{i}: {len(text)} bytes from {url}")

    async def http_worker():
    urls = ["https://example.com"] * 3
    async with aiohttp.ClientSession() as session:
    tasks = [fetch(session, url, i) for i, url in enumerate(urls)]
    await asyncio.gather(*tasks)

    # === 异步写文件 ===
    async def file_writer():
    for i in range(3):
    async with aiofiles.open(f"output_{i}.txt", "w") as f:
    await f.write(f"[File] Written by task {i}\n")
    print(f"[File] Written file {i}")
    await asyncio.sleep(0.5)

    # === 异步 MySQL 查询 ===
    async def mysql_worker():
    try:
    conn = await aiomysql.connect(
    host='localhost', port=3306,
    user='your_user', password='your_pass',
    db='your_db', autocommit=True
    )
    async with conn.cursor() as cur:
    await cur.execute("SELECT SLEEP(1), 'Hello from DB'")
    result = await cur.fetchone()
    print(f"[MySQL] Result: {result}")
    conn.close()
    except Exception as e:
    print("[MySQL] Error:", e)

    # === 主函数:并发运行所有任务 ===
    async def main():
    await asyncio.gather(
    redis_listener(),
    http_worker(),
    file_writer(),
    mysql_worker(),
    )

    if __name__ == "__main__":
    asyncio.run(main())