消息队列MQ

Kafka 服务器源码

本文主要是介绍Kafka 服务器源码,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

image

4.1 程序入口

Kafka.scala

def main(args: Array[String]): Unit = {
  try {
    // 获取参数相关信息
    val serverProps = getPropsFromArgs(args)
    // 配置服务
    val server = buildServer(serverProps)

    try {
      if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
        new LoggingSignalHandler().register()
    } catch {
      case e: ReflectiveOperationException =>
        warn("Failed to register optional signal handler that logs a message when the process is terminated " +
          s"by a signal. Reason for registration failure is: $e", e)
    }

    // attach shutdown handler to catch terminating signals as well as normal termination
    Exit.addShutdownHook("kafka-shutdown-hook", {
      try server.shutdown()
      catch {
        case _: Throwable =>
          fatal("Halting Kafka.")
          // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
          Exit.halt(1)
      }
    })

    // 启动服务
    try server.startup()
    catch {
      case _: Throwable =>
        // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }

    server.awaitShutdown()
  }
  catch {
    case e: Throwable =>
      fatal("Exiting Kafka due to fatal exception", e)
      Exit.exit(1)
  }
  Exit.exit(0)
}
这篇关于Kafka 服务器源码的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!