redis 发布订阅模型
发布/订阅模型:Pub/Sub
从名字就能看出来,这个模块是 Redis 专门是针对「发布/订阅」这种队列模型设计的。
它正好可以解决前面提到的第一个问题:重复消费。
即多组生产者、消费者的场景,我们来看它是如何做的。
Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。
1 : 使用 SUBSCRIBE 命令,启动 2 个消费者,并「订阅」同一个队列。
// 2个消费者 都订阅一个队列
127.0.0.1:6379> SUBSCRIBE queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
// 2个消费者 都订阅一个队列
127.0.0.1:6379> SUBSCRIBE queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
2: 再启动一个生产者,发布一条消息。
127.0.0.1:6379> SUBSCRIBE queue
// 收到新消息
1) "message"
2) "queue"
3) "msg1"
使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。
除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。
// 订阅符合规则的队列
127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "queue.*"
3) (integer) 1
生产者分别向 queue.p1 和 queue.p2 发布消息。
127.0.0.1:6379> PUBLISH queue.p1 msg1
(integer) 1
127.0.0.1:6379> PUBLISH queue.p2 msg2
(integer) 1
我们可以看到,Pub/Sub 最大的优势就是,支持多组生产者、消费者处理消息。
缺点
讲完了它的优点,那它有什么缺点呢?
其实,Pub/Sub 最大问题是:丢数据。
如果发生以下场景,就有可能导致数据丢失:
- 消费者下线
- Redis 宕机
- 消息堆积
这其实与 Pub/Sub 的实现方式有很大关系。
Pub/Sub 在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立「数据转发通道」,把符合规则的数据,从一端转发到另一端。
整个过程中,没有任何的数据存储,一切都是实时转发的。
这种设计方案,就导致了上面提到的那些问题。
例如,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息,因为找不到消费者,都会被丢弃掉。
如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部「丢弃」。
所以,当你在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。
另外,因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备「数据持久化」的能力。
也就是说,Pub/Sub 的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。
最后,我们来看 Pub/Sub 在处理「消息积压」时,为什么也会丢数据?
当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。
如果采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。
但 Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失!
每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。
当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。
之后,消费者不断地从缓冲区读取消息,处理消息。
因为这个缓冲区其实是有「上限」的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。
如果超过了缓冲区配置的上限,此时,Redis 就会「强制」把这个消费者踢下线。
这时消费者就会消费失败,也会丢失数据。
如果你有看过 Redis 的配置文件,可以看到这个缓冲区的默认配置:client-output-buffer-limit pubsub 32mb 8mb 60。
它的参数含义如下:
- 32mb:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线
- 8mb + 60:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线
Pub/Sub 的这一点特点,是与 List 作队列差异比较大的。
从这里你应该可以看出,List 其实是属于「拉」模型,而 Pub/Sub 其实属于「推」模型。
List 中的数据可以一直积压在内存中,消费者什么时候来「拉」都可以。
但 Pub/Sub 是把消息先「推」到消费者在 Redis Server 上的缓冲区中,然后等消费者再来取。
当生产、消费速度不匹配时,就会导致缓冲区的内存开始膨胀,Redis 为了控制缓冲区的上限,所以就有了上面讲到的,强制把消费者踢下线的机制。
总结一下 Pub/Sub 的优缺点:
- 支持发布 / 订阅,支持多组生产者、消费者处理消息
- 消费者下线,数据会丢失
- 不支持数据持久化,Redis 宕机,数据也会丢失
- 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失
有没有发现,除了第一个是优点之外,剩下的都是缺点。
所以,很多人看到 Pub/Sub 的特点后,觉得这个功能很「鸡肋」。
也正是以上原因,Pub/Sub 在实际的应用场景中用得并不多。
目前只有哨兵集群和 Redis 实例通信时,采用了 Pub/Sub 的方案,因为哨兵正好符合即时通讯的业务场景。
我们再来看一下,Pub/Sub 有没有解决,消息处理时异常宕机,无法再次消费的问题呢?
其实也不行,Pub/Sub 从缓冲区取走数据之后,数据就从 Redis 缓冲区删除了,消费者发生异常,自然也无法再次重新消费。
梳理
现在我们重新梳理一下,我们在使用消息队列时的需求。
当我们在使用一个消息队列时,希望它的功能如下:
- 支持阻塞等待拉取消息
- 支持发布 / 订阅模式
- 消费失败,可重新消费,消息不丢失
- 实例宕机,消息不丢失,数据可持久化
- 消息可堆积
Redis 除了 List 和 Pub/Sub 之外,还有符合这些要求的数据类型吗?
其实,Redis 的作者也看到了以上这些问题,也一直在朝着这些方向努力着。
Redis 作者在开发 Redis 期间,还另外开发了一个开源项目 disque。
这个项目的定位,就是一个基于内存的分布式消息队列中间件。
但由于种种原因,这个项目一直不温不火。
终于,在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,并给它定义了一个新的数据类型:Stream。
下面我们就来看看,它能符合上面提到的这些要求吗?
趋于成熟的队列:Stream
我们来看 Stream 是如何解决上面这些问题的。
我们依旧从简单到复杂,依次来看 Stream 在做消息队列时,是如何处理的?
首先,Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:
- XADD:发布消息
- XREAD:读取消息