欢迎访问我的博客,你的支持,是我最大的动力!

Kafka权威指南(三)

Linux 小马奔腾 74℃ 评论
目录:
[显示]

管理Kafka

管理Kafka

kafka-topics.sh 工具,可用来创建、修改、删除和查看集群里的主题
使用该工具需要 --zookeeper参数提供zk的连接字符串
# kafka大部分命令行工具是直接操作zookeeper上的元数据,并不会连接到broker上,要确保所使用的工作的版本与集群的broker版本相匹配

创建主题

必选参数:主题名,复制系数,分区数
主题名不建议使用下划线和句号,因为主题名会用在度量指标上,句号会被替换为下划线
topic.1 -> topic_1
示例:
kafka-topics.sh --zookeeper <zk_url>:2181/kafka-cluster --create --topic <string> --replication-factor <int> --partitions <int>
禁用基于机架信息的分配策略 --disable-rack-aware
忽略重复创建主题的错误 --if-not-exists

增加分区

主题基于分区进行伸缩和复制
增加分区可扩容或降低单个分区吞吐量;若在单个消费者群组内运行更多消费者,也需要增加分区数量
#一个分区只能由群组里的一个消费者读取
#对于基于键的主题,增加分区会影响键到分区之间的映射,建议开始就设置好分区数量

在使用 --alter修改主题时,若指定 --if-exists参数,可以忽略主题不存在时报错
(主题不存在时,应该创建主题,而该参数隐藏了错误,不建议使用)
示例:
kafka-topics.sh --zookeeper <zk_url>:2181/kafka-cluster --alter --topic my-topic --partitions 16

如果一定要减少分区数量,只能删除整个主题,再重新创建它

删除主题

删除主题可释放磁盘空间和文件句柄
删除主题会丢弃主题里所有数据,是不可逆的
broker的delete.topic.enable需要设置为true 若为false删除主题的请求会被忽略
示例:
kafak-topics.sh --zookeeper <zk_url>:2181/kafka-cluster --delete --topic my-topic

列出集群中所有主题

示例:
kafak-topics.sh --zookeeper <zk_url>:2181/kafka-cluster --list

列出主题详细信息

信息包含分区数、主题的覆盖配置、每个分区的副本清单
示例:
kafak-topics.sh --zookeeper <zk_url>:2181/kafka-cluster --describe
--topic 只显示特定主题的信息
--topics-with-overrides 找出所有包含覆盖配置的主题
--under-replicated-partitions 列出所有包含不同步副本的分区
--unavailable-partitions 列出所有没有首领的分区(这些分区已处于离线状态,对生产者和消费者都不可用)
kafak-topics.sh --zookeeper <zk_url>:2181/kafka-cluster --describe --under-replicated-patitions

消费者群组

保存消费者群组的位置
旧版:zookeeper
新版:broker
kafka-consumer-groups.sh 可列出消费者群组
旧版:删除消费者群组和偏移量 --zookeeper
新版: --bootstrap-server

列出并描述群组

旧版:
kafka-consumer-groups.sh --zookeeper <zk_url>:2181/kafka-cluster --list
新版:
kafka-consumer-groups.sh --new-consumer --bootstrap-server <kafka_ip>:9092/kafka-cluster --list
获取详细信息
列出群组里所有主题信息和每个分区的偏移量
用 --describe 代替 --list 并通过 --group 指定特定群组
kafka-consumer-groups.sh --zookeeper <zk_url>:2181/kafka-cluster --describe --group testgroup
输出字段含义:
# CURRENT-OFFSET 消费者群组最近提交偏移量,消费者当前读取位置
# LOG-END-OFFSET broker中当前最高偏移量
# LAG 差距 (LOG-END-OFFSET - CURRENT-OFFSET的差值)
# OWNER 消费者ID

删除群组

仅旧版才支持删除群组操作,操作前必须关闭所有消费者,否则会导致消费者不现不可预测的行为
kafak-consumer-groups.sh --zookeeper <zk_url>:2181/kafka-cluster --delete --group testgroup
# 从消费者群组中删除my-topic主题的偏移量
kafak-consumer-groups.sh --zookeeper <zk_url>:2181/kafka-cluster --delete --group testgroup --topic my-topic

偏移量管理

