欢迎访问我的博客,你的支持,是我最大的动力!

Kafka权威指南(一)生产者/消费者/术语

Linux 小马奔腾 284℃ 评论
目录:
[显示]

优秀资源

Kafka 集群在马蜂窝大数据平台的优化与应用扩展

Kafka架构原理,也就这么回事

虚位以待...

安装Kafka

可以保证消息在单个分区内是顺序

紧凑型日志主题只为每个键保留一个变更数据,可长时间使用,无需担心过期问题

1、Java 运行时环境
2、安装Zookeeper 保存集群元数据信息和消费者信息 如 3.4.6版本
zk启动后的验证:
telnet localhost 2181
srvr (打印状态信息)
zoo集群不建议超过7个节点,节点个数需为奇数个

conf/zoo.cfg 配置文件说明
tickTime=2000 #2秒
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20 #从节点与主节点间建立初始化连接的时间上限,tickTime的倍数 20*2000=40s
syncLimit=5 #从节点与主节点处于不同步状态的时间上限,tickTime的倍数 5*2000=10s
#serer.X=hostname:peerPort:leaderPort
server.1=zoo1.example.com:2888:3888
#X 服务器id 数据目录下 myid 文件内的数字
#peerPort 节点间通信的端口 2888
#leaderPort 首领选举的端口 3888

3、安装kafka broker
kafka_2.11-0.9.0.1.tgz kafka版本0.9.0.1 Scala版本2.11
bin/kafka-server-start.sh -daemon
conf/server.properties

4、验证
# 创建主题
kafka/bin/kafka-topics.sh --create --zookerper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 查看主是
kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
# 发布消息
kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 读取消息
kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

broker配置

broker.id 标识符,默认值是0,可设置为任意整数,唯一
port 默认9092
zookeeper.connect 指定zoo的地址 hostname:port/path hostname:port服务器组使用分号分隔,其中path是kafka集群的chroot环境,可自动创建,不指定则使用根路径
log.dirs 一组用逗号分隔的路径,存放日志数据;指定多个目录时,会往拥有最少数目分区的目录新增分区,而不会考虑磁盘空间
num.recovery.threads.per.data.dir 线程池大小,默认每个日志目录只使用一个线程,可设置多个,以达到并发处理的目的
这些线程只有在服务启动和关闭及崩溃后重启时用到;若设置值为8,log.dir指定了3个目录,则总共需要24个线程
auto.create.topics.enable true/false 是否允许自动创建主题

主题的默认配置
num.partitions 默认分区数,注意,主题一但创建,分区数可增加,但不可减小
如果消息是按键写入分区的,那么为已有主题增分区会很困难
可以按生产者吞吐量和消费者吞吐量估算分区数
经验:把分区大小限制在25GB以内,可得到比较理想的效果
log.retention.ms 决定消息多久被删除 (若使用管理工具在服务器间移动了分区,最后修改时间会不准确) 24h=86400000
log.retention.bytes 单个分区的最大大小
log.segment.bytes 单个日志片段的大小 (日志片段被关闭之前是不会过期的)
对于消息量比较小的主题,设置日志片段大小需要注意,因为过期只有在日志片段被关闭才开始计算,而填满一个日志片段可能需要很长时间
log.segment.ms 设置日志片段多长时间后需要关闭,默认为空,即仅按日志片段大小来关闭
message.max.bytes 单个消息大小,默认是1MB (1000000) 注意这指的是消息压缩后的大小,而不是消息实际的大小
这个值需要与客户端的 fetch.message.max.bytes 相一致,否则会导致大消息不能消费造成消费者阻塞

硬件的选择

- 磁盘吞吐量 影响生产者
- 磁盘容量
- 内存 影响消费者,通常消费都都紧追在生产者后,即数据是在内存中的,而不是在磁盘中获取;kafka需要较大的页面缓存
- 网络
- CPU 要救相对较低,主要用于消息的批量解压设置偏移量然后再批量压缩

Kafka集群

把一个broker加入到集群,需要配置两个地方:
- 配置相同的zookeeper.connect
- 为broker.id设置一个唯一的值

操作系统调优

