MQ消息最终一致性解决方案

只有RocketMQ支持事务消息,如果我们的MQ不是RocketMQ,可以采用本地消息+MQ达到同样的效果,并且本地消息表还可以做出独立的服务。

随着分布式服务架构的流行与普及,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用。虽然服务化为我们的系统带来了水平伸缩的能力,然而随之而来挑战就是分布式事务问题,多个服务之间使用自己单独维护的数据库,它们彼此之间不在同一个事务中,假如A执行成功了,B执行却失败了,而A的事务此时已经提交,无法回滚,那么最终就会导致两边数据不一致性的问题;尽管很早之前就有基于两阶段提交的XA分布式事务,但是这类方案因为需要资源的全局锁定,导致性能极差;因此后面就逐渐衍生出了消息最终一致性、TCC等柔性事务的分布式事务方案,本文主要分析的是基于消息的最终一致性方案

0\. 简单RPC处理存在的一致性问题

在正式开始讲述正题之前,我们先看一下,不依赖任何分布式事务手段,单纯将本地业务逻辑远程调用逻辑放在同一个本地事务中会有什么问题。

我们以订单创建为例,订单系统先创建订单(本地事务),然后RPC调用库存扣减服务。

@Transactionnal
public void processOrder() {
    try{
        // 订单处理(业务操作) 
        orderService.process(); 
        // 库存扣减(RPC远程调用) 
        storageService.deduction();
    }catch(Exception e){
         事务回滚;   
    }
}

如果库存服务由于DB数据量比较大,导致处理超时,订单服务在出现超时异常后,直接回滚本地事务,从而导致订单服务这边没数据,而库存服务那边数据却已经写入了,最终导致两边业务数据的不一致。

即使不存在 “DB数据量比较大” 这种特殊情况,也一定会存在因为网络抖动,订单服务调用库存服务超时而本地回滚,但是库存服务实际操作成功的情况。

其根本的原因就在于:远程调用,结果最终可能为成功、失败、超时;而对于超时的情况,处理方最终的结果可能是成功,也可能是失败,调用方是无法知晓的。

1\. 普通消息的处理流程

普通消息的处理流程

  1. 消息生成者发送消息
  2. MQ收到消息,将消息进行持久化,在存储中新增一条记录
  3. 返回ACK给生产者
  4. MQ push 消息给对应的消费者,然后等待消费者返回ACK
  5. 如果消息消费者在指定时间内成功返回ack,那么MQ认为消息消费成功,在存储中删除消息,即执行第6步;如果MQ在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新push消息,重复执行4、5、6步骤
  6. MQ删除消息

1.2 普通消息处理存在的一致性问题

我们还是以订单创建为例,订单系统先创建订单(本地事务),再发送消息给下游处理;如果订单创建成功,然而消息没有发送出去,那么下游所有系统都无法感知到这个事件,会出现脏数据;

public void processOrder() {
    // 订单处理(业务操作) 
    orderService.process();
    // 发送订单处理成功消息(发送消息) 
    sendBizMsg ();
}

如果先发送订单消息,再创建订单;那么就有可能消息发送成功,但是在订单创建的时候却失败了,此时下游系统却认为这个订单已经创建,也会出现脏数据。

public void processOrder() {
   // 发送订单处理成功消息(发送消息) 
    sendBizMsg ();
    // 订单处理(业务操作) 
    orderService.process();
}

1.3 一个错误的想法

此时可能有同学会想,我们可否将消息发送和业务处理放在同一个本地事务中来进行处理,如果业务消息发送失败,那么本地事务就回滚,这样是不是就能解决消息发送的一致性问题呢?

@Transactionnal
public void processOrder() {
    try{
        // 订单处理(业务操作) 
        orderService.process(); 
        // 发送订单处理成功消息(发送消息) 
        sendBizMsg ();
    }catch(Exception e){
         事务回滚;   
    }
}

这种做法的错误在于,如果订单处理成功,消息成功存储到MQ,但是MQ处理超时,从而ACK确认失败,导致发送方本地事务回滚。 势必造成消息生产者与消费者数据状态不一致。

2\. 事务消息

由于传统的处理方式无法解决消息生成者本地事务处理成功消息发送成功两者的一致性问题,因此事务消息就诞生了,它实现了消息生成者本地事务与消息发送的原子性,保证了消息生成者本地事务处理成功与消息发送成功的最终一致性问题。

