Page tree

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

Skip to end of metadata
Go to start of metadata

RDD 类似,transformation 允许从 input DStream 输入的数据做修改。DStreams 支持很多在 RDD 中可用的 transformation 算子。一些常用的算子如下所示 

Transformation(转换)Meaning(含义)
map(func)利用函数 func 处理原 DStream 的每个元素,返回一个新的 DStream
flatMap(func)map 相似,但是每个输入项可用被映射为 0 个或者多个输出项。
filter(func)返回一个新的 DStream,它仅仅包含源 DStream 中满足函数 func 的项。
repartition(numPartitions)通过创建更多或者更少的 partition 改变这个 DStream 的并行级别(level of parallelism)。
union(otherStream)返回一个新的 DStream,它包含源 DStreamotherStream 的所有元素。
count()通过计算源 DStream 中每个 RDD 的元素数量,返回一个包含单元素(single-element)RDDs 的新 DStream。
reduce(func)

利用函数 func 聚集源 DStream 中每个 RDD 的元素,返回一个包含单元素(single-elementRDDs 的新 DStream

函数应该是相关联的,以使计算可以并行化。

countByValue()这个算子应用于元素类型为 KDStream上,返回一个(K,long)对的新 DStream,每个键的值是在原 DStream 的每个 RDD 中的频率。
reduceByKey(func, [numTasks])

当在一个由 (K,V) 对组成的 DStream 上调用这个算子,返回一个新的由 (K,V) 对组成的 DStream,每一个 key 的值均由给定的 reduce 函数聚集起来。

注意:在默认情况下,这个算子利用了 Spark 默认的并发任务数去分组。你可以用 numTasks 参数设置不同的任务数。

join(otherStream, [numTasks])当应用于两个 DStream(一个包含(K,V)对,一个包含 (K,W) 对),返回一个包含 (K, (V, W)) 对的新 DStream
cogroup(otherStream, [numTasks])当应用于两个 DStream(一个包含(K,V)对,一个包含 (K,W) 对),返回一个包含 (K, Seq[V], Seq[W]) 的元组。
transform(func)通过对源 DStream 的每个 RDD 应用 RDD-to-RDD 函数,创建一个新的 DStream。这个可以在 DStream 中的任何 RDD 操作中使用。
updateStateByKey(func)利用给定的函数更新 DStream 的状态,返回一个新 "state" 的 DStream

最后两 个transformation 算子需要重点介绍一下 : 

UpdateStateByKey 操作

updateStateByKey 操作允许不断用新信息更新它的同时保持任意状态。你需要通过两步来使用它 : 

  1. 定义状态 - 状态可以是任何的数据类型。
  2. 定义状态更新函数 - 怎样利用更新前的状态和从输入流里面获取的新值更新状态。

在每个 batch 中,Spark 会使用更新状态函数为所有的关键字更新状态,不管在 batch 中是否含有新的数据。如果这个更新函数返回一个 none,这个键值对也会被消除。

让我们举个例子说明。在例子中,你想保持一个文本数据流中每个单词的运行次数,运行次数用一个 state 表示,它的类型是整数 : 

Scala:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

这里是一个应用于包含 words(单词)的 DStream 上(也就是说,在 更前面的示例 中,该 pairs DStream 包含了 (word, 1) pair)。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

更新函数将会被每个单词调用,newValues 拥有一系列的 1(从 (word, 1) 组对应),runningCount 拥有之前的次数。要看完整的代码,见案例。

注意,使用 updateStateByKey 需要配置的检查点的目录,这里是更详细关于讨论 CheckPointing 的部分。

Transform 操作

transform 操作(以及它的变化形式如 transformWith)允许在 DStream 运行任何 RDD-to-RDD 函数。它能够被用来应用任何没在 DStream API 中提供的 RDD 操作(It can be used to apply any RDD operation that is not exposed in the DStream API)。 例如,连接数据流中的每个批(batch)和另外一个数据集的功能并没有在 DStream API 中提供,然而你可以简单的利用 transform 方法做到。如果你想通过连接带有预先计算的垃圾邮件信息的输入数据流 来清理实时数据,然后过了它们,你可以按如下方法来做 : 

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
})

