阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

关于 Redis的订阅发布

173次阅读
没有评论

共计 2843 个字符,预计需要花费 8 分钟才能阅读完成。

文章目录

  1. 1. 为什么做订阅分布?
  2. 2. Redis 中的订阅发布
  3. 3. Redis 生产者消费者
  4. 4. Redis 中订阅发布
  5. 5. Java Jedis 踩过的坑

为什么做订阅分布?

随着业务复杂, 业务的项目依赖关系增强, 使用消息队列帮助系统 降低耦合度.

  • 订阅分布本身也是一种生产者消费者模式, 订阅者是消费者, 发布者是生产者.
  • 订阅发布模式, 发布者发布消息后, 只要有订阅方, 则多个订阅方会收到同样的消息
  • 生产者消费者模式, 生产者往队列里放入消息, 由多个消费者对一条消息进行抢占.

  • 订阅分布模式可以将一些不着急完成的工作放到其他进程或者线程中进行离线处理.

 

Redis 中的订阅发布

Redis 中的订阅发布模式, 当没有订阅者时, 消息会被直接丢弃(Redis 不会持久化保存消息)

Redis 生产者消费者

生产者使用 Redis 中的 list 数据结构进行实现, 将待处理的消息塞入到消息队列中.

1
2
3
4
5
6
7
8
9
10
11
class Producer(object):
 
def __init__(self, host=“localhost”, port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self.key = “test_key”
self.value = “test_value_{id}”
 
def produce(self):
for id in xrange(5):
msg = self.value.format(id=id)
self._conn.lpush(self.key, msg)

消费者使用 redis 中 brpop 进行实现, brpop 会从 list 头部消息, 并能够设置超时等待时间.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Consumer(object):
 
def __init__(self, host=“localhost”, port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self.key = “test_key”
 
def consume(self, timeout=0):
# timeout=0 表示会无线阻塞, 直到获得消息
while True:
msg = self._conn.brpop(self.key, timeout=timeout)
process(msg)
 
 
def process(msg):
print msg
 
if __name__ == ‘__main__’:
consumer = Consumer()
consumer.consume()
# 输出结果
(‘test_key’, ‘test_value_1’)
(‘test_key’, ‘test_value_2’)
(‘test_key’, ‘test_value_3’)
(‘test_key’, ‘test_value_4’)
(‘test_key’, ‘test_value_5’)

Redis 中订阅发布

在 Redis Pubsub 中, 一个频道 (channel) 相当于一个消息队列

1
2
3
4
5
6
7
8
9
10
11
class Publisher(object):
 
def __init__(self, host, port):
self._conn = redis.StrictRedis(host=host, port=port)
self.channel = “test_channel”
self.value = “test_value_{id}”
 
def pub(self):
for id in xrange(5):
msg = self.value.format(id=id)
self._conn.publish(self.channel, msg)

其中 get_message 使用了select IO 多路复用来检查 socket 连接是否是否可读.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Subscriber(object):
 
def __init__(self, host=“localhost”, port=6379):
self._conn = redis.StrictRedis(host=host, port=port)
self._pubsub = self._conn.pubsub() # 生成 pubsub 对象
self.channel = “test_channel”
self._pubsub.subscribe(self.channel)
 
def sub(self):
while True:
msg = self._pubsub.get_message()
if msg and isinstance(msg.get(“data”), basestring):
process(msg.get(“data”))
 
def close(self):
self._pubsub.close()
 
# 输出结果
test_value_1
test_value_2
test_value_3
test_value_4
test_value_5

Java Jedis 踩过的坑

在 Jedis 中订阅方处理是采用同步的方式, 看源码中 PubSub 模块的 process 函数

do-while 循环中, 会等到当前消息处理完毕才能够处理下一条消息, 这样会导致当入队列消息量过大的时候, redis 链接被强制关闭.

解决方案: 将整个处理函数改为异步的方式.

下面关于 Redis 的文章您也��能喜欢,不妨参考下:

Ubuntu 14.04 下 Redis 安装及简单测试 http://www.linuxidc.com/Linux/2014-05/101544.htm

Redis 主从复制基本配置 http://www.linuxidc.com/Linux/2015-03/115610.htm

Redis 集群明细文档 http://www.linuxidc.com/Linux/2013-09/90118.htm

Ubuntu 12.10 下安装 Redis(图文详解)+ Jedis 连接 Redis http://www.linuxidc.com/Linux/2013-06/85816.htm

Redis 系列 - 安装部署维护篇 http://www.linuxidc.com/Linux/2012-12/75627.htm

CentOS 6.3 安装 Redis http://www.linuxidc.com/Linux/2012-12/75314.htm

Redis 安装部署学习笔记 http://www.linuxidc.com/Linux/2014-07/104306.htm

Redis 配置文件 redis.conf 详解 http://www.linuxidc.com/Linux/2013-11/92524.htm

Redis 的详细介绍:请点这里
Redis 的下载地址:请点这里

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-04/130236.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-22发表,共计2843字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中