可用于需要跳过偏移量时
管理功能只对提交到zk的偏移量进行修改,不能修改提义到kafka broker的偏移量
1、导出偏移量到文件
kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect <zk_url>:2181/kafka-cluster --group testgroup --output-file ExportZkOffsets
2、导入偏移量(需要关闭消费者)
导入时不需要 --group 参数,因为文件中已经有该值
kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect <zk_url>:2181/kafka-cluster --input-file offsets

动态配置变更

在集群运行状态覆盖主题配置和客户端的配额参数
一旦设置完成,它们会成为集群的永久配置,并被保存在zk中,broker在启动时会读取它们

覆盖主题的默认配置

示例:
kafka-configs.sh --zookeeper <zk_url>:2181/kafka-cluster --alter --entity-type topics --entity-name <topic-name> --add-config <key>=<value>[,<key>=<value>...]
# 设置主题消息保留时间为1小时 3600000ms
kafka-configs.sh --zookeeper <zk_url>:2181/kafka-cluster --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000

覆盖客户端的默认配置

只能覆盖生产者和消费者的配额参数
若有5个broker,生产者配额是10MB/s,那么总的速率可达到50MB/s
示例:
kafka-configs.sh --zookeeper <zk_url>:2181/kafka-cluster --alter --entity-type clients --entity-name <client-id> --add-config <key>=<value>[,<key>=<value>...]
配置项:
producer_bytes_rate 单个生产者每秒可以向单个broker上生成消息的字节数
consumer_bytes_tare 单个消费者每秒钟可从单个broker上读取消息的字节数

列出被覆盖的配置

# 列出主题所有被覆盖的配置(不会显示集群默认配置)
kafka-configs.sh --zookeeper <zk_url>:2181/kafka-cluster --describe --entity-type topic --entity-name my-topic

移除被覆盖的配置

动态配置可以被移除,恢复到集群默认配置
kafka-configs.sh --zookeeper <zk_url>:2181/kafka-cluster --alter --entity-type topics --entity-name my-topic --delete-config retention.ms

分区管理

首选的首领选举

kafka将副本清单里第一个同步副本选为首领,但在关闭并重启broker后,并不会自动恢复原先首领的身份
通过触发首选的副本选举,可以让broker重新获得首领
kafka-preferred-replica-election.sh 可手动触发选举
示例:
kafka-preferred-replica-election.sh --zookeeper <zk_url>:2181/kafka-cluster
如果元数据超过节点允许的大小,默认1MB,选举会失败,此时可以使用partitions.json指定分区清单来启动副本选举
kafka-preferred-replica-election.sh --zookeeper <zk_url>:2181/kafka-cluster --path-to-json-file partitions.json
# partitions.json {"patitions":[{"partition":1,"topic":"foo"},{"partition":2,"topic":"bar"}]}

修改分区副本

适用场景
- 主题分区在集群分布不均造成负载不均
- broker离线造成分区不同步
- 新加入的broker需要从集群里获得负载

kafka-reassign-partitions.sh工具
1、根据broker清单和主题清单生成迁移步骤
2、执行迁移步骤
3、使用迁移步骤验证分区重分配的进度和完成情况(可选)
示例:
# 将主题foo,bar迁移到broker 0和broker 1上
第一步:
topics.json {"topics":[{"topic":"foo"},{"topic":"bar"}],"version":1}
kafka-reassign-partitions.sh --zookeeper <zk_url>:2181/kafka-cluster --generate --topics-to-move-json-file topics.json --broker-list 0,1
上述命令会输出两个json对象,分别是当前分区分配情况及建议的分区分配方案
将第一个json对象保存,以便在必要时进行回滚;第二个json对象保存到文件,如 reassign.json ,作为第二步的输入
第二步:
kafka-reassign-partitions.sh --zookeeper <zk_url>:2181/kafka-cluster --execute --reassignment-json-file reassign.json
第三步:
kafka-reassign-partitions.sh --zookeeper <zk_url>:2181/kafka-cluster --verify --reassignment-json-file reassign.json

分批重分配
分区重分配对集群性能有很大影响,会引起内存缓存发生变化,并占用额外的网络和磁盘资源
可以将重分配拆分成多个小步骤以降低影响

将单个broker移出集群
先关闭或者重启broker,这样该broker就不再是任何一个分区的首领,它上面的分区就可以被分配给集群里其他broker

