RocketMQ-队列负载均衡

队列负载均衡

在 RocketMQ 中,Consumer 端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在 Push 模式只是对 Pull 模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。

在两种基于拉模式的消费方式(Push/Pull)中,均需要 Consumer 端在知道从 Broker 端的哪一个消息队列—队列中去获取消息。因此,有必要在 Consumer 端来做负载均衡,即 Broker 端中多个 MessageQueue 分配给同一个 ConsumerGroup 中的哪些 Consumer 消费。在 Consumer 启动后,它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker 端在收到 Consumer 的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量 ConsumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量 ChannelInfoTable 中,为之后做 Consumer 端的负载均衡提供可以依据的元数据信息。

Consumer 的 RebalanceService 会每隔20s执行一次负载均衡。它会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。因为广播模式,每个 Consumer 都会订阅所有队列的内容,实现很简单,所以这里主要来看下集群模式下的主要处理流程:

从本地缓存变量 TopicSubscribeInfoTable 中,获取该 Topic 主题下的消息消费队列集合

  • 向各个 Broker 端发送获取该消费组下消费者Id列表
  • 先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列
  • 根据计算出来的新负载均衡结果,更新本地的队列消费任务

删除已经不由自己负责的队列消费任务添加新的由自己负责的队列消费任务,从 Broker 中读取该队列的消费偏移 Offset,然后开始消费任务

RocketMQ 的负载均衡过程并没有通过选主分配的过程进行,而是各个节点自行计算,我觉得主要是为了实现方便,而且 RocketMQ 也不追求一个消息只被消费一次,如果负载均衡的结果出现了短暂冲突(最终应该会趋于一致),也可以靠 Consumer 实现幂等性解决。

消息消费过程

  1. 当 Consumer 拉取收新的消息时,会将这些消息以 32 个为一组,提交给消息消费者线程池
  2. 线程池进行实际消费时,会确认当前消息队列是否仍然归自己管辖(重新负载均衡时,将该队列分配给了别的消费者)
  3. 恢复延时消息主题名

RocketMQ 将消息存入 CommitLog 文件时,如果发现消息是延时消息,会首先将原主题存入在消息的属性中,然后设置主题名称为 SCHEDULE_TOPIC,以便时间到后重新参与消息消费。

  • 执行具体的消息消费函数,最终将返回 CONSUME_SUCCESS (消费成功)或 RECONSUME_LATER (需要重新消费)
  • 如果业务代码返回 RECONSUME_LATER,根据模式作出不同的处理
    • 广播模式:什么都不处理,只打印log
    • 集群模式:回发消费失败的消息,进行重新消费,如果发送失败,则再次尝试自己消费

根据消费成功的消息,计算消费者线程池的剩余消息数量和大小,然后更新offset

由于可能会出现一组消息只有后半段被消费成功的情况,所以最终的 offset 为剩余消息池中最小的 offset,这就势必会出现重复消费

作者:贝克街的流浪猫
链接:https://zhuanlan.zhihu.com/p/360911990
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

重试消息

RocketMQ 提供了几个重试消息的延时级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 1Om 20m 30m 1h 2h,同时也有消息最大重新消费次数,如果超过了最大重新消费次数则会被单独存储起来,等待人工处理。

重试消息会被存入名为"%RETRY%+消费组名称"的主题中,原始主题会存入属性中。然后会基于定时任务机制,在到期时将任务再次拉取出来。

消费进度管理

  • 广播模式: 同一个消费组的所有消息消费者都需要消费主题下的所有消息,也就是同组内的消费者的消息消费行为是对立的,互相不影响,故消息进度需要独立存储,最理想的存储地方应该是与消费者绑定。这些数据最终会存储在 Consumer 节点的磁盘文件中,采用周期性刷盘的形式存储。

  • 集群模式: 同一个消费组内的所有消息消费者共享消息主题下的所有消息,同一条消息(同一个消息消费队列)在同一时间只会被消费组内的一个消费者消费,并且随着消费队列的动态变化重新负载,所以消费进度需要保存在一个每个消费者都能访问到的地方————Broker,在需要更新 Offset 时,会以网络请求的形式更新 Broker 中存储的 Offset。

消费者线程池每处理完一个消息消费任务(ConsumeRequest)时会从 ProcessQueue 中移除本批消费的消息,并返回 ProcessQueue 中最小的偏移量,用该偏移量更新消息队列消费进度,如果 ProcessQueue 中的消息 Offset 分别为 [10,30,40,50],这时候消费了30,40,最后的 Offset 仍然为 10。只有当 Offset = 10 的消息被消费后,Offset 才会变为 50。正因为如此,RocketMQ 才会有根据消息 Offset 跨度进行流量控制的功能。

此外,值得一提的是,当发生重新负载均衡后,如果某一队列被分配给了其他消费者,那么该队列对应的 Offset 也会从本机中消除。

消息队列负载

如果经过消息队列重新负载(分配)后,分配到新的消息队列时,

  • 首先需要尝试向 Broker 发起锁定该消息队列的请求,如果返回加锁成功则创建该消息队列的拉取任务,
  • 否则将跳过,等待其他消费者释放该消息队列的锁,然后在下一次队列重新负载时再尝试加锁。如果重新分配后,发现某一队列已不由自己负责,会主动的释放该队列的锁。
  • 除此之外,锁的最大存活时间是 60s,如果超过 60s 未续锁,则自动释放。

顺序消息消费与并发消息消费的第一个关键区别:

  • 顺序消息在创建消息队列拉取任务时需要在 Broker 服务器锁定该消息队列。
  • 消息拉取方式消息拉取过程中,先会判断该消息队列是否被锁定,如果未被自己锁定,则会延迟一段时间后,再进行拉取任务。

消息消费方式

如果消费模式为集群模式,启动定时任务,默认每隔 20s 锁定一次分配给自己的消息消费队列(锁的保活)。在 ConsumeMessageOrderlyService 消费消息时,先会获取内存中的队列锁。也就是说,一个消息消费队列同一时刻只会被一个消费线程池中一个线程消费。除此之外,其他过程基本和并发消费的过程一致。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注