页面树结构

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

Spark SQL 还支持在 Apache Hive 中读写数据。然而,由于 Hive 依赖项太多,这些依赖没有包含在默认的 Spark 发行版本中。如果在 classpath 上配置了 Hive 依赖,那么 Spark 会自动加载它们。注意,Hive 依赖也必须放到所有的 worker 节点上,因为如果要访问 Hive 中的数据它们需要访问 Hive 序列化和反序列化库(SerDes)。

Hive 配置是通过将 hive-site.xmlcore-site.xml(用于安全配置)以及 hdfs-site.xml(用于HDFS 配置)文件放置在conf/目录下来完成的。

如果要使用 Hive, 你必须要实例化一个支持 Hive SparkSession,包括连接到一个持久化的 Hive metastore, 支持 Hive 序列化反序列化库以及 Hive 用户自定义函数。即使用户没有安装部署 Hive 也仍然可以启用 Hive 支持。如果没有在 hive-site.xml 文件中配置,Spark 应用程序启动之后,上下文会自动在当前目录下创建一个 metastore_db 目录并创建一个由 spark.sql.warehouse.dir 配置的、默认值是当前目录下的 spark-warehouse 目录的目录。请注意 : 从 Spark 2.0.0 版本开始,hive-site.xml 中的 hive.metastore.warehouse.dir 属性就已经过时了,你可以使用 spark.sql.warehouse.dir 来指定仓库中数据库的默认存储位置。你可能还需要给启动 Spark 应用程序的用户赋予写权限。

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession


case class Record(key: Int, value: String)


// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"


val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()


import spark.implicits._
import spark.sql


sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")


// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...


// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+


// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")


// The items in DaraFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...


// You can also use DataFrames to create temporary views within a HiveContext. 
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")


// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...