Redis队列

本文深入分析了Redis用作MQ的可行性,对比分析了List、Pub/Sub、Stream的优缺点,并将Stream和专业MQ进行对比分析。

当我们在使用一个消息队列时,希望它的功能如下:

  • 支持阻塞等待拉取消息
  • 支持发布 / 订阅模式
  • 消费失败,可重新消费,消息不丢失
  • 实例宕机,消息不丢失,数据可持久化
  • 消息可堆积

这里我也列了一个表格,总结了它们各自的优缺点:

compare

List 队列

原理与实践

如果把 List 当作队列,你可以这么来用。

List队列

生产者使用 LPUSH 发布消息:

127.0.0.1:6379> LPUSH queue msg1  
(integer) 1  
127.0.0.1:6379> LPUSH queue msg2  
(integer) 2 

消费者这一侧,使用 BRPOP 拉取消息:

while true:  
    // 没消息阻塞等待,0表示不设置超时时间  
    msg = redis.brpop("queue", 0)  
    if msg == null:  
        continue  
    // 处理消息  
    handle(msg) 

使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL。

这个方案不错,既兼顾了效率,还避免了 CPU 空转问题,一举两得。

注意:如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,采用这种方案,客户端要有重连机制。

不足之处

  1. 不支持多消费者

消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据

  1. 消息丢失

消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了

Pub/Sub

原理与实践

Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。

pub-sub

假设你想开启 2 个消费者,同时消费同一批数据,就可以按照以下方式来实现。

首先,使用 SUBSCRIBE 命令,启动 2 个消费者,并「订阅」同一个队列。

// 2个消费者 都订阅一个队列  
127.0.0.1:6379> SUBSCRIBE queue  
Reading messages... (press Ctrl-C to quit)  
1) "subscribe"  
2) "queue"  
3) (integer) 1 

此时,2 个消费者都会被阻塞住,等待新消息的到来。

之后,再启动一个生产者,发布一条消息。

127.0.0.1:6379> PUBLISH queue msg1  
(integer) 1 

这时,2 个消费者就会解除阻塞,收到生产者发来的新消息。

127.0.0.1:6379> SUBSCRIBE queue  
// 收到新消息  
1) "message"  
2) "queue"  
3) "msg1" 

看到了么,使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。

除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。

PSUB

// 订阅符合规则的队列  
127.0.0.1:6379> PSUBSCRIBE queue.*  
Reading messages... (press Ctrl-C to quit)  
1) "psubscribe"  
2) "queue.*"  
3) (integer) 1 

这里的消费者,订阅了 queue.* 相关的队列消息。

之后,生产者分别向 queue.p1 和 queue.p2 发布消息。

127.0.0.1:6379> PUBLISH queue.p1 msg1  
(integer) 1  
127.0.0.1:6379> PUBLISH queue.p2 msg2  
(integer) 1 

这时再看消费者,它就可以接收到这 2 个生产者的消息了。

127.0.0.1:6379> PSUBSCRIBE queue.*  
Reading messages... (press Ctrl-C to quit)  
...  
// 来自queue.p1的消息  
1) "pmessage" 
2) "queue.*"  
3) "queue.p1"  
4) "msg1" 
// 来自queue.p2的消息  
1) "pmessage"  
2) "queue.*"  
3) "queue.p2"  
4) "msg2" 

不足之处

我们可以看到,虽然Pub/Sub 支持多组生产者、消费者处理消息,但是没有解决丢数据的问题。

并且,List队列还只在「消费责异常」时存在数据丢失,Pub/Sub 发生数据丢失的情况则更多。

如果发生以下场景,就有可能导致数据丢失:

  1. 消费责异常

Pub/Sub 从缓冲区取走数据之后,数据就从 Redis 缓冲区删除了,消费者发生异常,自然也无法再次重新消费。

  1. 消费者下线

整个过程中,没有任何的数据存储,一切都是实时转发的。如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部「丢弃」。

所以,当你在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。

  1. Redis 宕机

因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备「数据持久化」的能力。

也就是说,Pub/Sub 的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。

  1. 消息堆积

每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。

当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。

之后,消费者不断地从缓冲区读取消息,处理消息。

pub-sub消息堆积

但是,问题就出在这个缓冲区上。

因为这个缓冲区其实是有「上限」的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。如果超过了缓冲区配置的上限,此时,Redis 就会「强制」把这个消费者踢下线。这时消费者就会消费失败,也会丢失数据。

Stream

首先,Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:

  • XADD:发布消息
  • XREAD:读取消息

生产者发布 2 条消息:

// *表示让Redis自动生成消息ID  
127.0.0.1:6379> XADD queue * name zhangsan  
"1618469123380-0"  
127.0.0.1:6379> XADD queue * name lisi  
"1618469127777-0" 

使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID。

这个消息 ID 的格式是「时间戳-自增序号」。

消费者拉取消息:

// 从开头读取5条消息,0-0表示从开头读取  
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0  
1) 1) "queue"  
   2) 1) 1) "1618469123380-0"  
         2) 1) "name"  
            2) "zhangsan"  
      2) 1) "1618469127777-0"  
         2) 1) "name"  
            2) "lisi" 

如果想继续拉取消息,需要传入上一条消息的 ID:

127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0

(nil)

没有消息,Redis 会返回 NULL。

stream

以上就是 Stream 最简单的生产、消费。

