RocketMQ-消息存储

消息存储

Broker介绍到消息后,是怎么存储消息的。
RocketMQ主要存储的文件包括CommitLog文件、ConsumeQueue文件、IndexFile文件。
RocketMQ将所有主题的消息存储在同一个文件中,确保消息发送时顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量。但由于消息中间件一般是基于消息主题的订阅机制,这样便给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率,RocketMQ引入了ConsumeQueue消息队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。IndexFile索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从CommitLog文件中检索消息。

磁盘有时候会比你想象的快很多,有时候也会比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s,超过了一般网卡的传输速度,这是磁盘比想象的快的地方。但是磁盘随机写的速度只有大概1OOKB/s, 和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。

存储方案

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog, ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

CommitLog以物理文件的方式存放,每台Broker上的CommitLog被本机器所有ConsumeQueue共享。在CommitLog中,一个消息的存储长度是不固定的,RocketMQ采取一些机制,尽量向CommitLog中顺序写,但是随机读。ConsumeQueue的内容也会被写到磁盘里作持久存储,只不过是通过异步刷盘的方式进行。

思考点:为什么这样设计?

  1. CommitLog顺序写,可以大大提高写入效率。接收到消息时,只有CommitLog是需要同步刷盘的(根据配置,可能也不需要同步刷盘),其他文件都是异步保存,如果发生了宕机,RocketMQ可以根据CommitLog恢复ConsumeQueue文件和IndexFile。
  2. 虽然CommitLog是随机读,但是利用操作系统的page cache机制,可以批量地从磁盘读取,cache存到内存中之后,加速后续的读取速度。
  3. 为了保证完全的顺序写,需要ConsumeQueue这个中间结构,因为ConsumeQueue里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证CommitLog和ConsumeQueue的一致性,CommitLog里存储了Consume Queues、Message keys、Tags等所有信息,即使ConsumeQueue丢失,也可以通过commitLog完全恢复出来。

下图是一个Broker在文件系统中存储的各个文件。我们可以看到CommitLog文件夹、ConsumeQueue文件夹,还有在config文件夹中Topic、Consumer的相关信息。最下面那个文件夹index存的是索引文件,这个文件用来加快消息查询的速度。

commitlog:消息存储目录。

  • config: 运行期间一些配置信息,主要包括下列信息
    • consumerFilter.json:主题消息过滤信息。
    • consumerOffset.json:集群消费模式消息消费进度。
    • delayOffset.json:延时消息队列拉取进度。
    • subscriptionGroup:消息消费组配置信息。
    • topic.json:topic配置属性。
  • consumequeue:消息消费队列存储目录。
  • index:消息索引文件存储目录。
  • abort:如果存在 abort文件说明Broker非正常关闭,该文件默认启动时创建,正常退出之前删除。
  • checkpoint:文件检测点存储commitlog文件最后一次刷盘时间戳
  • consumequeue最后一次刷盘时间、index索引文件最后一次刷盘时间戳。

存储流程

  1. 存储流程如果当前Broker停止工作或Broker为SLAVE角色或当前Broker不支持写入则拒绝消息写入;如果消息主题长度超过256个字符、消息属性长度超过65536个字符将拒绝该消息写入。
  2. 如果消息的延迟级别大于0,将消息的原主题名称与原消息队列ID存入消息属性中,用延迟消息主题SCHEDULE_TOPIC、消息队列ID更新原先消息的主题与队列。
  3. 获取当前可以写入的CommitLog文件
  4. 在写入CommitLog之前,先申请putMessageLock,也就是将消息存储到CommitLog文件中是串行的
  5. 设置消息的存储时间,如果CommitLog文件不存在就需要创建新的文件
  6. 创建全局唯一消息ID
  7. 获取该消息在消息队列的偏移量
  8. 计算消息总长度,并写入CommitLog
  9. 如果计算发现CommitLog无法存储所有内容,则创建新的CommitLog,文件名为即将插入消息的偏移
  10. 将消息内容写入CommitLog文件后根据配置进行同步或者异步刷盘
  11. 更新逻辑偏移量,并释放putMessageLock
  12. 根据CommitLog偏移量,消息存储大小,tag的hash值插入一条Message到ConsumeQueue
  13. 根据Key的hash值,CommitLog偏移量,插入一条数据到IndexFile
  14. ConsumerQueue每隔一段时间自动刷盘、IndexFile在每次创建新indexFile时刷盘之前的索引文件、checkpoint文件在刷盘CommotLog,ConsumeQueue和IndexFile时进行更新

