SparkContext初始化过程

SparkContext在构造的过程中,已经完成了各项服务的启动。因为Scala语法的特点,所有构造函数都会调用默认的构造函数,而默认构造函数的代码直接在类定义中。

除了初始化各类配置、日志之外,最重要的初始化操作之一是启动Task调度器和DAG调度器,相关代码如下:

// 创建并启动Task调度器
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.send(TaskSchedulerIsSet)
// 创建DAG调度器,并引用之前创建的Task调度器之后,
// 再启动Task调度器
_taskScheduler.start()

DAG调度与Task调度的区别是,DAG是最高层级的调度,为每个Job绘制出一个有向无环图(简称DAG),跟踪各Stage的输出,计算完成Job的最短路径,并将Task提交给Task调度器来执行。而Task调度器只负责接受DAG调度器的请求,负责Task的实际调度执行,所以DAGScheduler的初始化必须在Task调度器之后。

DAG与Task这种分离设计的好处是,Spark可以灵活设计自己的DAG调度,同时还能与其他资源调度系统结合,比如YARN、Mesos。

Task调度器本身的创建在createTaskScheduler函数中进行。根据Spark程序提交时指定的不同模式,可以启动不同类型的调度器。并且出于容错考虑,createTaskScheduler会返回一主一备两个调度器。以YARN cluster模式为例,主、备调度器对应不同类的实例,但是加载了相同的配置。下面摘录了createTaskScheduler函数的相关实现:

private def createTaskScheduler(
    sc: SparkContext,
    master: String): (SchedulerBackend, TaskScheduler) = {
        // 省略部分代码……
        master match {
        // 省略部分case代码……
        case "yarn-standalone" | "yarn-cluster" =>
            if (master == "yarn-standalone") {
                logWarning("\"yarn-standalone\" is deprecated as of Spark 1.0.
                    Use \"yarn-cluster\" instead.")
             }
         // 主调度器
         val scheduler = try {
             val clazz = Class.forName("org.apache.spark.scheduler.
                 cluster.YarnClusterScheduler")
             val cons = clazz.getConstructor(classOf[SparkContext])
             cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
         } catch {
             case e: Exception => {
                throw new SparkException("YARN mode not available ?", e)
             }
         }
         // 备用调度器
         val backend = try {
             val clazz =

                  Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler Backend")
              val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
              cons.newInstance(scheduler, sc).asInstanceOf [CoarseGrainedSchedulerBackend]
          } catch {
              case e: Exception => {
                  throw new SparkException("YARN mode not available ?", e)
              }
          }
          scheduler.initialize(backend)
          (backend, scheduler)
          // 省略部分case代码……
    }
}
  • SparkContext初始化过程已关闭评论
  • 240 views
  • A+
所属分类:未分类
avatar