RocketMQ -初识

RocketMQ 是由阿里用java语言开发的一款高性能、高吞吐量的分布式消息中间件,于2017年正式捐赠 Apache 基金会并成为顶级开源项目。

RocketMQ的组成:Nameserver、Broker、Producer、Consumer


图1 RocketMQ架构图

NameServer

负责对于数据源的管理,包括了对于Topic和路由信息的管理.
没有状态,可横向扩展集群部署。节点之间无任何信息同步。

作用
在RocketMQ中提供服务,更新和发现Broker服务
接收Borker的请求,注册Broker的路由信息
接收client(producer/consumer)的请求,根据某个topic获取到Broker的路由信息

主要就是说 对于namersever来说每一个都是互相独立的,节点之间无任何信息同步;NameServer 的压力不会很大,在平时主要用于维护心跳检测,和提供Topic-Broker之间的关系数据。

但是需要注意的是对于Broker在向NameServer发送心跳的时候,会带上自己当前所负责的所有的Topic信息,这些信息如果太多的话,可能在一次的心跳检测的过程中出现网络波动,导致失败,最终就会导致NameServer 误认为Broker心跳检测失败。

对于NameServer 来说 是被设计成为无状态的,可以进行横向的扩展,节点之间相互并没有数据的通行,通过部署多台机器可以伪装成一个集群。

对于有状态和无状态的解释:
无状态就是说不会保存过多的数据信息,例如我们的Web站点就被设计成为是无状态的,每一次的登陆都需要进行信息的验证,但是对于有状态的寓意会保存数据信息,也就是说 服务端会记录每次回话的客户端信息,从而在客户端接入链接的时候能够识别客户端。典型的设计就是tomcat中的session。

Broker

Broker是消息的中转中心,负责消息的存储以及转发

单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时(30s)将Topic信息注册到NameServer,
Broker负责消息存储,以Topic为维度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。

对于每一个Broker 来说 在启动的时候,都会到NameServer中进行一个注册,随后每隔30秒定期向NameServer上报Topic路由信息,无论对于生产者还是消费者,在进行对应的操作之前会根据对应的Topic到NameServer中获取到Broker的路由信息

Topic

Topic(主题)可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。

Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。

一个 Topic 也可以被 0个、1个、多个消费者订阅

Producer

消息的生产者
与NameServer的某一个节点建立长连接(Keep-alive),定期从NameServer读取Topic路由信息,并向提供Topic服务的Broker建立长连接,且定期向Broker发送心跳。

Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。

RocketMQ 提供了三种方式发送消息:同步、异步和单向

  • 同步发送:
  • 特征:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。
    • 缺点:吞吐量很低
    • 优点:可以确保消息发送成功
    • 场景:重要通知邮件、营销短信。
  • 异步发送:
    • 特点:生产者将消息发出,不等接收方发回响应而是通过服务器回调的方式得到发送结果。可以设置发送失败重试次数。
      • 缺点:消息有丢失风险
      • 优点:吞吐量高
    • 场景:一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
  • 单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。
    • 缺点:消息有丢失风险,
    • 优点:吞吐量非常高
    • 场景:日志收集

Consumer

消息消费者,负责消费消息,一般是后台系统负责异步消费。
Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制。

  • Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。
  • Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。

实现

  • Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送一次心跳。
  • Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息。
  • Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费

consumer 与NameServer的某一个节点建立长连接(Keep-alive),定期从NameServer读取Topic路由信息,并向提供Topic服务的Broker建立长连接,且定期向Broker发送心跳。Consumer既可以从Master节点订阅消息,也可以从Slave节点订阅消息,消费者在向Master拉去消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读取老消息,产生读I/O),以及从服务器是否可读等因素判断建议下一次是从Master还是Slave拉取

Message

消息类型:

  • 普通消息

  • 顺序消息

    • 消息局部有序,整体无序。发送时将消息推送到同一个队列,消费时横向消费保证消息局部有序。要保证最终消费到的消息是有序的,需要从Producer、Broker、Consumer三个步骤都保证消息有序才行。
    • 实现:Rocket MQ给我们提供了MessageQueueSelector接口,可以自己重写里面的接口,实现自己的算法,举个最简单的例子:判断i % 2 == 0,那就都放到queue1里,否则放到queue2里;
  • 广播消息

    • 把消息发给了所有订阅了对应主题的消费者,而不管消费者是不是同一个消费者组。
  • 延时消息

    • 消息并不会立即发送出去,而是会等一段时间再发送出去。开源部分消息延迟时间分为18个等级,1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  • 批量消息

    • 多条消息合并成一个批量消息,一次发送出去。这样的好处是可以减少网络IO,提升吞吐量。建议消息不要超过1M,最大长度4M。超过4M拆分消息再发送
  • 过滤消息

    • 消费者通过tag或者sql过滤消息。直接在Broker通过过滤条件过滤减少IO耗时。
  • 事务消息

    • 生产者先发送一个half消息给Broker,Broker收到消息之后给予反馈。生产者收到反馈之后开始执行本地事务逻辑,执行完之后会再给Broker发送一条消息告诉Broker是该commit还是rollback。

Tag

Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。

标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。

Group

分组,一个组可以订阅多个Topic。

分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的

Queue

在Kafka中叫Partition,每个Queue内部是有序的,在RocketMQ中分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题。

Message Queue

Message Queue(消息队列),主题被划分为一个或多个子主题,即消息队列。

一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。

消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。

Offset

在RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset 来访问,Offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限。

也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。

