页面树结构

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

调优 Spark 应用程序的内存使用情况和 GC 行为已经在 优化指南 中详细讨论了。强烈推荐您阅读它。在这一章,我们讨论在 Spark Streaming 应用程序 Context 中指定的一些优化参数。

Spark Streaming 应用程序在群集中需要的 Memory(内存) 数量取决于使用的 Transformations(转换)上的类型行为。例如,如果您想要在最近 10 分钟的时候上使用 Window(窗口)函数,那么您的群集应有有足够的 Memory 以保存 10 分钟值的数据在内存中。或者您想要在大量的 keys 中使用 updateStateByKey,那么所需要的内存将会更高。与之相反,如果您想要去做一个简单的 map-filter-store 操作,那么所需的内存将会更少。

在一般情况下,从数据通过 Receiver(接收器)被接收时起使用 StorageLevel.MEMORY_AND_DISK_SER_2 存储,数据在内存中放不下时将拆分到硬盘中去。这样也许降低了 Streaming 应用程序的性能,因此建议为您的 Streaming 应用程序提供足够的内存。它最好去试一试,并且在小范围看看 Memory(内存)使用情况然后估算相应的值。

内存调优的另一个方面是垃圾收集。Streaming 应用程序需要低延迟,JVM 垃圾回收造成的大量暂停是不可取的。

这里有一些能够帮助你调整内存使用情况以及 GC 开销的参数 : 

  • Persistence Level of DStreamsDStream 的持久化级别): 像前面所提到的 数据序列化 部分,输入的数据和 RDDs 默认持久化为序列化的字节。和反序列化持久性相比,这样减少了内存使用和 GC 开销。启用 Kryo 序列化进一步减少了序列化大小和内存使用。进一步减少内存使用可以用压缩实现(请看 Spark 配置 spark.rdd.compress),付出的是 CPU 时间。
  • Clearing old data(清除旧数据): 默认情况下,所有的输入数据和通过 DStream transformation(转换)产生的持久化 RDDs 将被自动的清除。Spark Streaming 决定何时清除基于使用 transformation(转换)的数据。例如,如果你使用一个 10 分钟的 Window(窗口)操作,那么 Spark Streaming 将保存最近 10 分钟的数据,并主动扔掉旧的数据。数据也能够通过设置 streamingContext.remeber 保持更久(例如,交互式查询旧数据)。
  • CMS Garbage CollectorCMS 垃圾回收器): 使用并发的 mark-sweep GC 是强烈推荐的用于保持 GC 相关的暂停更低。即使知道并发的 GC 降低了整个系统处理的吞吐量。仍然建议使用,以获得更一致的批处理时间。确定你在 Driver(在 spark-submit 中使用 --driver-java-options)和 Executor(使用 Spark 配置 spark.executor.extraJavaOptions)上设置的 CMS GC
  • Other tips(其它建议): 为了进一步降低 GC 开销,这里有些更多的建议可以尝试。
    • 持久化 RDDs 使用 OFF_HEAP 存储界别。更多详情请看 Spark 编程指南
    • 使用更多的 Executor 和更小的 heap size(堆大小)。这将在每个 JVM heap 内降低 GC 压力。

应该记住的要点

  • 一个 DStream 和一个单独的 Receiver(接收器)关联。为了达到并行的读取多个 Receiver(接收器)。例如,多个 DStreams 需要被创建。一个 Receiver 运行在一个 Executor 内。它占有一个 CoreCPU)。确保在 Receiver Slot 被预定后有足够的 Core 。例如,spark.cores.max 应该考虑 Receiver SlotReceiver 以循环的方式被分配到 Executor
  • 当数据从一个 Stream 源被接收时,Receiver(接收器) 创建了数据块。每个块间隔的毫秒内产生一个新的数据块。N 个数据块在批间隔(N = 批间隔 / 块间隔)的时候被创建。这些 Block(块)通过当前的 ExecutorBlockManager 发布到其它 ExecutorBlockManager。在那之后,运行在 Driver 上的  Network Input Tracker 获取 Block 位置用于进一步处理。
  • 一个 RDD 创建在 Driver 上,因为 Block 创建在 batchInterval(批间隔)期间。Block 在 batchInterval 划分成 RDD 时生成。每个分区是 Spark 中的一个任务。blockInterval == batchInterval 将意味着那是一个单独的分区被创建并且可能它在本地被处理过了。
  • Block 上的 Map 任务在 Executor 中被处理(一个接收 Block,另一个 Block 被复制)无论块的间隔,除非非本地调度死亡。有更大的块间隔意味着更大的块。在本地节点上一个高的值 spark.locality.wait 增加处理 Blcok 的机会。需要发现一个平衡在这两个参数来确保更大的块被本都处理之间。
  • 而不是依靠 batchIntervalblockInterval,你可以通过调用 inputDstream.repartition(n) 来定义分区的数量。这样会 reshuffles RDD 中的数据随机来创建 N 个分区。是的,为了更好的并行,虽然增加了 shuffle 的成本。一个 RDD 的处理通过 DriverJobScheduler 作为一个 Job 来调度。在给定的时间点仅有一个 Job 是活跃的。所以,如果一个 Job 正在执行那么其它的 Job 会排队。
  • 如果你有两个 DStream,那将有两个 RDDs 形成并且将有两个 Job 被创建,他们将被一个一个的调度。为了避免这个,你可以合并两个 DStream。这将确保两个 RDDDStream 形成一个单独的 unionRDD。这个 unionRDD 被作为一个单独的 Job 考虑。然而分区的 RDDs 不受影响。
  • 如果批处理时间超过了批间隔,那么显然 Receiver(接收器)的内存将开始填满,最走将抛出异常(最可能的是 BlockNotFoundException)。当前没有方法去暂停 Receiver。使用 SparkConf 配置  spark.streaming.receiver.maxRateReceiver(接收器)的速率可以被限制。