页面树结构

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

概述

Spark SQL 是 Spark 处理结构化数据的一个模块。与基础的 Spark RDD API 不同,Spark SQL 提供了查询结构化数据及计算结果等信息的接口。在内部,Spark SQL 使用这个额外的信息去执行额外的优化。有几种方式可以跟 Spark SQL 进行交互,包括 SQL 和 Dataset API。当使用相同执行引擎进行计算时,无论使用哪种 API / 语言都可以快速的计算。这种统一意味着开发人员能够在基于提供最自然的方式来表达一个给定的 transformation API 之间实现轻松的来回切换不同的 。

该页面所有例子使用的示例数据都包含在 Spark 的发布中,并且可以使用 spark-shellpyspark,或者 sparkR 来运行。

SQL 

Spark SQL 的功能之一是执行 SQL 查询。Spark SQL 也能够被用于从已存在的 Hive 环境中读取数据。更多关于如何配置这个特性的信息,请参考 Hive 表 这部分。当以另外的编程语言运行 SQL 时,查询结果将以 Dataset / DataFrame 的形式返回。您也可以使用 命令行 或者通过 JDBC / ODBC 与 SQL 接口交互。

Datasets 和 DataFrames

一个 Dataset 是一个分布式的数据集合。Dataset 是在 Spark 1.6 中被添加的新接口,它提供了 RDD 的优点(强类型化,能够使用强大的 lambda 函数)与 Spark SQL 优化的执行引擎的好处。一个 Dataset 可以从 JVM 对象来构造并且使用转换功能(mapflatMapfilter,等等)。Dataset API 在 Scala 和 Java 中是可用的。Python 不支持 Dataset API。但是由于 Python 的动态特性,许多 Dataset API 的有点已经可用了(也就是说,你可能通过 name 天生的 row.columnName 属性访问一行中的字段)。这种情况和 R 相似。

一个 DataFrame 是一个 Dataset 组织成的指定列。它的概念与一个在关系型数据库或者在 R/Python 中的表是相等的,但是有更多的优化。DataFrame 可以从大量的 Source 中构造出来,像 : 结构化的数据文件,Hive 中的表,外部的数据库,或者已存在的 RDDDataFrame API 在 ScalaJavaPython 和 R 中是可用的。在 Scala 和 Java 中,一个 DataFrame 所代表的是一个多个 Row(行)的 Dataset。在 Scala API 中,DataFrame 仅仅是一个 Dataset[Row] 类型的别名 。然而,在 Java API 中,用户需要去使用 Dataset<Row> 来表示 DataFrame

在这个文档中,我们将常常会引用 Scala/Java  的 Dataset 的 Row(行)作为 DataFrame

入门指南

起始点 : SparkSession

Spark 中所有功能的入口点是 SparkSession 类。去创建一个基本的 SparkSession,仅使用 SparkSession.builder() : 

import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder()
  .appName("Spark SQL Example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
 
// 所有的示例代码可以在 Spark repo 的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中找到。

在 Spark 2.0 中 SparkSession 为 Hive 特性提供了内嵌的支持,包括使用 HiveQL 编写查询的能力,访问 Hive UDF,以及从 Hive 表中读取数据的能力。为了使用这些特性,你不需要去有一个已存在的 Hive 设置。

创建 DataFrames

与一个 SparkSession 一起,应用程序可以从一个 已存在的 RDD   中从一个百度的 R data.frame),或者一个 Hive 表中,或者从 Spark 数据源 中创建 DataFrame

举个例子,下面基于一个 JSON 文件的内容创建一个 DataFrame : 

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
 
// 所有的示例代码可以在 Spark repo 的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中找到。

无类型 Dataset 操作(aka DataFrame 操作)

DataFrame 提供了一个 DSLdomain-specific language)用于在 ScalaJavaPython 或者 R 中的结构化数据操作。

正如上面提到的一样,在Spark 2.0 ,Scala 和 JavaAPI 中的 DataFrame 仅仅是多个 Row(行)的 Dataset 。这些操作也参考了与强类型的 Scala/Java Datasets 的 “类型转换” 相对应的 “无类型转换”。

这里包括一些使用 Dataset 进行结构化数据处理的示例 : 

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+
 
// 所有的示例代码可以在 Spark repo 的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中找到。

能够在 DataFrame 上被执行的操作类型的完整列表请参考 API 文档

除了简单的列引用和表达式之外,DataFrame 也有丰富的函数库,包括 string 操作,date 算术,常见的 math 操作以及更多。可用的完整列表请参考 DataFrame 函数参考

以编程的方式运行 SQL 查询

// SparkSession 使应用程序的 SQL 函数能够以编程的方式运行 SQL 查询并且将查询结果以一个 DataFrame。
 
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
 
// 所有的示例代码可以在 Spark repo 的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中找到。

创建 Datasets

Dataset 与 RDD 相似,然而,并不是使用 Java 序列化或者 Kryo,他们使用一个指定的 Encoder(编码器) 来序列化用于处理或者通过网络进行传输的对象。虽然编码器和标准的序列化都负责将一个对象序列化成字节,编码器是动态生成的代码,并且使用了一种允许 Spark 去执行许多像 filteringsorting 以及 hashing 这样的操作,不需要将字节反序列化成对象的格式。

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
 
// 所有的示例代码可以在 Spark repo 的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中找到。

RDD 的互操作性

Spark SQL 支持两种不同的方法用于转换已存在的 RDD 成为 Dataset

