kafka启动流程

通过 kafka-server-start.sh 发现,启动类为 kafka.Kafka,直接看类代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def main(args: Array[String]): Unit = {
try {
// 服务启动配置
val serverProps = getPropsFromArgs(args)
// 启动类
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartable.shutdown
}
})

kafkaServerStartable.startup
kafkaServerStartable.awaitShutdown
}
catch {
case e: Throwable =>
fatal(e)
System.exit(1)
}
System.exit(0)
}

启动为 KafkaServerStartable 对象进行启动,而 KafkaServerStartable 中具体的实现是 KafkaServer对象的启动,具体启动方法是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
  /**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
try {
info("starting")

if(isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

if(startupComplete.get)
return

val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
// 状态调整为 Starting
brokerState.newState(Starting)

/* start scheduler 启动调度器 */
kafkaScheduler.startup()

/* setup zookeeper 初始化 ZK连接,同时创建 根节点.*/
zkUtils = initZk()

/* Get or create cluster_id 创建或获取 cluster id*/
_clusterId = getOrGenerateClusterId(zkUtils)
info(s"Cluster ID = $clusterId")

/* generate brokerId 获取 broker id 以及broker上对应的log_dirs*/
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "

/* create and configure metrics 创建一个metrics,这个metrics提供给kafka内部使用*/
val reporters = config.getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter],
Map[String, AnyRef](KafkaConfig.BrokerIdProp -> (config.brokerId.toString)).asJava)
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
metrics = new Metrics(metricConfig, reporters, time, true)

// 容量管理的一个东西,比如限制kafka producer生产传输速度
quotaManagers = QuotaFactory.instantiate(config, metrics, time)
notifyClusterListeners(kafkaMetricsReporters ++ reporters.asScala)

/* start log manager 构建LogManager,并运行。broker端的重要线程 */
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()

// 每个partition的状态缓存,每个broker都会异步维护同一个缓存,更新请求来自controller
metadataCache = new MetadataCache(config.brokerId)
credentialProvider = new CredentialProvider(config.saslEnabledMechanisms)
// NIO socket server 创建和启动
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup()

/* start replica manager 副本管理器初始化和启动 */
replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
isShuttingDown, quotaManagers.follower)
replicaManager.startup()

/* start kafka controller 控制器初始化和启动 */
kafkaController = new KafkaController(config, zkUtils, brokerState, time, metrics, threadNamePrefix)
kafkaController.startup()

adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)

/* start group coordinator 管理消费组成员和offset */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
groupCoordinator.startup()

/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}

/* start processing requests 处理各种kafka的请求*/
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
clusterId, time)
// 请求池
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)

Mx4jLoader.maybeLoad()

/* start dynamic config manager Topic 配置管理请求,client配置管理请求,broker配置管理请求,用户配置管理请求 */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

// Create the config manager. start listening to notifications
// 动态监听所有请求,开始启动
dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()

/* tell everyone we are alive 监听kafka进程状态,健康管理*/
val listeners = config.advertisedListeners.map { endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
else
endpoint
}
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()

// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
checkpointBrokerId(config.brokerId)

/* register broker metrics */
registerStats()
// 更新broker状态为 Runing
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
// 启动完成,修改相关状态
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}

总的来说,在KafkaServer启动流程中,先后启动了以下几个模块:

  1. kafkaScheduler 调度模块,负责kafka内部的周期性调度和非周期性调度
  2. 初始化 Zookeeper 工具类
  3. 实例化 metrics
  4. quotaManagers 限额管理模块
  5. logManager 日志管理模块
  6. socketServer 网络服务模块
  7. replicaManager 副本管理模块
  8. kafkaController kafka控制器
  9. adminManager 暂时不知道具体干嘛的,应该做客户端管理用的
  10. groupCoordinator 消费组协调器
  11. apis 和 requestHandlerPool 请求API和请求池
  12. dynamicConfigManager 动态配置管理