开发以及运行 Spark 单词统计应用程序
这部分教程描述了如何在 Spark 支持的三种语言 : Scala,Python 和 Java 中去编写,编译,以及运行一个简单的 Spark wordcount(单词统计)应用。Scala 和 Java 代码 最初开发的 Cloudera 教程由 Sandy Ryza 编写。
继续阅读 :
编写应用程序
示例的应用程序是一个加强版的 WordCount,权威的 MapReduce 示例。在这个版本的 WordCount 中,我们的目标是学习字母的分布在语料库中最流行的词汇。
- 创建一个 SparkConf 和 SparkContext。一个 Spark 应用程序对应一个 SparkContext class 的实例。当运行一个 shell 时,SparkContext 已经为您创建好了。
- 得到一个单词频率的阀值。
- 读取一个输入的文本文档。
- 统计每个单词出现的次数。
- 过滤掉所有低于出现次数阀值的单词。
- 对于剩余的单词,统计每个字母出现的次数。
在 MapReduce 中,这个需要两个 MapReduce 应用程序,以及持久化他们的中间数据到 HDFS 中去,这个应用程序相比使用 MapReduce API 开发需要大约 90% 的代码行数。
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SparkWordCount { def main(args: Array[String]) { // create Spark context with Spark configuration val sc = new SparkContext(new SparkConf().setAppName("Spark Count")) // get threshold val threshold = args(1).toInt // read in text file and split each document into words val tokenized = sc.textFile(args(0)).flatMap(_.split(" ")) // count the occurrence of each word val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _) // filter out words with fewer than threshold occurrences val filtered = wordCounts.filter(_._2 >= threshold) // count characters val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _) System.out.println(charCounts.collect().mkString(", ")) }
import sys from pyspark import SparkContext, SparkConf if __name__ == "__main__": # create Spark context with Spark configuration conf = SparkConf().setAppName("Spark Count") sc = SparkContext(conf=conf) # get threshold threshold = int(sys.argv[2]) # read in text file and split each document into words tokenized = sc.textFile(sys.argv[1]).flatMap(lambda line: line.split(" ")) # count the occurrence of each word wordCounts = tokenized.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2:v1 +v2) # filter out words with fewer than threshold occurrences filtered = wordCounts.filter(lambda pair:pair[1] >= threshold) # count characters charCounts = filtered.flatMap(lambda pair:pair[0]).map(lambda c: c).map(lambda c: (c, 1)).reduceByKey(lambda v1,v2:v1 +v2) list = charCounts.collect() print repr(list)[1:-1]
import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; import org.apache.spark.SparkConf; import scala.Tuple2; public class JavaWordCount { public static void main(String[] args) { // create Spark context with Spark configuration JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count")); // get threshold final int threshold = Integer.parseInt(args[1]); // read in text file and split each document into words JavaRDD<String> tokenized = sc.textFile(args[0]).flatMap( new FlatMapFunction() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } ); // count the occurrence of each word JavaPairRDD<String, Integer> counts = tokenized.mapToPair( new PairFunction() { public Tuple2 call(String s) { return new Tuple2(s, 1); } } ).reduceByKey( new Function2() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } } ); // filter out words with fewer than threshold occurrences JavaPairRDD<String, Integer> filtered = counts.filter( new Function, Boolean>() { public Boolean call(Tuple2 tup) { return tup._2 >= threshold; } } ); // count characters JavaPairRDD<Character, Integer> charCounts = filtered.flatMap( new FlatMapFunction<Tuple2<String, Integer>, Character>() { @Override public Iterable<Character> call(Tuple2<String, Integer> s) { Collection<Character> chars = new ArrayList<Character>(s._1().length()); for (char c : s._1().toCharArray()) { chars.add(c); } return chars; } } ).mapToPair( new PairFunction<Character, Character, Integer>() { @Override public Tuple2<Character, Integer> call(Character c) { return new Tuple2<Character, Integer>(c, 1); } } ).reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } } ); System.out.println(charCounts.collect()); } }
因为 Java 7 不支持匿名函数,这个 Java 应用程序比 Scala 和 Python 更复杂,但是仍然需要编写一小部分的代码等价于 MapReduce 应用程序。Java 8 支持匿名函数并且它们使用更深的 Java 应用程序的 streamline。
编译以及打包 Scala 和 Java 应用程序
这个教程使用 Maven 来编译以及打包 Scala 和 Java 应用程序。本教程的 pom.xml 摘录如下。为了更好的练习使用 Maven 来构建 Spark 应用程序,请看 构建 Spark 应用程序。
去编译 Scala,包含下列的 Scala 插件工具程序 :
<plugin> <groupId>org.scala-tools</groupId> <xrefrtifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin>
它需要 scala-tools 插件在仓库中。
<pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories>
同样的,也需要包含 Scala 和 Spark 作为依赖 :
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <xrefrtifactId>scala-library</artifactId> <version>2.10.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <xrefrtifactId>spark-core_2.10</artifactId> <version>1.6.0-cdh5.7.0</version> <scope>provided</scope> </dependency> </dependencies>
为了生成应用程序的 JAR,运行 :
$ mvn package
在 target 目录中去创建 sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar。
运行应用程序
应用程序的输入是一个大型文本文件,每一行包含一个文档中的所有单词,删除了标点符号。放入一个 input 文件到 HDFS 目录上去。你可以使用教程中的 示例输入文件 :
$ wget --no-check-certificate .../inputfile.txt $ hdfs dfs -put inputfile.txt
- 使用 spark-sbumit 运行应用程序 :
Scala - 在本地进程中运行,threshold 为 2。
$ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount \ --master local --deploy-mode client --executor-memory 1g \ --name wordcount --conf "spark.app.id=wordcount" \ sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://namenode_host:8020/path/to/inputfile.txt 2
如果你使用示例的输入文件,输出的结果应该如下所示 :
(e,6), (p,2), (a,4), (t,2), (i,1), (b,1), (u,1), (h,1), (o,2), (n,4), (f,1), (v,1), (r,2), (l,1), (c,1)
Java - 在本地进程中运行,threshold 为 2。
$ spark-submit --class com.cloudera.sparkwordcount.JavaWordCount \ --master local --deploy-mode client --executor-memory 1g \ --name wordcount --conf "spark.app.id=wordcount" \ sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://namenode_host:8020/path/to/inputfile.txt 2
如果你使用示例的输入文件,输出的结果应该如下所示 :
[(u'a', 4), (u'c', 1), (u'e', 6), (u'i', 1), (u'o', 2), (u'u', 1), (u'b', 1), (u'f', 1), (u'h', 1), (u'l', 1), (u'n', 4), (u'p', 2), (u'r', 2), (u't', 2), (u'v', 1)]