优秀的编程知识分享平台

网站首页 > 技术文章 正文

Spark源码阅读:从Spark-Shell开始(下)

nanyue 2024-08-01 22:51:21 技术文章 7 ℃

在上一篇文章 Spark源码阅读:从spark-shell开始(上) 中,我们发现 spark-shell 是通过 spark-submit 脚本启动的,两个脚本都只是做了一层包装,底层逻辑spark源码里的 Scala单例类 SparkSubmit。object SparkSubmit 负责提交一个spark任务,而 object repl.Main 作为参数。

为了看到真实的提交结果,我们在执行时,设置SPARK_PRINT_LAUNCH_COMMAND=1,执行结果如下:

输出到console的信息可以分为五个部分:

  1. 背后真正的启动命令
  2. 初始化 Logging 的输出
  3. 初始化 repl 命令行的输出:预加载类/配置,初始化SparkSession
  4. 打印Welcome
  5. 交互式命令行开始接收输入

启动命令比较简单,java 执行 object SparkSubmit 的 main函数,SparkSubmit.main 里通过反射获取 object repl.Main 的 main 函数,并触发执行:

/Library/Java/JavaVirtualMachines/jdk1.8.0_301.jdk/Contents/Home/bin/java \
  -cp /Users/user/go/src/github.com/apache/spark/conf/:/Users/user/go/src/github.com/apache/spark/assembly/target/scala-2.12/jars/* \
  -Dscala.usejavacp=true \
  -Xmx1g org.apache.spark.deploy.SparkSubmit \
  --class org.apache.spark.repl.Main \
  --name Spark shell spark-shell

我们再看一下 object repl.Main 的结构。它存储了一些配置信息,还提供了创建SparkSession对象的能力。值得注意的是,sparkSession: SparkSession 和 交互式命令行里的 spark: SparkSession 是同一个对象。交互式命令行的初始化和代码执行均托管给 interp: SparkILoop 对象。

SparkILoop 实现了Spark定制版的交互式命令行。SparkILoop 继承了 scala 编译器自带的 repl 类 ILoop,所以已经具备了命令行的核心功能(读取用户输入,解释/执行代码,返回输出),它只需要定制命令行的启动逻辑即可。startup() 函数中对这段逻辑有完整的实现,后面我们着重阅读这部分代码。

上面这张图左侧是 SparkILoop 的结构,process方法包含了创建命令行的主干逻辑,启动阶段的逻辑包含在 startup 方法中。startup() 阶段,会按顺序创建:

  1. SplashLoop实例:启动后台线程,存储用户输入
  2. scala解释器实例:用来解释&执行用户输入的代码
  3. SparkSession实例:用来执行spark指令

然后返回初始化过程中的用户输入。

上图中右侧的代码中,startup() 返回的字符串,通过 pattern matching 进行检查。

Pattern Matching (模式匹配) 用来检查一个变量是否符合某种pattern,是函数式编程的标配。使用时,它可以取代 Java 里的 switch case,但能力远远不止于此。

如果初始化过程中,用户按了 Ctrl+D ,返回值是 null,命令行会退出执行。

如果输入了其他字符,loop(line) 里的 loop方法会解释并执行这行代码,并等待用户下一轮输入。loop 方法是 SparkILoop 父类 ILoop 的方法。它是一个尾递归函数,除非接收到 null,或者遇到不可恢复的异常,否则不会退出。

// class ILoop
@tailrec final def loop(line: String): LineResult = {
  import LineResults._
  if (line == null) EOF
  else if (try processLine(line) catch crashRecovery) loop(readOneLine())
  else ERR
}

private def readOneLine() = {
  out.flush()
  in readLine prompt
}

processLine 方法会处理新的指令。对于有很多行的代码块,解释器对象判断是否缓存代码,或取出缓存代码触发执行。

readOneLine 会尝试读取新的用户输入,如果没有,则处于等待状态。


命令行启动的主流程基本是这些。刚才我们提到,startup() 阶段,会按顺序创建:

  1. SplashLoop实例:初始化 console reader,并提取reader获取的第一条指令
  2. scala解释器实例:用来解释&执行用户输入的代码
  3. SparkSession实例:用来执行spark指令

下面我们看一下 startup() 的逻辑。

// 创建 SplashLoop,包含SplashReader实例和 prompt
val splash = preLoop

// 创建解释器
createInterpreter()
intp.initializeSynchronous()

// 更新配置,初始化SparkSession,执行预加载文件和代码
loopPostInit()

// 打印welcome
printWelcome()

// 提取第一行,Ctrl+D 停止执行
val line = splash.line

这些步骤走完之后,loop 方法会接管第一行输入,并处理完startup过程中用户输入的指令,同时接收新的用户输入指令。

这里我们重点看下如何初始化 SparkSession。初始化的逻辑定义在 initializeSpark 函数中。IMain intp 是一个 scala 的代码解释器,它通过 quietRun 可以执行代码块。

// class SparkILoop
def initializeSpark(): Unit = {
  if (!intp.reporter.hasErrors) {
    // `savingReplayStack` removes the commands from session history.
    savingReplayStack {
      initializationCommands.foreach(intp quietRun _)
    }
  } else {
    throw new RuntimeException(s"Scala $versionString interpreter encountered " +
      "errors during initialization")
  }
}

// 初始化命令,总共5条
val initializationCommands: Seq[String] = Seq(
  """
  @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
      org.apache.spark.repl.Main.sparkSession
    } else {
      org.apache.spark.repl.Main.createSparkSession()
    }
  @transient val sc = {
    val _sc = spark.sparkContext
    // 打印 WebUI URL 和 application id
    ...
    _sc
  }
  """,
  "import org.apache.spark.SparkContext._",
  "import spark.implicits._",
  "import spark.sql",
  "import org.apache.spark.sql.functions._"
)

initializationCommands 包含五个代码块,第一个用来初始化 spark 和 sc对象,其余用于在命令行里引入必要的package和class。这段代码执行完成,意味着我们可以在命令行里使用 spark 和 sc 对象。

值得注意的是 spark 的初始化是通过 object org.apache.spark.repl.Main 的方法实现的,而 repl.Main 是 spark-shell 的入口单例类。命令行里和的 spark 对象 和 repl.Main.sparkSession 成员变量是同一个 SparkSession,同理对 sc 和 repl.Main.sparkContext 也是一个。

spark-shell 代码的介绍就先到这里。如果你对于spark代码不熟悉,可以走单步调试,看关键步骤的执行是否符合预期。配置方式如下:

第一步:IDEA 增加 Remote JVM Debug 配置,这里我们只选择 use module class=spark-repl,其他使用默认值

第二步:启动 spark-shell前设置环境变量 SPARK_SUBMIT_OPTS

export SPARK_SUBMIT_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
./bin/spark-shell

// spark-shell 命令会被卡在下面这一行,直到执行第三步
Listening for transport dt_socket at address: 5005

第三步:IDE 里启动 Debug

Tags:

最近发表
标签列表