RocketMQ-CheckPoint、文件恢复、刷盘机制

CheckPoint文件

checkpoint 的作用是记录 CommitLog、ConsumeQueue、Index文件的刷盘时间点,文件固定长度为 4k,其中只用该文件的前面 24 个字节,其存储格式如下图所示。

  • physicMsgTimestamp: CommitLog文件刷盘时间点。
  • logicsMsgTimestamp: 消息消费队列文件刷盘时间点。
  • indexMsgTimestamp: 索引文件刷盘时间点

更新ConsumeQueue和IndexFile

消息消费队列文件、消息属性索引文件都是基于 CommitLog 文件构建的,当消息生产者提交的消息存储在 CommitLog 文件中,ConsumeQueue、IndexFile需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ 通过开启一个线程 ReputMessageService 来准时转发 CommitLog 文件更新事件,相应的任务处理器根据转发的消息及时更新 ConsumeQueue、IndexFile文件。

文件恢复

由于 RocketMQ 存储首先将消息全量存储在 CommitLog 文件中,然后异步生成转发任务更新 ConsumeQueue、Index 文件。如果消息成功存储到 CommitLog 文件中,转发任务未成功执行,此时消息服务器 Broker 由于某个原因宕机,导致 CommitLog、ConsumeQueue、IndexFile文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在 CommitLog 文件中存在,但由于并没有转发到 ConsumeQueue,这部分消息将永远不会被消费者消费。

接下来我们看一看 RocketMQ Broker 的启动过程:

  • 判断上一次退出是否正常。
    • Broker在启动时创建abort文件,在退出时通过注册 JVM 钩子函数删除 abort 文件。如果下一次启动时存在 abort 文件。 说明 Broker 是异常退出的,CommitLog 与 ConsumeQueue 数据有可能不一致,需要进行修复。
    • 加载延迟队列,RocketMQ 定时消息相关。
    • 加载所有 CommitLog 文件,如果文件大小和配置单文件大小不一致则忽略,创建好了将wrotePosition、flushedPosition, committedPosition三个指针都指向文件结尾。后面的恢复过程会将这些指针修正。
    • 加载消息 ConsumeQueue文件。与加载 CommitLog 大致相同。
    • 加载存储检测点,检测点主要记录 commitLog 文件、ConsumeQueue 文件、Index 索引文件的刷盘点。
    • 加载索引文件,如果上次异常退出,而且索引文件上次刷盘时间小于该索引文件最大的消息时间戳该文件将立即销毁。
    • 根据 Broker 是否是正常停止执行不同的恢复策略,下文将分别介绍异常停止、正常停止的文件恢复机制。
    • 恢复 ConsumeQueue 文件后,将在 CommitLog 实例中保存每个消息消费队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列 ID 还存储了消息队列偏移量的关键所在。

Broker 正常停止

  • Broker正常停止再重启时,从倒数第三个文件开始进行恢复,如果不足 3 个文件,则从第一个文件开始恢复。
  • 从要恢复的 CommitLog 中,按照读到的消息大小读出消息正文,然后使用CRC(循环冗余校验)判断消息是否正确。
  • 遍历 CommitLog 文件,每次取出一条消息,如果检查结果为 true 并且消息的长度大于 0 表示消息正确,校验指针移动到本条消息的末尾;如果查找结果为 true 并且消息的长度等于 0,表示已到该文件的末尾,如果还有下一个文件需要检查,则循环步骤3,否则跳出循环; 如果查找结构为 false,表明该文件未填满所有消息,跳出循环,结束遍历文件。
  • 通过步骤 3,最终会得到一个校验通过的偏移 offset,通过它来更新 commit 指针和 flush 指针。
  • 删除 offset 之后的所有文件。

正常停止的时,Broker 会将 IndexFile 和 ConsumeQueue 都更新好,所以如果 Broker 正常停止的话,恢复过程只是修正commit 指针和 flush 指针。Broker 异常停止异常文件恢复的步骤与正常停止文件恢复的流程基本相同,其主要差别有两个。

