Kafka的集群响应10-30s罪魁祸首居然是它?


本文转载:https://mp.weixin.qq.com/s/aCWgOwafhcvz3gn2ez-i-A
作者:丁威
公众号:中间件兴趣圈
本文转载之前已经得到作者授权。

正文部分:

先说现象:
大促期间,kafka的集群响应时间飙升到了10-30s,严重影响消息的写入。
随着对日志层层的抽丝剥茧,最终发现了罪魁祸首竟是它...

zk(zookeeper ) 在kafka中的举足轻重的作用

controller和brokers都关联kafka的核心工作机制

controller
控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。
集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。

Broker启动的时候,会尝试去Zk中创建/controller节点。
Kafka 当前选举控制器的规则:第一个成功创建/controller 节点的Broker 会被指定为控制器。

控制器的职责:
1 主题创建、删除、增加分区 
2 分区重分配
3 preferred 领导者选举
4 集群成员管理
5 数据服务

详细:https://www.liritian.com/archives/679
  • /controller Kafka控制器的消息,kafka控制器的选举依靠zookeeper

  • /brokers/ids/{id} 在持久节点/brokers/ids下创建众多的临时节点,每一个节点,表示一个Broker节点,节点的内容存储了Broker的基本信息,例如端口、版本、监听地址等。

  • /brokers/topics/{topic}/partitions/{partition}/state

  • 在kafka2.8版本一下,Kafka中topic中的路由信息最终持久化在zookeeper中,每一个broker节点启动后会在内存中缓存一份数据。/brokers节点每一个子节点表示一个具体的主题,主题的元数据主要包括分区的个数与每一个分区的状态信息。每一个分区的状态信息主要包括:

  • controller_epoch 当前集群控制器的epoch,表示controller选举的次数,我们可以理解为controller的“版本号”。
    leader 当前分区Leader所在的broker id。

  • Leader_epoch 分区的leader_epoch,表示分区Leader选举的次数,从0开始,每发生一次分区leader选举该值就会加一,kafka通过引入leader epoch机制解决低版本依靠依赖水位线表示副本进度可能造成的数据丢失与数据不一致问题,这个将在后续文章中深入剖析。

  • isr 分区的isr集合。

  • version 存储状态分区状态数据结构的版本号,这个字段大家可以忽略

在Zookeeper中有一种同样的“设计模式”(watch机制),就是可以通过在zookeeper中创建临时节点+事件监听机制,从而实现数据的实时动态感知,以/brokers/ids为例进行阐述:

  • Kafka broker进程启动时会向zookeeper创建一个临时节点/brokers/ids/{id},其中id为broker的编号

  • Kafka Broker进程停止后,创建的临时节点在broker与zookeeper的会话超时后会被自动删除,产生节点删除事件

  • Kafka controller 会自动监听/brokers/ids 目录的节点新增与删除事件,一旦broker下线、上线,controller都会实时感知,从而采取必要处理。

经过上面的初步介绍,Kafka对zookeeper的依赖还是非常大的,特别是Kafka控制器的选举、broker节点的存活状态等都依赖zookeeper。

Kafka 控制器可以看出是整个kafka集群的“大脑”,如果它出现异动,其影响范围之广,影响程度之大可想而知,接下来的故障分析会给出更直观的展现。

rebalance 重平衡

  • 重平衡的定义: Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程。
  • 触发时机
    • 1)组成员数量发生变化,无论增减
      consumer端参数,session.timeout.ms 决定了 Consumer 存活性的时间间隔,默认是10秒及10000ms。建议是6s。
      heartbeat.interval.ms 发送心跳请求频率的参数,这个值越小 心跳的越快,额外消耗带宽,更加快速的知晓当前是否开启rebalance。建议2s。
      max.poll.interval.ms 它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔,默认是5分钟。有效时间内无法消费完poll方法,返回消息,C主动发起离开,C开启新一轮Rebalance。
    • 2)订阅主题(topic)数量发生变化
    • 3)订阅主题的分区(partition)数发生变化

进入正题,问题分析

一看到消息发送响应时间长,我的第一反应是查看线程栈,是不是有锁阻塞,但查看线程堆栈发现Kafka用于处理请求的线程池大部分都阻塞在获取任务处,表明“无活可干”状态

说明客户端端消息发送请求都没有到达Kafka的排队队列,并且专门用于处理网络读写的线程池也很空闲,那又是为什么呢?
消息发送端延迟超级高,但服务端线程又极度空闲,有点诡异?

继续查看服务端日志,发现了大量主题(甚至连系统主题__consumer_offsets主题【这个主题是用来保存消费者消费消息的offset的】也发生了Leader选举),日志如下:

核心日志:start at Leader Epoch 大量分区在进行Leader选举。

Kafka中只有Leader分区能处理读、写请求,follower分区只是从leader分区复制数据,在Leader节点宕机后参与leader选举,故分区在进行Leader选举时无法处理客户端的写入请求,而发送端又有重试机制,故消息发送延迟很大。

那到底在什么情况下会触发大量主题进行重新选举呢?

我们找到当前集群的Controler节点,查看state-change.log中,发现如下日志

出现了大量分区的状态从OnlinePartition变更为OfflinePartition。继续查看Controler节点下的controller.log中发现关键日志:

核心日志解读

[Controller id=1] Broker failure callback for 8 (kafka.controller.KafkaController) 控制器将节点8从集群的在线中移除,控制器为什么会将节点8移除呢?
接下来顺藤摸瓜,去看一下节点8上的日志如下图所示:

核心日志解读:原来broker与zookeeper的会话超时,导致临时节点被移除。

先不探究会话为什么会超时,我们先来看一下会话超时,会给Kafka集群带来什么严重影响。

/brokers/ids下任意一个节点被删除,Kafka控制器都能及时得到,并执行对应的处理。