第一种方法是使用反射去推断一个包含指定的对象类型的 RDD 的 Schema。在你的 Spark 应用程序中当你已知 Schema 时这个基于方法的反射可以让你的代码更简洁。

第二种用于创建 Dataset 的方法是通过一个允许你构造一个 Schema 然后把它应用到一个已存在的 RDD 的编程接口。然而这种方法更繁琐,当列和它们的类型知道运行时都是未知时它允许你去构造 Dataset

使用反射推断 Schema

// Spark SQL 的 Scala 接口支持自动转换一个包含 case classes 的 RDD 为 DataFrame。 Case class 定义了表的 Schema。Case class 的参数名使用反射读取并且成为了列名。
// Case class 也可以是嵌套的或者包含像 SeqS 或者 ArrayS 这样的复杂类型。这个 RDD 能够被隐式转换成一个 DataFrame 然后被注册为一个表。表可以用于后续的 SQL 语句。
 
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
implicit val stringIntMapEncoder: Encoder[Map[String, Int]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
 
// 所有的示例代码可以在 Spark repo 的 “examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala” 中找到。

以编程的方式指定 Schema

当 case class 不能够在执行之前被定义(例如,records 记录的结构在一个 string 字符串中被编码了,或者一个 text 文本 dataset 将被解析并且不同的用户投影的字段是不一样的)。一个 DataFrame 可以使用下面的三步以编程的方式来创建。

  1. 从原始的 RDD 创建 RDD 的 RowS(行)。
  2. Step 1 被创建后,创建 Schema 表示一个 StructType 匹配 RDD 中的 Rows(行)的结构。
  3. 通过 SparkSession 提供的 createDataFrame 方法应用 Schema 到 RDD 的 RowS(行)。

例如 : 

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row


// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

数据源

Spark SQL 支持通过 DataFrame 接口操作多种数据源。一个 DataFrame 可以通过关联转换来操作,也可以被创建为一个临时的 view。注册一个 DataFrame 作为一个临时的 view 就可以允许你在数据集上运行 SQL 查询。本节介绍了一些通用的方法通过使用 Spark Data Sources 来加载和保存数据以及一些可用的内置数据源的特定选项。

通用的 Load/Save 函数

在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。

val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

完整的示例代码在 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。

手动指定选项

你也可以手动的指定数据源,并且将与你想要传递给数据源的任何额外选项一起使用。数据源由其完全限定名指定(例如 : org.apache.spark.sql.parquet),不过对于内置数据源你也可以使用它们的缩写名(json, parquet, jdbc)。使用下面这个语法可以将从任意类型数据源加载的DataFrames 转换为其他类型。

val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

 

完整的示例代码在 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。

 

直接在文件上运行 SQL

你也可以直接在文件上运行 SQL 查询来替代使用 API 将文件加载到 DataFrame 再进行查询。

val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

完整代码示例在 "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"。

保存模式

Save 操作可以使用 SaveMode,可以指定如何处理已经存在的数据。这是很重要的要意识到这些保存模式没有利用任何锁并且也不是原子操作。另外,当执行 Overwrite,新数据写入之前会先将旧数据删除。

Scala / JavaAny LanguageMeaning(含义)
SaveMode.ErrorIfExists(default)"error" (default)在保存 DataFrame 到一个数据源时,如果数据已经存在,将会抛出异常。
SaveMode.Append"append"在保存 DataFrame 到一个数据源时,如果数据/表已经存在,DataFrame 的内容将会追加到已存在的数据后面。
SaveMode.Overwrite"overwrite"Overwrite 模式意味着当保存 DataFrame 到一个数据源,如果数据/表已经存在,那么已经存在的数据将会被 DataFrame 的内容覆盖。
SaveMode.Ignore"ignore"Ignore 模式意味着当保存 DataFrame 到一个数据源,如果数据已经存在,save 操作不会将 DataFrame 的内容保存,也不会修改已经存在的数据。这个和 SQL 中的 'CREATE TABLE IF NOT EXISTS' 类似 。


保存为持久化的表

DataFrames 也可以通过 saveAsTable 命令来保存为一张持久表到 Hive metastore 中。值得注意的是对于这个功能来说已经存在的 Hive 部署不是必须的。Spark 将会为你创造一个默认的本地 Hive metastore(使用 Derby)。不像 createOrReplaceTempView 命令那样,saveAsTable 将会持久化 DataFrame 中的内容并在 Hive metastore 中创建一个指向数据的指针。持久化的表将会一直存在甚至当你的 Spark 应用已经重启,只要保持你的连接是和一个相同的 metastore。一个相对于持久化表的 DataFrame 可以通过在 SparkSession 调用 table 方法创建。

默认情况下 saveAsTable 操作将会创建一个 managed table”,意味着数据的位置将会被 metastore 控制。Managed tables 在表 drop 后也数据也会自动删除。

Parquet 文件

Parquet 是一个列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持对 Parquet 文件的读写还可以自动的保存源数据的模式

以编程的方式加载数据

import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
peopleDF.write.parquet("people.parquet")
val parquetFileDF = spark.read.parquet("people.parquet")
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

分区发现

在系统中,比如 Hive,表分区是一个很常见的优化途径。在一个分区表中 ,数据通常存储在不同的文件目录中,对每一个分区目录中的途径按照分区列的值进行编码。Parquet 数据源现在可以自动地发现并且推断出分区的信息。例如,我们可以将之前使用的人口数据存储成下列目录结构的分区表,两个额外的列,gender 和 country 作为分区列 : 

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

通过向 SparkSession.read.parquet 或 SparkSession.read.load 中传入 path/to/tableSpark SQL 将会自动地从路径中提取分区信息。现在返回的 DataFrame 模式变成 : 

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
需要注意的是分区列的数据类型是自动推导出的。当前,支持数值数据类型以及 string 类型。有些时候用户可能不希望自动推导出分区列的数据类型。对于这些使用场景,自动类型推导功能可以通过 spark.sql.sources.partitionColumnTypeInference.enabled 来配置,默认值是 true。当自动类型推导功能禁止,分区列的数据类型是 string

从 Spark 1.6.0 开始,分区发现只能发现在默认给定的路径下的分区。对于上面那个例子,如果用户向 SparkSession.read.parquet 或 SparkSession.read.loadgender 传递 path/to/table/gender=male 将不会被当做分区列。如果用户需要指定发现的根目录,可以在数据源设置 basePath 选项。比如,将 path/to/table/gender=male 作为数据的路径并且设置 basePath 为 path/to/table/gender 将会作为一个分区列。

Schema 合并

类似 ProtocolBufferAvro,以及 ThriftParquet 也支持 schema 演变。用户可以从一个简单的 schema 开始,并且根据需要逐渐地向 schema 中添加更多的列。这样,用户最终可能会有多个不同但是具有相互兼容 schema 的 Parquet 文件。Parquet 数据源现在可以自动地发现这种情况,并且将所有这些文件的 schema 进行合并。

Hive metastore Parquet 表转换

当从 Hive metastore 里读写 Parquet 表时,为了更好地提升新能 Spark SQL 会尝试用自己支持的 Parquet 替代 Hive SerDe。这个功能通过 spark.sql.hive.convertMetastoreParquet 选项来控制,默认是开启的。

Hive/Parquet Schema Reconciliation

从 Hive 和 Parquet 处理表 schema 过程的角度来看有两处关键的不同。

  1. Hive 对大小写不敏感,而 Parquet 不是。
  2. Hive 认为所有列都是 nullable 可为空的,在 Parquet 中为 nullability 是需要显示声明的。

由于这些原因,当我们将 Hive metastore Parquet table 转换为Spark SQLtable 时必须使 Hive metastore schema 与 Parquet schema 相兼容。兼容规则如下 : 

  1. 除了nullability相同 schema 的字段的数据类型必须相同。要兼容的字段应该具有 Parquet 的数据类型,因此 nullability 是被推崇的。
  2. reconciled schema 包含了这些 Hive metastore schema 里定义的字段。 
    • 任何字段只出现在 Parquet schema 中会被 reconciled schema 排除。
    • 任何字段只出现在 Hive metastore schema 中会被当做 nullable 字段来添加到 reconciled schema 中。

Metadata 刷新

为了提高性能 Spark SQL 缓存了 Parquet metadata。当 Hive metastore Parquet table 转换功能开启,这些转换后的元数据信息也会被缓存。如果这些表被 Hive 或者其他外部的工具更新,你需要手动刷新以确保元数据信息保持一致。

spark.catalog.refreshTable("my_table")


配置

Parquet 的配置可以使用 SparkSession 的 setConf 来设置或者通过使用 SQL 运行 SET key=value 命令。

属性名

默认值

含义

spark.sql.parquet.binaryAsString

false

 其他的一些产生 Parquet 的系统,特别是 Impala SparkSQL 的老版本,当将 Parquet 模式写出时不会区分二进制数据和字符串。这个标志告诉 Spark SQL 将二进制数据解析成字符串,以提供对这些系统的兼容。

spark.sql.parquet.int96AsTimestamp

true

其他的一些产生 Parquet 的系统,特别是 Impala ,将时间戳存储为 INT96 的形式。Spark 也将时间戳存储为 INT96,因为我们要避免纳秒级字段的精度的损失。这个标志告诉 Spark SQL INT96 数据解析为一个时间戳,以提供对这些系统的兼容。

spark.sql.parquet.cacheMetadata

true

打开 Parquet模式的元数据的缓存。能够加快对静态数据的查询。

spark.sql.parquet.compression.codec

gzip

设置压缩编码解码器,当写入一个 Parquet文件时。可接收的值包括 : uncompressedsnappygziplzo。

spark.sql.parquet.filterPushdown

false

打开 Parquet 过滤器的后进先出存储的优化。这个功能默认是被关闭的,因为一个 Parquet 中的一个已知的 bug 1.6.0rc3 (PARQUET-136)。然而,如果你的表中不包含任何的可为空的(nullable)字符串或者二进制列,那么打开这个功能是安全的。

spark.sql.hive.convertMetastoreParquet

true

当设置成 falseSpark SQL 会为 parquet 表使用 Hive SerDe(Serialize/Deserilize)。

spark.sql.parquet.mergeSchemafalse 当设置 true 时,Parquet 数据源从所有的数据文件中合并 schemas,否则 schema 来自 summary file 或随机的数据文件当 summary file不可得时。

JSON Datasets

Spark SQL 可以自动的推断出 JSON 数据集的 schema 并且将它作为 DataFrame 进行加载。这个转换可以通过使用 SparkSession.read.json() 在字符串类型的 RDD 中或者 JSON 文件。

注意作为 json file 提供的文件不是一个典型的 JSON 文件。每一行必须包含一个分开的独立的有效 JSON 对象。因此,常规的多行 JSON 文件通常会失败。

 // A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string
val otherPeopleRDD = spark.sparkContext.makeRDD(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+


Hive 表

Spark SQL 还支持在 Apache Hive 中读写数据。然而,由于 Hive 依赖项太多,这些依赖没有包含在默认的 Spark 发行版本中。如果在 classpath 上配置了 Hive 依赖,那么 Spark 会自动加载它们。注意,Hive 依赖也必须放到所有的 worker 节点上,因为如果要访问 Hive 中的数据它们需要访问 Hive 序列化和反序列化库(SerDes)。

Hive 配置是通过将 hive-site.xmlcore-site.xml(用于安全配置)以及 hdfs-site.xml(用于HDFS 配置)文件放置在conf/目录下来完成的。

如果要使用 Hive, 你必须要实例化一个支持 Hive 的 SparkSession,包括连接到一个持久化的 Hive metastore, 支持 Hive 序列化反序列化库以及 Hive 用户自定义函数。即使用户没有安装部署 Hive 也仍然可以启用 Hive 支持。如果没有在 hive-site.xml 文件中配置,Spark 应用程序启动之后,上下文会自动在当前目录下创建一个 metastore_db 目录并创建一个由 spark.sql.warehouse.dir 配置的、默认值是当前目录下的 spark-warehouse 目录的目录。请注意 : 从 Spark 2.0.0 版本开始,hive-site.xml 中的 hive.metastore.warehouse.dir 属性就已经过时了,你可以使用 spark.sql.warehouse.dir 来指定仓库中数据库的默认存储位置。你可能还需要给启动 Spark 应用程序的用户赋予写权限。

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DaraFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a HiveContext. 
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...

与不同版本的 Hive Metastore 交互

Spark SQL 对 Hive 最重要的一个支持就是可以和 Hive metastore 进行交互,这使得 Spark SQL 可以访问 Hive 表的元数据。从 Spark 1.4.0 版本开始,通过使用下面描述的配置,Spark SQL 一个简单的二进制编译版本可以用来查询不同版本的 Hive metastore。注意,不管用于访问 metastore 的 Hive 是什么版本,Spark SQL 内部都使用 Hive 1.2.1 版本进行编译,并且使用这个版本的一些类用于内部执行(serdesUDFsUDAFs等)。

下面的选项可用来配置用于检索元数据的Hive 版本 : 

属性名

默认值

含义

spark.sql.hive.metastore.version

1.2.1

Hive metastore 版本。可用选项从 0.12.0 1.2.1

spark.sql.hive.metastore.jars

builtin

存放用于实例化 HiveMetastoreClient jar 包位置。这个属性可以是下面三个选项之一 :

  1. builtin : 使用 Hive 1.2.1 版本,当启用 -Phive 时会和 Spark 一起打包。如果使用了这个选项,那么 spark.sql.hive.metastore.version 要么是 1.2.1,要么就不定义。
  2. maven : 使用从 Maven 仓库下载的指定版本的 Hive jar 包。生产环境部署通常不建议使用这个选项。
  3. 标准格式的 JVM classpath。这个 classpath 必须包含所有 Hive 及其依赖的 jar 包,并且包含正确版本的 hadoop。这些 jar 包只需要部署在 driver 节点上,但是如果你使用 yarn cluster 模式运行,那么你必须要确保这些 jar 包是和应用程序一起打包的。

spark.sql.hive.metastore.sharedPrefixes

com.mysql.jdbc,

org.postgresql,

com.microsoft.sqlserver,

oracle.jdbc

一个逗号分隔的类名前缀列表,这些类使用 classloader 加载,且可以在 Spark SQL 和特定版本的 Hive 间共享。一个共享类的示例就是用来访问 Hive metastoreJDBC driver。其它需要共享的类,是需要与已经共享的类进行交互的。例如,log4j 使用的自定义 appender

spark.sql.hive.metastore.barrierPrefixes

(empty)

一个逗号分隔的类名前缀列表,这些类需要在 Spark SQL 访问的每个 Hive 版本中显式地重新加载。例如,在一个共享前缀列表

org.apache.spark.*)中声明的 Hive UDF通常需要被共享。


JDBC 连接其它数据库

Spark SQL 还有一个能够使用 JDBC 从其他数据库读取数据的数据源。当使用 JDBC 访问其它数据库时,应该首选 JdbcRDD。这是因为结果是以数据框(DataFrame)返回的,且这样 Spark SQL操作轻松或便于连接其它数据源。因为这种 JDBC 数据源不需要用户提供 ClassTag,所以它也更适合使用 Java 或 Python 操作。(注意,这与允许其它应用使用 Spark SQL 执行查询操作的 Spark SQL JDBC 服务器是不同的)。

使用 JDBC 访问特定数据库时,需要在 spark classpath 上添加对应的 JDBC 驱动配置。例如,为了从 Spark Shell 连接 postgres,你需要运行如下命令 : 

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

通过调用数据源API,远程数据库的表可以被加载为DataFrame Spark SQL临时表。支持的参数有 : 

属性名含义
url

要连接的 JDBC URL

dbtable要读取的 JDBC 表。 注意,一个 SQL 查询的 From 分语句中的任何有效表都能被使用。例如,既可以是完整表名,也可以是括号括起来的子查询语句
driver用于连接 URL JDBC 驱动的类名。
partitionColumn, lowerBound, upperBound, numPartitions这几个选项,若有一个被配置,则必须全部配置。它们描述了当从多个 worker 中并行的读取表时,如何对它分区。partitionColumn 必须时所查询表的一个数值字段。注意,lowerBound 和 upperBound 都只是用于决定分区跨度的,而不是过滤表中的行。因此,表中的所有行将被分区并返回。
fetchSizeJDBC fetch size,决定每次读取多少行数据。 默认将它设为较小值(如,Oracle上设为 10)有助于 JDBC 驱动上的性能优化。

代码示例如下 : 

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename"
)

