MQ综述

1. 概念

  • Broker
  • Producer
  • Consumer
  • Topic
  • Queue
  • Message

2. 模式

2.1. 点对点

PTP 点对点: 使用 Queue 作为通信载体

点对点

消息生产者生产消息发送到 Queue 中,然后消息消费者从 Queue 中取出并且消费消息。消息被消费以后,Queue 中不再存储,所以消息消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费

2.2. 发布/订阅

Pub/Sub 发布订阅(广播): 使用 Topic 作为通信载体

发布订阅

消息生产者(发布)将消息发布到 Topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 Topic 的消息会被所有订阅者消费

总结

Queue 实现了负载均衡,将 Producer 生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者

Topic 实现了发布和订阅,当你发布一个消息,所有订阅这个 Topic 的服务都能得到这个消息,所以从1到N个订阅者都能得到一个消息的拷贝

3. 协议

3.1. AMQP协议

AMQP 即 Advanced Message Queuing Protocol ,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。兼容 JMS。RabbitMQ 就是基于 AMQP 协议实现的。

优点:可靠、通用

JMS(JAVA Message Service,java 消息服务)是 java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。JMS(JAVA Message Service,Java 消息服务)API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

ActiveMQ 就是基于 JMS 规范实现的。

JMS vs AMQP

对比方向 JMS AMQP
定义 Java API 协议
跨语言
跨平台
支持消息类型 提供两种消息模型:①Peer-2-Peer;②Pub/sub 提供了五种消息模型:①direct exchange;②fanout exchange;③topic change;④headers exchange;⑤system exchange。本质来讲,后四种和 JMS 的 pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分;
支持消息类型 支持多种消息类型 ,我们在上面提到过 byte[](二进制)

3.2. MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是 IBM 开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过 Twitter 让房屋联网)的通信协议。

优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统

3.3. STOMP协议

STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为 MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP 提供一个可互操作的连接格式,允许客户端与任意 STOMP 消息代理(Broker)进行交互。

优点:命令模式(非 Topic\Queue 模式)

3.4. XMPP协议

XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。

优点:通用公开、兼容性强、可扩展、安全性高,但 XML 编码格式占用带宽大

3.5. 基于TCP/IP自定义的协议

有些特殊框架(如:Redis、Kafka、ZeroMq等)根据自身需要未严格遵循 MQ 规范,而是基于 TCP\IP 自行封装了一套协议,通过网络 Socket 接口进行传输,实现了 MQ 的功能

4. 优点

4.1. 解耦

4.2. 异步

4.3. 削峰

5. 缺点

  • 系统可用性降低。在系统中引入 MQ,那么万一 MQ 挂了怎么办呢?一般而言,引入的外部依赖越多,系统越脆弱,每一个依赖出问题都会导致整个系统的崩溃
  • 系统复杂度提高。需要考虑 MQ 的各种情况,比如:消息的重复消费、消息丢失、保证消息传递的顺序性等等
  • 数据一致性问题。比如 A 系统已经给客户返回操作成功,这时候操作 BC 都成功了,操作 D 却失败了,导致数据不一致

6. F&Q

6.1 如何保证消息队列的高可用?

6.1.1 RabbitMQ 的高可用性

RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

1. 单机模式

无高可用。

2. 普通集群模式

无高可用性。

普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。

RabbitMQ-普通集群模式

这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈

而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。

所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

3. 镜像集群模式

有高可用性。

这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。

RabbitMQ-镜像集群模式

这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。你想,如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢?

6.1.2 Kafka 的高可用性

Kafka有partition的概念,每个topic可以划分为多个partition。每个 partition 的数据都会同步到指定数量的其它机器上,形成自己的多个 replica 副本。

所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。只能读写 leader?很简单,要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度太高,很容易出问题。

如果这个宕机的 broker 上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就有所谓的高可用性了。

写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)

消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。

6.2 如何保证消息不被重复消费?

如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

例如:kafka有offset的概念,如果消费者已经消费了消息,但是还没提交对应位置offset的时候应用发生重启,重启之后继续消费就有可能发生重复消费。

再例如:也有可能是生产者重复发送消息导致重复消费,例如:RabbitMQ生产者为了保证消息不被丢失而开启了comfirm模式,如果生产者成功发送消息 👉 RabbitMQ成功处理保存了消息 👉 但是因为网络抖动生产者没有收到RabbitMQ的回复ack 👉 生产者重发消息 👉 最终该消息发了两次😔

解决问题的方式有如下三种思路

  • 如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据
  • 如果你拿到这个消息做 Redis 的 Set 的操作,不用解决,因为你无论 Set 几次结果都是一样的,Set 操作本来就算幂等操作
  • 如果上面两种情况还不行,准备一个第三服务方来做消费记录。以 Redis 为例,给消息分配一个全局 ID,只要消费过该消息,将 <Id,Message> 以 K-V 形式写入 Redis。那消费者开始消费前,先去 Redis 中查询有没消费记录即可

6.3 如何保证消息的可靠性传输?

如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题?

6.3.1 RabbitMQ

rabbitmq-message-lose

1. 生产者弄丢了数据

事务 或者 confirm机制

