Kafka Architecture Distilled
Kafka名词术语:
消息:Record。Kafka是消息引擎,这里的消息就是指Kafka处理的主要对象。
主题:Topic。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。
分区:Partition。一个有序不变的消息序列。每个主题下可以有多个分区。
消息位移:Offset。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
副本:Replica。Kafka中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
生产者:Producer。向主题发布新消息的应用程序。
消费者:Consumer。从主题订阅新消息的应用程序。
消费者位移:Consumer Offset。表征消费者消费进度,每个消费者都有自己的消费者位移。
消费者组:Consumer Group。多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance是Kafka消费者端实现高可用的重要手段。
Kafka运维常用操作:
创建主题:
kafka-topics.sh --create --zookeeper <zookeeper 地址> --replication-factor <副本数> --partitions <分区数> --topic <主题名>
kafka-topics.sh --bootstrap-server 172.16.1.3:9092,172.16.2.3:9092,172.16.3.3:9092 --create --topic test --partitions 3 --replication-factor 2 --config retention.ms=604800000
查看主题列表:
kafka-topics.sh --list --zookeeper <zookeeper 地址>
kafka-topics.sh --zookeeper 172.16.1.3:2181,172.16.2.3:2181,172.16.3.3:2181 --list
查看主题详情:
kafka-topics.sh --describe --zookeeper <zookeeper 地址> --topic <主题名>
kafka-topics.sh --zookeeper 172.16.1.3:2181,172.16.2.3:2181,172.16.3.3:2181 --describe --topic test
kafka-topics.sh --bootstrap-server 172.16.157.3:9092,172.16.157.4:9092,172.16.157.5:9092 --describe --topic test
创建生产者:
kafka-console-producer.sh --broker-list <Kafka 服务器地址> --topic <主题名>
创建消费者:
kafka-console-consumer.sh --bootstrap-server <Kafka 服务器地址> --topic <主题名> --from-beginning
kafka-console-consumer.sh --bootstrap-server 172.16.1.3:9092,172.16.2.3:9092,172.16.3.4:9092 --topic test --from-beginning
查看消费组:
kafka-consumer-groups.sh --bootstrap-server 172.16.1.3:9092,172.16.2.3:9092,172.16.3.3:9092 --list
查看消费者组的消费情况:
kafka-consumer-groups.sh --bootstrap-server <Kafka 服务器地址> --group <消费者组名> --describe
kafka-consumer-groups.sh --bootstrap-server 172.16.1.3:9092,172.16.2.3:9092,172.16.3.3:9092 --describe --group test-group
修改分区数:
kafka-topics.sh --zookeeper 172.16.1.3:2181,172.16.2.3:2181,172.16.3.3:2181 --alter --topic test --partitions 2
kafka-topics.sh --bootstrap-server 172.16.1.3:9092,172.16.2.3:9092,172.16.3.3:9092 --alter --topic test --partitions 2
创建主题时同时设置消息保留时间:
bin/kafka-topics.sh --bootstrap-server 172.16.1.3:9092,172.16.2.3:9092,172.16.3.3:9092 --create --topic test --partitions 3 --replication-factor 2 --config retention.ms=604800000 (7天)
修改topic消息保留时间:
kafka-configs.sh --alter --zookeeper 172.16.1.3:2181,172.16.2.3:2181,172.16.3.3:2181 --entity-type topics --entity-name test --add-config retention.ms=7200000
假设要设置主题级别参数max.message.bytes,那么命令如下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
设置常规的主题级别参数,使用 –zookeeper。设置动态参数,使用 –bootstrap-server。
主题级别参数是在创建或修改主题时设置的,它们会应用于整个主题。这些参数包括分区数、副本因子、数据保留策略等。主题级别参数的设置会影响整个主题的行为,一旦设置完成,除非修改主题配置,否则不会随着时间的推移而改变。修改后生效要重启broker。
动态参数是在运行时通过命令行或API进行设置的参数,可以在不停止或重新启动 Kafka 服务的情况下进行更改。这些参数包括日志清理策略、消息最大字节数、消息最大保留时间等。通过动态参数,可以实时地对 Kafka 服务进行调整和优化,以满足实时需求和变化的业务场景。
修改主题限速:
这里主要是指设置Leader副本和Follower副本使用的带宽。有时候,想要让某个主题的副本在执行副本同步机制时,不要消耗过多的带宽。Kafka提供了这样的功能。假设有个主题,名为test,想让该主题各个分区的Leader副本和Follower副本在处理副本同步时,不得占用超过100MBps的带宽。注意是大写B,即每秒不超过100MB。
要达到这个目的,必须先设置Broker端参数leader.replication.throttled.rate和follower.replication.throttled.rate,命令如下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
这条命令结尾处的 –entity-name就是Broker ID。倘若该主题的副本分别在0、1、2、3多个Broker上,那么你还要依次为Broker 1、2、3执行这条命令。
设置好这个参数之后,还需要为该主题设置要限速的副本。在这个例子中,想要为所有副本都设置限速,因此统一使用通配符*来表示,命令如下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
主题分区迁移:
同样是使用kafka-reassign-partitions脚本,对主题各个分区的副本进行“手术”般的调整,比如把某些分区批量迁移到其他Broker上。这种变更比较复杂。
删除主题:
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
删除主题的命令并不复杂,关键是删除操作是异步的,执行完这条命令不代表主题立即就被删除了。它仅仅是被标记成“已删除”状态而已。Kafka会在后台默默地开启主题删除操作。因此,通常情况下,需要耐心地等待一段时间。
位移重设还有另一个重要的途径:通过kafka-consumer-groups脚本。需要注意的是,这个功能是在Kafka 0.11版本中新引入的。这就是说,如果你使用的Kafka是0.11版本之前的,那么你只能使用API的方式来重设位移。
Kafka重设位移操作:
重设位移大致可以从两个维度来进行。
位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成给定的位移值。
时间维度。可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如30分钟前,然后让消费者直接将位移调回30分钟之前的位移值。
下面的这张表格罗列了7种重设策略。
Earliest策略表示将位移调整到主题当前最早位移处。这个最早位移不一定就是0,因为在生产环境中,很久远的消息会被Kafka自动删除,所以当前最早位移很可能是一个大于0的值。如果你想要重新消费主题的所有消息,那么可以使用Earliest策略。
Latest策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了15条消息,那么最新末端位移就是15。如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用Latest策略。
Current策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景:你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current策略就可以帮你实现这个功能。
表中第4行的Specified-Offset策略则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理。在实际使用过程中,可能会出现corrupted消息无法被消费的情形,此时消费者程序会抛出异常,无法继续工作。一旦碰到这个问题,你就可以尝试使用Specified-Offset策略来规避。
如果说Specified-Offset策略要求你指定位移的绝对数值的话,那么Shift-By-N策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前100条位移处,此时你需要指定N为-100。
刚刚讲到的这几种策略都是位移维度的,下面来聊聊从时间维度重设位移的DateTime和Duration策略。
DateTime允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天0点。
Duration策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是PnDTnHnMnS。如果你熟悉Java 8引入的Duration类的话,你应该不会对这个格式感到陌生。它就是一个符合ISO-8601规范的Duration格式,以字母P开头,后面由4部分组成,即D、H、M和S,分别表示天、小时、分钟和秒。举个例子,如果你想将位移调回到15分钟前,那么你就可以指定PT0H15M0S。
目前,重设消费者组位移的方式有两种。
通过消费者API来实现。
通过kafka-consumer-groups命令行脚本来实现。
比起API的方式,用命令行重设位移要简单得多。
Earliest策略直接指定--to-earliest:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
Latest策略直接指定--to-latest:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
Current策略直接指定--to-current:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
Specified-Offset策略直接指定--to-offset:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
Shift-By-N策略直接指定--shift-by N:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute
DateTime策略直接指定--to-datetime:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
最后是实现Duration策略,直接指定--by-duration:
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute