Spark RDD的Transformation

RDD的Transformation是指由一个RDD生成新RDD的过程,比如前面使用的flatMapmapfilter操作都返回一个新的RDD对象,类型是MapPartitionsRDD,它是RDD的子类。

所有的RDD Transformation都只是生成了RDD之间的计算关系以及计算方法,并没有进行真正的计算。下面还是以WordCount为例进行介绍:

val textFile = sc.textFile("README.md")
val words = textFile.flatMap(line => line.split(" "))
val wordPairs = words.map(word => (word, 1))
val wordCounts = wordPairs.reduceByKey((a, b) => a + b)
wordCounts.collect()

每一个操作都会生成一个新的RDD对象(其类型为RDD子类),它们按照依赖关系串在一起,像一个链表(其实是DAG的简化形式),每个对象有一个指向父节点的指针,以及如何从父节点通过计算生成新对象的信息。下图显示了WordCount计算过程中的RDD Transformation生成的RDD对象的依赖关系。

Spark RDD的Transformation

          RDD Transformation生成的RDD对象的依赖关系

除了RDD创建过程会生成新的RDD外,RDD Transformation也会生成新的RDD,并且设置与前一个RDD的依赖关系。结合每一个RDD的数据和它们之间的依赖关系,每个RDD都可以按依赖链追溯它的祖先,这些依赖链接就是RDD重建的基础。因此,理解了RDD依赖,也就理解了RDD的重建容错机制。

下面以map为例进行介绍。实际上,这就是生成了一个新的RDD对象,其类型是MapPartitionsRDD(它是RDD的子类):

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

MapPartitionsRDD的定义如下:

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
    extends RDD[U](prev) {
    override val partitioner = if (preservesPartitioning)
        firstParent[T].partitioner else None
    override def getPartitions: Array[Partition] = firstParent[T].partitions
    override def compute(split: Partition, context: TaskContext): Iterator[U] =
        f(context, split.index, firstParent[T].iterator(split, context))
}

可以看到,MapPartitionsRDD最主要的工作是用变量f保存传入的计算函数,以便compute调用它来进行计算。其他4个重要属性基本保持不变:分区和优先计算位置没有重新定义,保持不变,依赖关系默认依赖调用的RDD,分区器优先使用上一级RDD的分区器,否则为None

在Spark中,RDD是有依赖关系的,这种依赖关系有两种类型。

  • 窄依赖。依赖上级RDD的部分分区。
  • Shuffle依赖。依赖上级RDD的所有分区。

对应类的关系如下图所示。

Spark RDD的Transformation

                             对应类的关系

之所以这么区分依赖关系,是因为它们之间有本质的区别。使用窄依赖时,可以精确知道依赖的上级RDD的分区。一般情况下,会选择与自己在同一节点的上级RDD分区,这样计算过程都在同一节点进行,没有网络IO开销,非常高效,常见的mapflatMapfilter操作都是这一类。而Shuffle依赖则不同,无法精确定位依赖的上级RDD的分区,相当于依赖所有分区(想想reduceByKey计算,需要对所有的key重新排列)。计算时涉及所有节点之间的数据传输,开销巨大。所以,以Shuffle依赖为分隔,Task被分成Stage,方便计算时的管理。

RDD仔细维护着这种依赖关系和计算方法,使得通过重新计算来恢复RDD成为可能。当然,这也不是万能的。如果依赖链条太长,那么通过计算来恢复的代价就太大了。所以,Spark又提供了一种叫检查点的机制。对于依赖链条太长的计算,对中间结果存一份快照,这样就不需要从头开始计算了。

  • Spark RDD的Transformation已关闭评论
  • 137 views
  • A+
所属分类:未分类
avatar