修改复制系数

在分区重分配执行步骤中修改json文件,调整replicas列表的内容,可以调整复制系数

转储日志片段

读取日志片段
# 显示消息概要信息
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000052368601.log
# 显示消息数据内容
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000052368601.log --print-data-log
# 验证日志片段索引文件的正确性
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000052368601.index,00000000000052368601.log --index-sanity-check
# --index-sanity-check 检查无用的索引;--verify-index-only 检查索引匹配度

副本验证

kafka-replica-verification.sh 验证集群分区副本的一致性
副本验证会对集群造成影响,它需要读取所有消息,并且读取过程是并行进行的,使用时要小心
# 对broker 1和2上以 my- 开头的主题副本进行验证
kafka-replica-verification.sh --broker-list x.x.x.x:9092,x.x.x.x:9092 --topic-white-list 'my-.*'

消费和生产

手动读取和生成消息
kafka-console-consumer.sh kafka-console-producer.sh

控制台消费者

kafka-console-consumer.sh 可以从一个或多个主题上读取消息,并打印到标准输出

指定集群:
--zookeeper xxxx:2181/kafka-cluster
--new-consumer --broker-list xxxx:9092,xxxx:9092
指定主题:
--topic
--whitelist 正则
--blacklist 正则
指定参数:
--consumer.config <configfile>
--consumer-property KEY=VALUE
--property 向消息格式化器传递配置信息
其他常用配置:
--formatter CLASSNAME 指定消息格式化器类名,用于解码消息,默认 kafka.tools.DefaultFormatter
可选:kafka.tools.LoggingMessageFormatter 将消息输出到日志;kafka.tools.ChecksumMessageFormatter 只打印校验和;kafka.tools.NoOpMessageFormatter 不打印消息;
--from-beginning 从最旧的偏移量开始读取
--max-messages NUM 最多读取多少个消息
--partition NUM 指定只读取ID为NUM的分区(新版消费者)

旧版:
kafka-console-consumer.sh --zookeeper xxxx:2181/kafka-cluster --topic my-topic

读取偏移量主题
__consumer_offsets主题,了解某个群组提交的偏移量和偏移量提交的频度
需要使用 kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter 这个格式化器

示例:
kafka-console-consumer.sh --zookeeper xxxx:2181/kafka-cluster --topic __consumer_offsets --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter' --max-messages 1

控制台生产者

kafka-console-producer.sh 用于向kafka主题写入消息
默认一行一个消息,消息的键和值以tab分隔,若没有tab则键为null
参数:
--producer.config <configfile> 指定消费者配置文件
--producer-property KEY=VALUE
--key-serializer CLASSNAME 指定消息键编码器,默认 kafka.serializer.DefaultEncoder
--value-serializer CLASSNAME 指定消息值编码器,默认 kafka.serializer.DefaultEncoder
--compression-codec str 指定压缩方式,none,gzip,snappy,lz4 默认gzip
--sync 以同步的方式生成消息

示例:
kafka-console-producer.sh --broker-list xxxx:9092,xxxx:9092 --topic my-topic

客户端ACL

kafka-acls.sh 处理与客户端访问控制相关的问题

不安全的操作

在紧急情况下使用,一般不建议执行

移动集群控制器

集群控制器仅有一个,运行于某个broker上
控制器会将自己注册到zk的一个节点上,该节点位于集群路径最顶层,名为/controller
手动删除该节点会释放当前控制器,集群会进行新的控制器选举

取消分区重分配

分区重分配的流程:
1、发起重分配请求(创建zk节点)
2、集群控制器将分区添加到broker上
3、新的broker开始复制分区,直到副本达到同步状态
4、集群控制器从分区副本清单里移除旧的broker

当重分配进行到一半时,若broker发生故障并无法立即重启,这会导致重分配过程无法结束,进而妨碍其他重分配任务的进行
此时,可以取消分区重分配

取消步骤:
1、从zk上删除 /admin/reassign_partitions节点
2、重新选举控制器

注意:取消重分配后,旧的broker不会从副本清单里移除,分区复制系数会比正常的大;若主题分区包含不一致的复制系数,broker是不允许对其进行操作的(如增加分区)

移除待删除的主题

主题的删除是通过在zk的/admin/delete_topic节点下创建一个以待删除主题名命名的子节点实现的
删除这个子节点(不要删除/admin/delete_topic父节点)即移除了被挂起的删除请求