首先,正常停止默认从倒数第三个文件开始进行恢复,而异常停止则需要从最后一个文件往前走,找到第一个消息存储正常的文件。
其次,如果 CommitLog 目录没有消息文件,如果在消息消费队列 ConsumeQueue 目录下存在文件,则需要销毁。 如何判断一个 CommitLog 文件是正确的呢?

  1. 首先判断文件的魔数
  2. 如果文件中第一条消息的存储时间等于 0,则认为文件无效
  3. 对比文件第一条消息的时间戳与检测点,文件第一条消息的时间戳小于文件检测点 checkpoint 说明该文件部分消息是可靠的,则从该文件开始恢复。
  4. 如果根据前 3 步算法找到了合法的 CommitLog,则遍历 CommitLog 中的消息,验证消息的合法性,并将消息重新转发到消息消费队列与索引文件,这样会造成 ConsumeQueue 的冗余,这需要消息的消费者来实现幂等性。
  5. 如果步骤3未找到有效 CommitLog,则设置 CommitLog 目录的 flush 指针、 commit 指针都为 0,并销毁消息消费队列文件。

异常停止时,不确定 ConsumeQueue 和 IndexFile 是否正确,所以从最后一个有效文件,重新发送 CommitLog 变动事件,从而触发 ConsumeQueue 和 IndexFile 的更新。

刷盘机制

RocketMQ 的存储与读写是基于 JDK NIO 的内存映射机制(MappedByteBuffer)的,消息存储时首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。

如果是同步刷盘,消息追加到内存后,将同步调用 MappedByteBuffer 的 force 方法;

如果是异步刷盘,在消息追加到内存后立刻返回给消息发送端。RocketMQ 使用一个单独的线程按照某一个设定的频率执行刷盘操作。

通过在 broker 配置文件中配置 flushDiskType 来设定刷盘方式,
可选值为

  • ASYNC_FLUSH (异步刷盘) 默认为异步刷盘
  • SYNC_FLUSH (同步刷盘)

ConsumeQueue、IndexFile 刷盘的实现相对于 CommitLog 刷盘机制来说都很简单,

  • ConsumeQueue 是周期性刷盘,
  • IndexFile索引文件的刷盘并不是采取定时刷盘机制,而是每次想要更新一次索引文件就会将之前的改动刷写到磁盘。

接下来主要介绍 CommitLog 的刷盘过程。

同步刷盘

同步刷盘,指的是在消息追加到内存映射文件的内存中后,立即将数据从内存刷写到磁盘文件;
CommitLog 中有一个刷盘服务 GroupCommitService,所有消息发送线程接收到的同步写入请求,最终都会以请求-回应的方式通知 GroupCommitService 代其进行刷盘操作。
当 GroupCommitService 执行完刷盘任务,或者刷盘任务执行超时时,发送线程才会回复消息的 Producer。

异步刷盘

异步刷盘根据是否开启 transientStorePoolEnable 机制,刷盘实现会有细微差别。

如果 transientStorePoolEnable 为 true, RocketMQ 会单独申请一个与目标物理文件 (CommitLog) 同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到 FileChannel 中,再 flush 到磁盘。 如果 transientStorePoolEnable 为 false,消息直接追加到与物理文件直接映射的内存中,然后刷写到磁盘中。

当 transientStorePoolEnable 为 true时,会有一个 CommitRealTimeService 默认每隔 200ms 将直接内存中的数据提交到 FileChannel,一次提交默认至少要包含 4 个页的数据,否则暂时不提交。

当 transientStorePoolEnable 为 false 时,这个 CommitRealTimeService 实际上什么都没做。

