页面树结构

版本比较

标识

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。
评论: 更新排版格式

RDD 类似,transformation 允许从 input DStream 输入的数据做修改。DStreams 支持很多在 RDD 中可用的 transformation 算子。一些常用的算子如下所示 

Transformation(转换)Meaning(含义)
map(func)利用函数 func 处理原 DStream 的每个元素,返回一个新的 DStream
flatMap(func)map 相似,但是每个输入项可用被映射为 0 个或者多个输出项。
filter(func)返回一个新的 DStream,它仅仅包含源 DStream 中满足函数 func 的项。
repartition(numPartitions)通过创建更多或者更少的 partition 改变这个 DStream 的并行级别(level of parallelism)。
union(otherStream)返回一个新的 DStream,它包含源 DStreamotherStream 的所有元素。
count()通过计算源 DStream 中每个 RDD 的元素数量,返回一个包含单元素(single-element)RDDs 的新 DStream。
reduce(func)

利用函数 func 聚集源 DStream 中每个 RDD 的元素,返回一个包含单元素(single-elementRDDs 的新 DStream

函数应该是相关联的,以使计算可以并行化。

countByValue()这个算子应用于元素类型为 KDStream上,返回一个(K,long)对的新 DStream,每个键的值是在原 DStream 的每个 RDD 中的频率。
reduceByKey(func, [numTasks])

当在一个由 (K,V) 对组成的 DStream 上调用这个算子,返回一个新的由 (K,V) 对组成的 DStream,每一个 key 的值均由给定的 reduce 函数聚集起来。

注意:在默认情况下,这个算子利用了 Spark 默认的并发任务数去分组。你可以用 numTasks 参数设置不同的任务数。

join(otherStream, [numTasks])当应用于两个 DStream(一个包含(K,V)对,一个包含 (K,W) 对),返回一个包含 (K, (V, W)) 对的新 DStream
cogroup(otherStream, [numTasks])当应用于两个 DStream(一个包含(K,V)对,一个包含 (K,W) 对),返回一个包含 (K, Seq[V], Seq[W]) 的元组。
transform(func)通过对源 DStream 的每个 RDD 应用 RDD-to-RDD 函数,创建一个新的 DStream。这个可以在 DStream 中的任何 RDD 操作中使用。
updateStateByKey(func)利用给定的函数更新 DStream 的状态,返回一个新 "state" 的 DStream

最后两 个transformation 算子需要重点介绍一下 : 

UpdateStateByKey 操作

updateStateByKey 操作允许不断用新信息更新它的同时保持任意状态。你需要通过两步来使用它 : 

  1. 定义状态 - 状态可以是任何的数据类型。
  2. 定义状态更新函数 - 怎样利用更新前的状态和从输入流里面获取的新值更新状态。

在每个 batch 中,Spark 会使用更新状态函数为所有的关键字更新状态,不管在 batch 中是否含有新的数据。如果这个更新函数返回一个 none,这个键值对也会被消除。

让我们举个例子说明。在例子中,你想保持一个文本数据流中每个单词的运行次数,运行次数用一个 state 表示,它的类型是整数 : 

Scala:

代码块
languagescala
themeEclipse
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

这里是一个应用于包含 words(单词)的 DStream 上(也就是说,在 更前面的示例 中,该 pairs DStream 包含了 (word, 1) pair)。

代码块
languagescala
themeEclipse
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

更新函数将会被每个单词调用,newValues 拥有一系列的 1(从 (word, 1) 组对应),runningCount 拥有之前的次数。要看完整的代码,见案例。

注意,使用 updateStateByKey 需要配置的检查点的目录,这里是更详细关于讨论 CheckPointing 的部分。

Transform 操作

transform 操作(以及它的变化形式如 transformWith)允许在 DStream 运行任何 RDD-to-RDD 函数。它能够被用来应用任何没在 DStream API 中提供的 RDD 操作(It can be used to apply any RDD operation that is not exposed in the DStream API)。 例如,连接数据流中的每个批(batch)和另外一个数据集的功能并没有在 DStream API 中提供,然而你可以简单的利用 transform 方法做到。如果你想通过连接带有预先计算的垃圾邮件信息的输入数据流 来清理实时数据,然后过了它们,你可以按如下方法来做 : 

代码块
languagescala
themeEclipse
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
})

