页面树结构

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

你可以很容易地使用 DataFrames 和 SQL Streaming 操作数据。 需要使用 SparkContext 或者正在使用的 StreamingContext 创建一个 SparkSession。这样做的目的就是为了使得驱动程序可以在失败之后进行重启。 使用懒加载模式创建单例的 SparkSession 对象。下面的示例所示。在原先的 单词统计 程序的基础上进行修改,使用 DataFramesSQL 生成单词统计。 每个 RDD 转换为 DataFrame,注册为临时表,然后使用 SQL 查询。

/** 流程序中的DataFrame操作 */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // 获取单例的SQLContext
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._

  // 将RDD [String]转换为DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // 注册临时表
  wordsDataFrame.registerTempTable("words")

  // 在DataFrame上使用SQL进行字计数并打印它
  val wordCountsDataFrame = 
    sqlContext.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

这里是完整 源代码

你可以在使用其他线程读取的流数据上进行 SQL 查询(就是说,可以异步运行 StreamingContext)。 只要确保 StreamingContext 可以缓存一定量的数据来满足查询的需求。 否则 StreamingContext,检测不到任何异步 SQL 查询,在完成查询之前将删除旧的数据。 例如,如果您想查询最后一批,但您的查询可以运行需要 5 分钟,然后调用 streamingContext.remember(Minutes(5))Scala 中,或其他语言)。

请看 DataFrames,Datasets 和 SQL  指南学习更多关于 DataFrames 的知识。