RocketMQ-进阶

RocketMQ架构模式组成:
NameServer、Broker、Producer、Consumer

概念

  • 回溯消息 回溯消息是指消息消费端已经消费成功的消息,由于业务要求需要重新消费消息。RocketMQ支持按时间回溯消息,时间维度可精确到毫秒,可以向前或向后回溯。

  • 消息堆积 消息中间件的主要功能是异步解耦,必须具备应对前端的数据洪峰,提高后端系统的可用性,必然要求消息中间件具备一定的消息堆积能力。RocketMQ消息存储使用磁盘文件(内存映射机制),并且在物理布局上为多个大小相等的文件组成逻辑文件组,可以无限循环使用。RocketMQ消息存储文件并不是永久存储在消息服务器端,而是提供了过期机制,默认保留3天。

  • 定时消息 定时消息是指消息发送到Broker后,不能被消息消费端立即消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意精度的定时消息消费,必须在消息服务端对消息进行排序,势必带来很大的性能损耗,故RocketMQ不支持任意精度的定时消息,而只支持特定延迟级别。

  • 消息重试机制 消息重试是指消息在消费时,如果发送异常,消息中间件需要支持消息重新投递,RocketMQ支持消息重试机制

  • 消息存储
    消息中间件的一个核心实现是消息的存储对消息存储一般有如下两个维度的考量: 消息堆积能力和消息存储性能。RocketMQ追求消息存储的高性能,引人内存映射机制,所有主题的消息顺序存储在同一个文件中。同时为了避免消息无限在消息存储服务器中累积,引入了消息文件过期机制与文件存储空间报警机制

  • 消息过滤:
    消息过滤 消息过滤是指在消息消费时,消息消费者可以对同一主题下的消息按照规则只消费自己感兴趣的消息。RocketMQ消息过滤支持在服务端与消费端的消息过滤机制。

    • 消息在Broker端过滤。Broker只将消息消费者感兴趣的消息发送给消息消费者。
    • 消息在消息消费端过滤,消息过滤方式完全由消息消费者自定义,但缺点是有很多无用的消息会从Broker传输到消费端。

Nameserver

Broker消息服务器在启动时向所有NameServer注册,消息生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。
NameServer与每台Broker服务器保持长连接,并间隔30s检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者,这是为了降低NameServer实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性。
NameServer本身的高可用可通过部署多台NameServer服务器来实现,但彼此之间互不通信,也就是NameServer服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,这也是RocketMQ NameServer设计的一个亮点,RocketMQ NameServer设计追求简单高效。

存储内容

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡。
brokerAddrTable:Broker基础信息,包含brokerName、所属集群名称、主备Broker地址。
clusterAddrTable:Broker集群信息,存储集群中所有Broker名称。brokerLiveTable:Broker状态信息,NameServer每次收到心跳包时会替换该信息。
filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤

路由注册

路由注册RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳语句,每隔30s向集群中所有NameServer发送心跳包,NameServer收到Broker心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdateTimestamp,然后NameServer每隔10s扫描brokerLiveTable,如果连续120s没有收到心跳包,NameServer将移除该Broker的路由信息同时关闭Socket连接。

心跳包

  • brokerAddr:broker地址。
  • brokerId:brokerId,0:Master;大于0:Slave。
    brokerName:broker名称。
  • clusterName:集群名称。
  • haServerAddr:master地址,初次请求时该值为空,slave向NameServer注册后返回其MasterAddr。
  • requestBody:filterServerList:消息过滤服务器列表。
  • topicConfigWrapper:主题配置。从心跳包内容我们会发现,每次心跳包中都会包含所有的topic信息,如果一个broker上topic非常多的话,心跳包就会比较大,如果正好赶上网络不好的时候,可能就会导致broker下线。
  • NameServer与Broker保持长连接,Broker状态存储在brokerLiveTable中,NameServer每收到一个心跳包,将更新brokerLiveTable中关于Broker的状态信息以及路由表(topicQueueTable、 brokerAddrTable、 brokerLiveTable、 filterServerTable)。

Broker每隔30s向NameServer发送一个心跳包,心跳包中包含BrokerId、Broker地址、Broker名称、Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢? NameServer会每隔1Os扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker, 关闭与Broker连接,并同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。

RocketMQ有两个触发点来触发路由删除:

  • NameServer定时扫描brokerLiveTable检测上次心跳包与当前系统时间的时间差,如果时间戳大于120s,则需要移除该Broker信息。
  • Broker在正常被关闭的情况下,会执行unRegisterBroker指令,主动删除NameServer中关于自己的信息。

路由发现

RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不主动推送给客户端,而是由客户端定时拉取主题最新的路由。

消息发送