请注意,提供的函数在每个批次间隔中被调用。这允许您进行随时间变化的RDD操作,即RDD操作,分区数,广播变量等可以在批次之间更改。

Window (窗口)操作

Spark Streaming 也支持窗口计算,它允许你在一个滑动窗口数据上应用 transformation 算子。下图阐明了这个滑动窗口。

如上图显示,窗口在源 DStream 上滑动,合并和操作落入窗内的源 RDDs,产生窗口化的 DStream RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操作,并且每两个时间单元滑动一次。 这说明,任何一个窗口操作都需要指定两个参数 : 

  • window length(窗口长度): 窗口的持续时间。
  • sliding interval(滑动的时间间隔): 窗口操作执行的时间间隔。

这两个参数必须是源 DStream  batch interval批时间间隔)的倍数。

下面举例说明窗口操作。例如,你想扩展前面的例子用来计算过去 30 秒的词频,间隔时间是 10 秒。为了达到这个目的,我们必须在过去 30 秒的 pairs DStream 上应用 reduceByKey 操作。用方法 reduceByKeyAndWindow 实现。

代码块
languagescala
themeEclipse
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些常用的窗口操作如下所示,这些操作都需要用到上文提到的两个参数 - 窗口长度和滑动的时间间隔。

转换含义
window ( windowLength , slideInterval )返回一个新的 DStream,基于窗口的批 DStream 来源。
countByWindow ( windowLength , slideInterval )返回一个滑动窗口中的元素计算流。
reduceByWindow ( func, windowLength , slideInterval )返回一个新创建的单个元素流,通过聚合元素流了 滑动时间间隔使用 函数 。 函数应该关联和交换,以便它可以计算 正确地并行执行。

reduceByKeyAndWindow ( func, windowLength ,

slideInterval , ( numTasks ])

调用DStream(K、V)对,返回一个新的DStream(K、V) 对每个键的值在哪里聚合使用给定的reduce函数 函数 在一个滑动窗口批次。

注意: 默认情况下,它使用引发的默认数量 并行任务(2为本地模式,在集群模式是由配置数量 财产 spark.default.parallelism分组)。

你可以通过一个可选的 numTasks参数设置不同数量的任务。

reduceByKeyAndWindow ( func, invFunc , windowLength ,

slideInterval ,( numTasks ])

上面的reduceByKeyAndWindow()的一个更有效的版本,其中每个窗口的reduce值是使用上一个窗口的reduce值递增计算的。

这是通过减少进入滑动窗口的新数据和“反向减少”离开窗口的旧数据来完成的。 一个例子是在窗口滑动时“添加”和“减去”键的计数。

然而,它仅适用于“可逆缩减函数”,即,具有对应的“逆缩减”函数(作为参数invFunc)的那些缩减函数。

像reduceByKeyAndWindow中一样,reduce任务的数量可通过可选参数进行配置。 请注意,必须启用检查点设置才能使用此操作。

countByValueAndWindow ( windowLength , slideInterval ,[ numTasks ])

调用DStream(K、V)对,返回一个新的DStream(K,长)对的 每个键的值是它的频率在一个滑动窗口。

就像在 reduceByKeyAndWindow,通过一个减少任务的数量是可配置的 可选参数。

Join 操作

最后,Spark streaming 可以很容易与其它的数据源进行 join

Stream-stream 连接

stream 可以很容易与其他 stream 进行 join

代码块
languagescala
themeEclipse
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

之类,在每一个批间隔中,生成的抽样 stream1 将与生成的抽样 stream2 进行 join 操作。 也可以做 leftOuterJoin rightOuterJoinfullOuterJoin。此外,它通常是非常有用的做连接的窗口 (window) stream。 这是非常容易的。

代码块
languagescala
themeEclipse
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)  
Stream-dataset连接

这已经被证明在早些时候解释  DStream.transform 操作。 这是另一个例子,加入一个有窗口的流数据集。

代码块
languagescala
themeEclipse
titleScala
linenumberstrue
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

事实上,你也可以动态地改变你想加入的数据集。 提供的函数变换 评估每批间隔,因此将使用当前数据集作为参考点。

DStream 转换的完整列表可以在 API 文档。Scala API 看到 DStream PairDStreamFunctions Java API 明白了 JavaDStream JavaPairDStream Python API 看到 DStream