故障排除

  • 在客户端会话(client session)中或者所有 executor 上,JDBC 驱动类必须可见于原生的类加载器。这是因为 Java 的驱动管理(DriverManager )类在打开一个连接之前会做一个安全检查,这就导致它忽略了所有对原生类加载器不可见的驱动。一个方便的方法,就是修改所有 worker 节点上的 compute_classpath.sh 以包含你的驱动 Jar 包。
  • 一些数据库,如 H2,会把所有的名称转为大写。在 Spark SQL 中你也需要使用大写来引用这些名称。

性能调优

对一些工作负载,可能的性能改进的方式,不是把数据缓存在内存里,就是调整一些试验选项。

缓存数据到内存

Spark SQL 可以通过调用 spark.cacheTable("tableName") 或者 dataFrame.cache() 以列存储格式缓存表到内存中。随后,Spark SQL 将会扫描必要的列,并自动调整压缩比例,以减少内存占用和 GC 压力。你可以调用 spark.uncacheTable("tableName") 来删除内存中的表。

你可以在 SparkSession 使用 setConf 方法或在 SQL 语句中运行 SET key=value 命令,来配置内存中的缓存。

属性名

默认值含义
spark.sql.inMemoryColumnarStorage.compressedtrue

当设置为 true 时,Spark SQL 将会基于数据的统计信息自动地为每一列选择单独的压缩编码方式。

