kafka 高水位(HW)和 Leader Epoch机制
A 高水位的特征
用消息位移来表征
B 高水位作用
1) 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
2)帮助kafka完成副本同步。
蓝色部分是已提交信息,红色是未提交信息
高水位(HW)以下的是已提交,高水位以上就是未提交的消息。
位移值等于高水位的消息也属于未提交消息,也不能被消费。
C :日志末端位移(LEO),表示副本写入下一条消息的位移值。注意15所在的方框 是虚线,说明只有15条消息(0-14),显然介于高水位和LEO之间的消息就属于未提交消息。
1)同一个副本对象,其高水位(HW)值不会大于LEO值。
2)kafka 所有副本都有对应的高水位和LEO ,不仅Leader副本有。
3)分区的高水位就是其Leader副本的高水位。
D: 高水位更新机制
Leader副本所在的Broker上 还保存了其他Follwer副本的LEO值和HW值。

远程副本:broker0上保存的这些Follower又称为远程副本(Remote Replica)。帮助Leader副本确定其高水位,也就是分区高水位。
kafka副本机制 不会再更新图中灰色HW,其他都会更新,包括灰色HW 下面的LEO。
LEO 更新时机
1) Follower副本从Leader副本拉取消息,写入到本地磁盘后,会更新LEO值。
2) Leader副本收到Producer发送的消息,写入到本地磁盘后,会更新其LEO值。
3) Follower副本从Leader副本拉取消息时,会告诉Leader副本,我会从哪个位移处开始拉取。Leader副本会把这个位移值作为相应远程副本的LEO。
HW 高水位更新时机
1) Follower 副本成功更新完LEO之后,比较其LEO值与Leader副本发来的高水位值,并用两者较小值更新自己的高水位。
2)Leader副本高水位(分区高水位)更新时机有两个,一个是Leader副本更新其LEO之后;另一个是更新完远程副本LEO之后。具体算法是:取Leader副本和所有与Leader同步的远程副本LEO中的最小值。
E :Leader Epoch 组成
a> Follower 副本的高水位更新需要一轮额外的拉取请求才能实现,所以Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这样会丢失数据,或者数据不一致。
1)Epoch:一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
2) 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。
举栗说明:
我举个例子来说明一下 Leader Epoch。
假设现在有两个 Leader Epoch<0, 0> 和 <1, 120>,那么,第一个 Leader Epoch 表示版本号是 0,这个版本的 Leader 从位移 0 开始保存消息,一共保存了 120 条消息。之后,Leader 发生了变更,版本号增加到 1,新版本的起始位移是 120。
Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。
当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。
如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。
Kafka 主题管理
A 主题日常管理
1)创建主题
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
2)查询主题
bin/kafka-topics.sh --bootstrap-server broker_host:port --list
详细主题命令:
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
3) 修改主题
a> 修改主题分区
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数> 新分区数要比原分区数大
b> 修改主题级别参数
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
修改动态级别参数用--zookeeper
c> 变更副本数
kafka-reassign-partitions 脚本
d> 修改主题限速
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
这条命令结尾处的 --entity-name 就是 Broker ID。倘若该主题的副本分别在 0、1、2、3 多个 Broker 上,那么你还要依次为 Broker 1、2、3 执行这条命令
设置好这个参数之后,我们还需要为该主题设置要限速的副本。在这个例子中,我们想要为所有副本都设置限速,因此统一使用通配符 * 来表示,命令如下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
e> 删除主题:异步操作
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
4) 特殊主题的管理与运维
a) 增加副本值
首先创建json文件
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
...
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}`
执行 脚本命令
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute
b) 查看内部主题的消息
查看位移提交数据
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
读取该主题消息,查看消费者组的状态信息
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
事务状态查看
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.transaction.TransactionLog\$TransactionLogMessageFormatter " --from-beginning
B 常见问题
1 删除主题失败
原因
a>副本所在的 Broker 宕机了: 重启broker
b> 待删除主题的部分分区依然在执行迁移过程
解决方案:
手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。
手动删除该主题在磁盘上的分区目录
在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存
2 __consumer_offsets 占用太多的磁盘
jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态,如果挂掉了重启相应的broker