这里需要分两种情况考虑。

普通broker节点被移除

  • 处理入口为:KafkaController的onBrokerFailure方法,代码详情如下图所示
  • 一个普通的broker在zk中被移除,Kafka控制器会将该节点上分配的所有分区的状态从OnlinePartition变更为OfflinePartition,从而触发分区的重新选举
  • 扩展知识点:__consumer_offsets分区如果进行Leader重新选举,大面积的消费组会触发重平衡,背后的机制:

    • 消费组需要在Broker端进行组协调器选举,选举算法如下:消费组的名称的hashcode与主题 __ consumer_offsets的队列总数取模,取余数,映射成 __consumer_offsets

    • [确定 consumer group 位移信息写入consumers_offsets 的哪个分区? 具体计算公式:  consumers_offsets partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) 注意:groupMetadataTopicPartitionCount 由 offsets.topic.num.partitions 指定,默认是 50 个分区。该分区 leader 所在的 broker 就是被选定的 coordinator等于每一个 groupid 都有一个自己对应的 broker 作为 coordinator]

    • 分区,该分区的leader在哪个broker节点,该节点则会充当消费组的组协调器。

    • 一旦该分区的Leader发生变化,对应的消费组必须重新选举新的组协调器,从而触发消费组的重平衡(rebalance)。

Controller 节点被移除

由上面开始的内容知道这个controller节点意义非凡。
如果zookeeper中移除的broker id 为 Kafka controller,其影响会更大,主要的入口如下图所示

如果是controller节点会话超时,临时节点/controller节点会被删除,从而会触发Kafka controller选举,最终所有的broker节点都会收到节点/controller的删除、新增或节点数据变化的通知,KafkaController的onControllerFailover方法会被执行,与会将于zookeeper相关的事件监听器重新注册、分区状态机、副本状态机都会停止并重新启动,各个分区会触发自动leader分区选举。

可以这样形容:一朝天子一朝臣,全部重新来过。

zookeeper 会话超时,到底谁的锅呢?

查看服务端日志,可以看到如下日志:

核心日志解读:Closed socket connection for client ... 表示连接被客户端主动关闭。

那为什么客户端会主动关闭心跳呢?心跳处理的套路就是客户端需要定时向服务端发送心跳包,服务端在指定时间内没有收到或处理心跳包,则会超时。

要想一探究竟,唯一的办法:阅读源码 ,通过研读Zookeeper客户端源码,发现存在这样一个设计:客户端会把所有的请求先放入一个队列中,然后通过一个发送线程(SendThread)从队列中获取请求,发送到服务端,关键代码如下:

如果存在大量的zk更新操作,心跳包可能会处理不及时,而在出现zookeeper session会话超时之前,集群在大面积ISR扩张与收缩,频繁更新zk,从而触发了客户端心跳超时,这个问题也可以通过如下代码进行复现:

经过这波分析,由于zookeeper会话超时,导致大量分区重新选举,最终导致消息发送延迟很大,并且消费组大面积重平衡的根本原因就排查清楚了。

最终解决方案:

适当的调高了
session.timeout.ms
min.insync.replicas
replica.lag.time.max.ms

extra tips:

  • ISR:

  • kafka的ISR代表insync-replicas
    In-sync Replicas(ISR)ISR副本集合。ISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的。

  • Leader 副本在ISR里面,

  • Broker 端参数 replica.lag.time.max.ms Follwer副本能够落后Leader副本的最长时间间隔,默认是10秒。10s内 即使F的消息数量少于L很多,仍然是同步的。如果超过10s 则是不同步 ,ISR集合会自动收缩。等到后面追上来了,又可以加入ISR集合了,这样就算是扩张了。

  • 作用:可以用来实现消息发送阶段消息不丢失;

  • 可以设置acks=all或者-1表示必须ISR集合中的副本全部成功写入消息才会向客户端返回成功,注意:这里并不是说所有副本写入成功。

  • 并且对ISR集合中的副本的个数也有要求,可以在topic级别的配置参数:min.insync.replicas进行设置,该值默认为1。

  • 3个参数:
    zookeeper.connection.timeout.ms 客户端等待与zookeeper建立连接的最大时间。如果没有设置,则使用zookeeper.session.timeout.ms中的值

    zookeeper.session.timeout.ms 默认是18s Zookeeper session timeout
    zookeeper.max.in.flight.requests
    客户端在阻塞前发送给Zookeeper的未确认请求的最大数量

总结:
1 controller 是第一个成功创建临时节点controller的broker
2 controller的实时感知是通过zk的watch机制来完成的
3 broker节点内存中的数据结构
4 kafka的watch机制
5 重平衡的定义与触发机制
6 不同的broker节点与zk超时的2种情况
7 服务端在指定时间内没有处理客户端心跳处理,就会超时

捋一下:消息发送延迟很大的原因是什么?
答:因为Kafka中只有Leader分区能处理读、写请求,follower分区只是从leader分区复制数据,
在Leader节点宕机后参与leader选举,故分区在进行Leader选举时无法处理客户端的写入请求,
而发送端又有重试机制,故消息发送延迟很大。request.timeout.ms:客户端请求超时时间,默认30s

zk写入延迟很高,10-30s原因是什么
答:Leader分区负责读写,如果Leader很忙就会导致写入延迟很高,比如本例中的leader跟zk的会话超时,导致zk一直在选举leader,那么在leader选举的时候,整个kafka集群是不能写入的。
如果写入的策略是acks = all或者-1,那么需要写入的副本数量就比较多,延迟也会很高;

zk为什么一直在选举
答:由于集群在大面的ISR收缩与扩展,导致频繁更新ZK,无暇处理客户端心跳,从而导致客户端心跳超时;

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注