Commit文件

Commit文件commitlog 目录的组织方式在前面已经详细介绍过了,该目录下的文件主要存储消息,其特点是每一条消息长度不相同,CommitLog 文件存储的逻辑视图如下图所示,每条消息的前面4个字节存储该条消息的总长度。整个 CommitLog 文件默认大小为 1G

在查找消息时,

  1. 需要先根据要查找的消息偏移找到消息所在的文件,
  2. 然后根据消息偏移与文件大小取余,得到消息在文件中的位置,
  3. 最后根据消息大小,取出指定长度的消息内容。

public SelectMappedBufferResult getMessage(final long offset, final int size) {
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
    if (mappedFile != null) {
        int pos = (int) (offset % mappedFileSize);
        return mappedFile.selectMappedBuffer(pos, size);
    }
    return null;
}

ConsumeQueue文件RocketMQ 基于主题订阅模式实现消息消费,消费者关心的是一个主题下的所有消息,但由于同一主题的消息不连续地存储在 CommitLog 文件中,试想一下如果消息消费者直接从消息存储文件(CommitLog)中去遍历查找订阅主题下的消息,效率将极其低下,RocketMQ 为了适应消息消费的检索需求,设计了消息消费队列文件(ConsumeQueue),该文件可以看成是 CommitLog 关于消息消费的“索引”文件,消息主题,第二级目录为主题的消息队列

为了加快检索速度,并且减少空间使用,ConsumeQueue 不会存储所有消息正文,只会存储如下内容:

单个 ConsumeQueue 文件中默认包含 30 万个条目,单个文件的长度为 30w × 20 ≈ 6M 字节, 单个 ConsumeQueue 文件可以看出是一个 ConsumeQueue 条目的数组,其下标为 ConsumeQueue 的逻辑偏移量,消息消费进度存储的偏移量即逻辑偏移量。 ConsumeQueue 即为 CommitLog 文件的索引文件, 其构建机制是当消息到达 CommitLog 文件后, 由专门的线程产生消息转发任务,从而构建消息消费队列文件与下文提到的索引文件。


public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
    /*startIndex 消息索引*/
    int mappedFileSize = this.mappedFileSize;
    // 根据消息索引 * 20 得到在 ConsumeQueue 中的物理偏移
    long offset = startIndex * CQ_STORE_UNIT_SIZE;
    if (offset >= this.getMinLogicOffset()) {
        // 找到物理索引所在的文件
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
        if (mappedFile != null) {
            // 物理索引与文件大小取余,得到数据存储的位置,然后通过MappedByteBuffer的到内存映射Buffer
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
            return result;
        }
    }
    return null;
}

根据 startIndex 获取准备消费的条目。首先 startIndex * 20 得到在 ConsumeQueue 中的物理偏移量,如果该 offset 小于 minLogicOffset,则返回 null,说明该消息已被删除;

如果大于 minLogicOffset,则根据偏移量定位到具体的物理文件,然后通过 offset 与物理文大小取模获取在该文件的偏移量,最终的到从 startIndex 开始,到该 ConsumeQueue 有效结尾的所有数据对应的 MappedByteBuffer。

除了根据消息偏移量查找消息的功能外,RocketMQ 还提供了根据时间戳查找消息的功能,具体实现逻辑如下:

  • 首先根据时间戳定位到 ConsumeQueue 物理文件,就是从第一个文件开始找到第一个文件更新时间大于该时间戳的文件。
  • 然后对 ConsumeQueue 中的所有项,使用二分查找,查询每条记录对应的 CommitLog 的最后更新时间和要查询的时间戳
  • 最终找到与时间戳对应的 ConsumeQueue 偏移,或者离时间戳最近的消息的 ConsumeQueue 偏移

Index索引文件

消息消费队列是 RocketMQ 专门为消息订阅构建的索引文件,提高根据主题与消息队列检索消息的速度,另外 RocketMQ 引入了 Hash 索引机制为消息建立索引,HashMap 的设计包含两个基本点: Hash槽 与 Hash 冲突的链表结构。