spark.sql.inMemoryColumnarStorage.batchSize10000

控制列式缓存批量的大小。当缓存数据时,增大批量大小可以提高内存利用率和压缩率,但同时也会带来 OOMOut Of Memory)的风险。

其它配置选项

下面的选项也可以用来提升查询执行的性能。随着 Spark 自动地执行越来越多的优化操作,这些选项在未来的发布版本中可能会过时。

属性名默认值含义
spark.sql.files.maxPartitionBytes134217728 (128 MB)读取文件时单个分区可容纳的最大字节数。
spark.sql.files.openCostInBytes4194304 (4 MB)

打开文件的估算成本,按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)。

spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)

配置一个表在执行 join 操作时能够广播给所有 worker 节点的最大字节大小。通过将这个值设置为-1,可以禁用广播。注意,目前的数据统计仅支持已经运行了 ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令的 Hive metastore

spark.sql.shuffle.partitions200配置为连接或聚合操作混洗(shuffle)数据时使用的分区数。

分布式 SQL引擎

通过使用 Spark SQ L的 JDBC/ODBC 或者命令行接口,它还可以作为一个分布式查询引擎。在这种模式下,终端用户或应用程序可以运行 SQL 查询来直接与 Spark SQL 交互,而不需要编写任何代码。

运行 Thrift JDBC/ODBC server

