kafka基础使用指南

开发与运维

0.前言

虽然现如今的互联网行业已经步入衰退期,java应用开发的火热程度已经远不及当年,但是瘦死的骆驼比马大,java开发在编程领域还是占有很大的市场的。今天我们说的主角kafka就是用java开发的一款开源软件,通常用作java应用的消息中间件,并且也是目前最好用的消息中间件之一,今天我们就来部署一个kafka,并且看下kafka有哪些常见的用法。

1.准备工作

还是准备一台rocky9 linux操作系统的虚拟机,配置信息如下:

IP地址CPU内存
192.168.159.1674核8G

当然,其他配置或者操作系统的虚拟机也可以。

2.部署单机版kafka

部署kafka通常需要搭配一个zookeeper(新版本也支持kraft模式),本身kafka自带zookeeper,但为了方便管理,还是单独部署一个zookeeper给kafka使用。
kafka的安装包可以到官网下载:https://kafka.apache.org/downloads

2.1 安装包准备

本次示例我们的安装包如下:
kafka安装包:kafka_2.12-3.2.0.tgz
zookeeper安装包:zookeeper-3.4.5.tar.gz
注意:kafka启动依赖java,需要虚拟机有jdk
安装包准备好之后,根据个人喜好,上传到虚拟机指定目录。

2.2 部署kafka

因为我们下载的是源码包,所以部署就非常简单了,只需要解压安转包存放到指定目录即可。
(1)部署zookeeper命令如下:

cd ~/install/kafka
tar xf zookeeper-3.4.5.tar.gz
mv zookeeper-3.4.5 /usr/local/zookeeper

(2)部署kafka命令如下:

cd ~/install/kafka
tar xf kafka_2.12-3.2.0.tgz
mv zookeeper-3.4.5 /usr/local/kafka

执行完以上几条命令,kafka就算部署完成了。

2.3 配置修改

接着看一下zookeeper和kafka的配置文件,并可以根据需要修改一些配置。 (1)zookeeper的配置文件

mv /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
vim /usr/local/zookeeper/conf/zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/data/zookeeper/data
# the port at which the clients will connect
clientPort=2181
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

zookeeper主要需要关注的是dataDir这个配置,是zookeeper存放数据的目录,原本是在/tmp目录下,线上环境最好指定专门的目录。 (2)kafka配置文件

vim /usr/local/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://192.168.159.167:9092
advertised.listeners=PLAINTEXT://192.168.159.167:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/log
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

如果需要从其他服务器访问kafka,需要修改listeners、advertised.listeners配置,指定服务器IP地址,另外可以修改下kafka日志存放目录log.dirs。

2.4 启动服务

配置修改完成之后,就可以启动服务,需要先启动zookeeper,再启动kafka。
(1)启动zookeeper

cd /usr/local/zookeeper/bin/
./zkServer.sh start

(2)启动kafka

cd /usr/local/kafka/bin/
./kafka-server-start.sh -daemon ../config/server.properties

(3)验证服务

ps -ef |grep zookeeper |grep zoo.cfg

如果看到zookeeper进程正常运行,说明zookeeper启动成功。

ps -ef |grep kafka |grep server.properties

如果看到kafka进程正常运行,说明kafka启动成功。
到此,单机版本的kafka就部署完成了。

3.kafka常用术语

kafka涉及到的术语比较多,这里列举一些常用的,说明一下。
(1)broker:kafka集群包括一个或多个服务器,这种服务器叫做broker。broker接受来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的请求作出相应,返回已经提交到磁盘上的信息。
(2)topic(主题):每条发布到kafka的消息都有一个类别,这个类别叫做topic。topic就好比数据库的表或者文件系统中的文件夹。一个主题可以分为多个分区,一个分区就是一个提交日志。消息以追加的方式写入分区,然后以先入先出的顺序读取。注意,由于一个主题可以多个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。其实就是队列存储。
(3)message(消息) :kafka的数据单元。
批次:就是一组消息,这些消息属于同一个主题和分区。为了提高效率,消息被分批次写入kafka。
(4)partition(分区):物理上的概念,一个topic通常有多个partition,类似队列,提高读写的并发。
(5)offset(偏移量):一种元数据,它是一个不断递增的整数值,在创建消息时,kafka会把它添加到消息里。kafka为每条在分区的消息保存这个offset,这也是消费者在分区的位置。比如一个偏移量为10的消费者,表示它已经消费了0-9偏移量的消息,下一个要消费的消息是偏移量为10的。kafka 0.9版本之前存在zookeeper,0.9之后存在kafka。
(6)producer(生产者):消息的生产者,负责发送指定topic的消息到broker。默认情况下把消息均衡地分布到主题的所有分区上。
(7)consumer(消费者):消息读取客户端,通过订阅一组topic的消息从broker拉取消息。
(8)consumer group(消费者群组):消费者是消费者群组的一部分,就是说,会有一个或者多个消费者共同读取一个topic。群组保证每个分区只能被一个消费者使用。可以为消费者指定group name,若不指定则属于默认的group。
(9)rebalance(重平衡):消费者组内某个消费者实例挂了之后,其它消费者实例自动重新分配订阅主题分区的过程。rebalance是kafka消费端实现高可用的重要手段。

4.基础功能测试

kafka部署完毕,基础知识也掌握了一些,接下来就来动手试试kafka生产者如何产生消息,消费者如何消费消息。

4.1 主题相关

(1)创建主题

./kafka-topics.sh --create --bootstrap-server 192.168.159.167:9092  --topic mytest

(2)查看主题

./kafka-topics.sh --bootstrap-server 192.168.159.167:9092 --list --exclude-internal

(3)删除主题

./kafka-topics.sh --bootstrap-server 192.168.159.167:9092 --delete --topic test

注意:如果发现删除主题后又重新出现了,需要在配置文件添加delete.topic.enable=true

4.2 消息相关

(1)生产消息

./kafka-console-producer.sh --bootstrap-server 192.168.159.167:9092 --topic test

基于test主题生产消息,命令会卡住,并提示输入消息,如下所示:

picture.image 我们可以输入一串消息进去,比如:hello,world,hello world

(2)消费消息

./kafka-console-consumer.sh --bootstrap-server 192.168.159.167:9092 --topic test --from-beginning

picture.image 可以看到消费者会消费消息,并打印消费的消息。

5.总结

kafka作为一个老牌消息中间件,有这免费、性能好、社区活跃等优点,目前依然是最常用的消息中间件之一,今天只是讲解了一下kafka的基础知识,还有很多好玩有趣的功能等着大家探索。

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
字节跳动基于 DataLeap 的 DataOps 实践
随着数字化转型的推进以及业务数仓建设不断完善,大数据开发体量及复杂性逐步上升,如何保证数据稳定、正确、持续产出成为数据开发者核心诉求,也成为平台建设面临的挑战之一。本次分享主要介绍字节对于DataOps的理解 以及 DataOps在内部业务如何落地实践。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论