从图中可以看出,indexFile 总共包含 IndexHeader、 Hash 槽、 Hash 条目(数据)。IndexHeader IndexHeader头部,包含 40 个字节,记录该 IndexFile 的统计信息,其结构如下。

  • beginTimestamp: 该索引文件中包含消息的最小存储时间。
  • endTimestamp: 该索引文件中包含消息的最大存储时间。
  • beginPhyOffset: 该索引文件中包含消息的最小物理偏移量(CommitLog 文件偏移量)。
  • endPhyOffset:该索引文件中包含消息的最大物理偏移量(CommitLog 文件偏移量)。
  • hashSlotCount: hashSlot个数,并不是 hash 槽使用的个数,在这里意义不大。
  • indexCount: Index条目列表当前已使用的个数,Index条目在Index条目列表中按顺序存储。
  • Hash槽 Hash槽,一个 IndexFile 默认包含500万个 Hash 槽,每个 Hash 槽存储的是落在该 Hash 槽的 hashcode 最新的 Index 的索引。
  • Hash 条目 Index条目列表,默认一个索引文件包含 2000 万个条目,每一个 Index 条目结构如下。
    • hashcode: key 的 hashcode。
    • phyOffset: 消息对应的物理偏移量。
    • timeDif:该消息存储时间与第一条消息的时间戳的差值,小于 0 该消息无效。
    • preIndexNo:该条目的前一条记录的 Index 索引,当出现 hash 冲突时,构建的链表结构。

Index文件的写入步骤如下:

  • 如果当前已使用条目大于等于允许最大条目数时,则返回 false,表示当前索引文件已写满。如果当前索引文件未写满则根据 key 算出 key 的 hashcode,然后 keyHash 对 hash 槽数量取余定位到 hashcode 对应的 hash 槽下标, hashcode对应的hash槽的物理地址 = IndexHeader 头部(40字节) + 下标 * 每个 hash 槽的大小(4字节)。
  • 读取 hash 槽中存储的数据,如果 hash 槽存储的数据小于 0 或大于当前索引文件中存储的最大条目,则将该槽的值设置为 0。
  • 将条目信息存储在 IndexFile 中。计算新添加条目的起始物理偏移量,等于头部字节长度 + hash 槽数量单个 hash 槽大小(4个字节) + 当前 Index 条目个数单个 Index 条目大小(20个字节)。依次将 hashcode、消息物理偏移量、时间差timeDif、原来 Hash 槽的值存入该索引条目中。将新添加的索引条目索引存入 hash 槽中,覆盖原来的值。
  • 更新文件索引头信息。

至此,索引文件的写入套路就已经介绍完了,它通过 hash 槽存储了 hash 冲突链表的头指针,然后每个索引项都保存了前一个索引项的指针,借此,在文件存储中实现了链表的数据结构。
当根据 key 查找消息时,不光可以设置要查找 key 还可以设置最大查找数量,开始时间戳,结束时间戳,操作步骤如下:

  • 根据 key 计算 hashcode,然后 keyHash 对 hash 槽数量取余定位到 hashcode 对应的 hash 槽下标。

  • 如果对应的 Hash 槽中存储的数据小于 1 或大于当前索引条目个数则表示该 HashCode 没有对应的条目,直接返回。

  • 由于会存在 hash 冲突,根据 slotValue 定位该 hash 槽最新的一个 Item 条目,将存储的物理偏移加入到 phyOffsets 中 ,然后继续验证Item条目中存储的上一个 Index 下标,如果大于等于 1 并且小于最大条目数,则继续查找,否则结束查找。

  • 根据 Index 下标定位到条目的起始物理偏移量,然后依次读取 hashcode、 物理偏移量、时间差、上一个条目的Index下标,循环步骤4。

  • 如果存储的时间差小于 0,则直接结束;如果 hashcode 匹配并且消息存储时间介于待查找时间start、 end之间则将消息物理偏移量加入到phyOffsets

  • 验证条目的前一个 Index 索引,如果索引大于等于 1 并且小于Index条目数,则继续查找,否则结束整个查找。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注