我这里使用spark2.4.4版本;
进入org.apache.spark.deploy.SparkSubmit类的main方法
override def main(args: Array[String]): Unit = { val submit = new SparkSubmit() { self => override protected def parseArguments(args: Array[String]): SparkSubmitArguments = { new SparkSubmitArguments(args) { override protected def logInfo(msg: => String): Unit = self.logInfo(msg) override protected def logWarning(msg: => String): Unit = self.logWarning(msg) } } override protected def logInfo(msg: => String): Unit = printMessage(msg) override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg") override def doSubmit(args: Array[String]): Unit = { try { super.doSubmit(args) } catch { case e: SparkUserAppException => exitFn(e.exitCode) } } } //提交 submit.doSubmit(args) }
下一步
def doSubmit(args: Array[String]): Unit = { // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the application starts. val uninitLog = initializeLogIfNecessary(true, silent = true) val appArgs = parseArguments(args) if (appArgs.verbose) { logInfo(appArgs.toString) } appArgs.action match { //提交 case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) //杀掉 case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) case SparkSubmitAction.PRINT_VERSION => printVersion() } }