Page tree

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

Skip to end of metadata
Go to start of metadata

在我们详细介绍如何你自己的 Spark Streaming 程序的细节之前,让我们先来看一看一个简单的 Spark Streaming 程序的样子。比方说,我们想要计算从一个监听 TCP socket 的数据服务器接收到的文本数据(text data)中的字数。所有你需要做的就是照着下面的步骤做。

首先,我们导入了 Spark Streaming 类和部分从 StreamingContext 隐式转换到我们的环境的名称,目的是添加有用的方法到我们需要的其他类(如 DStream)。 StreamingContext 是所有流功能的主要入口点。我们创建了一个带有 2 个执行线程和间歇时间为 1 秒的本地 StreamingContext

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ //自从 Spark 1.3 开始,不再是必要的了   
 
// 创建一个具有两个工作线程(working thread)和批次间隔为1秒的本地 StreamingContext 
// master 需要 2 个核,以防止饥饿情况(starvation scenario)。 
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") 
val ssc = new StreamingContext(conf, Seconds(1))

在使用这种背景下,我们可以创建一个代表从 TCP 源流数据的离散流(DStream),指定主机名(hostname)(例如 localhost)和端口(例如 9999)。

// 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999 
val lines = ssc.socketTextStream("localhost", 9999) 

上一步的这个 lines 离散流(DStream)表示将要从数据服务器接收到的数据流。在这个 离散流(DStream)中的每一条记录都是一行文本(text)。接下来,我们想要通过空格字符(space characters)拆分这些数据行(lines)成单词(words)。

// 将每一行拆分成单词 val words = lines.flatMap(_.split(" "))
val words = lines.flatMap(_.split(" "))

flatMap 是一种一对多的离散流(DStream)操作,它会通过在源离散流(source DStream)中根据每个记录(record)生成多个新纪录的形式创建一个新的离散流(DStream)。在这种情况下,在这种情况下,每一行(each line)都将被拆分成多个单词(words)和代表单词离散流(words DStream)的单词流。接下来,我们想要计算这些单词。

import org.apache.spark.streaming.StreamingContext._ // 自从 Spark 1.3 不再是必要的 
// 计算每一个批次中的每一个单词
val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) 
 
// 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素
// 注意 : 必需要触发 action(很多初学者会忘记触发action操作,导致报错:No output operations registered, so nothing to execute) 
wordCounts.print()

上一步的 words 离散流进行了进一步的映射(一对一的转变)为一个 (word, 1)对 的离散流(DStream),这个离散流然后被规约(reduce)来获得数据中每个批次(batch)的单词频率。最后,wordCounts.print() 将会打印一些每秒生成的计数。

请注意,当这些行(lines)被执行的时候, Spark Streaming 只有建立在启动时才会执行计算,在它已经开始之后,并没有真正地处理。为了在所有的转换都已经完成之后开始处理,我们在最后运行 : 

ssc.start() // 启动计算 
ssc.awaitTermination() // 等待计算的终止

完整的代码可以在 Spark Streaming 的例子 NetworkWordCount 中找到。

如果你已经 下载 并且 建立Spark ,你可以运行下面的例子。你首先需要运行 Netcat(一个在大多数类 Unix 系统中的小工具)作为我们使用的数据服务器。

$ nc -lk 9999

然后,在另一个不同的终端,你可以运行这个例子通过执行 : 

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

然后,在运行在 netcat 服务器上的终端输入的任何行(lines),都将被计算,并且每一秒都显示在屏幕上,它看起来就像下面这样 : 

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...