1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| import redis import threading import time
class RedisPubSub: def __init__(self, host='localhost', port=6379, db=0, channel='default'): self.channel = channel self.redis = redis.Redis(host=host, port=port, db=db, decode_responses=True) self.pubsub = self.redis.pubsub() self._running = False def publish(self, message: str): """发布消息""" self.redis.publish(self.channel, message) def subscribe(self, callback): """ 订阅并启动监听线程。 参数 callback: 接收一个函数,在收到消息时被调用。 """ def listen(): self.pubsub.subscribe(self.channel) print(f"Subscribed to {self.channel}") for message in self.pubsub.listen(): if not self._running: break if message['type'] == 'message': callback(message['data']) self._running = True self.listen_thread = threading.Thread(target=listen, daemon=True) self.listen_thread.start()
def stop(self): """停止订阅""" self._running = False self.pubsub.unsubscribe() print(f"Unsubscribed from {self.channel}")
|