kafka的选举

Kafka中的选举大致可以分为三大类:控制器的选举、分区leader的选举以及消费者相关的选举
控制器
在Kafka集群中会有一个或多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态,还有执行分区重新分配的管理等工作。
比如当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。再比如当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
控制器的选举
Kafka Controller的选举是依赖Zookeeper来实现的,Broker启动的时候,会尝试去Zookeeper中创建 /kafka/controller节点。Kafka 当前选举控制器的规则:
1 第一个成功创建 /kafka/controller 节点(这个临时EPHEMERAL)的Broker 会被指定为控制器。
2 其他的代理节点陆续启动时,也会尝试在Zookeeper系统中创建 /kafka/controller节点,但是由于 /kafka//controller节点已经存在,所以会抛出“创建 /kafka/controller节点失败异常”的信息。创建失败的代理节点会根据返回的结果,判断出在Kafka集群中已经有一个控制器被成功创建了,所以放弃创建/controller节点,这样就确保了Kafka集群控制器的唯一性;
3 其他的代理节点,会在控制器上注册相应的监听器,各个监听器负责监听各自代理节点的状态变化。当监听到节点状态发生变化时,会触发相应的监听函数进行处理。
我们来看一下这controller节点长什么样
get -s /kafka/controller {"version":1,"brokerid":1,"timestamp":"1588917580803"} cZxid = 0x500000053 ctime = Thu May 07 22:59:40 PDT 2020 mZxid = 0x500000053 mtime = Thu May 07 22:59:40 PDT 2020 pZxid = 0x500000053 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x10006a34ba10003 [tips:这里不为0就是临时节点]] dataLength = 54 numChildren = 0 如果更换了启动节点的顺序 比如 3 2 1 那么brokerid 就是3.
这个brokerid 会产生变化,如果相应的broker退出了。
当控制器被关闭或者与Zookeeper系统断开连接时,Zookeeper系统上的临时节点就会被清除。Kafka集群中的监听器会接收到变更通知,各个代理节点会尝试到Zookeeper系统中创建一个控制器的临时节点。第一个成功在Zookeeper系统中创建的代理节点,将会成为新的控制器。每个新选举出来的控制器,会在Zookeeper系统中获取一个递增的controller_epoch值
get -s /kafka/controller_epoch 会返回相应的纪元。
分区leader的选举
分区leader副本的选举由Kafka Controller 负责具体实施。
分区leader选举有几种情况:
A 当创建分区(创建主题或增加分区都有创建分区的动作)。
B 分区上线(比如分区中原先的leader副本下线,此时分区需要选举一个新的leader上线来对外提供服务)的时候都需要执行leader的选举动作。
基本思路是
1 按照AR集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。
2 一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。
注意这里是根据AR的顺序而不是ISR的顺序进行选举的。这个说起来比较抽象,有兴趣的读者可以手动关闭/开启某个集群中的broker来观察一下具体的变化。
C 分区进行重分配(reassign)的时候也需要执行leader的选举动作。
思路:从重分配的AR列表中找到第一个存活的副本,且这副本在目前的ISR列表中。
D 发生优先副本(preferred replica partition leader election)的选举时,
直接将优先副本设置为leader即可,AR集合中的第一个副本即为优先副本。
E 当某节点被优雅地关闭(也就是执行ControlledShutdown)时,位于这个节点上的leader副本都会下线,所以与此对应的分区需要执行leader的选举。
从AR列表中找到第一个存活的副本,且这个副本在目前的ISR列表中,与此同时还要确保这个副本不处于正在被关闭的节点上。
消费者相关的选举
组协调器GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,这个选举的算法也很简单,分两种情况分析。如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader。如果某一时刻leader消费者由于某些原因退出了消费组,那么会重新选举一个新的leader,这个重新选举leader的过程又更“随意”了,相关代码如下:
//scala code. private val members = new mutable.HashMap[String, MemberMetadata] var leaderId = members.keys.head 复制代码
解释一下这2行代码:在GroupCoordinator中消费者的信息是以HashMap的形式存储的,其中key为消费者的member_id,而value是消费者相关的元数据信息。leaderId表示leader消费者的member_id,它的取值为HashMap中的第一个键值对的key,这种选举的方式基本上和随机无异。总体上来说,消费组的leader选举过程是很随意的。
分区分配策略的选举
或许你对此有点陌生,但是用过Kafka的同学或许对partition.assignment.strategy(取值为RangeAssignor、RoundRobinAssignor、StickyAssignor等)这个参数并不陌生。每个消费者都可以设置自己的分区分配策略,对消费组而言需要从各个消费者呈报上来的各个分配策略中选举一个彼此都“信服”的策略来进行整体上的分区分配。这个分区分配的选举并非由leader消费者决定,而是根据消费组内的各个消费者投票来决定的。
https://juejin.im/post/5cdec305f265da1b7c60e71c

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注