一、首先实现一个redis类,提供给订阅者和发布者调用
import redis
class RedisHelper:
def __init__(self):
# 链接服务端
self.__conn = redis.Redis(host='127.0.0.1')
# 加入两个频道
self.chan_sub = 'fm104.5'
self.chan_pub = 'fm104.5'
def public(self, msg):
#发消息订阅方
# publish发消息加入频道chan_pub
self.__conn.publish(self.chan_pub, msg)
return True
def subscribe(self):
# 开始订阅pubsub()
# 打开收音机
pub = self.__conn.pubsub()
# 调频道 subscribe
pub.subscribe(self.chan_sub)
# 准备接收parse_response()
# 在次调用parse_response() 开始接收
pub.parse_response()
# 返回订阅变量
return pub
二、发布者:可以看出发布的消息a 是一个列表,需要转换成str 进行发送。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# 调用逻辑模块
from redis_class import RedisHelper
# 实例化对象
obj = RedisHelper()
# 发消息加入频道
a = [{'item_id': '12315', 'type_id': 5, 'content': 'redis实现发布订阅功能,发布端负责发送消息。'}]
obj.public(str(a))
三、订阅者:打印msg可以发现其数据格式,是一个列表,列表前两个元素为消息头,无实际意义;第三个元素为一个字符串。
[b'message', b'fm104.5', b"[{'item_id': '1808151751199150000526', 'type_id': 5, 'content': '\xe4\xb8\x8d\xe5\xb0\x91\xe4\xba\xba\xe8\xae\xa4\xe4\xb8\xba\xef\xbc\x8c\xe5\x8f}]"]
因为,需要在接收端进行数据解析。 eval(msg[2])得到的是一个列表,eval(msg[2])[0]得到的是一个字典;最后通过键来获取字典的值,即content的内容。
# 实例化RedisHelper类对象
obj = RedisHelper()
# 赋值订阅变量
redis_sub = obj.subscribe()
i = 0
# 循环执行如下命令
while True:
# 二次调用parse_response() 开始接收,parse_response()接受发布的信息。
msg= redis_sub.parse_response()
print(msg)
try:
temp = eval(msg[2])[0]['content']
print(temp)
if temp is not None:
i_str = str(i)
filename = './new_doc/' + i_str + '.txt'
i += 1
print(i)
f = open(filename, "w", encoding='utf-8')
f.write(temp)
f.close()
except:
print('error')