然后是定时刷盘的逻辑,CommitLog 会有一个 FlushRealTimeService 定时将数据刷入磁盘,默认每隔 10s 进行一次刷盘,和 commit 过程一样,刷盘阶段默认也是至少攒够 4 个页的脏数据才进行刷盘,当 transientStorePoolEnable 为 true时,刷盘过程调用的是 FileChannel 的 force,否则调用的是 MappedByteBuffer 的 force。

过期删除机制

由于 RocketMQ 操作 CommitLog、ConsumeQueue文件是基于内存映射机制并在启动的时候会加载 CommitLog、ConsumeQueue 目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引人一种机制来删除己过期的文件。

RocketMQ 顺序写 CommitLog 文件、ConsumeQueue 文件,所有写操作全部落在最后一个 CommitLog 或 ConsumeQueue 文件上,之前的文件在下一个文件创建后将不会再被更新。

RocketMQ 清除过期文件的方法是: 如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ 不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为 72 小时 ,通过在 Broker 配置文件中设置 fileReservedTime 来改变过期时间,单位为小时。RocketMQ 会每隔 10s 调度一次清除过程,检测是否需要清除过期文件。

消息消费

消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式与广播模式两种消费模式。

  • 集群模式,主题下的同一条消息只允许被其中一个消费者消费。
  • 广播模式,主题下的同一条消息将被集群内的所有消费者消费一次。

消息服务器与消费者之间的消息传送也有两种方式:

  • 推模式
  • 拉模式。

所谓的拉模式,是消费端主动发起拉消息请求,而推模式是消息到达消息服务器后,推送给消息消费者。

RocketMQ 消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。

消息队列负载机制遵循一个通用的思想:
一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。

RocketMQ 支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。

不支持消息全局顺序消费,如果要实现某一主题的全局顺序消息消费,可以将该主题的队列数设置为 1,牺牲高可用性。

RocketMQ 支持两种消息过滤模式:表达式(TAG、SQL92)与类过滤模式。

消费者启动

构建主题订阅信息订阅目标topic订阅重试主题消息。
RocketMQ消息重试是以消费组为单位,而不是主题,消息重试主题名为 %RETRY% + 消费组名。消费者在启动的时候会自动订阅该主题,参与该主题的消息队列负载。初始化消息进度。

如果消息消费是集群模式,那么消息进度保存在 Broker 上;
如果是广播模式,那么消息消费进度存储在消费端。根据是否是顺序消费,创建消费端消费线程服务。ConsumeMessageService 主要负责消息消费,内部维护一个线程池。

消息拉取

我们会基于 PUSH 模型来介绍拉取机制,因为其内部包括了 PULL 模型。
消息消费有两种模式:

  • 广播模式
  • 集群模式

广播模式比较简单,每一个消费者需要去拉取订阅主题下所有消费队列的消息;
在集群模式下,同一个消费组内有多个消息消费者,同一个主题存在多个消费队列,消息队列负载,通常的做法是一个消息队列在同一时间只允许被一个消息消费者消费,一个消息消费者可以同时消费多个消息队列。
RocketMQ 使用一个单独的线程 PullMessageService 来负责消息的拉取。

public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}

PullMessageService 从服务端拉取到消息后,会根据消息对应的消费组,转给该组对应的 ProcessQueue,而 ProcessQueue 是 MessageQueue 在消费端的重现、快照。

PullMessageService 从消息服务器默认每次拉取 32 条消息,按消息的队列偏移量顺序存放在 ProcessQueue 中,PullMessageService 然后将消息提交到消费者消费线程池,消息成功消费后从 ProcessQueue 中移除。

消息拉取分为 3 个主要步骤。

  • 消息拉取客户端消息拉取请求封装。
  • 消息服务器查找并返回消息。
  • 消息拉取客户端处理返回的消息。

发送拉取请求

  1. 判断队列状态,如果不需要拉取则退出

  2. 进行消息拉取流控

    • 消息处理总数
    • 消息偏移量跨度
  3. 查询路由表,找到要发送的目标 Broker 服务器,

  4. 如果没找到就更新路由信息如果消息过滤模式为类过滤,则需要根据主题名称、broker地址找到注册在 Broker上的 FilterServer 地址,从 FilterServer 上拉取消息,否则从 Broker 上拉取消息

  5. 发送消息

