全体架构

RocketMQ 是一个典型的发布订阅体系,经过 Broker 节点中转和耐久化数据、解耦上下游。Broker 是真实存储数据的节点,由多个水平布置但不必定完全对等的副本组构成,单个副本组的不同节点的数据会到达最终共同。RocketMQ 优异的功能体现,绕不开其优秀的存储模型。

RocketMQ 存储机制浅析

存储机制设计

在存储方法上,RocketMQ/Kafka/RabbitMQ 均选用的是音讯刷盘至所布置虚拟机/物理机的文件体系做耐久化。ActiveMQ(默许选用的 KahaDB 做音讯存储)可选用 JDBC 做音讯耐久化,经过简略的 xml 装备信息即可完成 JDBC 音讯存储。运用文件体系做耐久化的状况下,可获得更高效的 I/O 读写。

  • Broker Store 目录结构
storePathRootDir=/cache1/rocketmq/broker/data
├── abort // 该文件在 Broker 发动后会主动创建,正常封闭 Broker,该文件会主动消失。若在没有发动 Broker 的状况下,发现这个文件是存在的,则说明之前 Broker 的封闭对错正常封闭
├── checkpoint // 其间存储着 commitlog、consumequeue、index 文件的最终刷盘时刻戳
├── commitlog // 其间寄存着 commitlog 文件,而音讯是写在 commitlog 文件中的
│   ├── 00000000000000000000
│   ├── 00000000001073741824
│   └── 00000000002147483648
├── config // 寄存着 Broker 运行期间的一些装备数据
│   ├── consumerFilter.json // 消费者的过滤器
│   ├── consumerFilter.json.bak
│   ├── consumerOffset.json // offsetTable记载消费进展偏移量
│   ├── consumerOffset.json.bak
│   ├── delayOffset.json
│   ├── delayOffset.json.bak
│   ├── subscriptionGroup.json // 消费者订阅联系
│   ├── subscriptionGroup.json.bak
│   ├── topics.json // topic装备
│   └── topics.json.bak
├── consumequeue // 其间寄存着 consumequeue 文件,行列就寄存在这个目录中
│   ├── TopicTest1
│      ├── 0
│          └── 00000000000000000000  
│      └── 1
│          └── 00000000000000000000    
│   └── TopicTest2
├── index // 音讯索引文件 indexFile,加快音讯查询速度
│   └── 20230902163452641 //文件名以创建时刻戳命名
└── lock // 运行期间运用到的全局资源锁

CommitLog

RocketMQ Broker 单个实例下所有的 Topic 都运用同一个 CommitLog 来存储,即单个实例音讯全体有序。CommitLog 单个文件巨细默许 1G,文件文件名是起始偏移量,一共 20 位,左边补零,起始偏移量是 0。假定文件依照默许巨细 1G 来算:

  • 第一个文件的文件名为 00000000000000000000 ,当第一个文件被写满之后,开端写入第二个文件;
  • 第二个文件的文件名为 00000000001073741824 ,1G=1073741824=102410241024;
  • 第三个文件的文件名是 00000000002147483648,(文件名相差1G=1073741824=102410241024)。

CommitLog 依照上述命名的优点是给出任意一个音讯的物理偏移量,能够经过二分法进行查找,快速定位这个文件的方位,然后用音讯物理偏移量减去地点文件的称号,得到的差值就是在该文件中的绝对地址。

音讯存储格式

RocketMQ 存储机制浅析

  • MagicCode:MagicCode 是一个特殊的字段,它能够标志 Buffer 中的某个 CommitLog 是一个正常的CommitLog,仍是由于 Buffer 没有剩余的空间寄存该 CommitLog,导致该 CommitLog 是一个空的 CommitLog。MagicCode 有两个值,如下所示:
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586   8;
// End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586   8;
  • BodyCRC:CRC 即循环冗余校验码,是数据通信范畴中最常用的一种查错校验码,经过 CRC 就能够知道数据的正确性和完整性。RocketMQ 经过 CRC 来校验音讯部分:
if (checkCRC) {
    int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
    if (crc != bodyCRC) {
       log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
       return new DispatchRequest(-1, false/* success */);
    }
}
  • QueueId:音讯发往哪个行列,QueueId 在 Producer 发送音讯时会挑选出来。
  • QueueOffset:寄存了音讯记载应该在 ConsumerQueue 中的方位,这样构建 ConsumerQueue 的时分,就知道该条记载在 ConsummerQueue 的方位次序,在消费音讯的时分很有用途。
  • PhysicalOffset:音讯在 CommitLog 中的物理方位。需求注意的是,我们 CommitLog 对应着磁盘上的多个文件,这里的偏移量不是从某个文件开端算的,而是从第一个文件偏移开端算起的。
  • SysFlag:是 RocketMQ 内部运用的符号位,经过位运算进行符号。例如是否对音讯进行了紧缩、是否属于业务音讯。SysFlag 初始值为 0,可与下面的符号进行位运算。
  • BornTimestamp:Producer 发送音讯的时刻。
  • BornHost:Producer 发送音讯运用的套接字地址。
  • StoreTimestamp:音讯在 Broker 上存储时刻。
  • StoreHostAddress:Broker 的套接字地址,存储方法同 BornHost。
  • ReconsumeTimes:重复消费次数,初始为 0。Broker 重试的时分,这个 ReconsumeTimes 就会 1,默许最大重试次数是 16 次。
  • PreparedTransactionOffset:业务音讯相关的一个特点(RocketMQ 业务音讯依据两阶段提交)。
  • Properties:寄存了 RocketMQ 内部用到的一些特点,也寄存了用户的一些特点。

