页面树结构

2017-11-09 ApacheCN 开源组织,第二期邀请成员活动,一起走的更远 : http://www.apachecn.org/member/209.html


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

优化 Spark 应用

本主题介绍在 Spark 应用优化中各个方面。在优化期间您应该监控应用程序的行为来判断优化操作是否有效。

关于监控 Spark 应用的信息,请看 监控 Spark 应用

继续阅读 : 

Shuffle 概述

一个 Spark dataset 由固定的 partition(分区)数组成,其中每一个包含多个 record(记录)。通过 narrow transformation(窄依赖转换)的 dataset,像 map 和 filter 操作,record 需要在一个驻留在 parent partition 的 single partition(分区)中的单独的 partition 来计算 record。每个对象仅仅依赖 parent 上一个单独的对象。像 coalesce 这样的操作会处理多个输入的分区,但是该 transformation 仍然被认为是 narrow(窄依赖),因为用于计算的任何单个输出记录的输入记录仍然可以仅驻留在 partition(分区)的一个有限子集中。

Spark 也支持 wide(宽依赖)的转换,像 groupByKey 和 reduceByKey。在这些依赖中,用于在一个单独的 partition(分区)中计算的数据可以驻留在 parent dataset 的许多 partition 中。为了执行这些 transformation,所有拥有相同 key 的 tuple 必须在相同的 partition 中,被相同的 task 处理。为了满足这个需求,Spark 执行了一个 shuffle,在群集中传输数据的结果是有一组新 partition 的新的 stage

举个例子,思考下面的代码 : 

sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()

它运行了一个单独的 action,count,它依赖从一个 text file(文本文件)的 dataset 上衍生的 3  个 transformation。这份代码运行在一个单独的 satge 中,因为这 3 个依赖来自不同 partition 并且超过它们的 input(输入)的数据的 transformation 没有输出。

相对的,这份 scala 代码在一个 text file(文本文件)中找到了超过 1000  次所有出现的单词 : 

val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._2 >= 1000)
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)
charCounts.collect()

这个例子有 3 个 satge。两个 transformation 各自触发了 stage 的边界,因为计算它们的输出需要通过 key 来 repartition。

最后一个例子是这样一个复杂的 transformation 图,它包括了一个有多个依赖的 join transformation。

这个粉色的框框展示了用来运行结果的 stage 图形 : 

在每个 stage 边界上,输入通过 parent stage 写入到硬盘,然后被 child stage 中的 task 通过网络抓取。因为这样所以导致了很高的硬盘和网络 I/O,stage 边界被认为是代价很昂贵的并且应该尽可能的避免它们。在 parent stage 中数据 partition 的数量也许不同与 child satge 中的 partition 数量。transformation 能够触发一个 stage 边界,通常接受一个 numPartitions 参数,它指定了在 child stage 中用来拆分数据的 partition 数量。正如在 MapReduce Job 中 reducer 的数量是一个重要的参数一样,stage 边界上的 partition 数量也能够决定一个应用的性能。优化 partition 的数量 描述了如何去调整这个数值。

选择 Transformation 来最小化 Shuffle

您通常可以从产生相同结果的 action 和 transformation 来选择许多设置。然而,不是所有的设置都会产生相同的性能。避免这个普通的陷阱并选择合适的设置可以显著的提高一个应用的性能。

什么时候选择一个 transformation 的设置,最小化 shuffle 的数量和数据 shuffle 的数量。shuffle 是极其昂贵的操作。所有 shuffle 的数据必须被写入到硬盘,然后通过网络传输。repartition,join,cogroup 和任何的 *By 或者 *ByKey transformation 都会导致 shuffle。并不是所有的这些 transformation 都是相等的,然后,您应该避免下面的套路 : 

  • groupByKey : 当执行一个 reducive 相关的操作时,例如,rdd.groupByKey().mapValues(_.sum) 与 rdd.reduceByKey(_ + _) 产生相同的结果。然而,前面的那个会通过网络传输整个 dataset,虽然后面那个每个 partition 中的每个 key 计算了本地的 sum 并在 shuffle 后 combine 这些本地的 sum 到一个更大的 sum。
  • reduceByKey : 当输入和输出的 value 的类型不同时,例如,考虑编写一个找出所有每个 key 相同的 string 的 transformation。您可以使用 map 以 transformation 每个元素到一个 Set 中,然后用 reduceByKey 来 combine 该 Set : 

    rdd.map(kv => (kv._1, new Set[String]() + kv._2)).reduceByKey(_ ++ _)

    这会造成一个不必要的对象被创建,因为一个新的 set 必须为每个 record 分配。
    相反,使用 aggregateByKey,它更高效的执行了 map 端的聚合。

    val zero = new collection.mutable.Set[String]()
    rdd.aggregateByKey(zero)((set, v) => set += v,(set1, set2) => set1 ++= set2)
  • flatMap-join-groupBy : 当两份 dataset 已经通过 key 分组并且您想要 join 它们以及保持它们的分组,请使用 cogroup。这将避免 unpacking 和 repacking the groups 的开销。

 

