页面树结构

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

在我们详细介绍如何你自己的 Spark Streaming 程序的细节之前,让我们先来看一看一个简单的 Spark Streaming 程序的样子。比方说,我们想要计算从一个监听 TCP socket 的数据服务器接收到的文本数据(text data)中的字数。所有你需要做的就是照着下面的步骤做。

首先,我们导入了 Spark Streaming 类和部分从 StreamingContext 隐式转换到我们的环境的名称,目的是添加有用的方法到我们需要的其他类(如 DStream)。 StreamingContext 是所有流功能的主要入口点。我们创建了一个带有 2 个执行线程和间歇时间为 1 秒的本地 StreamingContext

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ //自从 Spark 1.3 开始,不再是必要的了   
 
// 创建一个具有两个工作线程(working thread)和批次间隔为1秒的本地 StreamingContext 
// master 需要 2 个核,以防止饥饿情况(starvation scenario)。 
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") 
val ssc = new StreamingContext(conf, Seconds(1))

在使用这种背景下,我们可以创建一个代表从 TCP 源流数据的离散流(DStream),指定主机名(hostname)(例如 localhost)和端口(例如 9999)。

// 创建一个将要连接到 hostname:port 的离散流,如 localhost:9999 
val lines = ssc.socketTextStream("localhost", 9999) 

上一步的这个 lines 离散流(DStream)表示将要从数据服务器接收到的数据流。在这个 离散流(DStream)中的每一条记录都是一行文本(text)。接下来,我们想要通过空格字符(space characters)拆分这些数据行(lines)成单词(words)。

// 将每一行拆分成单词 val words = lines.flatMap(_.split(" "))
val words = lines.flatMap(_.split(" "))

flatMap 是一种一对多的离散流(DStream)操作,它会通过在源离散流(source DStream)中根据每个记录(record)生成多个新纪录的形式创建一个新的离散流(DStream)。在这种情况下,在这种情况下,每一行(each line)都将被拆分成多个单词(words)和代表单词离散流(words DStream)的单词流。接下来,我们想要计算这些单词。

import org.apache.spark.streaming.StreamingContext._ // 自从 Spark 1.3 不再是必要的 
// 计算每一个批次中的每一个单词
val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) 
 
// 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素
// 注意 : 必需要触发 action(很多初学者会忘记触发action操作,导致报错:No output operations registered, so nothing to execute) 
wordCounts.print()

上一步的 words 离散流进行了进一步的映射(一对一的转变)为一个 (word, 1)对 的离散流(DStream),这个离散流然后被规约(reduce)来获得数据中每个批次(batch)的单词频率。最后,wordCounts.print() 将会打印一些每秒生成的计数。

请注意,当这些行(lines)被执行的时候, Spark Streaming 只有建立在启动时才会执行计算,在它已经开始之后,并没有真正地处理。为了在所有的转换都已经完成之后开始处理,我们在最后运行 : 

ssc.start() // 启动计算 
ssc.awaitTermination() // 等待计算的终止

完整的代码可以在 Spark Streaming 的例子 NetworkWordCount 中找到。

如果你已经 下载 并且 建立Spark ,你可以运行下面的例子。你首先需要运行 Netcat(一个在大多数类 Unix 系统中的小工具)作为我们使用的数据服务器。

$ nc -lk 9999

然后,在另一个不同的终端,你可以运行这个例子通过执行 : 

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

然后,在运行在 netcat 服务器上的终端输入的任何行(lines),都将被计算,并且每一秒都显示在屏幕上,它看起来就像下面这样 : 

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...