页面树结构

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

一个 Streaming 应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM 崩溃等)。为了使这成为可能,Spark Streaming 需要 checkpoint 足够的信息到容错存储系统中, 以使系统从故障中恢复。

  • Metadata checkpointing : 保存流计算的定义信息到容错存储系统如 HDFS 中。这用来恢复应用程序中运行 worker 的节点的故障。元数据包括 : 
    • Configuration : 创建 Spark Streaming 应用程序的配置信息。
    • DStream operations : 定义 Streaming 应用程序的操作集合。
    • Incomplete batches : 操作存在队列中的未完成的批。
  • Data checkpointing : 保存生成的 RDD 到可靠的存储系统中,这在有状态 transformation(如结合跨多个批次的数据)中是必须的。在这样一个 transformation 中,生成的 RDD 依赖于之前批的 RDD,随着时间的推移,这个依赖链的长度会持续增长。在恢复的过程中,为了避免这种无限增长。有状态的 transformation 的中间 RDD 将会定时地存储到可靠存储系统中,以截断这个依赖链。

元数据 checkpoint 主要是为了从 driver 故障中恢复数据。如果 transformation 操作被用到了,数据 checkpoint 即使在简单的操作中都是必须的。

何时启用 CheckPointing

应用程序在下面两种情况下必须开启 checkpoint

  • 使用有状态的 ransformation。如果在应用程序中用到了updateStateByKey 或者 reduceByKeyAndWindowcheckpoint 目录必需提供用以定期 checkpoint RDD。
  • 从运行应用程序的 driver 的故障中恢复过来。使用元数据 checkpoint 恢复处理信息。

注意,没有前述的有状态的 transformation 的简单流应用程序在运行时可以不开启 checkpoint。在这种情况下,从 driver 故障的恢复将是部分恢复(接收到了但是还没有处理的数据将会丢失)。 这通常是可以接受的,许多运行的 Spark Streaming 应用程序都是这种方式。

如何配置 CheckPointing

在容错、可靠的文件系统(HDFSs3 等)中设置一个目录用于保存 checkpoint 信息。这可以通过 streamingContext.checkpoint(checkpointDirectory) 方法来做。这运行你用之前介绍的 有状态 transformation。另外,如果你想从 driver 故障中恢复,你应该以下面的方式重写你的Streaming 应用程序。

  • 当应用程序是第一次启动,新建一个 StreamingContext,启动所有 Stream,然后调用 start() 方法。
  • 当应用程序因为故障重新启动,它将会从 checkpoint 目录 checkpoint 数据重新创建 StreamingContext

这种配置的方式很简单,通过使用 StreamingContext.getOrCreate 即可,如下所示 : 

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果 checkpointDirectory 存在,上下文将会利用 checkpoint 数据重新创建。如果这个目录不存在,将会调用 functionToCreateContext 函数创建一个新的上下文,建立 DStream。 请看RecoverableNetworkWordCount 例子。

除了使用 getOrCreate,开发者必须保证在故障发生时,driver 处理自动重启。只能通过部署运行应用程序的基础设施来达到该目的。在部署章节将有更进一步的讨论。

注意,RDDcheckpointing 有存储成本。这会导致批数据(包含的 RDDcheckpoint)的处理时间增加。因此,需要小心的设置批处理的时间间隔。在最小的批容量(包含 秒的数据)情况下,checkpoint 每批数据会显著的减少 操作的吞吐量。相反,checkpointing 太少会导致谱系以及任务大小增大,这会产生有害的影响。因为有状态的 transformation 需要 RDD checkpoint。默认的间隔时间是批间隔时间的倍数,最少 10 秒。它可以通过 dstream.checkpoint 来设置。典型的情况下,设置 checkpoint 间隔是 DStream 的滑动间隔的 5-10 大小是一个好的尝试。