Kyle's Notebook

Kafka Cheat Sheet

Word count: 6.7kReading time: 28 min
2020/09/24

Kafka Cheat Sheet

Kafka 基本概念

Kafka 为数据生态系统带来了循环系统,在基础设施的各个组件之间传递消息以解耦生产者和消费者,并为客户端提供一致的接口便于集成。
根据业务需要可以动态增删组件,常用于活动跟踪、消息传递、度量指标和日志记录、提交日志、流处理等。

数据系统集成.jpg

工作模型

Kafka 支持两种工作模型:点对点模型(P2P)模型发布 - 订阅(Pub - Sub)模型

点对点模型

即消息队列模型,系统 A 发送得消息只能被系统 B 接收。Kafka 通过消费者组实现点对点模型:多个消费者实例共同组成一个组消费一组主题。这组主题中的每个分区都只会被组内的一个消费者实例消费。

发布 - 订阅(Pub - Sub)模型

区别于点对点模型,它有主题(Topic)的概念,即逻辑消息容器。

  • 发布者把消息分类,接收者对特定类型的消息进行订阅以便接收。

  • 对于多组发布者 - 接收者,引入一或多个中间人解耦,成为多个独立的发布订阅系统。

角色(Role)

在 Kafka 的工作模型中,分为 生产者(Producer)中间人(Broker)消费者(Consumer) 三种角色。

生产者(Producer)

生产者创建消息,默认下把消息分布到所有分区上,但也可以根据键和分区器直接写到指定分区上。

中间人(Broker)

表示一台独立的 Kafka 服务器,需要处理以下请求:

  • 客户端请求

    • 生产者:接收生产者消息,为消息设置偏移量,并提交消息到磁盘保存。

    • 消费者:响应读取分区的请求,返回已经提交到磁盘的消息。

  • 分区副本请求

  • 控制器发送给分区首领的请求

持久化:Broker 会在一定期限内保存消息以及读取进度(偏移量),允许消费者非实时读取。策略是保存一段时间或一定大小(达到期限或上限后清理),可以针对主题配置。

伸缩性:Broker 可以轻松扩展以提高性能,不会影响整体可用性。

数据写入机制:

  • 顺序写:Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个消息物理文件:只能追加写(Append-only),避免了缓慢的随机 I/O 操作,实现高吞吐量。

  • 定期删除:日志段(Log Segment)机制。在 Kafka 底层,一个日志细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存。在后台有定时任务检查老的日志段是否能够被删除,回收磁盘空间。

消费者(Consumer)

消费者订阅主题消息,并按照生成顺序读取,通过检查偏移量(保存在 Zookeeper 或 Kafka 中持久化)判断消息是否已读取。

支持多个消费者,消息可被多个消费者读取,之间互不影响;同时支持分组,消费者组内的共同读取一个主题,每个分区只能被一个消费者读取,即消费者对分区的所有权关系:目的是提升消费者端的吞吐量(TPS)。

消费者通过消费者位移记录自己在分区的消费进度。

消息架构

Kafka 是三层消息架构:

  • 主题层,可以配置 M 个分区,每个分区可以配置 N 个副本。

  • 分区层,每个分区的 N 个副本中只能有一个领导者,对外提供服务;N-1 个追随者副本,提供数据冗余之用。

  • 消息层,分区中包含若干条消息,每条消息的位移从 0 开始依次递增。客户端只能与分区的领导者副本进行交互。

主题(Topic)

消息通过主题分类,类比数据库表。

一个主题可以创建多个分区,需权衡吞吐量、内存及 Leader 副本选举所需时间。

分区(Partition)

一个主题的消息可以分为若干分区,一个分区就是一个顺序追加的提交日志。

  • 分区内消息有序,但多个分区下同一主题消息不能确保整体有序。

  • 通过分区实现数据冗余和伸缩性,由于主题的每个分区可以分布在不同服务器上,可以提供更高的性能。

  • 分区偏移量:分区元数据,不断递增的整数值,在创建消息时添加,在给定的分区内是唯一的,确保消息不被重读。区别于消费者位移(指示该消费者的消费进度),一旦消息写入分区,位移就不再变化。

副本是在分区的层级定义的,分区下可配置若干个副本:1 个 Leader、N-1 个 Follower。

副本(Replica)