Broker组装消息

1 根据订阅信息,构建消息过滤器

  • tag 过滤器只会过滤 tag 的 hashcode,为了追求高效率
  • SQL 过滤为了避免每次执行 SQL表达式,构建了 BloomFilter,在 Redis 防止缓存击穿那里我们也用过它
  1. 根据主题名称与队列编号获取消息消费队列
  2. 根据拉取消息偏移量,进行校对,
  3. 如何偏移量不合法,则返回相应的错误码如果待拉取偏移量大于 minOffset 并且小于 maxOffs 时,从当前 offset 处尝试拉取 32 条消息,根据消息队列偏移量(ConsumeQueue)从 CommitLog 文件中查找消息
  4. 根据 PullResult 填充 responseHeader 的 nextBeginOffset、 minOffset、 maxOffset
  5. 如果主 Broker 工作繁忙,会设置 flag 建议消费者下次从 Slave 节点拉取消息
  6. 如果 CommitLog 标记可用并且当前节点为主节点,则更新消息消费进度

拓展一下:

Bloom Filter是一种空间效率很高的随机数据结构,它的原理是,当一个元素被加入集合时,通过K个Hash函数将这个元素映射成一个位阵列(Bit array)中的K个点,把它们置为1。

检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检索元素一定不在;

如果都是1,则被检索元素很可能在。这就是布隆过滤器的基本思想。

但Bloom Filter的这种高效是有一定代价的:在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive)。因此,Bloom Filter不适合那些“零错误”的应用场合。而在能容忍低错误率的应用场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省

客户端处理消息

解码成消息列表,并进行消息过滤

  • 这里之所以还要进行过滤,是因为 Broker 为了追求效率只会根据 tag 的 hashcode 进行过滤,真实 key string 的对比,下放到 Consumer 上进行更新

  • PullRequest 的下一次拉取偏移量,如果过滤后没有一条消息的话,则立即触发下次拉取

  • 首先将拉取到的消息存入 ProcessQueue,然后将拉取到的消息提交到 ConsumeMessageService 中供消费者消费,该方法是一个异步方法,也就是 PullCallBack 将消息提交到 ConsumeMessageService 中就会立即返回

  • 根据拉取延时,适时进行下一次拉取

RocketMQ 并没有真正实现推模式,而是消费者主动向消息服务器拉取消息,RocketMQ 推模式是循环向消息服务端发送消息拉取请求,如果消息消费者向 RocketMQ 发送消息拉取时,消息并未到达消费队列,会根据配置产生不同效果:

  • 不启用长轮询机制:
    在服务端等待 shortPollingTimeMills=1s 时间后(挂起)再去判断消息是否已到达消息队列,如果消息未到达则提示消息拉取客户端 PULL_NOT_FOUND (消息不存在)
  • 开启长轮询模式: RocketMQ 一方面会每 5s 轮询检查一次消息是否存在,同时一有新消息到达后立马通知挂起线程再次验证新消息是否是自己感兴趣的消息,
    如果是, 则从 CommitLog 文件提取消息返回给消息拉取客户端,
    否则等到挂起超时,
    超时时间由消息拉取方在消息拉取时封装在请求参数中,PUSH 模式默认为 15s

当新消息达到 CommitLog 时,ReputMessageService 线程负责将消息转发给 ConsumeQueue、IndexFile,如果 Broker 端开启了长轮询模式并且角色主节点,则最终将调用 PullRequestHoldService 线程的 notifyMessageArriving 方法唤醒挂起线程,判断当前消费队列最大偏移量是否大于待拉取偏移量,如果大于则拉取消息。长轮询模式使得消息拉取能实现准实时。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注