前面我们说过,RocketMQ 发送消息有三种模式

  • 同步(sync)

  • 异步(async)

  • 单向 (oneway)

  • 同步:发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回发送结果。

  • 异步:发送者向MQ执行发送消息API时,指定消息发送成功后的回调函数,然后调用消息发送API后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。

  • 单向:消息发送者向MQ执行发送消息API时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上

消息内容

private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;

Message的基础属性主要包括消息所属主题topic,消息Flag,扩展属性,消息体,事务ID。消息Flag的定义如下,可以看出其主要和事务支持有关,关于RocketMQ的事务机制,我们后面会介绍:

public final static int COMPRESSED_FLAG = 0x1;
public final static int MULTI_TAGS_FLAG = 0x1 << 1;
public final static int TRANSACTION_NOT_TYPE = 0;
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;

Message 扩展属性主要包含下面几个。

  • tag:消息TAG,用于消息过滤。
  • keys:Message索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息。
  • waitStoreMsgOK:消息发送时是否等消息存储完成后再返回。
  • delayTimeLevel:消息延迟级别,用于定时消息或消息重试。这些扩展属性存储在Message的properties中。

消息发送流程

发送流程消息发送流程主要的步骤:

  • 验证消息
  • 查找路由
  • 消息发送(包含异常处理机制)

消息验证消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范,具体的规范要求是主题名称、消息体不能为空、消息长度不能等于0且默认不能超过允许发送消息的最大长度4M(maxMessageSize=102410244)。

  • 查找路由消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的Broker节点。
  • 如果生产者中缓存了topic的路由信息,如果该路由信息中包含了消息队列,则直接返回该路由信息,如果没有缓存或没有包含消息队列,则向NameServer查询该topic的路由信息。
  • 如果最终未找到路由信息,则抛出异常:无法找到主题相关路由信息异常。

这里就有一个问题,如果整个消息队列服务刚运行,各个topic的路由信息是如何创建出来的?一般来说会有两个方案:

  • 在生产者发送消息之前,就人工创建好各个topic的路由信息,
    • 优点:可以根据该topic消息的实际需求,分配合适的broker数量和消息队列数量。
    • 一般来说,生产环境的服务都推荐以这种方式进行。
  • 可以配置各个Broker,打开其自动创建topic的功能(BrokerConfig#autoCreateTopicEnable),这样就会在发送第一个消息时,动态的创建该topic的路由信息。

自动创建topic路由的过程如下:

  • Broker如果开启了自动创建topic功能,则创建默认主题路由信息,并通过心跳包告知NameServer
  • Producer查询本地路由缓存,未找到新topic的路由信息Producer查询
  • NameServer,未找到新topic的路由信息
  • Producer查询NameServer,找到默认主题路由信息
  • Producer根据默认主题路由信息,将消息发送到默认主题的其中一个Broker
  • 收到默认主题消息的Broker,根据消息的原始topic,创建相应的路由信息,并通过心跳包告知NameServerProducer
  • 下次发送该topic的消息时如果已经存在该消息的路由(定时拉取):
    • 则直接根据路由发送消息如果该消息的路由还没来得及同步:
    • 否则继续发送到默认主题

路由选择

路由选择根据路由信息选择消息队列,返回的消息队列按照broker、序号排序。举例说明,如果topicA在broker-a,broker-b上分别创建了4个队列,那么返回的消息队列如下:

[
  {
    "brokerName":"broker-a",
    "queueId": 0
  },
  {
    "brokerName":"broker-a",
    "queueId": 1
  },
  {
    "brokerName":"broker-a",
    "queueId": 2
  },
  {
    "brokerName":"broker-a",
    "queueId": 3
  },
  {
    "brokerName":"broker-b",
    "queueId": 0
  },
  {
    "brokerName":"broker-b",
    "queueId": 1
  },
  {
    "brokerName":"broker-b",
    "queueId": 2
  },
  {
    "brokerName":"broker-b",
    "queueId": 3
  }
]

首先消息发送端采用重试机制,由retryTimesWhenSendFailed指定同步方式重试次数,异步重试机制在收到消息发送请求后,执行回调之前进行重试,由retryTimesWhenSendAsyncFailed指定。接下来就是循环执行:
选择消息队列、发送消息,发送成功则返回,收到异常则重试。
如果在一次消息发送的过程中消息发送失败了,那么在下次重试的过程中,会排除掉上次失败的Broker。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) { //空表示第一次发送
        return selectOneMessageQueue(); // 循环选择一个队列
    } else {
        int index = this.sendWhichQueue.getAndIncrement();
        for (int i = 0; i < this.messageQueueList.size(); i++) {// 遍历所有队列
            int pos = Math.abs(index++) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {// 排除上一次的broker
                return mq;
            }
        }
        return selectOneMessageQueue(); // 如果只有一个broker,则继续循环选择套路
    }
}