副本是 Kafka 的备份机制:把相同的数据拷贝到多台机器上。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。

  • 领导者(Leader):对外提供服务,即与客户端程序进行交互。

  • 追随者(Follower):只与 Leader 交互,异步拉取消息写入自己的提交日志中,不对外提供服务。

模式(Schema)

使用额外的结构定义消息内容,使之易于理解。

前提是要确保数据格式的一致性,否则容易耦合,升级时需要同时处理两种数据格式:

  • JSON、XML:易用且可读性好,但缺乏强类型处理能力,不同版本之间兼容性也不好;

  • Avro:紧凑的序列化格式,模式和消息体分开,因此模式变化不需要重新生成代码;支持强类型和模式进化,能前后兼容。

消息批次(Batch)

Kafka 的数据单元是消息,由字节数组组成。当消息以可控的方式写入不同的分区,会基于键生成散列值、对分区数取模,为消息选取分区。

批次即同属一个主题和分区的一组消息,分批次写入 Kafka 可以减少网络开销、提高效率:批次越大、单位时间内处理的消息越多、单个消息的传输时间越长。可通过对批次数据压缩,提高数据传输和存储能力。

集群与服务协调

随着 Kafka 部署数量的增加,可部署多个集群,实现数据类型分离、安全隔离和多数据中心功能(需另外提供集群之间复制消息的功能,可通过 MirrorMaker 实现)。

Kafka 使用 ZooKeeper 保存集群 Broker 和主题的元数据信息、消费者元数据及分区偏移量,实现服务协调。

ZooKeeper 集群使用一致性协议,建议部署奇数个节点,确保大多数节点可用。

常用脚本

Kafka 部署包中提供了数十个脚本(Windows 的 bat 和 Linux 的 Shell),直接运行会提示基本用法,比较常用的有:

序号 名称 描述
1 connect-standalone
connect-distributed
Kafka Connect 组件的启动脚本,分别为单节点 Standalone 模式和多节点 Distributed 模式。
2 kafka-acls 设置 Kafka 访问权限。
3 kafka-broker-api-versions 验证不同 Kafka 版本之间服务器和客户端的适配性:
在 0.10.2.0 之前 Kafka 向下兼容;
自 0.10.2.0 版本开始支持双向兼容。
4 kafka-configs 配置 Kafka broker 参数。
5 kafka-console-producer
kafka-console-consumer
kafka-verifiable-producer
kafka-verifiable-consumer
测试 Kafka 生产者、消费者。
6 kafka-producer-perf-test
kafka-consumer-perf-test
测试 Kafka 生产者、消费者性能。
7 kafka-consumer-groups 设置 Kafka 消费者位移。
8 kafka-delegation-tokens 管理 Delegation Token(一种轻量级的认证机制,补充了现有的 SASL 认证机制)。
9 kafka-delete-records 删除 Kafka 的分区消息(Kafka 有自动消息删除策略)。
10 kafka-dump-log 查看 Kafka 消息文件的内容,包括消息的各种元数据信息,甚至是消息体本身。
11 kafka-log-dirs 查询各个 Broker 上的各个日志路径的磁盘占用情况。
12 kafka-mirror-maker 实现 Kafka 集群间的消息同步。
13 kafka-preferred-replica-election 执行 Preferred Leader 选举。
14 kafka-reassign-partitions 执行分区副本迁移以及副本文件路径迁移。
15 kafka-topics 执行所有的主题管理操作。
16 kafka-run-class 执行任何带 main 方法的 Kafka 类。
17 kafka-server-start
kafka-server-stop
启动和停止 Kafka Broker 进程。
18 kafka-streams-application-reset Kafka Streams 应用程序重设位移。
19 zookeeper-* 管理和运维 ZooKeeper。
20 trogdor Kafka 的测试框架,用于执行各种基准测试和负载测试。

生产消息

1
2
3
bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4

# 指定生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法。

消费消息

1
2
3
4
bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property enable.auto.commit=false 

# --group 指定消费者组,否则每次运行 Console Consumer,都会自动生成一个新的消费者组来消费(console-consumer)
# --from-beginning 等同于将 Consumer 端参数 auto.offset.reset 设置成 earliest,否则默认为 latest

测试生产者性能

1
2
3
bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4

# 向指定主题发送了 1 千万条消息,每条消息大小是 1KB,还可以设置压缩算法、延时时间等

输出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时。