请注意,提供的函数在每个批次间隔中被调用。这允许您进行随时间变化的RDD操作,即RDD操作,分区数,广播变量等可以在批次之间更改。

Window (窗口)操作

Spark Streaming 也支持窗口计算,它允许你在一个滑动窗口数据上应用 transformation 算子。下图阐明了这个滑动窗口。

如上图显示,窗口在源 DStream 上滑动,合并和操作落入窗内的源 RDDs,产生窗口化的 DStream RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操作,并且每两个时间单元滑动一次。 这说明,任何一个窗口操作都需要指定两个参数 : 

  • window length(窗口长度): 窗口的持续时间。
  • sliding interval(滑动的时间间隔): 窗口操作执行的时间间隔。

这两个参数必须是源 DStream  batch interval批时间间隔)的倍数。

下面举例说明窗口操作。例如,你想扩展前面的例子用来计算过去 30 秒的词频,间隔时间是 10 秒。为了达到这个目的,我们必须在过去 30 秒的 pairs DStream 上应用 reduceByKey 操作。用方法 reduceByKeyAndWindow 实现。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些常用的窗口操作如下所示,这些操作都需要用到上文提到的两个参数 - 窗口长度和滑动的时间间隔。

转换含义
window ( windowLength , slideInterval )返回一个新的 DStream,基于窗口的批 DStream 来源。
countByWindow ( windowLength , slideInterval )返回一个滑动窗口中的元素计算流。
reduceByWindow ( func, windowLength , slideInterval )返回一个新创建的单个元素流,通过聚合元素流了 滑动时间间隔使用 函数 。 函数应该关联和交换,以便它可以计算 正确地并行执行。

reduceByKeyAndWindow ( func, windowLength ,

slideInterval , ( numTasks ])

调用DStream(K、V)对,返回一个新的DStream(K、V) 对每个键的值在哪里聚合使用给定的reduce函数 函数 在一个滑动窗口批次。

注意: 默认情况下,它使用引发的默认数量 并行任务(2为本地模式,在集群模式是由配置数量 财产 spark.default.parallelism分组)。

你可以通过一个可选的 numTasks参数设置不同数量的任务。

reduceByKeyAndWindow ( func, invFunc , windowLength ,

slideInterval ,( numTasks ])

上面的reduceByKeyAndWindow()的一个更有效的版本,其中每个窗口的reduce值是使用上一个窗口的reduce值递增计算的。

这是通过减少进入滑动窗口的新数据和“反向减少”离开窗口的旧数据来完成的。 一个例子是在窗口滑动时“添加”和“减去”键的计数。

然而,它仅适用于“可逆缩减函数”,即,具有对应的“逆缩减”函数(作为参数invFunc)的那些缩减函数。

像reduceByKeyAndWindow中一样,reduce任务的数量可通过可选参数进行配置。 请注意,必须启用检查点设置才能使用此操作。

countByValueAndWindow ( windowLength , slideInterval ,[ numTasks ])

调用DStream(K、V)对,返回一个新的DStream(K,长)对的 每个键的值是它的频率在一个滑动窗口。

就像在 reduceByKeyAndWindow,通过一个减少任务的数量是可配置的 可选参数。

Join 操作

最后,Spark streaming 可以很容易与其它的数据源进行 join

Stream-stream 连接

stream 可以很容易与其他 stream 进行 join

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

之类,在每一个批间隔中,生成的抽样 stream1 将与生成的抽样 stream2 进行 join 操作。 也可以做 leftOuterJoin rightOuterJoinfullOuterJoin。此外,它通常是非常有用的做连接的窗口 (window) stream。 这是非常容易的。

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)  
Stream-dataset连接

这已经被证明在早些时候解释  DStream.transform 操作。 这是另一个例子,加入一个有窗口的流数据集。

Scala
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

事实上,你也可以动态地改变你想加入的数据集。 提供的函数变换 评估每批间隔,因此将使用当前数据集作为参考点。

DStream 转换的完整列表可以在 API 文档。Scala API 看到 DStream PairDStreamFunctions Java API 明白了 JavaDStream JavaPairDStream Python API 看到 DStream