2.1 事务消息处理的流程

image

  1. 事务消息与普通消息的区别就在于消息生产环节,生产者首先预发送一条消息到MQ(这也被称为发送half消息)
  2. MQ接受到消息后,先进行持久化,则存储中会新增一条状态为待发送的消息
  3. 然后返回ACK给消息生产者,此时MQ不会触发消息推送事件
  4. 生产者预发送消息成功后,执行本地事务
  5. 执行本地事务,执行完成后,发送执行结果给MQ
  6. MQ会根据结果删除或者更新消息状态为可发送
  7. 如果消息状态更新为可发送,则MQ会push消息给消费者,后面消息的消费和普通消息是一样的

注意点:由于MQ通常都会保证消息能够投递成功,因此,如果业务没有及时返回ACK结果,那么就有可能造成MQ的重复消息投递问题。因此,对于消息最终一致性的方案,消息的消费者必须要对消息的消费支持幂等,不能造成同一条消息的重复消费的情况。

2.2 支持事务消息的MQ

现在目前较为主流的MQ,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ等,只有RocketMQ支持事务消息。据笔者了解,早年阿里对MQ增加事务消息也是因为支付宝那边因为业务上的需求而产生的。因此,如果我们希望强依赖一个MQ的事务消息来做到消息最终一致性的话,在目前的情况下,技术选型上只能去选择RocketMQ来解决。上面我们也分析了事务消息所存在的异常情况,即MQ存储了待发送的消息,但是MQ无法感知到上游处理的最终结果。对于RocketMQ而言,它的解决方案非常的简单,就是其内部实现会有一个定时任务,去轮训状态为待发送的消息,然后给producer发送check请求,而producer必须实现一个check监听器,监听器的内容通常就是去检查与之对应的本地事务是否成功(一般就是查询DB),如果成功了,则MQ会将消息设置为可发送,否则就删除消息。

由于并非所有的MQ都支持事务消息,假如我们不选择RocketMQ来作为系统的MQ,是否能够做到消息的最终一致性呢?答案是可以的。

3\. 基于本地消息表的最终一致性

image

基于本地消息的最终一致性方案的最核心做法就是在执行业务操作的时候,记录一条消息数据到DB,并且消息数据的记录与业务数据的记录必须在同一个事务内完成,这是该方案的前提核心保障。在记录完成后消息数据后,后面我们就可以通过一个定时任务到DB中去轮训状态为待发送的消息,然后将消息投递给MQ。这个过程中可能存在消息投递失败的可能,此时就依靠重试机制来保证,直到成功收到MQ的ACK确认之后,再将消息状态更新或者消息清除;而后面消息的消费失败的话,则依赖MQ本身的重试来完成,其最后做到两边系统数据的最终一致性。基于本地消息服务的方案虽然可以做到消息的最终一致性,但是它有一个比较严重的弊端,每个业务系统在使用该方案时,都需要在对应的业务库创建一张消息表来存储消息。针对这个问题,我们可以将该功能单独提取出来,做成一个消息服务来统一处理,因而就衍生出了我们下面将要讨论的方案。

4\. 独立消息服务的最终一致性

image

独立消息服务最终一致性本地消息服务最终一致性最大的差异就在于将消息的存储单独地做成了一个RPC的服务,这个过程其实就是模拟了事务消息的消息预发送过程,如果预发送消息失败,那么生产者业务就不会去执行,因此对于生产者的业务而言,它是强依赖于该消息服务的。不过好在独立消息服务支持水平扩容,因此只要部署多台,做成HA的集群模式,就能够保证其可靠性。在消息服务中,还有一个单独地定时任务,它会定期轮训长时间处于待发送状态的消息,通过一个check补偿机制来确认该消息对应的业务是否成功,如果对应的业务处理成功,则将消息修改为可发送,然后将其投递给MQ;如果业务处理失败,则将对应的消息更新或者删除即可。因此在使用该方案时,消息生产者必须同时实现一个check服务,来供消息服务做消息的确认。对于消息的消费,该方案与上面的处理是一样,都是通过MQ自身的重发机制来保证消息被消费。

参考

MQ消息最终一致性解决方案

版权

评论