次序写

RocketMQ 的 Commitlog 文件、Consumequeue 文件都是次序写入的。磁盘次序写入速度能够到达几百兆/s,而随机写入速度只有几百 KB /s,相差上千倍。

PageCache 机制

Broker 在将音讯次序写入 CommitLog,大大提升功能。但还不够,毕竟仍是磁盘 I/O 操作,要想进一步提升功能,须利用内存。所以 Broker 将数据写入 CommitLog 文件时,不是直接写入物理磁盘文件,而是先进入 OS 的 PageCache 内存缓存,后续由 OS 后台线程异步将 PageCache 数据刷入底层磁盘文件。消费音讯时,选用随机读的方法,由于 PageCache 局部性热门原理且全体状况下仍是从旧到新的有序读,大部分 Case 音讯还能直接从 Page Cache 读,不会产生太多缺页(Page Fault)中断而从磁盘读取。

RocketMQ 存储机制浅析

  • 异步刷盘若 Broker 将音讯写入 PageCahe 并响应给生产者后突然宕机,此刻音讯在缓存中没有写入底层磁盘文件,就会形成音讯丢掉:生产者认为发送成功,实际上音讯写入失利。
  • 遇到 OS 进行脏页回写,内存收回,内存 Swap 等状况时,或许引起较大的音讯读写延迟。

扩展: Java NIO 依据零拷贝的完成

mmap:

  • FileChannel#map():把文件对象映射到虚拟内存。

  • MappedByteBuffer/DirectByteBuffer.get(): 获取内存数据。

    • 因其占用虚拟内存(非 JVM 的堆内存),不受 JVM -Xmx 参数约束,但其巨细也遭到 OS 虚拟内存巨细约束。一般一次只能映射 1.5~2G 的文件至用户态的虚拟内存空间,这也是为何 RocketMQ 默许设置单 CommitLog 日志数据文件为 1G。

sendfile:

  • FileChannel.transferFrom()/transferTo():底层调用了sendfile()内核函数。

RocketMQ 挑选 mmap 原因:

(1) sendfile 在用户态不行见,而当时场景下有读有写。

(2) 在 Linux 体系中关于 1G 的文件,mmap 处理的功能要优于 sendfile。

ConsumeQueue

ConsumeQueue 不担任存储音讯,只担任记载它所属 Topic 的音讯在 CommitLog 中的偏移量,这样当消费者从 Broker 拉取音讯的时分,就能够快速依据偏移量定位到音讯。

ConsumeQueue 存储的格式如下,包含起始物理方位偏移量音讯长度音讯Tag的哈希值,一共 20B:

RocketMQ 存储机制浅析

每个 ConsumeQueue 都有一个 QueueId,QueueId 的值为 0 到 TopicConfig 装备的行列数量。比如某个 Topic 的消费行列数量为 4,那么四个 ConsumeQueue 的 QueueId 就分别为 0、1、2、3。

消费者消费时会先从 ConsumeQueue 中查找音讯在 CommitLog 中的 Offset,再去 CommitLog 中找原始音讯数据。假如某个音讯只在 CommitLog 中有数据而没在 ConsumeQueue 中则消费者无法进行消费。

ConsumeQueue 类对应的是每个topic和queuId下面的所有文件。默许存储路径是$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每个文件由 30w 条数据组成,单个文件的巨细是 30w x 20Byte,即每个文件为 600w 字节,单个消费行列的文件巨细约为 5.722M=(600w/(1024*1024))。

ConsumeQueue 构建流程:

RocketMQ 存储机制浅析

IndexFile

Broker 除了经过 ConsumeQueue 提供给 Consumer 消费之外,还支持经过 MsgID 或者 MessageKey 来查询音讯;运用 ID 查询时,由于 ID 就是用 broker offset 生成的(这里 MsgID 指的是服务端的),所以很容易就找到对应的 CommitLog 文件来读取音讯。关于用 MessageKey 来查询音讯,MessageStore 经过构建一个 Index 来提高读取速度。IndexFile 结构如下图:

RocketMQ 存储机制浅析

Checkpoint

Checkpoint 主要记载 CommitLog、ConsumeQueue、Index 文件的刷盘时刻点,假如在上一次 Broker 反常结束时,会依据 StoreCheckpoint 的数据进行康复。


火山引擎依据字节跳动内部的大规模实践,推出的音讯行列产品包括音讯行列 Kafka / RabbitMQ / RocketMQ 版云原生音讯引擎 BMQ,欢迎咨询了解!

参考资料

深度解读 RocketMQ 存储机制

zhuanlan.zhihu.com/p/574215600…

来源团队|字节跳动 IBF 业财研发部 伍楼华