/etc/sysctl.conf
1、虚拟内存
对交换分区和处理方式和内存脏页进行调整
vm.swappiness 参数设置小一点,如1 (设置为0会完全禁用内存交换)
vm.dirty_background_ratio 设置小于10的值 不应该设置为0(会频繁刷新脏页)
vm.dirty_ratio 增加被内核进程刷新到磁盘前的脏页数量 可设置为大于20的值 如 60~80
若调高了该值,会有一些风险,建议启用kafka的复制功能,避免因系统崩溃造成数据丢失
检查当前脏页数量 cat /proc/vmstat | egrep "dirty|writeback"
2、磁盘
建议使用xfs文件系统
对挂载点设置 noatime 参数,因为该参数对kafka没什么用 atime -> 最后访问时间
3、网络
对大流量的支持
对分配给socket读写缓冲区的内存大小作调整
net.core.wmem_default 131072 (128KB)
net.core.rmem_default 131072 (128KB)
net.core.wmem_max 2097152 (2MB)
net.core.rmem_max 2097152 (2MB)
TCP socket读写缓冲区
net.ipv4.tcp_wmem 4096 65536 2048000 最小值4KB 默认值64KB 最大值2MB
net.ipv4.tcp_rmem 4096 65536 2048000 最小值4KB 默认值64KB 最大值2MB
net.ipv4.tcp_window_scaling 1 #启用TCP时间窗口扩展,可提升客户端传输数据的效率
net.ipv4.tcp_max_syn_backlog 比默认值1024更大的值,可接受更多的并发连接
net.core.netdev_max_backlog 比默认值1000更大的值,有助于应对网络流量的爆发

生产环境的注意事项

垃圾回收器选项
使用G1垃圾回收器
MaxGCPauseMillis 每次垃圾回收的默认停顿时间,不是固定的,默认值是 200ms
InitiatingHeapOccupancyPercent 启动新一轮垃圾回收前可使用的堆内存百分比,默认值45,包括新生代和老年代
对64G的服务器,堆内存为5G,参考配置:MaxGCPauseMillis 20ms; InitiatingHeapOccupancyPercent 35

Kafka启动脚本并没有启用G1,而是使用Parallel New和CMS
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true"

部署kafka集群时,需要把broker安放在不同的机架/区域

Kafka 对zookeeper的写操作不多,因此可以共享zookeeper集群,每个kafka集群使用一个chroot路径 (不推荐这么做)
# kafka 0.9.0.0 以前,消息者会向zookeeper写数据,保存消费者群组信息、主题信息、消费分区的偏移量;到0.9.0.0版本,允许broker直接维护这些信息
# 消费者可选将偏移量提交到zookeeper还是kafka,提交时间间隔默认是一分钟,这对zookeeper来说会造成一定压力

生产者-向Kafka写入数据

ProducerRecord对象包含目标主题和要发送的内容,还可以指定键或分区
经过分区器选定好分区后,记录会被添加到一个记录批次里(该批次将发往同一个主题和分区上)
服务器收到消息后,若成功,会返回一个RecordMetaData对象,包含主题、分区和记录的偏移量;若失败会重试,多次后返回错误信息

创建生产者
必选属性:
- bootstrap.servers broker地址清单,至少要现两台broker
- key.serializer 序列化方式,序列化器,ByteArraySerializer/StringSerializer/IntegerSerializer等
- value.serializer 序列化器

发送消息的方式
- fire-and-forget 发送并忘记,不关心消息是否到达,有时会丢失一些消息
- 同步发送 可以知道消息是否发送成功 send -> get
- 异步发送 指定回调函数

对生产者内存使用、性能和可靠性方面有影响的参数
- acks 必须多少个分区副本收到消息,生产者才会认为消息写入是成功的,对消息丢失的可能性有影响
0 不等待服务器响应,可以达到很高的吞吐量
1 只要首领节点收到消息,即认为发送成功。如果一个没有收到消息的节点成为新首领,消息会丢失
all 所有参与复制的节点全部收到消息时,才认为发送成功,最安全,延迟最高
- buffer.memory 生产者内存缓冲区大小,缓冲要发送以服务器的消息
- compression.type 消息压缩方式,snappy/gzip/lz4 snappy占用cpu较小,压缩比也不错
- retries 重发次数,默认重试等待时间为 100ms 由 retry.backoff.ms 定义
- batch.size 批次大小,按字节数计算而不是消息条数 由于并不一定是批次满了才发送,所以批量设置得比较大并不会造成延迟
- linger.ms 批次的发送间隔时间,增大会增加延迟但也会提升吞吐量
- client.id 任意字符串,识别消息来源
- max.in.flight.requests.per.connection 在收到服务器响应前可以发送多少个消息,设置为1可以保证消息是按发送的顺序写入服务器的,即使发生了重试
- timeout.ms 等待同步副本返回消息确诊的时间
- request.timeout.ms 发送数据时等待服务器响应的时间
- metadata.fetch.timeout.ms 获取元数据时等待服务器响应的时间
- max.block.ms 获取元数据时的阻塞时间
- max.request.size 发送请求的大小 (注意,broker对接收消息的最大值也有限制 message.max.bytes)
- receive.buffer.bytes send.buffer.bytes TCPsocket接收和发送数据包的缓冲区大小,-1则使用操作系统默认值