手动删除主题

当集群禁用了主题删除功能,可进行手动删除,这要求关闭集群所有broker
注意:在集群运行时修改zk里元数据很危险,会造成集群不稳定

操作步骤:
1、关闭集群所有broker
2、删除zk的/brokers/topics/<topic-name> 注意需要先删除其下的子节点
3、删除每个broker的分区目录
4、重启所有broker

监控Kafka

Kafka提供的所有度量指标都可以通过 JMX 接口进行访问
可使用代理读取kafka的jmx指标并上传到监控系统

zk中/brokers/ids/<ID> 节点包含了json格式的broker信息,里面也包括jmx_port

broker的度量指标

kafka的监控和告警不能依赖kafka本身(针对使用kafka作为监控数据通道的监控系统)

非同步分区

如果broker只有一个可监控指标,一定是非同步分区的数量
JMX MBean -> kafka.server:type=ReplicaManager,name=UnderReplicatedPartitionis

若集群里多个broker的非同步分区数量一直保持不变,说明集群中某个broker已经离线了
若集群非同步分区数量是波动的,或数量稳定但并没有broker离线,说明集群出现性能问题

首领自动再均衡功能建议启用,否则需要手动运行默认的副本选举

# 列出集群非同步分区
kafka-topics.sh --zookeeper xxxx:2181/kafka-cluster --describe --under-replicated

1、集群级别的问题
- 不均衡的负载
- 资源过度消耗
诊断问题需要用到如下指标
- 分区数量
- 首领分区数量
- 主题流入速率
- 主题流出速率
检查每个broker分担的以上负载是否均衡,如果不均衡,需要将负载较重的broker分区移动到负载较轻的broker上
可以使用 kafka-reassign-partitions.sh工具
辅助工具:kafka-assigner工具https://github.com/linkedin/kafka-tools

集群容量瓶颈
- CPU使用
- 网络流入吞吐量
- 网络流出吞吐量
- 磁盘平均等待时间
- 磁盘使用百分比
上述任何一种资源出现过度消耗,都会表现为分区的不同步

All Topics Bytes In Rate最适合用于显示集群的使用情况

2、主机级别的问题
- 硬件问题
- 进程冲突
- 本地配置的不一致
比较常见的问题是磁盘故障,生产者的性能与磁盘的写入速度有直接关系

broker度量指标

活跃控制器数量
该指标表示当前broker是否是集群控制器,1表示是
任何时候,集群正常只会有1个控制器
JMX MBean kafka.controller:type=KafkaController,name=ActiveControllerCount

请求处理器空闲率
处理客户端请求的现两个线程池:
- 网络处理器线程池,负责通过网络读写数据,通常这些线程不会出问题
- 请求处理器线程池,负责处理来自客户端的请求,包括从磁盘读取和写入消息,broker负载增大时对其有很大影响
处理器线程数没必要超过cpu核数

JMX MBean kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent
数值越低,说明broker负载越高;低于20%时,说明存在潜在问题;低于10%说明出现性能问题

主题流入字节
生产者消息流量
可以判断是否应该对集群扩展以及是否存在不均衡问题
JMX MBean kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec

主题流出字节
消费者从broker读取消息的速率,包括复制消费者
JMX MBean kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec

主题流入的消息
消息个数,不考虑消息大小
JMX MBean kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
注意,没有消息流出指标,因为发送出去的是批次,broker并不知道发送了多少条消息,但提供一个指标 每秒获取次数(请求速率)

分区数量
分配给broker的分区总数
JMX MBean kafka.server:type=ReplicaManager,name=PartitionCount

首领数量
broker拥有的首领分区数量,在整个集群内,各个broker上该值应该要保持均等
若出现不均,可能需要运行默认的副本选举,重新均衡集群的物首领
JMX MBean kafka.server:type=ReplicaManager,name=LeaderCount

离线分区
该指标只能由集群控制器提供,表示集群中没有首领分区的数量
离线分区会影响生产者,导致消息丢失、或造成回压
JMX MBean kafka.controller:type=KafkaController,name=OfflinePartitionsCount