何时不产生 Shuffle

在一些情况中,先前描述的 transformation 不会造成 shuffle。当一个先前的 transformation 已经根据相同的 partitioner(分区器)分区数据时 Spark 不会产生 shuffle。

思考下列的 flow : 

rdd1 = someRdd.reduceByKey(...)
rdd2 = someOtherRdd.reduceByKey(...)
rdd3 = rdd1.join(rdd2)

因为没有 partitioner 被传递到 reduceByKey 中,默认的 partitioner 被使用,导致 rdd1 和 rdd2 都是 hash-partition 的。这两个 reduceByKey transformation 产生两个 shuffle。如果 dataset 有相同的 partition,一个 join 操作将不需要额外的 shuffle。因为 dataset 被同样的分区了,在一个单独的 rdd1 的 partition 中的一组 key 都可以仅出现在 rdd2 的一个单独的 partition 中。因此,任何 rdd3 单个输出的分区的内容只依赖 rdd1 和 rdd2 上单个分区的内容,所以第三个 shuffle 是不需要的。

例如,如果 someRdd 有 4 个partition,someOtherRdd 有两个 partition,并且两个的 reduceByKey 操作都使用了 3 个 partition,一组任务运行如下 : 

如果 rdd1 和 rdd2 使用不同的 partition 或者使用了默认的(hash)partitioner(partition 数量却不一样),为了 join 仅仅一个一个 dataset(这个 partition 数量更少)需要 reshuffle : 

为了避免在 join 是产生 shuffle,您可以使用 广播变量。当一个 dataset 可以小到在一个单独的 executor 中放下时,它能够加载到一个 hash 表到 driver 上并且广播到每个 executor 中去。一个 map transformation 可以参考该 hash 表来做查找操作。

何时来添加一个 Shuffle Transformation

最小化 shuffle 数量的规则有一些例外。

当增加并行度时一个额外的 shuffle 是有好处的,例如,如果您的数据到达了几个大的不可拆分的文件,被 InputFormat 描述的 partitioning(分区)也许在每个 partition 中放入打来那个的 record(记录),然而不会生成足够的 partition 数量来使用所有可用的 core。在这种情况中,在加载完数据后用更多的 partition 数量(它将触发一个 shuffle)调用 repartition 方法,使得 transformation 接下来可以使用群集中更多的 CPU。‘

另一个例子使用的时候出现 reduce 或者 aggregate action 来聚合数据到 driver 中。在大量的 partition 上聚合时,在一个 driver 中的单线程合并所有结果的计算很快会变成一个瓶颈。为了减轻 driver 的负载,实现使用 reduceByKey 或者 aggregateByKey 来执行划分 dataset 到一个更小的 partition 数量的一连串的分布式聚合。在每个 partition 中的 value 被并行的与其它的合并,在被发送到 driver 用于最后一个聚合之前,请看  treeReduce 和 treeAggregate 如何去使用的例子。

当聚合已经通过 key 分组时这个方法是很有用的。例如,考虑一个在一个文集中统计每个单词出现次数的应用并拉取结果作为一个 map 到 driver 中去。一种方法是,它能使用 aggregate action 来完成,它在每个  partition 中计算了一个本地 map 并且在 driver 上合并 map。另一种方法是,它能够使用 aggregateByKey 来完成,它使用完全分布式的方式来执行统计,然后简单的调用 collectAsMap 算出结果到 driver 上。

二次排序

优化资源分配

资源优化案例

优化分区数量

减少数据结构的大小

选择数据格式