public MessageQueue selectOneMessageQueue() { // 循环选择
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

故障延迟机制

我们前面已经说过,如果Broker宕机,可能会花费很长时间才能同步到各个Producer,那么怎么在Broker宕机的信息同步到Producer之前,绕开它而将消息发送到别的正常Broker上呢,这就不得不提RocketMQ的另一个容错机制————故障延迟机制。首先我们需要知道RocketMQ在哪里存储失败的Broker,这些信息都存在LatencyFaultToleranceImpl中,其中有一个ConcurrentHashMap<String, FaultItem> faultItemTable存储了所有失败节点的信息。FaultItem中存储了如下内容:// FaultItem fields

private final String name;
private volatile long currentLatency; //请求该节点的耗时
private volatile long startTimestamp; //预估下次可用的时间点

public boolean isAvailable() { // 当前时间大于等于预估可用点时,则认为可用
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}

FaultItem的信息,在每次消息发送成功,和消息发送失败时,进行更新。如果消息发送成功,currentLatency赋值为本次请求的实际耗时消息发送失败,currentLatency赋值为30sstartTimestamp是根据currentLatency进行设置的,RocketMQ将currentLatency分成了不同的档位,不同档位的currentLatency会对应不同的notAvailableDuration,然后:

startTimestamp=System.currentTimeMillis() + notAvailableDuration currentLatency与notAvailableDuration的对应关系如下图:private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
// 按从大到小的顺序,找到 currentLatency 所在的区间,然后输出该区间对应的不可用时长
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }
    return 0;
}