生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务 channel.txSelect ,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 channel.txRollback ,然后重试发送消息;如果收到了消息,那么可以提交事务 channel.txCommit

// 开启事务
channel.txSelect
try {
    // 这里发送消息
} catch (Exception e) {
    channel.txRollback

    // 这里再次重发这条消息
}

// 提交事务
channel.txCommit

但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能

所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

2. RabbitMQ 弄丢了数据

开启 RabbitMQ 的持久化,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack ,你也是可以自己重发的。

3. 消费端弄丢了数据

关闭 RabbitMQ 的自动 ack ,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

rabbitmq-message-lose-solution

6.3.2 Kafka

1. 消费端弄丢了数据

跟RabbitMQ 差不多,关闭自动提交offset。

2. Kafka 弄丢了数据

partition follower 还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?

此时一般是要求起码设置如下 4 个参数:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。
  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了
  • 在 producer 端设置 retries=MAX (很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
3. 生产者会不会弄丢数据?

如果按照上述的思路设置了 acks=all ,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

6.4 如何保证消息的顺序性?

Rabbitmq:

  1. 一个 Queue,一个 Consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个;
  2. 拆分多个 Queue,生产者把相同的key消息放入同一个Queue,每个 Queue一个 Consumer,就是多一些 Queue 而已,确实是麻烦点;
  3. 或者就一个 Queue 但是对应一个 Consumer,Consumer把相同的key消息放入内存 queue,每个内存 queue对应一个Worker线程
  4. 第2第3结合使用

Kafka:

  • 一个 Topic,一个 Partition,一个 Consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个;

  • 生产者把相同的key消息放入同一个Kafka Partition,每个Partition对应一个Consumer,Consumer把相同的key消息放入内存 queue,每个内存 queue对应一个Worker线程

6.5 如何设计消息队列?

  • 支持可伸缩性,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?
  • 保证消息不会丢失,持久化、顺序写;与生产者配合,支持消息发生 事务comfirm 机制,达到数据 0 丢失。
  • 高可用,集群模式,少数broker挂掉了,MQ还能继续工作。参考kafka 的高可用保障机制,多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。

6.6 数据是通过 Push 还是 Pull 方式给到消费端,各自有什么弊端

  • Push 模型实时性好,但是因为状态维护等问题,难以应用到消息中间件的实践中,因为在 Broker 端需要维护 Consumer 的状态,不好适用于 Broker 去支持大量的 Consumer 的场景
  • Consumer 的消费速度是不一致的,Broker 进行推送难以处理不同的 Consumer 的状况,Broker 难以应对 Consumer 无法消费消息的情况,因为不知道 Consumer 的宕机是短暂的还是永久的,另外推送消息(量可能会很大)也会加重 Consumer 的负载或者压垮 Consumer,如果对应只有 1 个 Consumer,用 Push 比 Pull 好
  • Pull 模式实现起来会相对简单一些,但是实时性取决于轮训的频率,在对实时性要求高的场景不适合使用

7. 对比

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,吞吐量比RocketMQ和Kafka要低一个数量级 万级,吞吐量比RocketMQ和Kafka要低一个数量级 十万级,RocketMQ也是可以支撑高吞吐的一种MQ 十万级别,Kafka最大优点就是吞吐量大,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
Topic数量对吞吐量的影响 - - Topic可以达到几百、几千个的级别,吞吐量会有小幅度的下降。这是RocketMQ的一大优势,可在同等数量机器下支撑大量的Topic Topic从几十个到几百个的时候,吞吐量会大幅下降。所以在同等机器数量下,Kafka尽量保证Topic数量不要过多。如果支撑大规模Topic需要增加更多的机器
时效性 ms级 微秒级,这是rabbitmq的一大特点,延迟是最低的 ms级 延迟在ms级以内
可用性 高,基于主从架构实现可用性 高,基于主从架构实现可用性 非常高,分布式架构 非常高,Kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 - 经过参数优化配置,可以做到零丢失 经过参数配置,消息可以做到零丢失
功能支持 MQ领域的功能及其完备 基于erlang开发,所以并发性能极强,性能极好,延时低 MQ功能较为完备,分布式扩展性好 功能较为简单,主要支持加单MQ功能
优势 非常成熟,功能强大,在业内大量公司和项目中都有应用 erlang语言开发,性能极好、延时很低,吞吐量万级、MQ功能完备,管理界面非常好,社区活跃;互联网公司使用较多 接口简单易用,阿里出品有保障,吞吐量大,分布式扩展方便、社区比较活跃,支持大规模的Topic、支持复杂的业务场景,可以基于源码进行定制开发 超高吞吐量,ms级的时延,极高的可用性和可靠性,分布式扩展方便
劣势 偶尔有较低概率丢失消息,社区活跃度不高 吞吐量较低,erlang语音开发不容易进行定制开发,集群动态扩展麻烦 接口不是按照标准JMS规范走的,有的系统迁移要修改大量的代码,技术有被抛弃的风险 有可能进行消息的重复消费
应用 主要用于解耦和异步,较少用在大规模吞吐的场景中 都有使用 用于大规模吞吐、复杂业务中 在大数据的实时计算和日志采集中被大规模使用,是业界的标准

参考

MQ了解及对比选型

advanced-java

版权

评论