这里实现的 Thrift JDBC/ODBC server 对应于 Hive 1.2.1 版本中的 HiveServer2。你可以使用 Spark 或者 Hive 1.2.1 自带的 beeline 脚本来测试这个 JDBC server

要启动 JDBC/ODBC server, 需要在 Spark 安装目录下运行如下命令 : 

./sbin/start-thriftserver.sh   

这个脚本能接受所有 bin/spark-submit 命令行选项,外加一个用于指定 Hive 属性的 --hiveconf 选项。你可以运行 ./sbin/start-thriftserver.sh --help 来查看所有可用选项的完整列表。默认情况下,这启动的 server 将会在 localhost:10000 上进行监听。你可以覆盖该行为,比如使用以下环境变量 : 

 

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> 
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

或者系统属性 : 

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...   

现在你可以使用 beeline 来测试这个Thrift JDBC/ODBC server : 

./bin/beeline  

在 beeline 中使用以下命令连接到 JDBC/ODBC server : 

beeline> !connect jdbc:hive2://localhost:10000

Beeline 会要求你输入用户名和密码。在非安全模式下,只需要输入你本机的用户名和一个空密码即可。对于安全模式,请参考 beeline 文档中的指示。

将 hive-site.xmlcore-site.xml 以及 hdfs-site.xml 文件放置在 conf 目录下可以完成 Hive 配置。

你也可以使用 Hive 自带的 beeline 的脚本。

Thrift JDBC server 还支持通过 HTTP 传输来发送 Thrift RPC 消息。使用下面的设置作为系统属性或者对 conf 目录中的 hive-site.xml 文件配置来启用 HTTP 模式 : 

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice     

为了测试,在 HTTP 模式中使用 beeline 连接到 JDBC/ODBC server : 

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

运行 Spark SQL CLI

Spark SQL CLI 是一个很方便的工具,它可以在本地模式下运行 Hive metastore 服务,并且执行从命令行中输入的查询语句。注意 : Spark SQL CLI 无法与 Thrift JDBC server 通信。

要启动 Spark SQL CLI,可以在 Spark 安装目录运行下面的命令 :

./bin/spark-sql 

将 hive-site.xmlcore-site.xml 以及 hdfs-site.xml 文件放置在 conf 目录下可以完成 Hive 配置。你可以运行 ./bin/spark-sql --help 来获取所有可用选项的完整列表。

迁移指南