通常会关注 99th 分位(604ms),表明测试生产者生产的消息中,有 99% 消息的延时都在 604ms 以内,可作为 SLA 参考。

1
2
3
2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.

测试消费者性能

1
bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic

输出消费者的吞吐量数据(1723MB/s,但没有不同分位数下的分布情况)。

1
2
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012

查看主题消息总数

Kafka 自带命令没有提供这种功能,可以使用 Kafka 工具类 GetOffsetShell :

1
2
3
4
5
6
7
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic
# test-topic:0:0
# test-topic:1:0

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topic
# test-topic:0:5500000
# test-topic:1:5500000

计算出给定主题特定分区当前的最早位移和最新位移,将两者的差值累加起来,就能得到该主题当前总的消息数(5500000 + 5500000 == 11000000)

查看消息文件数据

1
2
3
4
bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration

# 加上 --deep-iteration 可查看每条具体消息
# 加上 --print-data-log 可查看消息里面的实际数据

输出消息批次(RecordBatch)或消息集合(MessageSet)的元数据信息(创建时间、使用的压缩算法、CRC 校验值等)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true
| offset: 0 CreateTime: 1561597044911 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 1 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 2 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 3 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 4 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 5 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 6 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 7 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 8 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 9 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 10 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 11 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 12 CreateTime: 1561597044932 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 13 CreateTime: 1561597044933 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
| offset: 14 CreateTime: 1561597044933 keysize: -1 valuesize: 1024 sequence: -1 headerKeys: []
baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true
......

查看消费者组消息位移

1
bin/kafka-consumr-groups.sh --bootstrap-server kafka-host:port --describe --group test-group

其中输出的 CURRENT-OFFSET 表示该消费者当前消费的最新位移,LOG-END-OFFSET 表示对应分区最新生产消息的位移,LAG 列是两者的差值。

1
2
3
TOPIC		PARTITION	CURRENT-OFFSET	LOG-END-OFFSET	LAG			CONSUMER-ID											HOST		CLIENT-ID
test-topic 0 1112985 5000000 3887015 consumer-1-d2a5e537-aec5-4659-a084-33174e1b3c48 /127.0.0.1 consumer-1
test-topic 0 1095015 5000000 3904985 consumer-1-d2a5e537-aec5-4659-a084-33174e1b3c48 /127.0.0.1 consumer-1

主题创建

指定分区、副本个数和主题名称

1
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name  --partitions 1 --replication-factor 1