小结:

  • nameserver 节点相互之间没有通信
  • producer 跟所有的nameserver保持连接
  • broker 跟nameserver 每30s上报心跳
  • 所有的消息队列都是持久化的
  • Rocket队列只有写队列和读队列
  • 消息有多种类型

如何保证消息的可靠性

生产者丢失

对于生产者丢失的情况出现在程序发送失败抛出来之后没有做重试处理,或者说发送的过程是成功的,但是网络闪退,MQ并没有收到信息,这个时候消息发送就失败了。

由于同步发送的一般不会出现这样使用方式,所以我们就不考虑同步发送的问题,我们基于异步发送的场景来说。

异步发送分为两个方式:异步有回调和异步无回调,无回调的方式,生产者发送完后不管结果可能就会造成消息丢失,而通过异步发送+回调通知+本地消息表的形式我们就可以做出一个解决方案。以下单的场景举例。

下单后先保存本地数据和MQ消息表,这时候消息的状态是发送中,如果本地事务失败,那么下单失败,事务回滚。
下单成功,直接返回客户端成功,异步发送MQ消息
MQ回调通知消息发送结果,对应更新数据库MQ发送状态
JOB轮询超过一定时间(时间根据业务配置)还未发送成功的消息去重试
在监控平台配置或者JOB程序处理超过一定次数一直发送不成功的消息,告警,人工介入。
一般而言,对于大部分场景来说异步回调的形式就可以了,只有那种需要完全保证不能丢失消息的场景我们做一套完整的解决方案。

MQ丢失

如果生产者保证消息发送到MQ,而MQ收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失。

比如RocketMQ:

RocketMQ分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。

虽然我们可以通过配置的方式来达到MQ本身高可用的目的,但是都对性能有损耗,怎样配置需要根据业务做出权衡。

消费者丢失
消费者丢失消息的场景:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。

在RocketMq中默认是需要消费者回复ack确认,若是说出现了消费者对这个数据完成了消费,但是没有返回ack确认信息,重发的机制会根据MQ的类型不同进行不同间隔的时间的重复发送。若是说出现了多次重复发送仍然不能够收到正确的Ack确定信息,就会进入死信队列。这个时候就需要人干预处理。

消息删除
对于在Broker中的消息消费完成之后 并不会立马的删除,每条消息都会持久化到CommitLog 中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。

进入死信队列
在消息重复消费失败之后经过了16次会进入死信队列中,虽然有18个定时区间,但是它从源码上看,是从第三次开始的,也就是说 1s和5s的重试直接跳过的,从10s的时候开始第一次重试;

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) {
    Message newMsg = new Message();
    // look 3 + 重试次数。他是从第三个开始的,第一次是10s ,第二次是30s。
    newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
    this.mQClientFactory.getDefaultMQProducer().send(newMsg);
 }

当我们选择好了集群模式之后,那么我们需要关心的就是怎么去存储和复制这个数据,RocketMQ对消息的刷盘提供了同步和异步的策略来满足我们的,当我们选择同步刷盘之后,如果刷盘超时会给返回FLUSH_DISK_TIMEOUT,如果是异步刷盘不会返回刷盘相关信息,选择同步刷盘可以尽最大程度满足我们的消息不会丢失。

除了存储有选择之后,我们的主从同步提供了同步和异步两种模式来进行复制,当然选择同步可以提升可用性,但是消息的发送RT时间会下降10%左右。

RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。

而Kafka采用的是独立型的存储结构,每个队列一个文件。

消息重复

由于RocketMQ通过消息消费确认机制(ACK)来确保消息至少被消费一次,但由于ACK消息有可能丢失等其他原因,RocketMQ无法做到消息只被消费一次,有重复消费的可能

消息领域有一个对消息投递的QoS(Quality of Service),服务质量定义,分为:

  • 最多一次(At most once)
  • 至少一次(At least once)
  • 仅一次( Exactly once)

几乎所有的MQ产品都声称自己做到了At least once。

既然是至少一次,那避免不了消息重复,尤其是在分布式网络环境下。

比如:

  • 网络原因闪断;
  • ACK返回失败等等故障;
  • 确认信息没有传送到消息队列;
  • 导致消息队列不知道自己已经消费过该消息了;
  • 再次将该消息分发给其他的消费者;

不同的消息队列发送的确认信息形式不同:
例如RabbitMQ是发送一个ACK确认消息,
RocketMQ是返回一个CONSUME_SUCCESS成功标志,
Kafka实际上有个offset的概念。

RocketMQ没有内置消息去重的解决方案,需要依靠消费者客户端去重;

保证幂等性:
幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用,数据库的结果都是唯一的,不可变的。

只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样,需要业务端来实现。

去重策略:保证每条消息都有唯一编号(比如唯一流水号),且保证消息处理成功与去重表的日志同时出现。

建立一个消息表,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。

消息消费

RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式。

为什么说要主动拉去消息而不是使用事件的监听
事件驱动方式是建立好长连接,由事件(发送数据)的方式来实时推送。

如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈。所以采取了pull的方式。主要是考虑消费者端的消费能力,灵活的调控消息消费者,确保消息平稳的被消费掉;

RocketMQ刷盘实现

Broker 在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中。

刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成。

小结:

  • 消息的可靠性
  • 消费者丢失消息,如果没有收到ack,会持续进行16次重试,超出了就进入死信队列,由人工干预;
  • MQ丢失消息,采用同步刷盘保证消息在磁盘上,重启后从磁盘加载消息
  • producer 消息丢失:把消息插入到数据表,定期轮训,告警人工干预
  • RocketMQ 是拉消息类型,采用的是longpoll机制

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注