页面树结构

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

Accumulators累加器)是一个仅可以执行 “added”(添加)的变量来通过一个关联和交换操作,因此可以高效地执行支持并行。累加器可以用于实现 counter( 计数,类似在 MapReduce 中那样)或者 sums(求和)。原生 Spark 支持数值型的累加器,并且程序员可以添加新的支持类型。

创建 accumulators(累加器)并命名之后,在 SparkUI 界面上将会显示它。这样可以帮助理解正在运行的阶段的运行情况(注意 : 该特性在 Python 中还不支持)。

可以通过调用 SparkContext.longAccumulator() 或 SparkContext.doubleAccumulator() 方法创建数值类型的 accumulator(累加器)以分别累加 LongDouble 类型的值。集群上正在运行的任务就可以使用 add 方法来累计数值。然而,它们不能够读取它的值。只有 driver program(驱动程序)才可以使用 value 方法读取累加器的值。

 下面的代码展示了一个 accumulator(累加器)被用于对一个数字中的元素求和。

Scala
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

上面的代码示例使用的是 Spark 内置的 Long 类型的累加器,程序员可以通过继承 AccumulatorV2 类创建新的累加器类型。AccumulatorV2 抽象类有几个需要 override(重写)的方法 : reset 方法可将累加器重置为 0,add 方法可将其它值添加到累加器中,merge 方法可将其他同样类型的累加器合并为一个。其他需要重写的方法可参考 scala API 文档。 例如,假设我们有一个表示数学上 vectors向量)的 MyVector 类,我们可以写成 : 

Scala
object VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
  val vec_ : MyVector = MyVector.createZeroVector
  def reset(): MyVector = {
    vec_.reset()
  }
  def add(v1: MyVector, v2: MyVector): MyVector = {
    vec_.add(v2)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

注意,在开发者定义自己的  AccumulatorV2 类型时, resulting type(返回值类型)可能与添加的元素的类型不一致。

累加器的更新只发生在 action 操作中,Spark 保证每个任务只更新累加器一次,例如,重启任务不会更新值。在 transformations转换)中, 用户需要注意的是,如果 task(任务)或 job stages(阶段)重新执行,每个任务的更新操作可能会执行多次。

累加器不会改变 Spark lazy evaluation(懒加载)的模式。如果累加器在 RDD 中的一个操作中进行更新,它们的值仅被更新一次,RDD 被作为 action 的一部分来计算。因此,在一个像 map() 这样的 transformation转换)时,累加器的更新并没有执行。下面的代码片段证明了这个特性 : 

Scala
val accum = sc.accumulator(0)
data.map { x => accum += x; x }
// 在这里,accus 仍然为 0, 因为没有 actions(动作)来让 map 操作被计算。