Kafka 2.2 版本开始,社区推荐用 --bootstrap-server 参数替换 --zookeeper 参数(详见官方文档: http://kafka.apache.org/22/documentation.html ):

The bin/kafka-topics.sh command line tool is now able to connect directly to brokers with --bootstrap-server instead of zookeeper. The old --zookeeper option is still available for now. Please read KIP-377 for more information.

  • 使用 --zookeeper 会绕过 Kafka 的安全体系:即使为 Kafka 集群设置了安全认证,限制了主题的创建,如果使用 –zookeeper 的命令依然能不受约束成功创建任意主题。

  • 使用 --bootstrap-server 与集群进行交互,逐渐成为使用 Kafka 标准做法,不必同时维护 ZooKeeper 和 Broker 的连接信息。

主题查询

注意用户只能查看自己可见的 Kafka 主题。

1
bin/kafka-topics.sh --bootstrap-server broker_host:port --list

使用 --zookeeper 会绕过 Kafka 的安全体系,默认返回集群中所有的主题详细数据。

修改分区

只能增加数量,否则抛出 InvalidPartitionsException 异常。

1
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>

修改主题级别参数

注意必须使用 --zookeeper 参数

1
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760

修改主题限速

即设置 Leader 副本和 Follower 副本使用的带宽。

先设置 Broker 端参数 leader.replication.throttled.ratefollower.replication.throttled.rate

1
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0

为该主题设置要限速的副本

1
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test

删除主题

注意删除是异步操作,返回的是“已删除”的状态标记。

1
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete  --topic <topic_name>

常见错误排查

主题删除失效

由于删除是异步操作,有可能删除不成功,最常见的原因:

  • 副本所在的 Broker 宕机:重启 Broker 后自动恢复

  • 待删除主题的部分分区依然在执行迁移过程。

对于后者,需要依次操作:

  • 手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。

  • 手动删除该主题在磁盘上的分区目录。

  • 在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。

__consumer_offsets 占用太多的磁盘

__consumer_offsets__transaction_state 是 Kafka 内置的特殊主题,一旦发现前者消耗了过多的磁盘空间,请显式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下都是因为该线程异常导致无法及时清理此内部主题,此时只能重启 Broker(具体原因根据错误日志排查)。

动态配置

Kafka 社区于 1.1.0 版本中正式引入了动态 Broker 参数(Dynamic Broker Configs)。即修改后无需重启 Broker 就能立即生效,而之前在 server.properties 中配置的参数则称为静态参数(Static Configs),详情参考官方文档: http://kafka.apache.org/11/documentation.html

动态配置的应用场景一般有以下几种:

  • 修改Broker 端各种线程池大小,实时应对突发流量(或减少资源浪费);

  • 调整 Broker 端连接信息或安全配置信息;

  • 更新 SSL Keystore 有效期;

  • 调整 Broker 端 Compact 操作性能;

  • 实时变更 JMX 指标收集器(JMX Metrics Reporter)。

参数保存

在 Broker 配置表中有一列 DYNAMIC UPDATE MODE(更新模式),共有以下三种:

  • read-only:静态配置,需要重启 Broker 才能生效。

  • per-broker:动态配置,修改后只会在对应的 Broker 上生效。

  • cluster-wide:动态配置,修改后会在整个集群上生效。

Kafka 将动态 Broker 参数保存在 ZooKeeper 的 /config 路径中(都是持久化节点):

1
2
3
4
5
6
7
8
/config
changes # changes 实时监测动态参数变更,不保存参数值
topics # 主题级别参数
users # 动态调整客户端配额(Quota)
clients # 即限制连入集群的客户端的吞吐量或者是限定使用的 CPU 资源
/brokers
<default> # cluster-wide 范围的动态参数
broker.id # 特定 Broker 的 per-broker 范围参数(可能存在多个)

配置的优先级为:per-broker 参数 > cluster-wide 参数 > static 参数 > Kafka 默认值。

参数配置

使用 Kafka 自带的 kafka-configs 脚本,以配置 cluster-wide 范围参数为例:

1
2
bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=true
# unclean.leader.election.enable

要设置 cluster-wide 范围的动态参数,即在集群层面设置全局值,需要显式指定 entity-default,检查配置是否成功:

1
2
3
4
5
bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --describe

# 有以下输出,表示成功地在全局层面上设置该参数值为 true(其中 sensitive=false 表明要调整的参数不是敏感数据):
Default config for brokers in the cluster are:
unclean.leader.election.enable=true sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true}

如要设置 per-broker 范围(ID 为 1 的 Broker)参数,则执行:

1
bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false

检查配置是否成功:

1
2
3
4
bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --describe

Configs for broker 1 are:
unclean.leader.election.enable=false sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false, DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true, DEFAULT_CONFIG:unclean.leader.election.enable=false}
  • 在 Broker 1 层面上,该参数被设置成了 false,表明配置成功;

  • 在全局层面上,该参数值依然是 true,表明之前设置的 cluster-wide 范围参数值依然有效。

最后需要分别从 Broker 和全局层面删除配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 删除cluster-wide范围参数
bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --delete-config unclean.leader.election.enable
Completed updating default config for brokers in the cluster,


# 删除per-broker范围参数
bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --delete-config unclean.leader.election.enable
Completed updating config for broker: 1.


# 查看cluster-wide范围参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --describe
Default config for brokers in the cluster are:


# 查看Broker 1上的动态参数配置
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --describe
Configs for broker 1 are:

常用参数

序号 参数名称 备注
1 log.retention.ms 由于不可能完美地预估所有业务的消息留存时长,修改日志留存时间是一个比较高频的操作。可以设置对应的主题级别或在全局层面上动态变更参数。
2 num.io.threads
num.network.threads
磁盘与网络 IO 线程池。在实际生产环境中,Broker 端请求处理能力经常要按需扩容。
3 ssl.keystore.type
ssl.keystore.location
ssl.keystore.password
ssl.key.password
由于允许动态实时调整 SSL 相关参数,就能创建过期时间很短的 SSL 证书。每当调整时,Kafka 底层会重新配置 Socket 连接通道并更新 Keystore。新的连接会使用新的 Keystore,阶段性地调整这组参数,有利于增加安全性。
4 num.replica.fetchers Follower 副本拉取速度慢常见的做法是增加该参数值,确保有充足的线程可以执行 Follower 副本向 Leader 副本的拉取。使用动态参数,则不需要重启 Broker 就能立即在 Follower 端生效。

