页面树结构

版本比较

标识

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。

...

在Spark Streaming中,无法从checkpoint恢复累加器广播变量。如果启用检查点并使用累加器 广播变量 您必须为累加器 广播变量创建延迟实例化的单例实例,以便在驱动程序重新启动失败后重新实例化。这在下面的示例中显示。

代码块
languagescala
themeEclipse
object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: 

...

LongAccumulator = null

  def getInstance(sc: SparkContext): 

...

LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.

...

longAccumulator(

...

"WordsInBlacklistCounter")
        }
      }
    }
    instance
  }
}

wordCounts.foreachRDD { (

...

rdd: RDD[(String, Int)], time: Time) =>

...

  

...

// Get or register the blacklist Broadcast
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter

...

.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("[", ", ", "]")
  val output = "Counts at time " + time + " " + counts
})

完整代码(源代码)。