Kafka 手札(1)消费者组 消息压缩

kafka 高吞吐量 :顺序io,消费者组
Log Segment 日志段
消费者组(Consumer Group)。所谓的消费者组,指的是多个消费者实例共同组成一个组来消费一组主题,这组主题中的每个分区都只会被组内的一个消费者实例消费,其他消费者实例不能消费它
auto.create.topics.enable:false 即不允许自动创建 Topic
unclean.leader.election.enable:false, 不允许Unclean Leader 选举
auto.leader.rebalance.enable:false 不允许 定期换Leader
log.retention.{hours|minutes|ms} :默认是 hours=168 保存7天数据
log.rentention.bytes: 这是指定 Broker 为消息保存的总磁盘容量大小 -1 表示没限制
message.max.bytes: 控制 Broker 能够接收的最大消息大小。默认 1000012=976KB

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
topic下说明一条消息只能存在于一个分区,
而不会在多个分区中被保存多份。分区主要是实现系统的伸缩性
所谓分区策略是决定生产者将消息发送到哪个分区的算法
轮询:默认策略
随机:随意放置
消息键序:key-ordering
消息压缩:
kafka 保证消息不丢失:
1 持久化消息 写入日志
2 Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)
3 维持先消费 再更新位移(offset) 最大限度的保证消息不丢失,有可能会产生消息重复消费
4 配置参数acks=all 所有的副本都要接收到消息才算已经提交 Producer 一个参数
5 retries 设置稍微大一点
6 unclean.leader.election.enable=false 落后太多的broker 消息也是没有的
7 replication.factor >= 3 冗余 Broker端参数
8 min.insync.replicas > 1 消息至少要被写入多个副本 才算已提交,Broker端参数
9 确保 replication.factor 大于min.insync.replicas
10 consumer端函数 enable.auto.commit = false
kafka 消息交付可靠性保障以及精确处理一次语义的实现
最多一次 at most once 消息可能会丢失,但绝对不会被重复发送
至少一次 at least once 消息不会丢失,但有可能被重复发送 (默认)
精确一次 exactly once 消息不会丢失,也不会被重复发送
问题1 以上三种模式怎么配置。
kafka 怎么做到精确一次:
幂等性:Idempotence
生产者 Producer 开启enable.idempotence 被设置成true,
大概原理 即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。
只能保证单分区幂等性,无法实现多个分区的
只能实现单会话幂等性,不能实现跨会话幂等性
事务:Transaction
实现多分区、多会话的幂等性。
设置:
enable.idempotence = true
设置Producer端参数 transactional. id
消费端 Consumer 参数 设置isolation.level 参数值 read_committed
消费者组
consumer group
消费者组:定义->kafka 提供可扩展且具有容错性的消费者机制。
1 重要特诊
消费者组内多个消费者,他们共享一个GroupId 组内所有的消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)。
当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。
GroupId 是一个字符串 在一个kafka集群中,它标识唯一的一个consumer Group.
理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。
kafka 如何管理Consumer Group的位移呢(offset)?
每个 Consumer 实例怎么知道应该消费订阅主题的哪些分区呢?这就需要分配策略的协助了。当前 Kafka 默认提供了 3 种分配策略.
1,重要特征:
A:组内可以有多个消费者实例(Consumer Instance)。
B:消费者组的唯一标识被称为Group ID,组内的消费者共享这个公共的ID。
C:消费者组订阅主题,主题的每个分区(partion)只能被组(Group)内的一个消费者(Consumer)消费
D:消费者组机制,同时实现了消息队列模型和发布/订阅模型。
2,重要问题:
A:消费组中的实例与分区的关系:
消费者组中的实例个数,最好与订阅主题的分区数相同,否则多出的实例只会被闲置。一个分区只能被同一个分组内的一个消费者实例订阅。
B:消费者组的位移管理方式:
(1)对于Consumer Group而言,位移是一组KV对,Key是分区,V对应Consumer消费该分区的最新位移。
(2)Kafka的老版本消费者组的位移保存在Zookeeper中,好处是Kafka减少了Kafka Broker端状态保存开销。但ZK是一个分布式的协调框架,不适合进行频繁的写更新,这种大吞吐量的写操作极大的拖慢了Zookeeper集群的性能。
(3)Kafka的新版本采用了将位移保存在Kafka内部主题的方法。
C:消费者组的重平衡:
(1)重平衡:本质上是一种协议,规定了消费者组下的每个消费者如何达成一致,来分配订阅topic下的每个分区。
(2)触发条件:
a,组成员数发生变更
b,订阅主题数发生变更
c,定阅主题分区数发生变更
(3)影响:
Rebalance 的设计是要求所有consumer实例共同参与,全部重新分配所有用分区。并且Rebalance的过程比较缓慢,这个过程消息消费会中止
位移主题offset
__consumer_offsets
1 重要特征
A:老版本位移保存在zk上,当Consumer重启后,他能自动从zk中读取位移数据。好处是减少Broker需要吃油的状态空间,有利于高伸缩性
B:新版本位移管理机制:将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中
C:位移主题就是普通的kafka主题,消息格式是kafka自己定义的,不能自己修改,否则会Broker崩溃
D:位移主题的存的格式是kv,kafka集群把标识哪一个consumer的位移标志放在key里面,key包括Group ID 用来标识消费组,主题名,分区号。
E: 墓碑消息也称delete mark 主要特点是 消息体是null,用来删除Group信息。
2 重要问题
A:位移主题怎么创建
会在kafka集中第一个Consumer程序启动的时候,自动创建位移主题,也可以手动创建。
1)相关参数:offsets.topic.num.partitions 默认50个分区
2)备份因子:offsets.topic.replication.factor 默认是3
B:Consumer 怎么提交位移的呢
1)自动提交位移
consumer端 参数:enable.auto.commit=true 定期提交位移,定期时间多少由
auto.commit.interval.ms 来控制,默认5000ms 也就是5秒
优点: 省事,不用自己操作位移
缺点:没法把控C端的位移管理,会一直写入位移,浪费磁盘
2)手动提交位移
consumer端 参数:enable.auto.commit=false调用consumer.commitSync来手动提交位移
优点:能掌控位移,不会一直写入
缺点:自己调用API
C:怎么删除位移主题中的过期消息
Compaction 压紧策略,
Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起
tip:过期消息,对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息
Kafka 提供了专门的后台线程(Log Cleaner)定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。
消费者组的重平衡 怎么避免
重要特征:
A: 重平衡的定义: Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程
B:协调者 Coodinator 它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。
重要问题:
A:CG 如何确定为它服务的Coordinator在哪个broker上,先找分区id,再找分区的leader副本在哪个Broker上,那个broker 就是Coordinator所在的。
B:Rebalance 弊端有三点,触发时机有三点
a> 弊端
1)Rebalance 影响 Consumer 端 TPS,Rebalance 期间所有实例都不能消费消息
2)很慢
3)效率低
b>触发时机
1)组成员数量发生变化,无论增减
  • consumer端参数,session.timeout.ms 决定了 Consumer 存活性的时间间隔,默认是10秒及10000ms。建议是6s。
  • heartbeat.interval.ms 发送心跳请求频率的参数,这个值越小 心跳的越快,额外小号带宽,更加快速的知晓当前是否开启rebalance。建议2s。
  • max.poll.interval.ms 它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔,默认是5分钟。有效时间内无法消费完poll方法,返回消息,C主动发起离开,C开启新一轮Rebalance。