由此,我们可以算出如果消息发送失败,那么RocketMQ正常来说会禁用该Broker十分钟。知道了RocketMQ如何存储失败节点,在让我们来看看它是如何利用该信息,达到避开失败节点的效果。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, /*通常情况下,指上次请求失败时用到的节点*/final String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            // 外层轮询,下次请求时会选择下一个队列
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                // index用于内层轮询,从而排除不可用的节点
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 验证可用性,latencyFaultTolerance存储了各个Broker发送消息的耗时,已经预估的下次可用时间点
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    // 如果是第一次请求,并且可用,则直接返回
                    // 如果是重试请求,则只使用brokerName等于lastBrokerName相同的,这点大家肯定有疑问:为啥要使用上次失败的,其实这里的lastBrokerName不单单指上次发送失败的节点,
                    // 它还能蕴含推荐节点的信息,本函数的后半段中会看到如何将推荐节点的信息传递到lastBrokerName
                    // 但是我觉得这样写的一个弊端是:当前可用节点,如果和上次失败时所用的节点不一致时,就会被排除掉,这也会影响效率
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }
            // 走到这一步,代表所有节点都不可用(首次请求)或者上一次失败时用到的节点仍然不可用(重试请求)
            // 随后,将所有不可用的节点,按照潜在可用性(当前可用与否>上次使用该节点时的调用耗时>预估的下次可使用时间),进行了排序,然后选择最优的结果
            // 这里不用担心,连续重试时pickOneAtLeast每次都选择了相同节点,因为其内部也是用了轮训机制,会从最优->次优的顺序给出下一次推荐的节点
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            //getQueueIdByBroker这个函数名也是惊到我了,完全和其功能不匹配,本行代码意在得到推荐节点是否仍然存在可写队列,如果存在,得出队列数
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            // 我们得到的推荐节点存在可写队列
            if (writeQueueNums > 0) {
                // 至此我们已经拿到了一个推荐节点,但是接下来代码的作者并没有简单地根据推荐节点来寻找队列,而是靠外层轮训找了下一个队列
                // 如果再次重试过程中没有发生路由信息更新的话,该队列应该仍然是不可用的,并且很可能仍是最初失败的Broker节点的队列,
                // 为什么这么说:假如消息队列为[BrokerA1,BrokerA2,BrokerA3,BrokerA4,BrokerB1,BrokerB2],只有当上一次轮训到BrokerA4时,这里才会跳过BrokerA而得到BrokerB的队列
                // 而且,可能下次更新路由表时,该信息可能就会成为过期数据而被GC,我猜作者是觉得反正这个数据快没用了,不如把它替换成刚才得到的推荐节点信息,这样可以少new一个对象,还能增加下次查找时的效率
                // 之所以说它能增加效率,是因为这个过程实质上是将一个很可能宕机的节点队列换成了最可能可用的节点信息,那么下次再轮训到这个节点时,实际上就跳过了寻找推荐节点的过程
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    // 把得到的队列的BrokerName改成我们前面得到的推荐节点,这样如果再请求失败并且重试的时候,lastBrokerName其实存储的就是推荐节点的信息了,下次再执行本函数时就会优先使用推荐节点的其他队列
                    mq.setBrokerName(notBestBroker);
                    // 重新计算其队列编号,因为得到的这个队列数可能和推荐节点的队列数不一致,如果用了错误的队列序号,消息发送到Broker那时,肯定会报错
                    // 因此,这里基于外层轮询使用的index,对本次使用的队列编号进行了计算,我觉得这里最终达到的是随机选择的效果
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else { // 推荐节点不存在可写队列了,说明该节点可能已经宕机,并且NameServer已经删除了其路由信息,并且已经同步过来了
                // 这时候可以将该节点从延迟统计表中删除,不在考虑该节点
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        // 如果拿到的推荐节点已经不存在可写队列了,就随机选一个队列
        return tpInfo.selectOneMessageQueue();
    }
    // 默认策略
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

在我看来,RocketMQ的队列选择算法很恐怖,我不确定是特意设计成这样的,还是历史发展出来的奇怪产物。Github仓库中,也有很多人提issue问这个函数的设计深意。简单的说,上述算法按照如下流程工作:

  • 轮训所有队列,通过LatencyFaultTolerance找到可用队列如果未找到任何可用队列,
  • 通过LatencyFaultTolerance存储的信息,按照三个纬度的可用性排序(当前可用与否>上次使用该节点时的调用耗时>预估的下次可使用时间),选出最可能可用的队列
  • 如果上述两个步骤都没有选出队列,则按照最简单的轮询找到下一个队列

消息发送方式

  • 根据MessageQueue获取Broker的网络地址。如果找不到Broker信息,则抛出MQClientException,提示Broker不存在。

为消息分配全局唯一ID,如果消息体默认超过4K(compressMsgBodyOverHowMuch), 会对消息体采用zip压缩,并设置消息的系统标记为MessageSysFlag.COMPRESSED_FLAG。如果是事务Prepared消息,则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE。

  • 如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。
  • 构建消息发送请求包。主要包含如下重要信息:生产者组、主题名称、默认创建主题Key、该主题在单个Broker默认队列数、队列ID(队列序号)、消息系统标记(MessageSysFlag)、消息发送时间、消息标记(RocketMQ对消息中的flag不做任何处理,供应用程序使用)、消息扩展属性、消息重试次数、是否是批量消息等。
  • 根据消息发送方式,同步、异步、单向方式进行网络传输。
  • 如果注册了消息发送钩子函数,执行after逻辑。注意,就算消息发送过程中发生RemotingException、MQBrokerException、 InterruptedException时该方法也会执行。
同步发送

消息生产者向 broker 发送消息 ,执行 API 时同步等待 ,直到broker 服务器返回发送结果 。

异步发送

消息异步发送是指消息生产者调用发送的API后,无须阻塞等待消息服务器返回本次消息发送结果,只需要提供一个回调函数,供消息发送客户端在收到响应结果回调。异步方式相比同步方式,消息发送端的发送性能会显著提高,但为了保护消息服务器的负载压力,RocketMQ对消息发送的异步消息进行了并发控制,通过参数clientAsyncSemaphoreValue来控制,默认为65535。异步消息发送虽然也可以通过DefaultMQProducer#retryTimesWhenSendAsyncFailed属性来控制消息重试次数,但是重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等将不会重试。

单向发送

单向发送是指消息生产者调用消息发送的API后,无须等待消息服务器返回本次消息发送结果,并且无须提供回调函数,表示消息发送压根儿就不关心本次消息发送是否成功,其实现原理与异步消息发送相同,只是消息发送客户端在收到响应结果后什么都不做而已,并且没有重试机制。

批量发送

批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多性能就越好,其判断依据是单条消息的长度,如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过DefaultMQProducer#maxMessageSize。批量消息发送要解决的是如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容

小结:

  • RocketMQ 支持回溯消息
  • RocketMQ消息存储文件在磁盘上,默认保留3天
  • RocketMQ 定时消息只支持18个等级,不支持任意精度定时消息,是因为如果这样做,需要在服务端消息排序
  • RocketMQ所有主题消息顺序存储在同一个文件中
  • RocketMQ支持在服务端与消费端的消息过滤机制
  • Broker 每30s向Nameserver发送心跳包,大于120s没有收到心跳,NameServer会移除broker信息
  • Topic创建可以手动创建也支持自动创建,推荐手动创建
  • RocketMQ发送消息方式有三种:同步发送、异步发送、单向发送

参考链接:
https://www.zhihu.com/people/beikejiedeliulangmao/posts?page=3
https://wenku.baidu.com/view/5f7e6f7b383567ec102de2bd960590c69ec3d80d.html?_wkts_=1668925381866&bdQuery=RocketMQ+%E5%90%8C%E6%AD%A5%E5%8F%91%E9%80%81%E6%B6%88%E6%81%AF
https://blog.csdn.net/weixin_45525272/article/details/125940269

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注