与其他传统消息中间件不同,Kafka 是基于日志结构(log-based)的消息引擎,消费者在消费消息时,只是从磁盘文件上读取数据,因此消费者不会删除消息数据。而且由于消息的位移是由消费者控制的,因此可以通过修改位移的值,重复消费历史数据(replayable)。

位移重置

位移策略

直接把消费者的位移值重设成期望的位移值

  • Earliest:调整到当前最早位移处,即 重新消费主题所有消息

  • Latest:调整到当前最新位移处,即 跳过所有历史消息

  • Current:调整到当前最新提交位移处,一般用于 回滚变更前重设到消费者重启时的位置

  • Specified-Offset:调整为指定位移,一般用于 消费者在处理某条错误消息时,可手动地跳过此消息而规避错误

  • Shift-By-N:调整到当前位移 +N 处(可以为负值);

时间策略

给定一个时间,让消费者把位移调整成大于该时间的最小位移

  • DateTime:调整到大于给定时间的最小位移处;

  • Duration:调整到距离当前时间指定间隔的位移处。

其中 Duration 策略则是指给定相对的时间间隔,将位移调整到距离当前给定时间间隔的位移处,格式:PnDTnHnMnS(ISO-8601 规范,参考 Java Duration 类),即以字母 P 开头,后面由 4 部分组成,D(天)、H(小时)、M(分钟)、S(秒)。

如将位移调回到 15 分钟前,可以指定为:PT0H15M0S。

消费者 API

可以通过消费者 API 重设位移,其中 Java API:

1
2
3
4
5
6
7
8
9
10
11
// 除了 Earliest 和 Latest 以外,其余都使用 seek 

// 只能重设一个分区
void seek(TopicPartition partition, long offset);

void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);

// 可重设多个分区
void seekToBeginning(Collection<TopicPartition> partitions);

void seekToEnd(Collection<TopicPartition> partitions);

基本实现(注意调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))):

1
2
3
4
5
Properties consumerProperties = new Properties();
// 必须禁止自动提交位移
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...(省略其他消费者配置)

Earliest 策略(Latest 同理)

1
2
3
4
5
6
7
8
9
10
11
12
// 需要一次性构造主题的所有分区对象
String topic = "test"; // 要重设位移的Kafka主题
try (final KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Collections.singleton(topic));
consumer.poll(0);
consumer.seekToBeginning(
consumer.partitionsFor(topic).stream().map(partitionInfo ->
new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
// 如果是 Latest 则为 seekToEnd
}

Current 策略

1
2
3
4
5
6
7
8
9
// 借助 KafkaConsumer 的 committed 方法获取当前提交的最新位移
consumer.partitionsFor(topic).stream().map(info ->
// 获取给定主题的所有分区,然后依次获取对应分区上的已提交位移,最后通过 seek 方法重设位移到已提交位移处
new TopicPartition(topic, info.partition()))
.forEach(tp -> {
long committedOffset = consumer.committed(tp).offset();
consumer.seek(tp, committedOffset);
});

Specified-Offset 策略

1
2
3
4
5
long targetOffset = 1234L;
for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
consumer.seek(tp, targetOffset);
}

Shift-By-N 策略(向前跳 123 条消息)

1
2
3
4
5
for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
long targetOffset = consumer.committed(tp).offset() + 123L;
consumer.seek(tp, targetOffset);
}

DateTime 策略

1
2
3
4
5
6
7
8
// 重设位移到 2019 年 6 月 20 日晚上 8 点
long ts = LocalDateTime.of(2019, 6, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream().map(info ->
new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -> ts));
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}

Duration 策略

1
2
3
4
5
6
7
8
9
// 将位移调回 30 分钟前
Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream()
.map(info -> new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000 * 60));

for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}

命令行脚本

对于 Kafka 0.11 及以上的版本,可以使用 kafka-consumer-groups 脚本设置位移。

Earliest 策略

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute

Latest 策略

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute

Current 策略

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute

Specified-Offset 策略

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute

Shift-By-N 策略

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute

Duration 策略

1
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute

生产部署

最近需要规划一套 Kafka 集群,由于之前一直没有做过生产环境的资源评估,想尝试整理出一份技术方案,希望可以为后续的项目提供参考。

需求描述

基本需求如下(非真实数据):

  • 15 个业务系统 Topic

  • 留存时间:1 个月

  • 加载平均值:1,000 条 /s,峰值:5,000 条 /s

资源评估

主要是磁盘容量和服务器节点数量。

磁盘规划

无随机访问需求(Kafka 日志系统采用顺序访问),且软件层面的多副本机制可弥补磁盘易损坏的缺陷,可采用 HDD 机械硬盘 + RAID 0 的组合。

单个节点磁盘容量规划如下:

  • 新增消息数:加载平均值 1,000 条 /s,60 * 60 * 24 * 1000 约 1 千万条。

  • 新增消息大小:预计每条消息 2KB,即每天消息总大小为 10,000,000 条 * 2KB / 1000 / 1000 约 20GB。

  • 备份数:建议 3 副本,20GB * 3 = 60GB。

  • 预留空间:预留 10% 左右磁盘空间用于索引等其他数据,每天总存储容量 66GB。

  • 留存时间:1 个月,66GB * 31 约 2TB。

  • 数据压缩:使用 LZ4 压缩算法,压缩比 0.75,2TB * 0.75 = 1.5TB。

单节点数据盘总用量 1.5TB。

节点规划

使用千兆网卡(1Gbps),且 Kafka 部署在专属服务器。

  • 最大带宽:为保证有充分带宽留作其他进程使用,且确保较低丢包率,Kafka 服务需预留 70% 带宽,即 700Mbps。

  • 平均带宽:按最大值 1/4-1/3 保守换算(其余预留),即单个节点平均可使用 175-240 Mbps。

  • 数据量:传输数据需要按峰值算,5,000 条 /s * 2KB * 8bit = 80Mbps,80Mbps / 175Mbps = 1 台。

  • 备份数:3 个副本,1 台 * 3 = 3 台。

因此共需要 3 台 CPU 4 核+、内存 16GB+ 服务器,搭载系统 CentOS 7.6。

CATALOG
  1. 1. Kafka Cheat Sheet
    1. 1.1. Kafka 基本概念
      1. 1.1.1. 工作模型
        1. 1.1.1.1. 点对点模型
        2. 1.1.1.2. 发布 - 订阅(Pub - Sub)模型
      2. 1.1.2. 角色(Role)
        1. 1.1.2.1. 生产者(Producer)
        2. 1.1.2.2. 中间人(Broker)
        3. 1.1.2.3. 消费者(Consumer)
      3. 1.1.3. 消息架构
        1. 1.1.3.1. 主题(Topic)
        2. 1.1.3.2. 分区(Partition)
        3. 1.1.3.3. 副本(Replica)
      4. 1.1.4. 模式(Schema)
      5. 1.1.5. 消息批次(Batch)
      6. 1.1.6. 集群与服务协调
    2. 1.2. 常用脚本
      1. 1.2.1. 生产消息
      2. 1.2.2. 消费消息
      3. 1.2.3. 测试生产者性能
      4. 1.2.4. 测试消费者性能
      5. 1.2.5. 查看主题消息总数
      6. 1.2.6. 查看消息文件数据
      7. 1.2.7. 查看消费者组消息位移
      8. 1.2.8. 主题创建
      9. 1.2.9. 主题查询
      10. 1.2.10. 修改分区
      11. 1.2.11. 修改主题级别参数
      12. 1.2.12. 修改主题限速
      13. 1.2.13. 删除主题
    3. 1.3. 常见错误排查
      1. 1.3.1. 主题删除失效
      2. 1.3.2. __consumer_offsets 占用太多的磁盘
    4. 1.4. 动态配置
      1. 1.4.1. 参数保存
      2. 1.4.2. 参数配置
      3. 1.4.3. 常用参数
    5. 1.5. 位移重置
      1. 1.5.1. 位移策略
      2. 1.5.2. 时间策略
      3. 1.5.3. 消费者 API
      4. 1.5.4. 命令行脚本
    6. 1.6. 生产部署
      1. 1.6.1. 需求描述
      2. 1.6.2. 资源评估
        1. 1.6.2.1. 磁盘规划
        2. 1.6.2.2. 节点规划