这里不再重点介绍 Stream 命令的各种参数,我在例子中演示时,凡是大写的单词都是「固定」参数,凡是小写的单词,都是可以自己定义的,例如队列名、消息长度等等,下面的例子规则也是一样,为了方便你理解,这里有必要提醒一下。

下面我们来看,针对前面提到的消息队列要求,Stream 都是如何解决的?

1. Stream 是否支持「阻塞式」拉取消息?

可以的,在读取消息时,只需要增加 BLOCK 参数即可。

// BLOCK 0 表示阻塞等待,不设置超时时间  
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0 

这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。

2. Stream 是否支持发布 / 订阅模式?

也没问题,Stream 通过以下命令完成发布订阅:

  • XGROUP:创建消费者组
  • XREADGROUP:在指定消费组下,开启消费者拉取消息

下面我们来看具体如何做?

首先,生产者依旧发布 2 条消息:

127.0.0.1:6379> XADD queue * name zhangsan  
"1618470740565-0"  
127.0.0.1:6379> XADD queue * name lisi 
"1618470743793-0" 

之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:

// 创建消费者组1,0-0表示从头拉取消息  
127.0.0.1:6379> XGROUP CREATE queue group1 0-0  
OK  
// 创建消费者组2,0-0表示从头拉取消息  
127.0.0.1:6379> XGROUP CREATE queue group2 0-0  
OK 

消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。

第一个消费组开始消费:

// group1的consumer开始消费,>表示拉取最新数据  
127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >  
1) 1) "queue"  
   2) 1) 1) "1618470740565-0"  
         2) 1) "name"  
            2) "zhangsan"  
      2) 1) "1618470743793-0"  
         2) 1) "name"  
            2) "lisi" 

同样地,第二个消费组开始消费:

// group2的consumer开始消费,>表示拉取最新数据  
127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >  
1) 1) "queue"  
   2) 1) 1) "1618470740565-0"  
         2) 1) "name"  
            2) "zhangsan"  
      2) 1) "1618470743793-0"  
         2) 1) "name"  
            2) "lisi" 

我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。

这样一来,就达到了多组消费者「订阅」消费的目的。

XREADGROUP

3. 消息处理时异常,Stream 能否保证消息不丢失,重新消费?

除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。

当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。

// group1下的 1618472043089-0 消息已处理完成  
127.0.0.1:6379> XACK queue group1 1618472043089-0 

stream-ack

如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。

待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。

// 消费者重新上线,0-0表示重新拉取未ACK的消息  
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0  
// 之前没消费成功的数据,依旧可以重新消费  
1) 1) "queue"  
   2) 1) 1) "1618472043089-0"  
         2) 1) "name"  
            2) "zhangsan"  
      2) 1) "1618472045158-0"  
         2) 1) "name"  
            2) "lisi" 

4. Stream 数据会写入到 RDB 和 AOF 做持久化吗?

Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。

5. 消息堆积时,Stream 是怎么处理的?

其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:

  1. 生产者限流:避免消费者处理不及时,导致持续积压
  2. 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息

而 Redis 在实现 Stream 时,采用了第 2 个方案。

在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。

// 队列长度最大10000  
127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan  
"1618473015018-0" 

当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。

Stream与专业MQ对比

我们从以下两个方面对比:

  • 消息不丢
  • 消息可堆积

消息不丢

消息是否会发生丢失,其重点也就在于以下 3 个环节:

  1. 生产者会不会丢消息?
  2. 消费者会不会丢消息?
  3. 队列中间件会不会丢消息?

1. 生产者会不会丢消息?

无论是Stream还是专业MQ,要保证生产者不丢基本靠重发,消费者端应保证幂等。

从这个角度来看,Redis 也是合格的

可以是生产者发送后阻塞等待ack,也可以是MQ回调生产者的ack接口,无论哪种确认形式,只要没有被ack,就重发,当然这两种情况都可能导致消费重复发生。

2. 消费者会不会丢消息?

要解决这个问题,消费者在处理完消息后,必须「告知」队列中间件,队列中间件才会把标记已处理,否则仍旧把这些数据发给消费者。

这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢。

无论是 Redis 的 Stream,还是专业的队列中间件,例如 RabbitMQ、Kafka,其实都是这么做的。

所以,从这个角度来看,Redis 也是合格的

3. 队列中间件会不会丢消息?

在这个方面,Redis 其实没有达到要求

Redis 在以下 2 个场景下,都会导致数据丢失。

  1. AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能
  2. 主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库)

基于以上原因我们可以看到,Redis 本身的无法保证严格的数据完整性。

所以,如果把 Redis 当做消息队列,在这方面是有可能导致数据丢失的。

再来看那些专业的消息队列中间件是如何解决这个问题的?

像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时,一般是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,以此保证消息的完整性。这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。

也正因为如此,RabbitMQ、Kafka在设计时也更复杂。毕竟,它们是专门针对队列场景设计的。

但 Redis 的定位则不同,它的定位更多是当作缓存来用,它们两者在这个方面肯定是存在差异的。

消息积压怎么办

因为 Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。

所以,Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。

但 Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加「坦然」。

总结

综上,我们可以看到,把 Redis 当作队列来使用时,始终面临的 2 个问题:

  1. Redis 本身可能会丢数据
  2. 面对消息积压,Redis 内存资源紧张

参考

把Redis当作队列来用,真的合适吗?

版权

评论