Kafka 手札(4)-重设消费者位移

重设消费者位移
A 消息重演
RabbimitMQ 或者ActiveMQ 这样的传统消息中间件,处理和响应消息的方式是破坏性的(destructive)即一旦消息被成功处理,就会被从 Broker 上删除。
kafka 基于日志控制的log_based,消费者在消费消息时,仅仅从磁盘文件上读取数据而已,是只读操作,因此消费者不会删除消息数据。
选型:
传统的消息中间件:消息处理逻辑非常复杂,处理代价很高,同时你又不关心消息之间的顺序。
Kafka: 如果你的场景需要较高的吞吐量,但每条消息的处理时间很短,同时你又很在意消息的顺序。
B 重设位移
类型
1) 位移维度
根据位移值来重设,直接把消费者的位移值重设成我们给定的位移值。
2) 时间维度
我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;
也可以给出一段时间间隔,比如30分钟前,然后让消费者直接将位移调回30分钟之前的位移值。
7种重设策略
应用场景:
a> Earliest : 重新消费主题的所有消息
b> Latest: 想要跳过所有历史消息,打算从最新的消息处开始消费的时候。
c> Current: 表示将位移调整成消费者当前提交的最新位移。尤其是当你回滚了代码的时候
d> Specified-Offset:消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理
e> Shift-By-N 策略指定的就是位移的相对数值,可以往前也可以往后跳,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100。
时间维度策略
a> DateTime 允许指定一个时间,然后将位移重置到该时间之后的最早位移处。常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点。
b> Duration 指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。 以P表示开始标记。(2016-03-12T 00:00:00)。如果想调回15分钟前,那么你就可以指定PT0H15M0S
设置方式
    • 通过消费者API 来实现
    • 通过kafka-consumer-groups 命令行脚本来实现
命令行方式设置
1)bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
2)bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
3)bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
4) bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
5) bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute
6) bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2016-03-12T20:00:00.000 --execute
7) bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute
常见脚本简短说明
kafka-producer-perf-test
kafka-consumer-perf-test
它们分别是生产者和消费者的性能测试工具
kafka-delegation-tokens 它是管理 Delegation Token 的。基于 Delegation Token 的认证是一种轻量级的认证机制,补充了现有的 SASL 认证机制
kafka-delete-records 脚本用于删除 Kafka 的分区消息
kafka-dump-log 它能查看 Kafka 消息文件的内容,包括消息的各种元数据信息,甚至是消息体本身
kafka-log-dirs 脚本是比较新的脚本,可以帮助查询各个 Broker 上的各个日志路径的磁盘占用情况。
kafka-mirror-maker 脚本是帮助你实现 Kafka 集群间的消息同步的
kafka-preferred-replica-election 脚本是执行 Preferred Leader 选举的。它可以为指定的主题执行“换 Leader”的操作
kafka-reassign-partitions 脚本用于执行分区副本迁移以及副本文件路径迁移。
kafka-server-start 和 kafka-server-stop 启动和停止 Kafka Broker 进程的。
重点脚本
a> 生产消息:
启动和停止 Kafka Broker 进程的。
$ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4
b> 消费消息
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property enable.auto.commit=false
如果没有消息 该命令输出为空
c> 性能测试
$ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th。
d: 查看主题消息总数
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic
test-topic:0:0
test-topic:1:0
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topic
test-topic:0:5500000
test-topic:1:5500000
将两者的差值累加起来,就能得到该主题当前总的消息数 1100万条
e:查看消息文件数据
$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true
baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true
f: 查看消息内容
$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration --print-data-log
g: 查询消费者组位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注