页面树结构

2017-11-09 ApacheCN 开源组织,第二期邀请成员活动,一起走的更远 : http://www.apachecn.org/member/209.html


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

从 Spark 应用中访问 Avro 数据文件

Spark SQL 支持从各种各样的 数据源 加载和保存到 DataFrame。使用 spark-avro 库,您可以使用 Spark 访问 Avro 格式编码的数据。

spark-avro 库支持大部分 Spark SQL 和 Avro 记录之间的转换,making Avro a first-class citizen in Spark。这个库自动执行 Schema 转换。Spark SQL 读取数据并且转换它为 Spark 内部的表示。Avro 转换仅在读取以及写入数据是被执行。

默认情况下,指定一个目录时,read 方法默默的跳过没有 .avro 扩展名的文件。为了包含所有的文件,设置 avro.mapred.ignore.inputs.without.extension = false。请看 配置 Spark 应用

继续阅读 : 

写入压缩的数据文件

为了用于写数据时设置压缩类型,配置 spark.sql.avro.compression.codec 属性 : 

sqlContext.setConf("spark.sql.avro.compression.codec","codec") 

支持的 codec 值是 uncompressed,snappy,和 deflate。在 spark.sql.avro.deflate.level 中指定使用 deflate 的使用级别。示例请看 写入 Deflate 压缩记录

访问分区的数据文件

spark-avro 库支持读写分区数据。像你在写入 Parquet 一样,通过分区的列来写入。示例请看 写入分区数据 和 读取分区数据

指定 Record Name 和 Namespace

当写入硬盘时通过传递 recordName 和 recordNamespace 作为可选的参数来使用时指定 record name 和 namespace。

Spark SQL

您可以编写 SQL 查询来查询一个 Avro 文件。您必须首先创建一个临时表,然后才可以查询它。

sqlContext.sql("CREATE TEMPORARY TABLE table_name USING com.databricks.spark.avro OPTIONS (path "input dir"))
df = sqlContext.sql("SELECT * FROM table_name")

Avro 到 Spark SQL 的转换

spark-avro 库支持的所有用于转换的 Avro 数据类型 : 

  • boolean -> BooleanType
  • int -> IntegerType
  • long -> LongType
  • float -> FloatType
  • double -> DoubleType
  • bytes -> BinaryType
  • string -> StringType
  • record -> StructType
  • enum -> StringType
  • array -> ArrayType
  • map -> MapType
  • fixed -> BinaryType

spark-avro 库支持下列的 union 类型 : 

  • union(int, long) -> LongType
  • union(float, double) -> DoubleType
  • union(any, null) -> any

该库不支持复杂的 union 类型。

当它们被加载到 Spark 中时所有的 doc,aliases,和其它的 fields 被丢弃了。

Spark SQL 到 Avro 的转换

每一个被支持的 Spark SQL 类型 : 

  • BooleanType -> boolean
  • IntegerType -> int
  • LongType -> long
  • FloatType -> float
  • DoubleType -> double
  • BinaryType -> bytes
  • StringType -> string
  • StructType -> record
  • ArrayType -> array
  • MapType -> map
  • ByteType -> int
  • ShortType -> int
  • DecimalType -> string
  • BinaryType -> bytes
  • TimestampType -> long

局限性

由于 Spark 是转换数据类型,请记住以下几点 : 

  • Enumerated types are erased(枚举类型被擦出) - 当它们被读入 Spark 时 Avro 的枚举类型变成字符串,因为 Spark 不支持枚举类型。
  • Unions on output()- 当 union 指定的类型时 Spark 总是使用 null 选项代替写入。
  • Avro schema changes - Spark 读取一切成为内部表示。即使你只是读取,然后写数据,输出的 Schema(模式)是不同的。
  • Spark schema reordering - 当写入它们到硬盘上去时 Spark 记录了 element(元素)在它的 Schema 中的位置,所以 element(元素)到最后一个 element(元素)上。举一个例子,请看 写入分区数据

API 示例

这部分提供了使用 spark-avro API 所支持语言的示例。

Scala 示例

在 Spark 应用程序中最早与 Avro 数据文件工作的方式是通过使用 DataFrame API。spark-avro 库包括了在 SQLContext 中的 avro 方法用于读写 Avro 文件 : 

Scala 示例与 Function(函数)

import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = sqlContext.read.avro("input dir")
df.filter("age > 5").write.avro("output dir")

您也可以在 “format” 方法中指定 “com.databricks.spark.avro” : 

Scala 示例与 Format(格式)

import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

val df = sqlContext.read.format("com.databricks.spark.avro").load("input dir")

df.filter("age > 5").write.format("com.databricks.spark.avro").save("output dir")

写入 Deflate 压缩的记录

import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

// configuration to use deflate compression
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate")
sqlContext.setConf("spark.sql.avro.deflate.level", "5")

val df = sqlContext.read.avro("input dir")

// writes out compressed Avro records
df.write.avro("output dir")

写入分区数据

import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val df = Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")

df.write.partitionBy("year", "month").avro("output dir")

这份代码输出了一个类似于这样的目录结构 : 

-rw-r--r--   3 hdfs supergroup          0 2015-11-03 14:58 /tmp/output/_SUCCESS
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2011
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2011/month=7
-rw-r--r--   3 hdfs supergroup        229 2015-11-03 14:58 /tmp/output/year=2011/month=7/part-r-00001-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2012
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2012/month=7
-rw-r--r--   3 hdfs supergroup        231 2015-11-03 14:58 /tmp/output/year=2012/month=7/part-r-00001-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro
drwxr-xr-x   - hdfs supergroup          0 2015-11-03 14:58 /tmp/output/year=2012/month=8
-rw-r--r--   3 hdfs supergroup        246 2015-11-03 14:58 /tmp/output/year=2012/month=8/part-r-00000-9b89f1bd-7cf8-4ba8-910f-7587c0de5a90.avro

读取分区数据

import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.avro("input dir")

df.printSchema()
df.filter("year = 2011").collect().foreach(println)

这份代码自动检测分区的数据并把它们 join 在一起,所以它对待它们像未分区的数据一样。这也只查询所需要的目录,降低硬盘 I/O。

root
|-- title: string (nullable = true)
|-- rating: double (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)

[Git,2.0,2011,7]

指定一个记录名

import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.avro("input dir")

val name = "AvroTest"
val namespace = "com.cloudera.spark"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).avro("output dir")

Java 示例

在 Java 中使用 DataFrame API 来查询 Avro 文件。这个例子几乎与 Scala 示例与 Format(格式)一样 : 

import org.apache.spark.sql.*;

SQLContext sqlContext = new SQLContext(sc);

// Creates a DataFrame from a file
DataFrame df = sqlContext.read().format("com.databricks.spark.avro").load("input dir");

// Saves the subset of the Avro records read in
df.filter("age > 5").write().format("com.databricks.spark.avro").save("output dir");

Python 示例

在 Python 中使用 DataFrame API 来查询 Avro 文件。这个例子几乎与 Scala 示例与 Format(格式)一样 : 

# Creates a DataFrame from a directory
df = sqlContext.read.format("com.databricks.spark.avro").load("input dir")

#  Saves the subset of the Avro records read in
df.where("age > 5").write.format("com.databricks.spark.avro").save("output dir")