Spark的容错机制

摘 要

分布式系统通常在一个机器集群上运行,同时运行的几百台机器中某些出问题的概率大大增加,所以容错设计是分布式系统的一个重要能力。

容错体系概述

Spark以前的集群容错处理模型,像MapReduce,将计算转换为一个有向无环图(DAG)的任务集合,这样可以通过重复执行DAG里的一部分任务来完成容错恢复。但是由于主要的数据存储在分布式文件系统中,没有提供其他存储的概念,容错过程需要在网络上进行数据复制,从而增加了大量的消耗。所以,分布式编程中经常需要做检查点,即将某个时机的中间数据写到存储(通常是分布式文件系统)中。

RDD也是一个DAG,每一个RDD都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系,这个关系在Spark里面叫lineage。由于创建RDD的操作是相对粗粒度的变换(如mapfilterjoin),即单一的操作应用于许多数据元素,而不需存储真正的数据,该技巧比通过网络复制数据更高效。当一个RDD的某个分区丢失时,RDD有足够的信息记录其如何通过其他RDD进行计算,且只需重新计算该分区,这是Spark的一个创新。

Spark的lineage也不是完美解决所有问题的,因为RDD之间的依赖分为两种,如下图所示:

Spark的容错机制
根据父RDD分区是对应一个还是多个子RDD分区,依赖分为如下两种。

  • 窄依赖。父分区对应一个子分区。
  • 宽依赖。父分区对应多个子分区。

对于窄依赖,只需要通过重新计算丢失的那一块数据来恢复,容错成本较小。但如果是宽依赖,则当容错重算分区时,因为父分区数据只有一部分是需要重算子分区的,其余数据重算就造成了冗余计算。

所以,不同的应用有时候也需要在适当的时机设置数据检查点。由于RDD的只读特性使得它比常用的共享内存更容易做检查点,具体可以使用doCheckPoint方法。

在有些场景的应用中,容错会更复杂,比如计费服务等,要求零丢失。还有在Spark支持的Streaming计算的应用场景中,系统的上游不断产生数据,容错过程可能造成数据丢失。为了解决这些问题,Spark也提供了预写日志(也称作journal),先将数据写入支持容错的文件系统中,然后才对数据施加这个操作。

另外,Kafka和Flume这样的数据源,接收到的数据只在数据被预写到日志以后,接收器才会收到确认消息,已经缓存但还没有保存的数据在Driver程序重新启动之后由数据源从上一次确认点之后重新再发送一次。

这样,所有的数据要不从日志中恢复,要不由数据源重发,实现了零丢失。

Master节点失效

Spark Master的容错分为两种情况:Standalone集群模式和单点模式。

Standalone集群模式下的Master容错是通过ZooKeeper来完成的,即有多个Master,一个角色是Active,其他的角色是Standby。当处于Active的Master异常时,需要重新选择新的Master,通过ZooKeeper的ElectLeader功能实现。关于ZooKeeper的实现,这里就不展开了,感兴趣的朋友可以参考Paxos。

要使用ZooKeeper模式,你需要在conf/spark-env.sh中为SPARK_DAEMON_JAVA_OPTS添加一些选项,详见下表。

系统属性

说明

spark.deploy.recoveryMode 默认值为NONE。设置为ZOOKEEPER后,可以在Active Master异常之后重新选择一个Active Master
spark.deploy.zookeeper.url ZooKeeper集群地址(比如192.168.1.100:2181,192.168.1.101:2181
spark.deploy.zookeeper.dir 用于恢复的ZooKeeper目录,默认值为/spark

设置SPARK_DAEMON_JAVA_OPTS的实际例子如下:

SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS
    -Dspark.deploy.recoveryMode =ZOOKEEPER"

应用程序启动运行时,指定多个Master地址,它们之间用逗号分开,如下所示:

MASTER=spark://192.168.100.101:7077,spark://192.168.100.102:7077 bin/spark-shell

在ZooKeeper模式下,恢复期间新任务无法提交,已经运行的任务不受影响。

此外,Spark Master还支持一种更简单的单点模式下的错误恢复,即当Master进程异常时,重启Master进程并从错误中恢复。具体方法是设置spark.deploy.recoveryMode属性的值为FILESYSTEM,并为spark.deploy.recoveryDirectory属性设置一个本地目录,用于存储必要的信息来进行错误恢复。

Slave节点失效

Slave节点运行着Worker、执行器和Driver程序,所以我们分三种情况讨论下3个角色分别退出的容错过程。

  • Worker异常停止时,会先将自己启动的执行器停止,Driver需要有相应的程序来重启Worker进程。
  • 执行器异常退出时,Driver没有在规定时间内收到执行器的StatusUpdate,于是Driver会将注册的执行器移除,Worker收到LaunchExecutor指令,再次启动执行器。
  • Driver异常退出时,一般要使用检查点重启Driver,重新构造上下文并重启接收器。第一步,恢复检查点记录的元数据块。第二步,未完成作业的重新形成。由于失败而没有处理完成的RDD,将使用恢复的元数据重新生成RDD,然后运行后续的Job重新计算后恢复。
  • Spark的容错机制已关闭评论
  • 82 views
  • A+
所属分类:未分类
avatar