victory的博客

长安一片月,万户捣衣声

0%

python | Redis实现发布订阅

Python Redis 发布订阅

  1. Redis发布/订阅是一种广播式消息系统:
    • 发布者:向一个频道(channel)发送消息
    • 订阅者:订阅一个或多个频道,等待接受消息
    • 当有消息发布到一个频道,所有订阅该频道的客户端都会立即收到消息
  2. 应用场景示例
    • 实时聊天系统:用户订阅频道,别人发言就能立刻收到
    • 消息推送/通知中心:后台发布消息,前端订阅实时显示
    • 分布式服务通信:多个服务通过Redis通信协调
  3. 特点
    • 高性能:消息实时传递
    • 非持久化:消息不会存储
    • 多对多支持:一个频道支持多个订阅者,一个客户端可订阅多个频道
    • 无确认机制:不想Kafka/RabbitMQ,由消息丢失风险
  4. Redis 发布订阅Python类实现(redis_pubsub.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import redis
import threading
import time

class RedisPubSub:
def __init__(self, host='localhost', port=6379, db=0, channel='default'):
self.channel = channel # 频道
self.redis = redis.Redis(host=host, port=port, db=db, decode_responses=True) # redis客户端
self.pubsub = self.redis.pubsub() # redis发布订阅对象
self._running = False # 是否开启订阅

# 向self.channel频道发送消息message
def publish(self, message: str):
"""发布消息"""
self.redis.publish(self.channel, message)

# 订阅self.channel消息
def subscribe(self, callback):
"""
订阅并启动监听线程。
参数 callback: 接收一个函数,在收到消息时被调用。
"""
def listen():
# 订阅self.channel频道
self.pubsub.subscribe(self.channel)
print(f"Subscribed to {self.channel}")

# 监听
for message in self.pubsub.listen():
if not self._running:
break
if message['type'] == 'message':
callback(message['data'])

# 开启订阅
self._running = True

# 创建监听线程
self.listen_thread = threading.Thread(target=listen, daemon=True)
# 启动监听线程
self.listen_thread.start()

def stop(self):
"""停止订阅"""
self._running = False
self.pubsub.unsubscribe()
print(f"Unsubscribed from {self.channel}")
  1. 发布端(publisher_demo.py)
1
2
3
4
5
6
7
8
from redis_pubsub import RedisPubSub
import time

pub = RedisPubSub(channel='chat')
for i in range(5):
pub.publish(f"Message {i}")
print(f"Sent: Message {i}")
time.sleep(1)
  1. 订阅端(subscriber_demo.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from redis_pubsub import RedisPubSub
import time

def handle_message(msg):
print(f"Received: {msg}")

sub = RedisPubSub(channel='chat')
sub.subscribe(callback=handle_message)

try:
while True:
time.sleep(1)
except KeyboardInterrupt:
sub.stop()