请求度量指标
每种请求类型都有8个度量指标,分别体现不同请求处理阶段的细节
- RequestsPerSecond,[重要]速率指标
- Total Time,[重要]处理请求的时间,从收到请求始,到发送完响应止
- Request Queue Time,请求停留在队列的时间,从收到请求始,至开始处理请求止
- Local Time,首领分区花在处理请求上的时间,包括把消息写入磁盘(不一定冲刷)
- Remote Time,请求处理完毕前,等待跟随者的时间
- Throttle Time,搁置响应的时间,用于拖慢请求者,限定在客户端配额范围内
- Response Queue Time,响应发送前停留在队列里的时间
- Response Send Time,发送响应的时间
- Mean 所有请求的平均值
- StdDev 整体请求时间标准偏差
- 百分位[重要] 99% 99.9%

Produce请求99.9%百分位数值快速增长说明出现大规模的性能问题
Consumer的时间会受各种因素影响

主题和分区的度量指标

主题的度量指标可用于识别出造成集群流量增长的主题

1、主题实例的度量指标
指标会基于特定的主题
2、分区实例的度量指标
基于特定的分区
Partition size表示分区当前在磁盘上保留的数据量,如果把它们组合起来,可以表示单个主题保留的数据量
同一个主题若不同分区间数据量存在差异,说明消息分布不均
Log segment count表示保存在磁盘上的日志片段文件数量

Java虚拟机监控

垃圾回收
若使用的G1垃圾回收器,需要监控的指标如下:
Full GC cycles java.lang:type=GarbageCollector,name=G1 Old Generation
Yong GC cycles java.lang:type=GarbageCollector,name=G1 Young Generation
- CollectionCount 自启动以来的垃圾回收次数
- CollectionTime 自启动以来的垃圾回收时间,ms

Java操作系统监控
java.lang:type=OperatingSystem 提供操作系统信息
- MaxFileDescriptorCount JVM能打开的文件句柄数量的最大值
- OpenFileDescriptorCount 目前已经打开的文件句柄数量

操作系统监控

平均负载是对指等待处理器执行的线程数,包括不可中断睡眠状态的线程(如磁盘等待)

日志

可以使用两种日志:
1、 kafka.controller INFO级别,记录集群管制器信息
会记录主题创建和修改、broker状态变更、集群活动、副本选举和分区移动等
2、kafka.server.ClientQuotaManager INFO级别,记录与生产和消费配额活动相关的信息

调试日志
kafka.request.logger 包含发送给broker的每一个请求的详细信息 (日志量很大)

压缩线程状态日志
kafka.log.LogCleaner kafka.log.Cleaner kafka.log.LogCleanerManager (日志量不大)

客户端监控

监控官方的Java客户端

生产者/消费者度量指标

生产者整体度量指标
- record-err-rate 若值大于0表明生产者正在丢弃无法发送的消息(消息达到重试上限后,会被丢弃)
- request-latency-avg 生产者发送一个请求到broker所需要的平均时间,若值变大,说明生产者正在变慢
- outgoing-byte-rate 每秒消息字节数
- record-send-rate t每秒消息数量
- request-rate 每秒发送给broker的请求数
- record-queue-time-avg 消息在发送给kafka之前在生产者客户端等待的时间

消费者度量指标
- fetch-latency-avg 消费者向broker发送请求需要的时间
- bytes-consumed-rate records-consumed-rate 读取消息的字节数和消息个数
消费者群组协调器
- sync-time-avg 同步活动所使用的时间
- sync-rate 同步的频率,通常是0
- commit-latency-avg 提交偏移量所需的平均时间
- assigned-partitions 分配给消费者单个实例的分区数量,可用于判断群组负载是否均衡

配额
kafka可对客户端请求进行限流,防止客户端拖垮整个集群
可对每秒单个端对单个broker的流量字节数进行限制
消费者:kafka.consumer:type=consumer-fetch-manager-metrics,client-id=<ID> 的属性 fetch-throttle-time-avg
生产者:kafka.producer:type=producer-metrics,client-id=<ID> 的属性 produce-throttle-time-avg

延时监控

对消费者,最重要的就是消费延时
即分区最后一个消息和消费者最后读取的消息之间相差的消息个数
使用外部监控比使用客户端自己的监控要好得多

可以使用 Burrow 工具
可用于监控

p201

 

转载请注明:轻风博客 » Kafka权威指南(三)

喜欢 (0)or分享 (0)