Kafka 手札(2)消费者组监控

消费者组消费进度监控如何实现
消费滞后的定义
A :所谓滞后程度,就是指消费者当前落后于生产者的程度。消费滞后程度有个专门的名称,Consumer Lag。生产10条 消费2条 Consumer Lag=8条
B: Lag的层级
Lag的单位是消息数,但是Lag的层级是在分区上的,如果要计算主题级别,需要汇总所有的主题分区的Lag ,并累加。
C: 如何监控
1)使用 Kafka 自带的命令行工具 kafka-consumer-groups 脚本
kafka-consumer-groups 脚本是 Kafka 为我们提供的最直接的监控消费者消费进度的工具

 

2)使用 Kafka 自带的 JMX 监控指标。
kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标。
records-lag-max 和 records-lead-min,它们分别表示此消费者在测试窗口时间内曾经达到的最大的 Lag 值和最小的 Lead 值。
这里的 Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。
副本机制
A:定义
replication 也可以称之为备份机制,通常是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝。本质就是一个只能追加写消息的提交日志。
B: 好处
1)提供数据冗余
2)提供高伸缩性
横向扩展,提高吞吐量
3)改善数据局部性
C:kafka的副本特征
1) 同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用。
2) kafka的副本分为两类,领导者副本(Leader Replica)和追随着副本(Follower Replica),一对多的关系。
3) 每个分区创建的时候 都会选举一个副本,其他的自动变成追随者副本
4) kafka的副本不提供对外服务即都不能响应消费者和生产者的读写请求。唯一任务就从leader那里异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
5) 领导挂了 zk 就会开启新一轮选举,选民为曾经的追随者。老leader回来 只能作为追随者副本,加入到集群中。
主要问题:
A: 如何保证分区下多个副本的内容一致性
1) 基于领导者的副本机制,
B: 副本机制的好处是什么?
1)Read-your-writes 生产消息后立马就能消费到。
2)方便实现单调读(Monotonic Reads)。多次消费消息时,不会看到某消息时有时无。
a> In-sync Replicas(ISR)ISR副本集合。ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。
b> Leader 副本在ISR里面,
c> Broker 端参数 replica.lag.time.max.ms Follwer副本能够落后Leader副本的最长时间间隔,默认是10秒。10s内 即使F的消息数量少于L很多,仍然是同步的。如果超过10s 则是不同步 ,ISR集合会自动收缩。等到后面追上来了,又可以加入ISR集合了。
C:Unclean领导者选举(Unclean Leader Election)
1) 如果ISR为空,代表Leader副本也挂了,Kafka需要重新选举,
2) Kafka 把所有不在ISR 中存活副本都称为非同步副本。选局非同步副本的过程叫Unclean 领导选举
3) 相关参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。
4) 优点:提高可用性
缺点:数据丢失
5)禁止之后维护了数据的一致性,避免了消息丢失,牺牲了高可用。

消费者组重平衡全流程解析
重平衡的三个触发条件:
A :1) 组内成员数量发生变化
2)订阅主题数量发生变化
3)订阅主题的分区数发生变化
B:重平衡过程通知机制
1) 通过消费者端的心跳线程(Heartbeat Thread)
旧版本心跳在消费者主线程完成,如果消息处理逻辑在主线程中消耗过长,心跳请求将无法及时发到协调者那里,导致协调者错误的认为消费者已死。
a> 0.10.1.0 版本开始 加入了心跳线程
2) 实现方法
重平衡的通知机制正是通过心跳线程来完成的。
协调者将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。就能立马知道重平衡又开始了,这就是重平衡的通知机制。
3) 频率控制的参数
heartbeat.interval.ms
从字面上看,它就是设置了心跳的间隔时间,但这个参数的真正作用是控制重平衡通知的频率。如果你想要消费者实例更迅速地得到通知,那么就可以给这个参数设置一个非常小的值,这样消费者就能更快地感知到重平衡已经开启了
C: 消费者组状态机(state machine)
1) 五种状态
a> Empty 消费者组内没有任何成员,但消费者组可能存在已提交的位移数据,而且这些位移尚未过期。
b> Dead 消费者组内没有任何成员,但组内的元数据信息已经在协调者端被移除。协调者组件保存着当前向他注册过的所有组信息,所谓的元数据信息就类似于这个注册信息
c>PreparingRebalance 消费者组准备开启重平衡,此时所有成员都要重新请求加入消费者组。
d>CompletingRebalance 消费者组下所有成员已经加入,各个成员正在等待分配方案。该状态在老一点的版本中被称为AwaitingSync ,他和CompletingRebalance 是等价的
e> Stable 消费者组的稳定状态。该状态表明重平衡已经完成,组内各成员能够正常消费数据了。
2) 只有在Empty状态下才会执行过期位移删除的操作。
D: 消费者端重平衡流程
1) 主要参与者
a> 消费者端
b> 协调者组件(Coordinator)
2) 重平衡步骤
a> 消费者端2个步骤
        • 加入组: 对应JoinGroup 请求。第一个发JoinGroup请求的成员自动称为消费领导者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
        • 等待消费领导者(LeaderConsumer)对应SyncGroup 请求
