Kafka Cheat Sheet
Kafka 基本概念
Kafka 为数据生态系统带来了循环系统,在基础设施的各个组件之间传递消息以解耦生产者和消费者,并为客户端提供一致的接口便于集成。
根据业务需要可以动态增删组件,常用于活动跟踪、消息传递、度量指标和日志记录、提交日志、流处理等。
工作模型
Kafka 支持两种工作模型:点对点模型(P2P)模型 与 发布 - 订阅(Pub - Sub)模型。
点对点模型
即消息队列模型,系统 A 发送得消息只能被系统 B 接收。Kafka 通过消费者组实现点对点模型:多个消费者实例共同组成一个组消费一组主题。这组主题中的每个分区都只会被组内的一个消费者实例消费。
发布 - 订阅(Pub - Sub)模型
区别于点对点模型,它有主题(Topic)的概念,即逻辑消息容器。
发布者把消息分类,接收者对特定类型的消息进行订阅以便接收。
对于多组发布者 - 接收者,引入一或多个中间人解耦,成为多个独立的发布订阅系统。
角色(Role)
在 Kafka 的工作模型中,分为 生产者(Producer)、中间人(Broker)、消费者(Consumer) 三种角色。
生产者(Producer)
生产者创建消息,默认下把消息分布到所有分区上,但也可以根据键和分区器直接写到指定分区上。
中间人(Broker)
表示一台独立的 Kafka 服务器,需要处理以下请求:
客户端请求
生产者:接收生产者消息,为消息设置偏移量,并提交消息到磁盘保存。
消费者:响应读取分区的请求,返回已经提交到磁盘的消息。
分区副本请求
控制器发送给分区首领的请求
持久化:Broker 会在一定期限内保存消息以及读取进度(偏移量),允许消费者非实时读取。策略是保存一段时间或一定大小(达到期限或上限后清理),可以针对主题配置。
伸缩性:Broker 可以轻松扩展以提高性能,不会影响整体可用性。
数据写入机制:
顺序写:Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个消息物理文件:只能追加写(Append-only),避免了缓慢的随机 I/O 操作,实现高吞吐量。
定期删除:日志段(Log Segment)机制。在 Kafka 底层,一个日志细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存。在后台有定时任务检查老的日志段是否能够被删除,回收磁盘空间。
消费者(Consumer)
消费者订阅主题消息,并按照生成顺序读取,通过检查偏移量(保存在 Zookeeper 或 Kafka 中持久化)判断消息是否已读取。
支持多个消费者,消息可被多个消费者读取,之间互不影响;同时支持分组,消费者组内的共同读取一个主题,每个分区只能被一个消费者读取,即消费者对分区的所有权关系:目的是提升消费者端的吞吐量(TPS)。
消费者通过消费者位移记录自己在分区的消费进度。
消息架构
Kafka 是三层消息架构:
主题层,可以配置 M 个分区,每个分区可以配置 N 个副本。
分区层,每个分区的 N 个副本中只能有一个领导者,对外提供服务;N-1 个追随者副本,提供数据冗余之用。
消息层,分区中包含若干条消息,每条消息的位移从 0 开始依次递增。客户端只能与分区的领导者副本进行交互。
主题(Topic)
消息通过主题分类,类比数据库表。
一个主题可以创建多个分区,需权衡吞吐量、内存及 Leader 副本选举所需时间。
分区(Partition)
一个主题的消息可以分为若干分区,一个分区就是一个顺序追加的提交日志。
分区内消息有序,但多个分区下同一主题消息不能确保整体有序。
通过分区实现数据冗余和伸缩性,由于主题的每个分区可以分布在不同服务器上,可以提供更高的性能。
分区偏移量:分区元数据,不断递增的整数值,在创建消息时添加,在给定的分区内是唯一的,确保消息不被重读。区别于消费者位移(指示该消费者的消费进度),一旦消息写入分区,位移就不再变化。
副本是在分区的层级定义的,分区下可配置若干个副本:1 个 Leader、N-1 个 Follower。
副本(Replica)
副本是 Kafka 的备份机制:把相同的数据拷贝到多台机器上。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
领导者(Leader):对外提供服务,即与客户端程序进行交互。
追随者(Follower):只与 Leader 交互,异步拉取消息写入自己的提交日志中,不对外提供服务。
模式(Schema)
使用额外的结构定义消息内容,使之易于理解。
前提是要确保数据格式的一致性,否则容易耦合,升级时需要同时处理两种数据格式:
JSON、XML:易用且可读性好,但缺乏强类型处理能力,不同版本之间兼容性也不好;
Avro:紧凑的序列化格式,模式和消息体分开,因此模式变化不需要重新生成代码;支持强类型和模式进化,能前后兼容。
消息批次(Batch)
Kafka 的数据单元是消息,由字节数组组成。当消息以可控的方式写入不同的分区,会基于键生成散列值、对分区数取模,为消息选取分区。
批次即同属一个主题和分区的一组消息,分批次写入 Kafka 可以减少网络开销、提高效率:批次越大、单位时间内处理的消息越多、单个消息的传输时间越长。可通过对批次数据压缩,提高数据传输和存储能力。
集群与服务协调
随着 Kafka 部署数量的增加,可部署多个集群,实现数据类型分离、安全隔离和多数据中心功能(需另外提供集群之间复制消息的功能,可通过 MirrorMaker 实现)。
Kafka 使用 ZooKeeper 保存集群 Broker 和主题的元数据信息、消费者元数据及分区偏移量,实现服务协调。
ZooKeeper 集群使用一致性协议,建议部署奇数个节点,确保大多数节点可用。
常用脚本
Kafka 部署包中提供了数十个脚本(Windows 的 bat 和 Linux 的 Shell),直接运行会提示基本用法,比较常用的有:
序号 | 名称 | 描述 |
---|---|---|
1 | connect-standalone connect-distributed |
Kafka Connect 组件的启动脚本,分别为单节点 Standalone 模式和多节点 Distributed 模式。 |
2 | kafka-acls | 设置 Kafka 访问权限。 |
3 | kafka-broker-api-versions | 验证不同 Kafka 版本之间服务器和客户端的适配性: 在 0.10.2.0 之前 Kafka 向下兼容; 自 0.10.2.0 版本开始支持双向兼容。 |
4 | kafka-configs | 配置 Kafka broker 参数。 |
5 | kafka-console-producer kafka-console-consumer kafka-verifiable-producer kafka-verifiable-consumer |
测试 Kafka 生产者、消费者。 |
6 | kafka-producer-perf-test kafka-consumer-perf-test |
测试 Kafka 生产者、消费者性能。 |
7 | kafka-consumer-groups | 设置 Kafka 消费者位移。 |
8 | kafka-delegation-tokens | 管理 Delegation Token(一种轻量级的认证机制,补充了现有的 SASL 认证机制)。 |
9 | kafka-delete-records | 删除 Kafka 的分区消息(Kafka 有自动消息删除策略)。 |
10 | kafka-dump-log | 查看 Kafka 消息文件的内容,包括消息的各种元数据信息,甚至是消息体本身。 |
11 | kafka-log-dirs | 查询各个 Broker 上的各个日志路径的磁盘占用情况。 |
12 | kafka-mirror-maker | 实现 Kafka 集群间的消息同步。 |
13 | kafka-preferred-replica-election | 执行 Preferred Leader 选举。 |
14 | kafka-reassign-partitions | 执行分区副本迁移以及副本文件路径迁移。 |
15 | kafka-topics | 执行所有的主题管理操作。 |
16 | kafka-run-class | 执行任何带 main 方法的 Kafka 类。 |
17 | kafka-server-start kafka-server-stop |
启动和停止 Kafka Broker 进程。 |
18 | kafka-streams-application-reset | Kafka Streams 应用程序重设位移。 |
19 | zookeeper-* | 管理和运维 ZooKeeper。 |
20 | trogdor | Kafka 的测试框架,用于执行各种基准测试和负载测试。 |
生产消息
1 | bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4 |
消费消息
1 | bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property enable.auto.commit=false |
测试生产者性能
1 | bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4 |
输出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时。
通常会关注 99th 分位(604ms),表明测试生产者生产的消息中,有 99% 消息的延时都在 604ms 以内,可作为 SLA 参考。
1 | 2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency. |
测试消费者性能
1 | bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic |
输出消费者的吞吐量数据(1723MB/s,但没有不同分位数下的分布情况)。
1 | start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec |
查看主题消息总数
Kafka 自带命令没有提供这种功能,可以使用 Kafka 工具类 GetOffsetShell :
1 | bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic |
计算出给定主题特定分区当前的最早位移和最新位移,将两者的差值累加起来,就能得到该主题当前总的消息数(5500000 + 5500000 == 11000000)
查看消息文件数据
1 | bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration |
输出消息批次(RecordBatch)或消息集合(MessageSet)的元数据信息(创建时间、使用的压缩算法、CRC 校验值等)。
1 | Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log |
查看消费者组消息位移
1 | bin/kafka-consumr-groups.sh --bootstrap-server kafka-host:port --describe --group test-group |
其中输出的 CURRENT-OFFSET 表示该消费者当前消费的最新位移,LOG-END-OFFSET 表示对应分区最新生产消息的位移,LAG 列是两者的差值。
1 | TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID |
主题创建
指定分区、副本个数和主题名称
1 | bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1 |
Kafka 2.2 版本开始,社区推荐用 --bootstrap-server
参数替换 --zookeeper
参数(详见官方文档: http://kafka.apache.org/22/documentation.html ):
The
bin/kafka-topics.sh
command line tool is now able to connect directly to brokers with--bootstrap-server
instead of zookeeper. The old--zookeeper
option is still available for now. Please read KIP-377 for more information.
使用
--zookeeper
会绕过 Kafka 的安全体系:即使为 Kafka 集群设置了安全认证,限制了主题的创建,如果使用 –zookeeper 的命令依然能不受约束成功创建任意主题。使用
--bootstrap-server
与集群进行交互,逐渐成为使用 Kafka 标准做法,不必同时维护 ZooKeeper 和 Broker 的连接信息。
主题查询
注意用户只能查看自己可见的 Kafka 主题。
1 | bin/kafka-topics.sh --bootstrap-server broker_host:port --list |
使用 --zookeeper
会绕过 Kafka 的安全体系,默认返回集群中所有的主题详细数据。
修改分区
只能增加数量,否则抛出 InvalidPartitionsException
异常。
1 | bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数> |
修改主题级别参数
注意必须使用 --zookeeper
参数
1 | bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760 |
修改主题限速
即设置 Leader 副本和 Follower 副本使用的带宽。
先设置 Broker 端参数 leader.replication.throttled.rate
和 follower.replication.throttled.rate
1 | bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0 |
为该主题设置要限速的副本
1 | bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test |
删除主题
注意删除是异步操作,返回的是“已删除”的状态标记。
1 | bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name> |
常见错误排查
主题删除失效
由于删除是异步操作,有可能删除不成功,最常见的原因:
副本所在的 Broker 宕机:重启 Broker 后自动恢复
待删除主题的部分分区依然在执行迁移过程。
对于后者,需要依次操作:
手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。
手动删除该主题在磁盘上的分区目录。
在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。
__consumer_offsets 占用太多的磁盘
__consumer_offsets
和 __transaction_state
是 Kafka 内置的特殊主题,一旦发现前者消耗了过多的磁盘空间,请显式地用 jstack
命令查看一下 kafka-log-cleaner-thread
前缀的线程状态。通常情况下都是因为该线程异常导致无法及时清理此内部主题,此时只能重启 Broker(具体原因根据错误日志排查)。
动态配置
Kafka 社区于 1.1.0 版本中正式引入了动态 Broker 参数(Dynamic Broker Configs)。即修改后无需重启 Broker 就能立即生效,而之前在 server.properties 中配置的参数则称为静态参数(Static Configs),详情参考官方文档: http://kafka.apache.org/11/documentation.html
动态配置的应用场景一般有以下几种:
修改Broker 端各种线程池大小,实时应对突发流量(或减少资源浪费);
调整 Broker 端连接信息或安全配置信息;
更新 SSL Keystore 有效期;
调整 Broker 端 Compact 操作性能;
实时变更 JMX 指标收集器(JMX Metrics Reporter)。
参数保存
在 Broker 配置表中有一列 DYNAMIC UPDATE MODE(更新模式),共有以下三种:
read-only:静态配置,需要重启 Broker 才能生效。
per-broker:动态配置,修改后只会在对应的 Broker 上生效。
cluster-wide:动态配置,修改后会在整个集群上生效。
Kafka 将动态 Broker 参数保存在 ZooKeeper 的 /config
路径中(都是持久化节点):
1 | /config |
配置的优先级为:per-broker 参数 > cluster-wide 参数 > static 参数 > Kafka 默认值。
参数配置
使用 Kafka 自带的 kafka-configs 脚本,以配置 cluster-wide 范围参数为例:
1 | bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=true |
要设置 cluster-wide 范围的动态参数,即在集群层面设置全局值,需要显式指定 entity-default
,检查配置是否成功:
1 | bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --describe |
如要设置 per-broker 范围(ID 为 1 的 Broker)参数,则执行:
1 | bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false |
检查配置是否成功:
1 | bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --describe |
在 Broker 1 层面上,该参数被设置成了 false,表明配置成功;
在全局层面上,该参数值依然是 true,表明之前设置的 cluster-wide 范围参数值依然有效。
最后需要分别从 Broker 和全局层面删除配置:
1 | 删除cluster-wide范围参数 |
常用参数
序号 | 参数名称 | 备注 |
---|---|---|
1 | log.retention.ms | 由于不可能完美地预估所有业务的消息留存时长,修改日志留存时间是一个比较高频的操作。可以设置对应的主题级别或在全局层面上动态变更参数。 |
2 | num.io.threads num.network.threads |
磁盘与网络 IO 线程池。在实际生产环境中,Broker 端请求处理能力经常要按需扩容。 |
3 | ssl.keystore.type ssl.keystore.location ssl.keystore.password ssl.key.password |
由于允许动态实时调整 SSL 相关参数,就能创建过期时间很短的 SSL 证书。每当调整时,Kafka 底层会重新配置 Socket 连接通道并更新 Keystore。新的连接会使用新的 Keystore,阶段性地调整这组参数,有利于增加安全性。 |
4 | num.replica.fetchers | Follower 副本拉取速度慢常见的做法是增加该参数值,确保有充足的线程可以执行 Follower 副本向 Leader 副本的拉取。使用动态参数,则不需要重启 Broker 就能立即在 Follower 端生效。 |
与其他传统消息中间件不同,Kafka 是基于日志结构(log-based)的消息引擎,消费者在消费消息时,只是从磁盘文件上读取数据,因此消费者不会删除消息数据。而且由于消息的位移是由消费者控制的,因此可以通过修改位移的值,重复消费历史数据(replayable)。
位移重置
位移策略
直接把消费者的位移值重设成期望的位移值
Earliest:调整到当前最早位移处,即 重新消费主题所有消息;
Latest:调整到当前最新位移处,即 跳过所有历史消息;
Current:调整到当前最新提交位移处,一般用于 回滚变更前重设到消费者重启时的位置;
Specified-Offset:调整为指定位移,一般用于 消费者在处理某条错误消息时,可手动地跳过此消息而规避错误;
Shift-By-N:调整到当前位移 +N 处(可以为负值);
时间策略
给定一个时间,让消费者把位移调整成大于该时间的最小位移
DateTime:调整到大于给定时间的最小位移处;
Duration:调整到距离当前时间指定间隔的位移处。
其中 Duration 策略则是指给定相对的时间间隔,将位移调整到距离当前给定时间间隔的位移处,格式:PnDTnHnMnS(ISO-8601 规范,参考 Java Duration
类),即以字母 P 开头,后面由 4 部分组成,D(天)、H(小时)、M(分钟)、S(秒)。
如将位移调回到 15 分钟前,可以指定为:PT0H15M0S。
消费者 API
可以通过消费者 API 重设位移,其中 Java API:
1 | // 除了 Earliest 和 Latest 以外,其余都使用 seek |
基本实现(注意调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))
):
1 | Properties consumerProperties = new Properties(); |
Earliest 策略(Latest 同理)
1 | // 需要一次性构造主题的所有分区对象 |
Current 策略
1 | // 借助 KafkaConsumer 的 committed 方法获取当前提交的最新位移 |
Specified-Offset 策略
1 | long targetOffset = 1234L; |
Shift-By-N 策略(向前跳 123 条消息)
1 | for (PartitionInfo info : consumer.partitionsFor(topic)) { |
DateTime 策略
1 | // 重设位移到 2019 年 6 月 20 日晚上 8 点 |
Duration 策略
1 | // 将位移调回 30 分钟前 |
命令行脚本
对于 Kafka 0.11 及以上的版本,可以使用 kafka-consumer-groups 脚本设置位移。
Earliest 策略
1 | bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute |
Latest 策略
1 | bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute |
Current 策略
1 | bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute |
Specified-Offset 策略
1 | bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute |
Shift-By-N 策略
1 | bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute |
Duration 策略
1 | bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute |
生产部署
最近需要规划一套 Kafka 集群,由于之前一直没有做过生产环境的资源评估,想尝试整理出一份技术方案,希望可以为后续的项目提供参考。
需求描述
基本需求如下(非真实数据):
15 个业务系统 Topic
留存时间:1 个月
加载平均值:1,000 条 /s,峰值:5,000 条 /s
资源评估
主要是磁盘容量和服务器节点数量。
磁盘规划
无随机访问需求(Kafka 日志系统采用顺序访问),且软件层面的多副本机制可弥补磁盘易损坏的缺陷,可采用 HDD 机械硬盘 + RAID 0 的组合。
单个节点磁盘容量规划如下:
新增消息数:加载平均值 1,000 条 /s,60 * 60 * 24 * 1000 约 1 千万条。
新增消息大小:预计每条消息 2KB,即每天消息总大小为 10,000,000 条 * 2KB / 1000 / 1000 约 20GB。
备份数:建议 3 副本,20GB * 3 = 60GB。
预留空间:预留 10% 左右磁盘空间用于索引等其他数据,每天总存储容量 66GB。
留存时间:1 个月,66GB * 31 约 2TB。
数据压缩:使用 LZ4 压缩算法,压缩比 0.75,2TB * 0.75 = 1.5TB。
单节点数据盘总用量 1.5TB。
节点规划
使用千兆网卡(1Gbps),且 Kafka 部署在专属服务器。
最大带宽:为保证有充分带宽留作其他进程使用,且确保较低丢包率,Kafka 服务需预留 70% 带宽,即 700Mbps。
平均带宽:按最大值 1/4-1/3 保守换算(其余预留),即单个节点平均可使用 175-240 Mbps。
数据量:传输数据需要按峰值算,5,000 条 /s * 2KB * 8bit = 80Mbps,80Mbps / 175Mbps = 1 台。
备份数:3 个副本,1 台 * 3 = 3 台。
因此共需要 3 台 CPU 4 核+、内存 16GB+ 服务器,搭载系统 CentOS 7.6。