python协程
协程是什么?
协程是一种用户级别的线程,由程序(而不是操作系统)调度。在Python种,由async def定义,使用await暂停和恢复执行。适合处理大量IO阻塞任务(如Redis、HTTP、文件、数据库等)。
为什么要用协程?
目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念就是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。其实不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。
而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。因为协程是用户自己来编写调度逻辑的,对于我们的CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。
简单协程示例
1
2
3
4
5
6
7
8import asyncio
async def say_hello():
print("Hello ...")
await asyncio.sleep(1)
print("... World!")
asyncio.run(say_hello())运行Redis订阅监听、HTTP并发请求、异步文件写入、异步数据库查询的协程驱动系统代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64import asyncio
import aiohttp
import aiofiles
import aiomysql
import redis.asyncio as aioredis
# === Redis 订阅监听 ===
async def redis_listener():
redis = aioredis.Redis()
pubsub = redis.pubsub()
await pubsub.subscribe("demo")
print("[Redis] Subscribed to 'demo'")
async for msg in pubsub.listen():
if msg["type"] == "message":
print(f"[Redis] Received: {msg['data'].decode()}")
# === 并发 HTTP 请求 ===
async def fetch(session, url, i):
async with session.get(url) as resp:
text = await resp.text()
print(f"[HTTP] #{i}: {len(text)} bytes from {url}")
async def http_worker():
urls = ["https://example.com"] * 3
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, i) for i, url in enumerate(urls)]
await asyncio.gather(*tasks)
# === 异步写文件 ===
async def file_writer():
for i in range(3):
async with aiofiles.open(f"output_{i}.txt", "w") as f:
await f.write(f"[File] Written by task {i}\n")
print(f"[File] Written file {i}")
await asyncio.sleep(0.5)
# === 异步 MySQL 查询 ===
async def mysql_worker():
try:
conn = await aiomysql.connect(
host='localhost', port=3306,
user='your_user', password='your_pass',
db='your_db', autocommit=True
)
async with conn.cursor() as cur:
await cur.execute("SELECT SLEEP(1), 'Hello from DB'")
result = await cur.fetchone()
print(f"[MySQL] Result: {result}")
conn.close()
except Exception as e:
print("[MySQL] Error:", e)
# === 主函数:并发运行所有任务 ===
async def main():
await asyncio.gather(
redis_listener(),
http_worker(),
file_writer(),
mysql_worker(),
)
if __name__ == "__main__":
asyncio.run(main())