RocketMQ 消息可靠原理

一、链路上的可靠性保障

1. 消息持久化

基础介绍

分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。

  • MQ收到一条消息后,需要向生产者返回一个ACK响应,并将消息存储起来。(告诉生产者已经将消息进行存储)
  • MQ Push一条消息给消费者后,等待消费者的ACK响应,需要将消息标记为已消费。如果没有标记为已消费,MQ会不断的尝试往消费者推送这条消息。
  • MQ需要定期删除一些过期的消息,这样才能保证服务一直可用。

RocketMQ采用直接用磁盘文件来保存消息,而不需要借助MySQL这一类索引工具。rocketmq默认文件–commitLog。

abort文件: 这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。

CommitLog

CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog 是由多个 MappedFile 组成的逻辑日志,每个 MappedFile 对应一个固定大小(默认 1GB)的物理文件。以起始偏移量为文件名,固定 20 位,通过消息的物理 offset,就可以直接计算出所在的 MappedFile 以及文件内偏移位置,实现 O(1) 的定位。这种设计配合顺序写,大幅提升了磁盘 IO 性能,是 RocketMQ 高吞吐的基础。

ConsumerQueue

ConsumerQueue:消息消费队列,可以认为是 CommitLog 中消息的索引。一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。因为 CommitLog 是糅合了所有主题的消息,所以通过索引才能更加高效的查找消息。ConsumeQueue 存储的条目是固定大小,只会存储 8 字节的 commitlog 物理偏移量,4 字节的消息长度和 8 字节 Tag 的哈希值,固定 20 字节。在实际存储中,ConsumeQueue 对应的是一个Topic 下的某个 Queue,每个文件最大约 5.72M,由 30w 条数据组成。消费者是先从 ConsumeQueue 来得到消息真实的物理地址,然后再去 CommitLog 获取消息。

为了提高效率,会为每个Topic在~/store/consumequeue中创建一个目录,目录名为Topic名称。在该Topic目录下,会再为每个该Topic的Queue建立一个目录,目录名为queueId,每个目录中存放着若干ConsumeQueue文件,ConsumeQueue文件是commitlog的索引文件,可以根据ConsumeQueue定位到具体的消息。一个consumequeue文件中所有消息的Topic一定是相同的。但每条消息的Tag可能是不同的。

Broker 先通过 ConsumeQueue 定位消息在 CommitLog(MappedFile)中的位置,然后再读取消息内容并发送给 Consumer。

2. 生产者确认 ACK

刷盘机制

RocketMQ需要将消息存储到磁盘上,这样才能保证断电后消息不会丢失。同时这样才可以让存储的消息量可以超出内存的限制。

RocketMQ为了提高性能,会尽量保证磁盘的顺序写。消息在写入磁盘时,有两种写磁盘的方式,同步刷盘和异步刷盘。

同步刷盘: 在返回写成功状态时,消息已经被写入磁盘。

异步刷盘: 在返回写成功状态时,消息可只是被写入了内存,当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

在 RocketMQ 中,消息的”确认时机”取决于发送模式和刷盘策略:

  • 同步刷盘模式: Broker 会先将消息持久化到 CommitLog 再返回发送成功,保证消息安全性。
  • 异步刷盘模式: Broker 写入内存映射后立即返回确认,消息刷盘到磁盘由后台线程异步完成。

3. 消费者确认 ACK

4. 重试机制

首先对于广播模式的消息, 是不提供可靠的消息重试的机制的,即消息消费失败后,不会再重新进行发送,而只是继续消费新的消息。而对于集群模式的消息,当消费者消费消息失败后(例如消费业务发生异常),会自动重试。

重试消息如何处理: 重试的消息会进入一个 “%RETRY%” + ConsumeGroup 的队列中,注意:ConsumeGroup为消费组名称。RocketMQ默认允许每条消息最多重试16次。

如果消息重试16次后仍然失败,消息将不再投递。转为进入死信队列。另外一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。然后关于这个重试次数,RocketMQ可以进行定制。例如通过maxReconsumeTimes;将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2小时。

5. 事务消息

消息的发送与本地事务操作保持一致,消息要么最终被发送(Commit),要么被回滚(Rollback),保证分布式事务一致性。

RocketMQ 的事务消息分为 三种状态:

1. Prepared(准备)

Producer 发送消息到 Broker,但 消息状态为半消息,不对 Consumer 可见。

2. Commit(提交)

本地事务执行成功,Producer 告诉 Broker 将消息状态改为可消费。

3. Rollback(回滚)

本地事务失败,Producer 告诉 Broker 删除消息,Consumer 不可见。

本地事务只能保证单系统原子性,无法解决分布式场景下”本地事务成功但消息未发送,或者消息已发送但本地事务失败”的问题。

RocketMQ 的事务消息通过 半消息 + 本地事务 + Broker 回查 机制保证:

1️⃣ 本地事务成功 → 消息提交 → Consumer 可消费

2️⃣ 本地事务失败 → 消息回滚 → 不被消费

3️⃣ 异常情况下 Broker 回查 Producer,最终状态一致

从而实现本地事务和消息发送的最终一致性,解决分布式事务问题。

异常情况

在 RocketMQ 事务消息中,如果 Producer 崩溃或网络异常导致 Broker 不确定消息状态,Broker 会对半消息进行定期回查,向 Producer 请求事务状态。Producer 根据本地事务日志返回 Commit 或 Rollback。通过这个回查机制,即使在异常情况下,也能保证消息最终状态与本地事务一致,从而实现分布式事务的最终一致性。

如果超过配置的最大回查次数或回查时间仍未得到明确回复,则触发 回查超时策略:Broker 可以默认回滚消息,或记录异常等待人工干预,从而防止半消息无限积压,保证系统健康和最终一致性。

二、业务上的可靠性保障

1. 顺序消费

消息的顺序指的是消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才有意义。但同时订单之间又是可以并行消费的。RocketMQ是通过将”相同ID的消息发送到同一个队列,而一个队列的消息只由一个消费者处理”来实现顺序消息。这样对于同一个订单的创建、付款和完成消息,确保按照这一顺序被发送和消费。

对于同一个业务标识(例如订单ID)的消息,消费时必须保持发送顺序。不同业务标识之间的消息可以并行消费,不影响整体吞吐。生产端保证路由一致性,同 ID 消息路由到同一个队列,Producer 在发送消息时可以指定 MessageQueueSelector。

2. 幂等性 / 数据唯一性问题

在MQ系统中,对于消息幂等有三种实现语义:

  • at most once 最多一次: 每条消息最多只会被消费一次
  • at least once 至少一次: 每条消息至少会被消费一次
  • exactly once 刚刚好一次: 每条消息都只会确定的消费一次

这三种语义都有他适用的业务场景。其中,at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。而at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。而这个exactly once是MQ中最理想也是最难保证的一种语义,需要有非常精细的设计才行。RocketMQ只能保证at least once,保证不了exactly once。所以,使用RocketMQ时,需要由业务系统自行保证消息的幂等性,避免消息重复消费问题。

消息重复场景

在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

1. 发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

2. 投递时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

幂等性保障方案

从上面的分析中,我们知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。

而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。