从 Spark SQL 1.6 升级到 2.0

  • SparkSession 现在是 Spark 新的切入点, 它替代了老的 SQLContext 和 HiveContext。注意 : 为了向下兼容,老的 SQLContext 和 HiveContext 仍然保留。可以从 SparkSession 获取一个新的 catalog 接口—— 现有的访问数据库和表的 API,如 listTablescreateExternalTable, dropTempViewcacheTable 都被移到该接口。
  • Dataset API 和 DataFrame API 进行了统一。在 Scala 中,DataFrame 变成了 Dataset[Row] 类型的一个别名,而 Java API 使用者必须将 DataFrame 替换成 Dataset<Row>Dataset 类既提供了强类型转换操作(如 mapfilter 以及 groupByKey)也提供了非强类型转换操作(如 select 和 groupBy)。由于编译期的类型安全不是 Python 和 R 语言的一个特性,Dataset 的概念并不适用于这些语言的 API。相反,DataFrame 仍然是最基本的编程抽象, 就类似于这些语言中单节点数据帧的概念。

  • Dataset 和 DataFrame API 中 unionAll 已经过时并且由 union 替代。
  • Dataset 和 DataFrame API 中 explode 已经过时,作为选择,可以结合 select 或 flatMap 使用 functions.explode() 。
  • Dataset 和 DataFrame API 中 registerTempTable 已经过时并且由 createOrReplaceTempView 替代。

从 Spark SQL 1.5 升级到 1.6

  • Spark 1.6 中,默认情况下服务器在多会话模式下运行。这意味着每个 JDBC / ODBC 连接拥有一份自己的 SQL 配置和临时注册表。缓存表仍在并共享。如果你想在单会话模式服务器运行,请设置选项 spark.sql.hive.thriftServer.singleSession 为 true。您既可以将此选项添加到 spark-defaults.conf,或者通过 --conf 将它传递给 start-thriftserver.sh

    ./sbin/start-thriftserver.sh \
         --conf spark.sql.hive.thriftServer.singleSession=true \
         ...
  •  1.6.1 开始,在 sparkR 中 withColumn 方法支持添加一个新列或更换数据框同名的现有列。
  • 从 Spark 1.6 开始,LongType 强制转换为 TimestampType 秒,而不是微秒。这一变化是为了匹配 Hive 1.2 ,保证从数值类型转换到 TimestampType 的一致性。见 SPARK-11724 了解详情。

从 Spark SQL 1.4 升级到 1.5

  • 使用手动管理的内存优化执行,现在是默认启用的,以及代码生成表达式求值。这些功能既可以通过设置 spark.sql.tungsten.enabled 为 false 来禁止使用。
  • Parquet 的模式合并默认情况下不再启用。它可以通过设置重新启用 spark.sql.parquet.mergeSchema 到 true
  • 字符串在 Python 列的分辨率现在支持使用点(.)来限定列或访问嵌套值。例如 df['table.column.nestedField']。但是,这意味着如果你的列名中包含任何圆点,你现在必须避免使用反引号(如 table.`column.with.dots`.nested)。
  • 在内存中的列存储分区修剪默认是开启的。它可以通过设置 spark.sql.inMemoryColumnarStorage.partitionPruning 为 false 来禁用。
  • 无限精度的小数列不再支持,而不是 Spark SQL 最大精度为 38 。当从 BigDecimal 对象推断模式时,现在使用(38,18)。在 DDL 没有指定精度时,则默认保留 Decimal(10, 0)
  • 时间戳现在存储在 微秒的精度,而不是 纳秒的。
  • 在 sql 语句中,浮点数现在解析为十进制。HiveQL 解析保持不变。
  • SQL/DateFrame 数据帧功能的规范名称现在是小写(e.g. sum vs SUM)
  • JSON 数据源不会自动加载由其他应用程序(未通过 Spark SQL 插入到数据集的文件)创建的新文件。对于 JSON 持久表(即表的元数据存储在 Hive Metastore),用户可以使用 REFRESH TABLE SQL 命令或 HiveContext 的 refreshTable 方法,把那些新文件列入到表中。对于代表一个 JSON 数据集的数据帧,用户需要重新创建数据框,同时数据框中将包括新的文件。
  • PySpark DataFrame 的 withColumn 方法支持添加新的列或替换现有的同名列。

从 Spark SQL 1.3 升级到 1.4

数据帧的数据读/写器接口

根据用户的反馈,我们创建了一个新的更快速的 API 中读取数据 (SQLContext.read)和写入数据(DataFrame.write)。同时废弃的过时的API(例如SQLContext.parquetFileSQLContext.jsonFile)。

请参阅 API 文档 SQLContext.read(ScalaJavaPython )和 DataFrame.write(Scala , JavaPython)的更多信息。

DataFrame.groupBy 保留分组列

根据用户反馈,我们改变的默认行为 DataFrame.groupBy().agg() 保留在 DataFrame 的分组列。为了维持 1.3 的行为特征,设置 spark.sql.retainGroupColumns 为 false

// 在 1.3.x 中, 为了让"department"列得到展示,
// 必须包括明确作为 gg 函数调用的一部分。
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// 在 1.4 以上版本, "department" 列自动包含了.
df.groupBy("department").agg(max("age"), sum("expense"))

// 恢复到 1.3 版本(不保留分组列)
sqlContext.setConf("spark.sql.retainGroupColumns", "false")

在 DataFrame.withColumn 中的改变

之前 1.4 版本中,DataFrame.withColumn() 只支持添加列。该列将始终在 DateFrame 结果中被加入作为新的列,即使现有的列可能存在相同的名称。从 1.4 版本开始,DataFrame.withColumn() 支持添加与所有现有列的名称不同的列或替换现有的同名列。

请注意,这一变化仅适用于 Scala API,并不适用于 PySpark 和 SparkR

从 Spark SQL 1.0~1.2 升级到 1.3