2)订阅主题数量发生变化
3)订阅主题的分区数发生变化
C: 粘性分区策略,0.11.0.0 版本推出了 StickyAssignor。是指每次 Rebalance 时,该策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动
位移提交
重要特征
A :Consumer 消费位移,它记录Consumer要消费的下一条消息的位移。
B:Consumer 需要向Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。Consumer 需要为分配给他的每个分区提交各自的位移数据。
C: 位移提交分为自动和手动提交,自动提交可能会出现重重复消费,可以通过减少 auto.commit.interval.ms 的值来提高提交频率,可以缩小重复消费的时间窗口,但不能完全消除。
D: poll返回的消息如果很多,可以批量提交,直接提交最新一条消息的位移。更加精细化管理,可以分批提交,加入返回500条可以每100条提交一次。
位移提交异常
CommitFailedException 是指Consumer客户端在提交位移时出现了错误或异常,并且并不可恢复的严重异常。
A:导致原因:
(1)消费者端处理的总时间超过预设的max.poll.interval.ms参数值
(2)出现一个Standalone Consumerd的独立消费者,配置的group.id重名冲突。
B :防止这个异常出现,大概有两个参数可以调整:
1 )max.poll.interval.ms=5 秒 设置大一点
2 )减少 poll 方法一次性返回的消息数量,即减少 max.poll.records 参数值
C:优化方法
1) 缩短单条消息处理的时间
2) Consumer 端允许下游系统消费一批消息的最大时长
max.poll.interval.ms 提高参数值默认是5分钟。
3) 减少下游系统一次性消费的消息总数
max.poll.records该参数规定了单次 poll 方法能够返回的消息总数的上限 默认是500。
4)下游系统使用多线程来加速消费
Standalone 和其他消费者组有相同的group.id值,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常,因为 Kafka 无法识别这个具有相同 group.id 的消费者实例,于是就向它返回一个错误,表明它不是消费者组内合法的成员。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注