页面树结构

2017-11-09 ApacheCN 开源组织,第二期邀请成员活动,一起走的更远 : http://www.apachecn.org/member/209.html


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

开发以及运行 Spark 单词统计应用程序

这部分教程描述了如何在 Spark 支持的三种语言 : Scala,Python 和 Java 中去编写,编译,以及运行一个简单的 Spark wordcount(单词统计)应用。Scala 和 Java 代码 最初开发的 Cloudera 教程由 Sandy Ryza 编写。

继续阅读 : 

编写应用程序

示例的应用程序是一个加强版的 WordCount,权威的 MapReduce 示例。在这个版本的 WordCount 中,我们的目标是学习字母的分布在语料库中最流行的词汇。

实践应用 : 
  1. 创建一个 SparkConf 和 SparkContext。一个 Spark 应用程序对应一个 SparkContext class 的实例。当运行一个 shell 时,SparkContext 已经为您创建好了。
  2. 得到一个单词频率的阀值。
  3. 读取一个输入的文本文档。
  4. 统计每个单词出现的次数。
  5. 过滤掉所有低于出现次数阀值的单词。
  6. 对于剩余的单词,统计每个字母出现的次数。

在 MapReduce 中,这个需要两个 MapReduce 应用程序,以及持久化他们的中间数据到 HDFS 中去,这个应用程序相比使用 MapReduce API 开发需要大约 90% 的代码行数。

 

 

Scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkWordCount {
  def main(args: Array[String]) {
    // create Spark context with Spark configuration
    val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))

    // get threshold
    val threshold = args(1).toInt

    // read in text file and split each document into words
    val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))

    // count the occurrence of each word
    val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)

    // filter out words with fewer than threshold occurrences
    val filtered = wordCounts.filter(_._2 >= threshold)

    // count characters
    val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)

    System.out.println(charCounts.collect().mkString(", "))
  }
Python
import sys

from pyspark import SparkContext, SparkConf

if __name__ == "__main__":

  # create Spark context with Spark configuration
  conf = SparkConf().setAppName("Spark Count")
  sc = SparkContext(conf=conf)

  # get threshold
  threshold = int(sys.argv[2])

  # read in text file and split each document into words
  tokenized = sc.textFile(sys.argv[1]).flatMap(lambda line: line.split(" "))

  # count the occurrence of each word
  wordCounts = tokenized.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2:v1 +v2)

  # filter out words with fewer than threshold occurrences
  filtered = wordCounts.filter(lambda pair:pair[1] >= threshold)

  # count characters
  charCounts = filtered.flatMap(lambda pair:pair[0]).map(lambda c: c).map(lambda c: (c, 1)).reduceByKey(lambda v1,v2:v1 +v2)

  list = charCounts.collect()
  print repr(list)[1:-1]
Java
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;

public class JavaWordCount {
  public static void main(String[] args) {

    // create Spark context with Spark configuration
    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));

    // get threshold
    final int threshold = Integer.parseInt(args[1]);

    // read in text file and split each document into words
    JavaRDD<String> tokenized = sc.textFile(args[0]).flatMap(
      new FlatMapFunction() {
        public Iterable call(String s) {
          return Arrays.asList(s.split(" "));
        }
      }
    );

    // count the occurrence of each word
    JavaPairRDD<String, Integer> counts = tokenized.mapToPair(
      new PairFunction() {
        public Tuple2 call(String s) {
          return new Tuple2(s, 1);
        }
      }
    ).reduceByKey(
      new Function2() {
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      }
    );

    // filter out words with fewer than threshold occurrences
    JavaPairRDD<String, Integer> filtered = counts.filter(
      new Function, Boolean>() {
        public Boolean call(Tuple2 tup) {
          return tup._2 >= threshold;
        }
      }
    );

    // count characters
    JavaPairRDD<Character, Integer> charCounts = filtered.flatMap(
      new FlatMapFunction<Tuple2<String, Integer>, Character>() {
        @Override
        public Iterable<Character> call(Tuple2<String, Integer> s) {
          Collection<Character> chars = new ArrayList<Character>(s._1().length());
          for (char c : s._1().toCharArray()) {
            chars.add(c);
          }
          return chars;
        }
      }
    ).mapToPair(
      new PairFunction<Character, Character, Integer>() {
        @Override
        public Tuple2<Character, Integer> call(Character c) {
          return new Tuple2<Character, Integer>(c, 1);
        }
      }
    ).reduceByKey(
      new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      }
    );

    System.out.println(charCounts.collect());
  }
}