在 Spark 1.3 中,我们从 Spark SQL 中删除了 "Alpha的标签,作为一部分已经清理过的可用的 API 。从 Spark 1.3 版本以上,Spark SQL 将提供在 1.X 系列的其他版本的二进制兼容性。这种兼容性保证不包括被明确标记为不稳定的(即 DeveloperApi 类或 Experimental)的 API

重命名 SchemaRDD 到 DateFrame

升级到 Spark SQL 1.3 版本时,用户会发现最大的变化是,SchemaRDD 已更名为 DataFrame。这主要是因为 DataFrames 不再从 RDD 直接继承,而是由 RDDS 自己来实现这些功能。DataFrames 仍然可以通过调用 .rdd 方法转换为 RDDS 。

在 Scala 中,有一个从 SchemaRDD 到 DataFrame 类型别名,可以为一些情况提供源代码兼容性。它仍然建议用户更新他们的代码以使用 DataFrame 来代替。Java 和 Python 用户需要更新他们的代码。

在 Java 和 Scala API 的统一

此前 Spark 1.3 有单独的Java兼容类(JavaSQLContext 和 JavaSchemaRDD),借鉴于 Scala API。在 Spark 1.3 中,Java API 和 Scala API 已经统一。两种语言的用户可以使用 SQLContext 和 DataFrame 。一般来说论文类尝试使用两种语言的共有类型(如 Array 替代了一些特定集合)。在某些情况下不通用的类型情况下,(例如,passing in closures 或 Maps)使用函数重载代替。

此外,该 Java 的特定类型的 API 已被删除。Scala 和 Java 的用户可以使用存在于 org.apache.spark.sql.types 类来描述编程模式。

隐式转换和 DSL 包的移除(仅限于 Scala)

许多 Spark 1.3 版本以前的代码示例都以 import sqlContext._ 开始这提供了从 sqlContext 到 cope 的所有功能。在 Spark 1.3 中,我们移除了从 RDDs 到 DateFrame 再到 SQLContext 内部对象的隐式转换。