说明:以上俩个请求都是消费者发送给Coordinator的。领导者选出之后,Coordinator会把Consumer Group 订阅信息封装进Join Group请求的响应体中,然后发给领导者,由领导者统一作出分配方案后,进入到下一步:发送syncGroup
第二个步骤是让协调者接收分配方案,然后以SyncGroup响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。
问题:为什么不让成员直接找消费领导者?
E:Broker 端重平衡的4个场景
1)有新成员入组,Coordinator 收到JoinGroup 后发送心跳请求响应
2)有成员主动离组,发送LeaveGroup 请求,Coordinator 收到LeaveGroup 后发送心跳请求响应。
3)有组成员崩溃离组,指消费者实例出现严重故障,突然宕机导致的离组。
协调者在等待session.timeout.ms时间后才能感知到。
4)每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。
Kafka 控制器组件
A :控制器作用
控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。
B: zooKeeper 是一个提供高可靠性的分布式协调服务框架
1) zk的特征
它使用的数据模型类似于文件系统的树形结构,根目录以“/” 开始。
该结构上的每个节点被称为znode,用来保存一些元数据协调信息
2)znode 持久性划分
持久性znode:不会因为zk的重启而消失
临时性znode:与创建它的ZooKeeper 会话绑定,会话结束自动删除。
它的ephemeralOwner不为0就是临时节点
3)zk的作用
ZooKeeper 赋予客户端监控 znode 变更的能力,即所谓的 Watch 通知功能。一旦 znode 节点被创建、删除,子节点数量发生变化,抑或是 znode 所存的数据本身变更,ZooKeeper 会通过节点变更监听器 (ChangeHandler) 的方式显式通知客户端。
ZooKeeper 常用来实现集群成员管理、分布式锁、领导者选举等功能。
C:控制器的选举
1)Broker启动的时候,会尝试去Zk中创建/controller节点。Kafka 当前选举控制器的规则:第一个成功创建/controller 节点的Broker 会被指定为控制器。
2)控制器是做什么的?
5种职责
        • 主题管理(创建 删除 增加分区)
          • 控制器帮助我们完成对kafka主题的创建,删除,以及分区增加的操作。
        • 分区重分配
          • 分区重分配主要指,kafka-reassign-partitions 脚本 提供的对已有主题分区进行细粒度的分配功能。
        • Preferred 领导者选举
          • Preferred 领导者选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案
        • 集群成员管理(新增 Broker、Broker 主动关闭、Broker 宕机)
          • 控制器利用watch机制检查 zk的 /brokers/ids 节点下的子节点数量变更。 broker启动的时候会在/brokers 下创建专属的znode节点,zk通过watch机制通知控制器,这样就能感知到这个变化。同样也会在/brokers/ids 下创建一个临时znode,当broker挂了,会跟zk的会话结束,这个znode就被自动删除了。zk也会告诉控制器 来善后。
        • 数据服务
          • 控制器的最后一大类工作,就是向其他的Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
D:控制器故障转移(Failover)
1) 定义
故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器。这个过程叫FailOver ,是自动完成的。
2)设计原理
单线程加事件队列。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注