页面树结构

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

有许多优化可以在 Spark 中来完成,使每批数据的处理时间最小化。这些都在 优化指南 中详细讨论。本章介绍一些最重要的问题。

数据接收的并行级别

通过网络(像 KafkaFlumesocket,等等)接受数据需要数据反序列化然后存在 Spark 中。如果数据接收成为了系统中的瓶颈,则需要考虑并行的数据接收。注意,每个 Input DStream(输入流)创建一个接受单个数据流的单独的 Receiver(接收器)(运行在一个 Worker 机器上)。接受多个数据流因此可以通过创建多个 Input DStreams 以及配置他们去从数据源(S)的不同分区接收数据流来实现。例如,一个单一的 Kafka Input DStream 接收两个 Topic(主题) 的数据能够被拆分成两个 Kafka 输入流,每个仅接收一个 Topic。这将运行两个 Receiver(接收器),使得数据可以被并行接受,因此将提高整体吞吐量。这些多个 DStream 可以合并在一起以创建一个单独的 DStream。然后被用于在一个单独的 Input DStream 上的 Transformations(转换)可以在统一的流上被应用。按照以下步骤进行 : 

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

应当考虑的另一个参数是 Receiver(接收器)的阻塞间隔,它通 过配置参数 spark.streaming.blockInterval 来决定。对于大部分 Receiver(接收器) 来说,在存储到 SparkMemory(内存)之前时接收的数据被合并成数据 Block(块)。每批 Block(块)的数量确定了将用于处理在一个类似 Map Transformation(转换)的接收数据的任务数量。每个 Receiver(接收器)的每个 Batch 的任务数量大约为(Batch 间隔 / Block 间隔)。例如,200msBlock(块)间隔和每 2sBatch 将创建 10 个任务。如果任务的数量过低(即,小于每台机器的 CoreCPU)数量),那么效率会很低,因为所有可用的 CoreCPU)将不会用来处理数据。以增加给定的 Batch 间隔的任务数量,降低该 Block(间隔)。然而,Block(块)间隔推荐的最低值约为 50ms,低于该推荐值的任务的运行开销可能是一个问题。

与多个 Input Streams / Receivers 接受数据的另一种方法是明确的 Repartition (重分区)输入的数据流(使用 inputStream.repartition(<partition(分区)的数量>))。这样在进一步处理之前就分发接收数据的 Batch 到群集中指定数量的机器上去。

数据处理的并行级别

集群资源的利用率可能会很低,如果在任何计算阶段中并行任务数量的不是很多的话。例如,对于像 reduceByKeyreduceByKeyAndWindow 这样的分布式 Reduce 操作来说,默认的并行任务数量由 spark.default.parallelism 配置属性 控制。您可以传递并行的的参数(请看 PairDStreamFunctions 文档),或者设置 spark.default.parallelism 配置属性 来改变默认值。

数据序列化

可以通过调整序列化的格式来减少数据序列化的开销。在流式传输的情况下,有两种数据类型会被序列化。

  • Input Data(输入的数据): 默认情况下,通过 Receivers(接收器)接收的输入数据被存储在 Executor 的内存与 StorageLevel.MEMORY_AND_DISK_SER_2 中。也就是说,数据被序列化成 Bytes(字节)以降低 GC 开销,以及被复制用于 Executor 的失败容错。此外,数据首先保存在 Memory(内存)中,如果内存不足已容纳所有用于流计算的输入数据将被溢出到硬盘上。这个序列化操作显然也需要开销 - Receiver(接收器)必须反序列化接收到的数据并且使用 Spark 的序列化格式重新序列化它们。
  • 通过 Streaming 操作产生的持久的 RDDs : 通过流计算产生的 RDDs 可能被持久化在内存中。例如,Window(窗口)操作将数据持久化在内存中,因为他们可能被多次处理。然而,不像 Spark Core 默认的 StorageLevel.MEMORY_ONLY,通过流计产生的持久化 RDDs 被使用 StorageLevel.MEMORY_ONLY_SER(也就是序列化)存储,默认 GC 开销降至最低。

在这两种情况下,使用 Kryo 序列化能够减少 CPU 和 内存的开销。更多细节请看 Spark 优化指南。对于 Kyro 来说,考虑注册自定义的 Class,并且禁用 Object(对象)引用跟踪(在 配置指南 中看 Kyro 相关的配置)。

在特定的情况下,需要用于保留 Streaming 应用程序的数据量不是很大,这样也许是可行的,来保存反序列化的数据(两种类型)不用引起过度的 GC 开销。例如,如果您使用 Batch 的间隔有几秒钟并且没有 Window(窗口)操作,然后你可以通过显式地设置存储相应的级别来尝试禁用序列化保存数据。这将减少由于序列化的 CPU 开销,可能不需要太多的 GC 开销就能提升性能。

任务启动开销

如果每秒任务启动的数据很高(比如,每秒 50 个任务或者更多),那么发送 Task(任务) 到 Slave 的负载可能很大,将很难实现 亚秒级 延迟。可以通过下面的改变来降低负载 : 

  • Execution Mode(运行模式): 以 Standlone 模式或者 coarse-grained(粗粒度的)Mesos 模式运行 Spark 会比 fine-grained(细粒度的)Mesos 模式运行 Spark 获得更佳的任务启动时间。更详细的信息请参考 Mesos 运行指南

这些改变也许能减少批处理的时间(100s of milliseconds),因此亚秒级的 Batch 大小是可行的。