此外,隐式转换现在只是通过 toDF 方法增加 RDDs 所组成的一些类型(例如 classes 或 tuples,而不是自动应用。

当使用 DSL 的内部函数(现在由 DataFrame API 代替)的时候,用于一般会导入 org.apache.spark.sql.catalyst.dsl 来代替一些公有的 DataFrame 的 API 函数 : import org.apache.spark.sql.functions._

删除在 org.apache.spark.sql 包中的一些 DataType 别名(仅限于 Scala)

Spark 1.3 移除存在于基本 SQL 包的 DataType 类型别名。开发人员应改为导入类 org.apache.spark.sql.types

UDF 注册迁移到 sqlContext.udf 中 (针对Java和Scala)

用于注册 UDF 的函数,不管是 DataFrame DSL 还是 SQL 中用到的,都被迁移到 SQLContext 中的 udf对象中。

sqlContext.udf.register("strLen", (s: String) => s.length())

Python UDF 注册保持不变。

Python 的 DataType 不再是单例的

在 Python 中使用 DataTypes 时,你需要先构造它们(如:StringType()),而不是引用一个单例对象。

兼容 Apache Hive

Spark SQL 在设计时就考虑到了和 Hive metastoreSerDes 以及 UDF 之间的兼容性。目前 Hive SerDes 和 UDF 都是基于Hive 1.2.1版本,并且Spark SQL 可以连接到不同版本的Hive metastore(从 0.12.0 到 1.2.1,可以参考[与不同版本的Hive Metastore交互])

在现有的 Hive 仓库中部署

Spark SQL Thrift JDBC server 采用了开箱即用的设计以兼容已有的 Hive 安装版本。你不需要修改现有的 Hive Metastore , 或者改变数据的位置和表的分区。

支持 Hive 的特性

Spark SQL 支持绝大部分的 Hive 功能,如 :  

  • Hive 查询语句, 包括 :
    • SELECT 
    • GROUP BY 
    • ORDER BY 
    • CLUSTER BY 
    • SORT BY 
  • 所有的Hive运算符, 包括 :
    • 关系运算符 (=, ⇔, ==, <>, <, >, >=, <=, etc) 
    • 算术运算符 (+, -, *, /, %, etc) 
    • 逻辑运算符 (AND, &&, OR, ||, etc) 
    • 复杂类型构造器 - 数学函数 (signlncos等) 
    • String 函数 (instr, length, printf等) 
  • 用户自定义函数(UDF) 
  • 用户自定义聚合函数(UDAF) 
  • 用户自定义序列化格式(SerDes) 
  • 窗口函数 
  • Joins 
    • JOIN 
    • {LEFT|RIGHT|FULL} OUTER JOIN 
    • LEFT SEMI JOIN - CROSS JOIN 
  • Unions
  • 子查询 
    • SELECT col FROM ( SELECT a + b AS col from t1) t2 
  • 采样 
  • Explain 
  • 分区表,包括动态分区插入 
  • 视图 
  • 所有 Hive DDL 功能, 包括 :
    • CREATE TABLE 
    • CREATE TABLE AS SELECT 
    • ALTER TABLE 
  • 绝大多数Hive数据类型,包括
    • TINYINT 
    • SMALLINT 
    • INT 
    • BIGINT 
    • BOOLEAN 
    • FLOAT 
    • DOUBLE 
    • STRING
    • BINARY 
    • TIMESTAMP 
    • DATE 
    • ARRAY<> 
    • MAP<> 
    • STRUCT<>

不支持的 Hive 功能

以下是目前还不支持的Hive功能列表。在Hive部署中这些功能大部分都用不到。

Hive 核心功能

bucket : bucket 是 Hive 表分区内的一个哈希分区,Spark SQL目前还不支持 bucket

Hive 高级功能
  • UNION 类型
  • Unique join
  • 列统计数据收集 : Spark SQL目前不依赖扫描来收集列统计数据并且仅支持填充 Hive metastore 的 sizeInBytes 字段。
Hive 输入输出格式
  • CLI文件格式 : 对于回显到CLI中的结果,Spark SQL仅支持TextOutputFormat
  • Hadoop archive
Hive 优化

有少数 Hive 优化还没有包含在 Spark 中。其中一些(比如索引)由于 Spark SQL 的这种内存计算模型而显得不那么重要。另外一些在 Spark SQL 未来的版本中会持续跟踪。

  • 块级别位图索引和虚拟列(用来建索引)
  • 自动为 join 和 groupBy 计算 reducer 个数 : 目前在 Spark SQL 中,你需要使用 ”SET spark.sql.shuffle.partitions=[num_tasks];” 
  • 来控制后置混洗的并行程度。 
  • 仅查询元数据 : 对于只需要使用元数据的查询请求,Spark SQL仍需要启动任务来计算结果。
  • 数据倾斜标志 : Spark SQL 不遵循 Hive 中的数据倾斜标志
  • STREAMTABLE join操作提示 : Spark SQL 不遵循 STREAMTABLE 提示。 
  • 对于查询结果合并多个小文件 : 如果返回的结果有很多小文件,Hive 有个选项设置,来合并小文件,以避免超过HDFS的文件数额度限制。Spark SQL 不支持这个。

参考

数据类型

Spark SQL 和 DataFrames 支持下面的数据类型 :  

  • 数值类型 
    • ByteType : 表示 1 字节长的有符号整型,数值范围 : -128 到 127
    • ShortType : 表示 字节长的有符号整型,数值范围 : -32768 到 32767
    • IntegerType : 表示 4 字节长的有符号整型,数值范围 : -2147483648 到 2147483647。 
    • LongType : 表示 字节长的有符号整型,数值范围 : -9223372036854775808 到 9223372036854775807
    • FloatType : 表示 4 字节长的单精度浮点数。 
    • DoubleType : 表示 8 字节长的双精度浮点数。
    • DecimalType : 表示任意精度有符号带小数的数值。内部使用 java.math.BigDecimal,一个BigDecimal 由一个任意精度的整数非标度值和一个 32 位的整数标度 (scale) 组成。
  • 字符串类型 

    • StringType : 表示字符串值

  • 二进制类型 
    • BinaryType : 表示字节序列值 
  • 布尔类型 
    • BooleanType : 表示布尔值 
  • 日期类型 
    • TimestampType : 表示包含年月日、时分秒等字段的日期值 
    • DateType : 表示包含年月日字段的日期值 
  • Complex types(复杂类型) 
    •  ArrayType(elementTypecontainsNull) : 数组类型,表示一个由类型为 elementType 的元素组成的序列,containsNull 用来表示 ArrayType 中的元素是否能为 null 值。

    • MapType(keyType, valueType, valueContainsNull) : 映射类型,表示一个键值对的集合。键的类型由 keyType 表示,值的类型则由 valueType 表示。对于一个 MapType 值,键是不允许为 null值。valueContainsNull 用来表示一个 MapType 的值是否能为 null 值。

    • StructType(fields) : 表示由 StructField 序列描述的结构。

      • StructField(name, datatype, nullable) : 表示 StructType 中的一个字段,name 表示字段名,datatype 是字段的数据类型,nullable 用来表示该字段是否可以为空值。
// Spark SQL的所有数据类型都放在 org.apache.spark.sql.types 这个包下。你可以这样获取它们 : 
import org.apache.spark.sql.types._

完整示例代码参见 Spark 仓库中的 "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala”。

数据类型Scala 中值的类型
用于获取或创建数据类型的 API
ByteTypeByteByteType
ShortTypeShortShortType
IntegerTypeIntIntegerType
LongTypeLongLongType
FloatTypeFloatFloatType
DoubleTypeDoubleDoubleType
DecimalTypejava.math.BigDecimalDecimalType
StringTypeStringStringType
BinaryTypeArray[Byte]BinaryType
BooleanTypeBooleanBooleanType
TimestampTypejava.sql.TimestampTimestampType
DateTypejava.sql.DateDateType
ArrayType

scala.collection.Seq

ArrayType(elementType, [containsNull])

注意 : containsNull 的默认值是 true

MapTypescala.collection.Map

MapType(keyType, valueType, [valueContainsNull])

注意 : valueContainsNull 的默认值是 true

StructTypeorg.apache.spark.sql.Row

StructType(fields)

注意 : fields 是一个 StructField 序列,另外不允许出现名称重复的字段。

StructFieldScala 中该字段的数据类型对应的值类型(例如,如果 StructField 的数据类型为 IntegerType,则 Scala 中其值类型为 Intlist(name=name, type=dataType, nullable=nullable)

NaN 语义

当处理一些不符合标准浮点数语义的 float 或 double 类型时,对于 Not-a-Number(NaN) 需要做一些特殊处理。具体如下 : 

  • NaN = NaN 返回 true。 
  • 在聚合操作中,所有 NaN 值都被分到同一组。 
  • 在 join key 中 NaN 可以当做一个普通的值。 
  • NaN 值在升序排序中排到最后,比任何其他数值都大。