项目开发中,很多应用场合都会出现对 kafka 的应用,比如,不同的微服务 DB 数据同步、服务间的异步通知等等。笔者在此做个系统的总结,方便日后复习,希望对大家的面试笔试都有帮助。
2.1、windows 环境
1)下载安装包,解压安装包,eg:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.12-2.6.0.tgz
2)检查修改配置文件,安装包的解压路径,eg:D:\install\kafka\kafka_2.12-2.6.0\config,两个文件 server.properties & zookeeper.properties
分别修改配置文件的日志存放目录参数:(server.properties)
A comma separated list of directories under which to store log fileslog.dirs=D:\install\kafka\log\kafka
修改配置文件的日志存放目录参数::(zookeeper.properties)
dataDir=D:\install\kafka\log\zk
3)启动 kafka 内置的 zk,因为 kafka 内部集成了 zookeeper 进行集群管理,需要优先启动 zk
在 bin 目录的 window 文件夹下有 win 版本的.bat 批处理脚本,打开终端,选中 zookeeper-server-start.bat。
执行命令:zookeeper-server-start.bat ....\config\zookeeper.properties
4)启动 kafka,打开终端,选中 kafka-server-start.bat,
执行命令:kafka-server-start.bat ....\config\server.properties
5)测试:topic 创建 &日志生产 &日志消费
选中 kafka-topics.bat,执行命令(创建 topic):
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-test-topic
选中 kafka-topics.bat,执行命令(查看 topic):
kafka-topics.bat --list --zookeeper localhost:2181
选中 kafka-console-producer.bat,执行命令(生产消息):
kafka-console-producer.bat --broker-list localhost:9092 --topic kafka-test-topic
选中 kafka-console-consumer.bat,执行命令(消费消息):
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic kafka-test-topic --from-beginning
2.2、linux 环境
步骤和 windows 几乎无异,差异只是配置文件的日志目录参数要修改为虚拟机的路径(分隔符要注意,windows 系统和 linux 系统不一样),执行对应的.bat 文件要修改为.sh 文件
Apache Kafka is an open-source distributed event streaming platform
Apache Kafka 是分布式发布-订阅消息系统,在 kafka 官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统。
它最初由 LinkedIn 公司开发,Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目。Kafka 是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
目前开源消息中间件不止 kafka 一种,如下图:
- 解耦
快递小哥手上有很多快递需要送,他每次都需要先电话一一确认收货人是否有空、哪个时间段有空,然后再确定好送货的方案。这样完全依赖收货人了!如果快递一多,快递小哥估计得忙疯了……
如果有了便利店,快递小哥只需要将同一个小区的快递放在同一个便利店,然后通知收货人来取货就可以了,这时候快递小哥和收货人就实现了解耦!
- 异步
快递小哥打电话给我后需要一直在你楼下等着,直到我拿走你的快递他才能去送其他人的。
快递小哥将快递放在小芳便利店后,又可以干其他的活儿去了,不需要等待你到来而一直处于等待状态,提高了工作的效率。
- 削峰
假设双十一我买了不同店里的各种商品,而恰巧这些店发货的快递都不一样,有中通、圆通、申通、各种通等……更巧的是他们都同时到货了!
中通的小哥打来电话叫我去北门取快递、圆通小哥叫我去南门、申通小哥叫我去东门。我一时手忙脚乱……
我们能看到在系统需要交互的场景中,使用消息队列中间件真的是好处多多,基于这种思路,就有了丰巢、菜鸟驿站等比小芳便利店更专业的“中间件”了。
Kafka 的独特优势在于:
- 高吞吐量:计算机集群真实消息传递场景,网络延迟低至 2ms。
- 高扩展性:将生产集群扩展到 1000 个代理、每天数万亿条消息、数 PB 的数据和数十万个分区。弹性扩展和收缩存储和处理。
- 永久储存:Kafka 将数据流安全地存储在分布式、持久、容错的集群中(硬盘)。
- 高可用性:在可用性区域上高效地扩展集群,或者跨地理区域连接单独的集群。
上图描述了:producer4,broker2,topic2,consumer4,consumer-group2,zk1。
下面是相关概念详细描述:
- Producer:Producer 即生产者,消息的产生者,是消息的入口。
- Kafka Cluster:
- Broker:Broker 是 Kafka 实例,每个服务器上有一个或多个 Kafka 的实例,我们姑且认为每个 Broker 对应一台服务器。每个 Kafka 集群内的 Broker 都有一个不重复的编号,如图中的 Broker-0、Broker-1 等……
- Topic:消息的主题,可以理解为消息的分类,Kafka 的数据就保存在 Topic。在每个 Broker 上都可以创建多个 Topic。(若没有手动创建 topic)kafka 的 topic 创建时机,consumer/prodocer 发生时,通过 topic-list 会找到已创建的记录,都会创建这个主题。
- Partition:Topic 的分区,每个 Topic 可以有多个分区,分区的作用是做负载,提高 Kafka 的吞吐量。同一个 Topic 在不同的分区的数据是不重复的,Partition 的表现形式就是一个一个的文件夹!
- Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为 Leader。在 Kafka 中默认副本的最大数量是 10 个,且副本的数量不能大于 Broker 的数量,Follower 和 Leader 绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
- Message:每一条发送的消息主体。
- Consumer:消费者,即消息的消费方,是消息的出口。
- Consumer Group:我们可以将多个消费者组成一个消费者组。
在 Kafka 的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个 Topic 的不同分区的数据,这也是为了提高 Kafka 的吞吐量!
(eg:多机部署下,多台服务器跑同一份代码,对于同一个 topic 的数据,它们分别消费了不同 partition 的消息,从而提高了系统整体消息消费的吞吐量)
- Zookeeper:Kafka 集群依赖 Zookeeper 来保存集群的的元信息,来保证系统的可用性。
我们看上面的架构图中,Producer 就是生产者,是数据的入口。注意看图中的红色箭头,Producer 在写入数据的时候永远在找 Leader,不会直接将数据写入 Follower!那 Leader 怎么找呢?写入的流程又是什么样的呢?我们看下图:
发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入 Leader 后,Follower 是主动的去 Leader 进行同步的!Producer 采用 Push 模式将数据发布到 Broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入消息到分区的示意图如下:
上面说到数据会写入到不同的分区,那 Kafka 为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
-
方便扩展。因为一个 Topic 可以有多个 Partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
-
提高并发。以 Partition 为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。
7.1 kafka 分区策略
熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器。那在 Kafka 中,如果某个 Topic 有多个 Partition,Producer 又怎么知道该将数据发往哪个 Partition 呢?
Kafka 中的三个原则:
-
Partition 在写入的时候可以指定需要写入的 Partition,如果有指定,则写入对应的 Partition。
-
如果没有指定 Partition,但是设置了数据的 Key,则会根据 Key 的值 Hash 出一个 Partition。
-
如果既没指定 Partition,又没有设置 Key,则会轮询选出一个 Partition。
我们可以看下官方 API 文档,找到 org.apache.kafka.clients.producer.KafkaProducer<K,V>
public Future send(ProducerRecord<K,V> record)
Asynchronously send a record to a topic. Equivalent to
send(record, null)
.
ProducerRecord 是底层封装的发送消息对象,该类有多个重载的构造器方法,可见完全符合以上的三个原则:
Constructors
Constructor and Description
ProducerRecord(String topic, Integer partition, K key, V value)
Creates a record to be sent to a specified topic and partition
ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
Creates a record to be sent to a specified topic and partition
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
Creates a record with a specified timestamp to be sent to a specified topic and partition
ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
Creates a record with a specified timestamp to be sent to a specified topic and partition
ProducerRecord(String topic, K key, V value)
Create a record to be sent to Kafka
ProducerRecord(String topic, V value)
Create a record with no key
7.1.1 分区数据结构(Segment)
Producer 将数据写入 Kafka 后,集群就需要对数据进行保存了!Kafka 将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。
Kafka 初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。
前面说过了每个 Topic 都可以分为一个或多个 Partition,如果你觉得 Topic 比较抽象,那 Partition 就是比较具体的东西了:
-
Partition 在服务器上的表现形式就是一个一个的文件夹,每个 Partition 的文件夹下面会有多组 Segment 文件。
-
每组 Segment 文件又包含 .index 文件、.log 文件、.timeindex 文件(早期版本中没有)三个文件。
-
Log 文件就是实际存储 Message 的地方,而 Index 和 Timeindex 文件为索引文件,用于检索消息。
如上图,这个 Partition 有三组 Segment 文件,每个 Log 文件的大小是一样的,但是存储的 Message 数量是不一定相等的(每条的 Message 大小不一致)。
文件的命名是以该 Segment 最小 Offset 来命名的,如 000.index 存储 Offset 为 0~368795 的消息,Kafka 就是利用分段+索引的方式来解决查找效率的问题。
举个例子:我们一开头进行的测试,topic=first-topic-0,此时 kafka 会默认分配一个 partition,该分区的最小单位 Segment 以一个文件夹的形式保存了相关文件( .index 文件、.log 文件、.timeindex 文件)
7.2 消息确保不丢包(ACK 应答机制)
其实上面的写入流程图中有描述出来,那就是通过 ACK 应答机制!
在生产者向队列写入数据的时候可以设置参数来确定是否确认 Kafka 接收到数据,这个参数可设置的值为 0、1、all:
- 0 代表 Producer 往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
- 1 代表 Producer 往集群发送数据只要 Leader 应答就可以发送下一条,只确保 Leader 发送成功。
- all 代表 Producer 往集群发送数据需要所有的 Follower 都完成从 Leader 的同步才会发送下一条,确保 Leader 发送成功和所有的副本都完成备份。安全性最高,但是效率最低。
- 最后要注意的是,如果往不存在的 Topic 写数据,能不能写入成功呢?答案是:Kafka 会自动创建 Topic,分区和副本的数量根据默认配置都是 1。
7.2.1 消息数据结构(message data structure)
上面说到 Log 文件就实际是存储 Message 的地方,我们在 Producer 往 Kafka 写入的也是一条一条的 Message。
那存储在 Log 中的 Message 是什么样子的呢?消息主要包含消息体、消息大小、Offset、压缩类型……等等!我们重点需要知道的是下面三个:
- Offset:Offset 是一个占 8byte 的有序 id 号,它可以唯一确定每条消息在 Parition 内的位置!
- 消息大小:消息大小占用 4byte,用于描述消息的大小。
- 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
7.2.2 消息存储策略
无论消息是否被消费,Kafka 都会保存所有的消息。那对于旧数据有什么删除策略呢?
- 基于时间, 默认配置是 168 小时(7 天)。
- 基于大小, 默认配置是 1073741824。
- 需要注意的是,Kafka 读取特定消息的时间复杂度是 O(1),所以这里删除过期的文件并不会提高 Kafka 的性能!
7.2.3 消费消息寻址
在保存数据的小节里面,我们聊到了 Partition 划分为多组 Segment,每个 Segment 又包含 .log、.index、.timeindex 文件,存放的每条 Message 包含 Offset、消息大小、消息体…… 同时,我们多次提到 Segment 和 Offset,查找消息的时候是怎么利用 Segment+Offset 配合查找的呢?
7.2.3.1 一条消息被消费掉的过程
假如现在需要查找一个 Offset 为 368801 的 Message 是什么样的过程呢?我们先看看下面的图:
-
先找到 Offset 的 368801message 所在的 Segment 文件(利用二分法查找),这里找到的就是在第二个 Segment 文件。【找到 Segment 文件夹】
-
打开找到的 Segment 中的 .index 文件(也就是 368796.index 文件,该文件起始偏移量为 368796+1。我们要查找的 Offset 为 368801 的 Message 在该 Index 内的偏移量为 368796+5=368801,所以这里要查找的相对 Offset 为 5)。【找到 Segment 文件夹下的 index 文件,确定 offset】
-
由于该文件采用的是稀疏索引的方式存储着相对 Offset 及对应 Message 物理偏移量的关系,所以直接找相对 Offset 为 5 的索引找不到。这里同样利用二分法查找相对 Offset 小于或者等于指定的相对 Offset 的索引条目中最大的那个相对 Offset,所以找到的是相对 Offset 为 4 的这个索引。【根据稀疏索引,寻址到数据文件】
带稀疏索引的文件。这类文件是将所有数据记录关键字值分成许多组,每组一个索引项,这种索引称为稀疏索引。这类文件的数据记录要求按关键字顺序排列。因此,其特点是索引项少,管理方便,但插入、删除记录代价较大。
- 根据找到的相对 Offset 为 4 的索引确定 Message 存储的物理偏移位置为 256。打开数据文件,从位置为 256 的那个地方开始顺序扫描直到找到 Offset 为 368801 的那条 Message。【从数据文件中找到指定的 Message】
因此,这套机制是建立在 Offset 为有序的基础上,利用 Segment+有序 Offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。
7.2.3.2 消费者记录消费位置
在早期的版本中,消费者将消费到的 Offset 维护在 Zookeeper 中,Consumer 每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的 Offset 已经直接维护在 Kafka 集群的 __consumer_offsets 这个 Topic 中!(见下图所示的 Segments)
7.2.3.3 Kafka 消费 Offset 原理
在通过 Client 端消费 Kafka 中的消息时,消费的消息会同时在 Zookeeper 和 Kafka Log 中保存,如上图红线所示。当手动删除 Kafka 某一分片上的消息日志时,如上图蓝线所示,此是只是将 Kafka Log 中的信息清 0 了,但是 Zookeeper 中的 Partition 和 Offset 数据依然会记录。
当重新启动 Kafka 后,我们会发现如下二种情况:
- 客户端无法正常用消费;
- 在使用 Kafka Consumer Offset Monitor 工具进行 Kafka 监控时会发现 Lag(还有多少消息数未读取(Lag=logSize-Offset))为负数;其中此种情况的删除操作需要我们重点关注,后面我们也会详细介绍其对应的操作步骤。
一般正常情况,如果想让 Kafka 客户端正常消费,那么需要 Zookeeper 和 Kafka Log 中的记录保持如上图黄色所示。
消息存储在 Log 文件后,消费者就可以进行消费了。消费模式比如:点对点模式和发布订阅模式。
8.1 消息消费模式
8.2 消费原理
而 Kafka 采用的是点对点的模式,消费者主动的去 Kafka 集群拉取消息,与 Producer 相同的是,消费者在拉取消息的时候也是找 Leader 去拉取。
kafka 的 consumer 消费原则有两个:
- 多个消费者可以组成一个消费者组(Consumer Group),每个消费者组都有一个组 id!
- 同一个消费组者的消费者可以消费同一 Topic 下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!
是不是有点绕?我们看下图:
图示是消费者组内的消费者小于 Partition 数量的情况,所以会出现某个消费者消费多个 Partition 数据的情况,消费的速度也就不及只处理一个 Partition 的消费者的处理速度!
如果是消费者组的消费者多于 Partition 的数量,那会不会出现多个消费者消费同一个 Partition 的数据呢?
答:上面已经提到过不会出现这种情况!
举例子,在上面的图:
ConsumerGroup2,多出来的消费者 Consumer-3 就是不消费任何 Partition 的数据。
所以在实际的应用中,建议消费者组的 Consumer 的数量与 Partition 的数量一致!
《源码系列》
《经典书籍》
《Java并发编程实战:第2章 影响线程安全性的原子性和加锁机制》
《Java并发编程实战:第3章 助于线程安全的三剑客:final & volatile & 线程封闭》
《服务端技术栈》
《算法系列》
《设计模式》