顺序保证
kafka可保证同一个分区的消息是有序的,即生产者写入的顺序与消费都消费的顺序相同
retries非零,max.in.flight.requests.per.connection 大于1时,会因重试而导致顺序偏差
retries=0,max.in.flight.requests.per.connection=1 可保证就算有重试,消息也顺序到达broker,但吞吐量会很低

序列化器
JSON、Avro、Thrift、Protobuf
Avro通过与语言无关的schema定义,schema通过JSON描述,数据被序列化成二进制文件;shema一般会内嵌在数据文件里
Avro有个特性,当负责写消息的应用程序使用新的schema,负责读消息的应用程序可以继续处理消息而无需改动
schema注册表

分区
ProducerRecord对象可以只包含目标主题和值,键可以设置为默认null
键可以作为分区依据

如果键不为空,且使用了默认的分区器,kafka会对键进行散列,然后根据散列值把消息映射到特定分区上
增加分区会打散原来的映射

实现自定义分区策略

消费者-从kafka读取数据

消费者和消费者群组
一个群组里的消费者订阅同一个主题,每个消费者接收主题一部分分区的消息

如果一个群组中消息者数量超过分区数量,有一部分消费者会闲置,不会接收到任何消息

当一个消费者被关闭或者崩溃时,会离开群组,原本由它读取的分区将由群组里其他消费者读取
增加分区数时,会发生分区重分配

再均衡,指分区所有权从一个消费者转移到另一个消费者
再均衡期间,消费者无法读取消息,会造成整个群组短时间的不可用
当分区重新分配给另一个消费者时,消费者当前读取状态会丢失,需要刷新缓存

消费者向被指派为群组协调器的broker发送心跳,以维持群组从属关系和对分区的所有权关系

创建kafka消费者
1、创建一个KafkaConsumer对象
必要属性:
- bootstrap.servers
- key.deserializer
- value.deserializer
- group.id 消费者所属的群组
2、订阅主题
可同时订阅多个主题,支持正则表达式
3、消息轮询

线程安全
在同一个群组里,无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者
通常是一个线程运行一个消费者

消费者的配置

与消费者性能和可用性相关的参数:
- fetch.min.bytes 获取记录的最小字节数,如果新消息不足,服务端会延迟发送数据,可适当调大该值以降低负载
- fetch.max.wait.ms 指定broker的等待时间,默认 500ms
- max.partition.fetch.bytes 从每个分区接收数据的最大值,默认 1MB 该值必须比broker的 max.message.size 大
- session.timeout.ms 会话超时时间,默认3s 超过该时间没有发送心跳,即认为消费者已离线
增大该值可减少意外的再均衡
session.timeout.ms 必须比 heartbeat.interval.ms 大,通常为3倍
- auto.offset.reset 默认值是latest 在偏移量无效时,从哪个位置获取消息;earliest 从起始位置
- enable.auto.commit 默认值true 是否正动提交偏移量
设置为false,可自己控制,能一定程序上避免出现重复数据和数据丢失
设置为true时,配合 auto.commit.interval.ms 可控制自动提交频率
- partition.assignment.strategy 分区分配策略
Range 把主题的若干个连续分区分配给消费者 分区数不是消费者整数倍时,会出现分配不均
RoundRobin 逐个分配,比较平均
- client.id 可为任意字符串,标识客户端
- max.poll.records 单次返回的记录数量
- receive.buffer.bytes send.buffer.bytes TCPsocket的缓冲区大小,-1将使用系统默认值

提交和偏移量
更新分区当前位置的操作叫提交
消费者向 _consumer_offset 的主题发送消息,消息包含每个分区的偏移量
偏移量用于再均衡后新的消费者接管消费
若提交的偏移量小于客户端处理的最后一个消息的偏移量,这两个偏移量之间的消息会被重复处理
若提交的偏移量大于客户端处理的最后一个消息的偏移量,这两个偏移量之间的消息会丢失 (提交早了,还没处理完)

提交和偏移量
更新分区当前位置的操作叫提交
消费者向 _consumer_offset 的主题发送消息,消息包含每个分区的偏移量
偏移量用于再均衡后新的消费者接管消费
若提交的偏移量小于客户端处理的最后一个消息的偏移量,这两个偏移量之间的消息会被重复处理
若提交的偏移量大于客户端处理的最后一个消息的偏移量,这两个偏移量之间的消息会丢失 (提交早了,还没处理完)

再均衡监听器
若发生再均衡,可以在即将失去分区所有权时提交偏移量,提交的是最近处理过的偏移量,不是批次的偏移量

从特定偏移量处开始处理记录
向后回退几个消息、向前跳过几个消息
# 把记录和偏移量写到同一个外部系统实现单次语义

反序列化器
kafka中传统的是字节数组,需要反序列化
序列化器和反序列化器需要一一匹配

独立消费者
没有群组的消费者
一个消费者从一个主题的所有分区或某个特定的分区读取数据 (不需要消费群组和再均衡)
# 一个消费者为自己分配分区并从分区里读取消息 (注意,在新增分区后,消费者无法感知)

深入Kafka

集群成员关系
通过zookeeper维护 /brokers/ids

控制器
负责分区首领的选举,在节点加入或离开集群时进行分区首领选举
也是一个broker 但一个集群中只有一个节点是控制器

复制
副本类型:
- 首领副本 所有生产者请求和消费者请求都会经过这个副本,监控哪个跟随者与自己状态一致
- 跟随者副本 不处理来自客户端的请求,从首领复制信息,若首领崩溃则其中一个提升为首领

跟随者若在10s内没有请求首领获取最新消息,则认为不同步 replica.lag.time.max.ms
只有同步的副本才可能被选举为新首领

当前首领和首选首领
第一个副本是首选首领 若第一个副本不是当前首领,若 auto.leader.rebalance.enable=true,则会触发首领选举,让首选首领成为当前首领

处理请求
生产请求和获取请求都必须发送给分区的首领副本
kafka客户端要自己负责把生产请求和获取请求发送到正确的broker上
元数据请求:客户端知道往哪里发送请求的依据,包含客户端感兴趣的主题列表,返回的响应包含这些主题的分区、每个分区的副本、哪个副本是首领副本
元数据请求可发往任意一个broker
客户端缓存的元数据刷新时间间隔 metadata.max.age.mx

生产请求
消息写入分区首领后,broker开始检查acks参数,若是0、1则立即返回响应;若是all,请求会被保存到炼狱缓冲区,直到所有跟随者副本都复制了消息,响应才返回客户端

获取请求
若请求的偏移量不存在,会返回一个错误
客户端可指定获取消息的上下限及超时时间
并不是所有保存到分区首领的数据都可以被客户端读取,只能读取到已经被写入所有同步副本的消息
replica.lag.time.max.ms 副本在复制消息时可被允许的最大延迟时间

其他请求
更新当前偏移量、创建主题等

在升级客户端之前先升级broker,因为新的broker知道如何处理旧的请求,反过来则不然

物理存储
kafka的基本存储单元是分区
分区大小受到单个挂载点可用空间的限制

分区分配
有时需要考虑机架
新的分区总是被添加到数量最小的目录里 (不会考虑占用空间和负载等因素)

文件管理
默认每个日志片段包含1GB或一周的数据,以较小者为准
当前正在写入数据的片段是活跃片段,活跃片段永远不会被删除
broker会为分区里每个片段打开一个文件句柄,哪怕片段是不活跃的 所以操作系统需要适当调整能打开文件句柄数量

文件格式
kafka把消息和偏移量保存在文件中
保存在磁盘上的数据格式与从生产者发送来的消息格式是一样的
除键、值、偏移量外,消息还包括消息大小、校验和、消息格式版本号、压缩算法、时间戳
如果生产者发送的是压缩过的消息,同一批次的消息会被压缩在一起

bin/kafka-run-class.sh kafka.tools.DumpLogSements #查看片段内容,显示每个消息的偏移量、校验和、消息大小、压缩算法等
--deep-iteration 显示被压缩到包装消息里的消息

索引
把偏移量映射到片段文件和偏移量在文件里的位置
如有必要,管理员可以删除索,这是安全的,kafka会自动重新生成索引

清理 compact
启用清理功能 log.cleaner.enabled
清理掉片段中不需要的消息,即同一个键对应的旧的值,如果消息没有键,清理功能会失效,不能对没有键的消息进行清理
compact 策略不会对当前片段进行清理

被删除的事件
当需要删除某个特定的键时,应用程序需要发送包含该键且值为null的消息
清理线程发现该消息后,会删除该键对应的所有消息,但会保留值为null的消息一段时间,该消息称为 墓碑消息
墓碑消息 会保留一段时间,以通知消息者,需要删除该键对应的所有值

 

转载请注明:轻风博客 » Kafka权威指南(一)生产者/消费者/术语

喜欢 (0)or分享 (0)