目录 |
---|
面板 | ||||
---|---|---|---|---|
| ||||
原文链接 : http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams 译文链接 : http://www.apache.wiki/pages/viewpage.action?pageId=2885990 |
与 RDD 类似,transformation 允许从 input DStream 输入的数据做修改。DStreams 支持很多在 RDD 中可用的 transformation 算子。一些常用的算子如下所示 :
Transformation(转换) | Meaning(含义) |
---|---|
map(func) | 利用函数 func 处理原 DStream 的每个元素,返回一个新的 DStream。 |
flatMap(func) | 与 map 相似,但是每个输入项可用被映射为 0 个或者多个输出项。 |
filter(func) | 返回一个新的 DStream,它仅仅包含源 DStream 中满足函数 func 的项。 |
repartition(numPartitions) | 通过创建更多或者更少的 partition 改变这个 DStream 的并行级别(level of parallelism)。 |
union(otherStream) | 返回一个新的 DStream,它包含源 DStream 和 otherStream 的所有元素。 |
count() | 通过计算源 DStream 中每个 RDD 的元素数量,返回一个包含单元素(single-element)RDDs 的新 DStream。 |
reduce(func) | 利用函数 func 聚集源 DStream 中每个 RDD 的元素,返回一个包含单元素(single-element)RDDs 的新 DStream。 函数应该是相关联的,以使计算可以并行化。 |
countByValue() | 这个算子应用于元素类型为 K 的 DStream上,返回一个(K,long)对的新 DStream,每个键的值是在原 DStream 的每个 RDD 中的频率。 |
reduceByKey(func, [numTasks]) | 当在一个由 (K,V) 对组成的 DStream 上调用这个算子,返回一个新的由 (K,V) 对组成的 DStream,每一个 key 的值均由给定的 reduce 函数聚集起来。 注意:在默认情况下,这个算子利用了 Spark 默认的并发任务数去分组。你可以用 |
join(otherStream, [numTasks]) | 当应用于两个 DStream(一个包含(K,V)对,一个包含 (K,W) 对),返回一个包含 (K, (V, W)) 对的新 DStream。 |
cogroup(otherStream, [numTasks]) | 当应用于两个 DStream(一个包含(K,V)对,一个包含 (K,W) 对),返回一个包含 (K, Seq[V], Seq[W]) 的元组。 |
transform(func) | 通过对源 DStream 的每个 RDD 应用 RDD-to-RDD 函数,创建一个新的 DStream。这个可以在 DStream 中的任何 RDD 操作中使用。 |
updateStateByKey(func) | 利用给定的函数更新 DStream 的状态,返回一个新 "state" 的 DStream。 |
最后两 个transformation 算子需要重点介绍一下 :
UpdateStateByKey 操作
updateStateByKey 操作允许不断用新信息更新它的同时保持任意状态。你需要通过两步来使用它 :
- 定义状态 - 状态可以是任何的数据类型。
- 定义状态更新函数 - 怎样利用更新前的状态和从输入流里面获取的新值更新状态。
在每个 batch 中,Spark 会使用更新状态函数为所有的关键字更新状态,不管在 batch 中是否含有新的数据。如果这个更新函数返回一个 none,这个键值对也会被消除。
让我们举个例子说明。在例子中,你想保持一个文本数据流中每个单词的运行次数,运行次数用一个 state 表示,它的类型是整数 :
Scala:
代码块 | ||||
---|---|---|---|---|
| ||||
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... // add the new values with the previous running count to get the new count Some(newCount) } |
这里是一个应用于包含 words(单词)的 DStream 上(也就是说,在 更前面的示例 中,该 pairs DStream 包含了 (word, 1) pair)。
代码块 | ||||
---|---|---|---|---|
| ||||
val runningCounts = pairs.updateStateByKey[Int](updateFunction _) |
更新函数将会被每个单词调用,newValues
拥有一系列的 1(从 (word, 1) 组对应),runningCount 拥有之前的次数。要看完整的代码,见案例。
注意,使用 updateStateByKey
需要配置的检查点的目录,这里是更详细关于讨论 CheckPointing 的部分。
Transform 操作
transform
操作(以及它的变化形式如 transformWith
)允许在 DStream 运行任何 RDD-to-RDD 函数。它能够被用来应用任何没在 DStream API 中提供的 RDD 操作(It can be used to apply any RDD operation that is not exposed in the DStream API)。 例如,连接数据流中的每个批(batch)和另外一个数据集的功能并没有在 DStream API 中提供,然而你可以简单的利用 transform
方法做到。如果你想通过连接带有预先计算的垃圾邮件信息的输入数据流 来清理实时数据,然后过了它们,你可以按如下方法来做 :
代码块 | ||||
---|---|---|---|---|
| ||||
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform(rdd => { rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... }) |
请注意,提供的函数在每个批次间隔中被调用。这允许您进行随时间变化的RDD操作,即RDD操作,分区数,广播变量等可以在批次之间更改。
Window (窗口)操作
Spark Streaming 也支持窗口计算,它允许你在一个滑动窗口数据上应用 transformation 算子。下图阐明了这个滑动窗口。
如上图显示,窗口在源 DStream 上滑动,合并和操作落入窗内的源 RDDs,产生窗口化的 DStream 的 RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操作,并且每两个时间单元滑动一次。 这说明,任何一个窗口操作都需要指定两个参数 :
- window length(窗口长度): 窗口的持续时间。
- sliding interval(滑动的时间间隔): 窗口操作执行的时间间隔。
这两个参数必须是源 DStream 上 batch interval(批时间间隔)的倍数。
下面举例说明窗口操作。例如,你想扩展前面的例子用来计算过去 30 秒的词频,间隔时间是 10 秒。为了达到这个目的,我们必须在过去 30 秒的 pairs
DStream 上应用 reduceByKey
操作。用方法 reduceByKeyAndWindow
实现。
代码块 | ||||
---|---|---|---|---|
| ||||
// Reduce last 30 seconds of data, every 10 seconds val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) |
一些常用的窗口操作如下所示,这些操作都需要用到上文提到的两个参数 - 窗口长度和滑动的时间间隔。
转换 | 含义 |
---|---|
window ( windowLength , slideInterval ) | 返回一个新的 DStream,基于窗口的批 DStream 来源。 |
countByWindow ( windowLength , slideInterval ) | 返回一个滑动窗口中的元素计算流。 |
reduceByWindow ( func, windowLength , slideInterval ) | 返回一个新创建的单个元素流,通过聚合元素流了 滑动时间间隔使用 函数 。 函数应该关联和交换,以便它可以计算 正确地并行执行。 |
reduceByKeyAndWindow ( func, windowLength , slideInterval , ( numTasks ]) | 当调用DStream(K、V)对,返回一个新的DStream(K、V) 对每个键的值在哪里聚合使用给定的reduce函数 函数 在一个滑动窗口批次。 注意: 默认情况下,它使用引发的默认数量 并行任务(2为本地模式,在集群模式是由配置数量 财产 你可以通过一个可选的 |
reduceByKeyAndWindow ( func, invFunc , windowLength , slideInterval ,( numTasks ]) | 上面的reduceByKeyAndWindow()的一个更有效的版本,其中每个窗口的reduce值是使用上一个窗口的reduce值递增计算的。 这是通过减少进入滑动窗口的新数据和“反向减少”离开窗口的旧数据来完成的。 一个例子是在窗口滑动时“添加”和“减去”键的计数。 然而,它仅适用于“可逆缩减函数”,即,具有对应的“逆缩减”函数(作为参数invFunc)的那些缩减函数。 像reduceByKeyAndWindow中一样,reduce任务的数量可通过可选参数进行配置。 请注意,必须启用检查点设置才能使用此操作。 |
countByValueAndWindow ( windowLength , slideInterval ,[ numTasks ]) | 当调用DStream(K、V)对,返回一个新的DStream(K,长)对的 每个键的值是它的频率在一个滑动窗口。 就像在 |
Join 操作
最后,Spark streaming 可以很容易与其它的数据源进行 join。
Stream-stream 连接
stream 可以很容易与其他 stream 进行 join
代码块 | ||||
---|---|---|---|---|
| ||||
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2) |
之类,在每一个批间隔中,生成的抽样 stream1
将与生成的抽样 stream2 进行 join 操作。
也可以做 leftOuterJoin
,
rightOuterJoin
,
fullOuterJoin
。此外,它通常是非常有用的做连接的窗口 (window) stream。 这是非常容易的。
代码块 | ||||
---|---|---|---|---|
| ||||
val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2) |
Stream-dataset连接
这已经被证明在早些时候解释 DStream.transform
操作。 这是另一个例子,加入一个有窗口的流数据集。
代码块 | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) } |
事实上,你也可以动态地改变你想加入的数据集。 提供的函数变换
评估每批间隔,因此将使用当前数据集作为
参考点。
DStream 转换的完整列表可以在 API 文档。Scala API 看到 DStream 和 PairDStreamFunctions 。Java API 明白了 JavaDStream 和 JavaPairDStream 。Python API 看到 DStream。