因为 Java 7 不支持匿名函数,这个 Java 应用程序比 Scala 和 Python 更复杂,但是仍然需要编写一小部分的代码等价于 MapReduce 应用程序。Java 8 支持匿名函数并且它们使用更深的 Java 应用程序的 streamline。

 

编译以及打包 Scala 和 Java 应用程序

这个教程使用 Maven 来编译以及打包 Scala 和 Java 应用程序。本教程的 pom.xml 摘录如下。为了更好的练习使用 Maven 来构建 Spark 应用程序,请看 构建 Spark 应用程序

去编译 Scala,包含下列的 Scala 插件工具程序 : 

<plugin>
  <groupId>org.scala-tools</groupId>
      <xrefrtifactId>maven-scala-plugin</artifactId>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
            <goal>testCompile</goal>
          </goals>
        </execution>
      </executions>
</plugin>

它需要 scala-tools 插件在仓库中。

<pluginRepositories>
<pluginRepository>
    <id>scala-tools.org</id>
    <name>Scala-tools Maven2 Repository</name>
    <url>http://scala-tools.org/repo-releases</url>
  </pluginRepository>
</pluginRepositories>

同样的,也需要包含 Scala 和 Spark 作为依赖 : 

<dependencies>
  <dependency>
    <groupId>org.scala-lang</groupId>
    <xrefrtifactId>scala-library</artifactId>
    <version>2.10.2</version>
    <scope>provided</scope>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <xrefrtifactId>spark-core_2.10</artifactId>
    <version>1.6.0-cdh5.7.0</version>
    <scope>provided</scope>
  </dependency>
</dependencies>

为了生成应用程序的 JAR,运行 : 

$ mvn package

在 target 目录中去创建 sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar。

运行应用程序

  1. 应用程序的输入是一个大型文本文件,每一行包含一个文档中的所有单词,删除了标点符号。放入一个 input 文件到 HDFS 目录上去。你可以使用教程中的 示例输入文件 : 

    $ wget --no-check-certificate .../inputfile.txt
    $ hdfs dfs -put inputfile.txt
  2. 使用 spark-sbumit 运行应用程序 : 
    • Scala - 在本地进程中运行,threshold 为 2。

      $ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount \
      --master local --deploy-mode client --executor-memory 1g \
      --name wordcount --conf "spark.app.id=wordcount" \
      sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://namenode_host:8020/path/to/inputfile.txt 2

      如果你使用示例的输入文件,输出的结果应该如下所示 : 

      (e,6), (p,2), (a,4), (t,2), (i,1), (b,1), (u,1), (h,1), (o,2), (n,4), (f,1), (v,1), (r,2), (l,1), (c,1)
    • Java - 在本地进程中运行,threshold 为 2。

      $ spark-submit --class com.cloudera.sparkwordcount.JavaWordCount \
      --master local --deploy-mode client --executor-memory 1g \
      --name wordcount --conf "spark.app.id=wordcount" \
      sparkwordcount-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://namenode_host:8020/path/to/inputfile.txt 2

      如果你使用示例的输入文件,输出的结果应该如下所示 : 

      [(u'a', 4), (u'c', 1), (u'e', 6), (u'i', 1), (u'o', 2), (u'u', 1), (u'b', 1), (u'f', 1), (u'h', 1), (u'l', 1), (u'n', 4), (u'p', 2), (u'r', 2), (u't', 2), (u'v', 1)]