Kafka.scala
def main(args: Array[String]): Unit = { try { // 获取参数相关信息 val serverProps = getPropsFromArgs(args) // 配置服务 val server = buildServer(serverProps) try { if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk) new LoggingSignalHandler().register() } catch { case e: ReflectiveOperationException => warn("Failed to register optional signal handler that logs a message when the process is terminated " + s"by a signal. Reason for registration failure is: $e", e) } // attach shutdown handler to catch terminating signals as well as normal termination Exit.addShutdownHook("kafka-shutdown-hook", { try server.shutdown() catch { case _: Throwable => fatal("Halting Kafka.") // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit. Exit.halt(1) } }) // 启动服务 try server.startup() catch { case _: Throwable => // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code fatal("Exiting Kafka.") Exit.exit(1) } server.awaitShutdown() } catch { case e: Throwable => fatal("Exiting Kafka due to fatal exception", e) Exit.exit(1) } Exit.exit(0) }