一、Topic 介绍
Topic(主题)类似于文件系统中的文件夹,事件就是该文件夹中的文件。Kafka 中的主题总是多生产者和多订阅者:一个主题可以有零个、一个或多个向其写入事件的生产者,以及零个、一个或多个订阅这些事件的消费者。可以根据需要随时读取主题中的事件——与传统消息传递系统不同,事件在消费后不会被删除。相反,您可以通过每个主题的配置设置来定义 Kafka 应该保留您的事件多长时间,之后旧事件将被丢弃。Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据是完全没问题的。
主题是分区的,这意味着一个主题分布在位于不同 Kafka 代理的多个“桶”上。数据的这种分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时从/向多个代理读取和写入数据。当一个新事件发布到一个主题时,它实际上被附加到该主题的分区之一。具有相同事件键(例如,客户或车辆 ID)的事件被写入同一分区,并且 Kafka 保证给定主题分区的任何消费者将始终以与写入事件完全相同的顺序读取该分区的事件。
为了使数据具有容错性和高可用性,每个主题都可以复制,甚至可以跨地理区域或数据中心复制,以便始终有多个代理拥有数据副本,以防万一出现问题。常见的生产设置是复制因子为 3,即,你的数据将始终存在三个副本。此复制在主题分区级别执行。
在设置副本时,副本数是必须小于集群的 Broker 数的,副本只有设置在不同的机器上才有作用。
二、Topic 的创建方式
2.1 zookeeper 方式(不推荐)
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --replication-factor 3 --topic topic_test
注:-–zookeeper 后面接的是 kafka 的 zk 配置, 假如你配置的是 localhost:2181/kafka 带命名空间的这种,则不要漏掉了。
2.2 Kafka 版本 >= 2.2 支持下面的方式(推荐)
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 3 --topic topic_test
2.3 Broker 参数 auto.create.topics.enable 创建(不推荐)
Server 端如果 auto.create.topics.enable
设置为 true 时,那么当 Producer 向一个不存在的 topic 发送数据时,该 topic 同样会被创建出来,此时,副本数默认是 1。
三、Topic 的创建流程
3.1 Topic 创建入口
首先我们找到 kafka-topics.sh 这个脚本,看下里面的内容:
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
最终执行的是 kafka.admin.TopicCommand 该类,源码中找到该类,用 IDEA 进行断点调试源码。
程序参数:
--create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --topic topic_test_9
3.2 源码入口
上述入口主要作用:
- 判断参数中有没有传 --zookeeper 参数,如果有传的话,则创建类 ZookeeperTopicService 的对象,也就是上面我们说的 zookeeper 方式创建 topic;如果没有传的话,则创建类 AdminClientTopicService 对象,也就是上面我们说的 Kafka 版本 >= 2.2 推荐的创建 topic 的方式;
- 根据传入的参数判断判断是否有 --create 参数,有的话走创建主题逻辑。
3.3 创建 AdminClientTopicService 对象
object AdminClientTopicService {
def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = {
bootstrapServer match {
case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList)
case None =>
}
Admin.create(commandConfig)
}
def apply(commandConfig: Properties, bootstrapServer: Option[String]): AdminClientTopicService =
new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))
}
- 如果传入的参数有 --command-config,则将这个文件里的参数放到 commandConfig 一个 map 里,假如配置文件里面已经有了
bootstrap.servers
配置,那么会将其覆盖。 - 将上面的 commandConfig 作为入参传入 Admin.create(commandConfig) 创建 Admin;从这里也可以看出,我们调用
kafka-topic.sh
脚本实际上 kafka 模拟了一个 client 来创建 topic 的过程。
3.4 AdminClientTopicService.createTopic 创建 Topic
case class AdminClientTopicService private (adminClient: Admin) extends TopicService {
override def createTopic(topic: CommandTopicPartition): Unit = {
// 假如配置了副本数,--replication-factor 一定要在1和32767之间。
if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1))
throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive")
// 假如配置了分区数,--partitions 必须大于0。
if (topic.partitions.exists(partitions => partitions < 1))
throw new IllegalArgumentException(s"The partitions must be greater than 0")
try {
// 假如指定了 --replica-assignment 参数,则按照指定的方式来分配副本。
val newTopic = if (topic.hasReplicaAssignment)
new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
else {
new NewTopic(
topic.name,
topic.partitions.asJava,
topic.replicationFactor.map(_.toShort).map(Short.box).asJava)
}
// 将配置 --config 解析成一个配置 map
val configsMap = topic.configsToAdd.stringPropertyNames()
.asScala
.map(name => name -> topic.configsToAdd.getProperty(name))
.toMap.asJava
newTopic.configs(configsMap)
// 调用 adminClient 创建 Topic
val createResult = adminClient.createTopics(Collections.singleton(newTopic),
new CreateTopicsOptions().retryOnQuotaViolation(false))
createResult.all().get()
println(s"Created topic ${topic.name}.")
} catch {
case e : ExecutionException =>
if (e.getCause == null)
throw e
if (!(e.getCause.isInstanceOf[TopicExistsException] && topic.ifTopicDoesntExist()))
throw e.getCause
}
}
...
}
- 判断副本数、分区数是否合理。
- 判断是否配置了 --replica-assignment 参数,如果配置了,则按照指定的方式来分配副本。
- 将配置 --config 解析到 configsMap 中,configsMap 再赋值给 NewTopic 对象中的 configs。
- 调用 adminClient 创建 Topic。
我们来跟一下 adminClient.createTopics 的源码:
@Override
public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
final CreateTopicsOptions options) {
final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> topicFutures = new HashMap<>(newTopics.size());
final CreatableTopicCollection topics = new CreatableTopicCollection();
for (NewTopic newTopic : newTopics) {
if (topicNameIsUnrepresentable(newTopic.name())) {
KafkaFutureImpl<TopicMetadataAndConfig> future = new KafkaFutureImpl<>();
future.completeExceptionally(new InvalidTopicException("The given topic name '" +
newTopic.name() + "' cannot be represented in a request."));
topicFutures.put(newTopic.name(), future);
} else if (!topicFutures.containsKey(newTopic.name())) {
topicFutures.put(newTopic.name(), new KafkaFutureImpl<>());
topics.add(newTopic.convertToCreatableTopic());
}
}
if (!topics.isEmpty()) {
final long now = time.milliseconds();
final long deadline = calcDeadlineMs(now, options.timeoutMs());
final Call call = getCreateTopicsCall(options, topicFutures, topics,
Collections.emptyMap(), now, deadline);
runnable.call(call, now);
}
return new CreateTopicsResult(new HashMap<>(topicFutures));
}
直接跟到创建 topic 的核心代码 getCreateTopicsCall 这里来:
private Call getCreateTopicsCall(final CreateTopicsOptions options,
final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> futures,
final CreatableTopicCollection topics,
final Map<String, ThrottlingQuotaExceededException> quotaExceededExceptions,
final long now,
final long deadline) {
return new Call("createTopics", deadline, new ControllerNodeProvider()) {
@Override
public CreateTopicsRequest.Builder createRequest(int timeoutMs) {
return new CreateTopicsRequest.Builder(
new CreateTopicsRequestData()
.setTopics(topics)
.setTimeoutMs(timeoutMs)
.setValidateOnly(options.shouldValidateOnly()));
}
@Override
public void handleResponse(AbstractResponse abstractResponse) {
...
}
private ConfigEntry configEntry(CreatableTopicConfigs config) {
...
}
@Override
void handleFailure(Throwable throwable) {
...
}
};
}
Call 回调函数中的 createRequest 创建请求会使用构建者模式构建 CreateTopicsRequest 请求参数,如下图:
选择 ControllerNodeProvider 这个节点发起网络请求:
创建 Topic 这个操作是需要 Controller 来执行的:
/**
* Provides the controller node.
*/
private class ControllerNodeProvider implements NodeProvider {
@Override
public Node provide() {
if (metadataManager.isReady() &&
(metadataManager.controller() != null)) {
return metadataManager.controller();
}
metadataManager.requestUpdate();
return null;
}
}
3.5 发起网络请求
发起网络请求后续会单独出一篇文章来讲
3.6 Controller 服务端接收客户端请求
服务端接收客户端请求的源码入口: kafka.server.KafkaRequestHandler#run
主要看下 apis.handle(request) 方法,可以看到客户端的请求都在 request.bodyAndSize()
里面:
3.6.1 KafkaApis.handle(request) 根据请求传递 Api 调用不同接口
request.header.apiKey 匹配客户端传来的 CreateTopics
3.6.2 KafkaApis.handleCreateTopicsRequest 处理创建 Topic 的请求
def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
...
val createTopicsRequest = request.body[CreateTopicsRequest]
val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)
// 如果当前 Broker 不是属于 Controller 的话,则抛出异常。
if (!controller.isActive) {
createTopicsRequest.data.topics.forEach { topic =>
results.add(new CreatableTopicResult().setName(topic.name)
.setErrorCode(Errors.NOT_CONTROLLER.code))
}
sendResponseCallback(results)
} else {
createTopicsRequest.data.topics.forEach { topic =>
results.add(new CreatableTopicResult().setName(topic.name))
}
// kafka 相关鉴权省略...
adminManager.createTopics(
createTopicsRequest.data.timeoutMs,
createTopicsRequest.data.validateOnly,
toCreate,
authorizedForDescribeConfigs,
controllerMutationQuota,
handleCreateTopicsResults)
}
}
- 判断当前 Broker 是否属于 Controller,不是的话则抛出异常;CreateTopic 操作必须由 Controller 来进行,因为有可能客户端发起请求的时候 Controller 已经变更。
- kafka 相关鉴权
- 最后调用 adminManager.createTopics()
3.6.3 adminManager.createTopics()
- 关于主题、副本分配、分区的相关校验
- 检查Topic是否存在
- 检查--replica-assignment参数和 (--partitions || --replication-factor) 不能同时使用
- 如果 (--partitions || --replication-factor) 没有设置,则使用Broker的配置(这个Broker肯定是Controller)
- 校验创建topic的参数准确性
- 计算分区副本分配方式
- 创建分区元数据
- adminZkClient.createTopicWithAssignment 把topic相关数据写入到zk中
- 如果 timeout <= 0 或 validateOnly为true 或 没有主题可以立即返回,否则将分配和错误传递给延迟操作并设置键。
- 尝试立即完成请求,否则将其放入炼狱。
3.6.4 把 topic 相关数据写入到 zk 中
我们来看下 adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false) 这个方法,看看有哪些数据写入到 zookeeper 中。
def createTopicWithAssignment(topic: String,
config: Properties,
partitionReplicaAssignment: Map[Int, Seq[Int]],
validate: Boolean = true): Unit = {
if (validate)
validateTopicCreate(topic, partitionReplicaAssignment, config)
info(s"Creating topic $topic with configuration $config and initial partition " +
s"assignment $partitionReplicaAssignment")
// write out the config if there is any, this isn't transactional with the partition assignments
// 将topic单独的配置写入到zk中
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
// create the partition assignment
// 将topic分区相关信息写入zk中
writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment(v) }, isUpdate = false)
}
写入 Topic 配置信息
- 调用 SetDataRequest 请求,往 /config/topics/{TopicName} 写入数据,这里一般会返回 NONODE,没有该节点,则往该节点写入数据,如果该节点存在,则直接覆盖掉。
- 节点不存在的话,则调用 createOrSet 方法,写入数据,并且节点类型是 PERSISTENT 持久节点。
这里写入的数据,也就是入参的时候传的 --config 的那些参数,这里的配置会覆盖默认配置。
写入 Topic 分区信息
- 将分区分配策略写入到 /brokers/topics/{TopicName} 中,节点类型是 PERSISTENT 持久节点
- 与 Zookeeper 交互的地方
- kafka.zookeeper.ZooKeeperClient#send,这里封装了很多与 Zookeeper 的交互:
3.7 Controller 监听 /brokers/topics/{topicName},通知 Broker 将分区写入磁盘
Controller 监听 /brokers/topics/{topicName},该节点有变化,就会通知 Controller 做出相应的处理。
kafka.controller.KafkaController#processTopicChange
private def processTopicChange(): Unit = {
// 如果处理的不是Controller角色就返回
if (!isActive) return
// 从zk中获取 /brokers/topics 所有Topic
val topics = zkClient.getAllTopicsInCluster(true)
// 找出哪些是新增的
val newTopics = topics -- controllerContext.allTopics
// 找出哪些Topic在zk上被删除了
val deletedTopics = controllerContext.allTopics.diff(topics)
controllerContext.setAllTopics(topics)
registerPartitionModificationsHandlers(newTopics.toSeq)
val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)
deletedTopics.foreach(controllerContext.removeTopic)
addedPartitionReplicaAssignment.foreach {
case (topicAndPartition, newReplicaAssignment) =>
controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
}
info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
s"[$addedPartitionReplicaAssignment]")
if (addedPartitionReplicaAssignment.nonEmpty)
onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
}
- 从 zk 中获取 /brokers/topics 所有 Topic,跟当前 Broker 内存中所有 controllerContext.allTopics 的差异,可以找出哪些是新增的 Topic,哪些 Topic 在 zk 上被删除了。
- 从 zk 中获取
/brokers/topics/{topicName}
给定主题的副本分配。并保存在内存中。
- 执行 onNewPartitionCreation,分区状态开始流转。
3.7.1 onNewPartitionCreation 状态流转
/**
* This callback is invoked by the topic change callback with the list of failed brokers as input.
* It does the following -
* 1. Move the newly created partitions to the NewPartition state
* 2. Move the newly created partitions from NewPartition->OnlinePartition state
*/
private def onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit = {
info(s"New partition creation callback for ${newPartitions.mkString(",")}")
partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq,
NewReplica)
partitionStateMachine.handleStateChanges(
newPartitions.toSeq,
OnlinePartition,
Some(OfflinePartitionLeaderElectionStrategy(false))
)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq,
OnlineReplica)
}
3.7.1.1 将待创建的分区状态流转为 NewPartition
3.7.1.2 将待创建的副本状态流转 NewReplica
3.7.1.3 将分区状态从刚刚的 NewPartition 流转为 OnlinePartition
- 获取 leaderIsrAndControllerEpochs,Leader 为副本的第一个;
- 向 zk 中写入 /brokers/topics/{topicName}/partitions/ 持久节点,无数据;
- 向 zk 中写入 /brokers/topics/{topicName}/partitions/{分区号} 持久节点,无数据;
- 向 zk 中写入 /brokers/topics/{topicName}/partitions/{分区号}/state 持久节点,数据为 leaderIsrAndControllerEpoch;
- 向副本所属 Broker 发送 leaderAndIsrRequest 请求;
- 向所有 Broker 发送 UPDATE_METADATA 请求。
3.7.1.4 将副本状态从刚刚的 NewReplica 流转为 OnlineReplica,更新下内存
3.8 Broker 收到 LeaderAndIsrRequest 创建本地 Log
直接定位到 kafka.server.KafkaApis#handleLeaderAndIsrRequest
继续跟 kafka.server.ReplicaManager#becomeLeaderOrFollower -> kafka.server.ReplicaManager#makeLeaders -> kafka.log.LogManager#getOrCreateLog
如果日志已经存在,只返回现有日志的副本,否则如果 isNew=true 或者如果没有离线日志目录,则为给定的主题和给定的分区创建日志,否则抛出 KafkaStorageException。