页面树结构

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

输出操作允许 DStream 的操作推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是 DStream 转换。目前,定义了下面几种输出操作 : 

输出操作含义
print()DStream 的每个批数据中打印前 10 条元素,这个操作在开发和调试中都非常有用。在 Python API 中调用 pprint()
saveAsObjectFiles(prefix, [suffix])

保存 DStream 的内容为一个序列化的文件 SequenceFile。每一个批间隔的文件的文件名基于 prefix suffix 生成。

"prefix-TIME_IN_MS[.suffix]",在 Python API 中不可用。

saveAsTextFiles(prefix, [suffix])保存 DStream 的内容为一个文本文件。每一个批间隔的文件的文件名基于 prefix suffix 生成。"prefix-TIME_IN_MS[.suffix]"
saveAsHadoopFiles(prefix, [suffix])

保存 DStream 的内容为一个 hadoop 文件。每一个批间隔的文件的文件名基于 prefix suffix 生成。

"prefix-TIME_IN_MS[.suffix]",在 Python API 中不可用。

foreachRDD(func)

在从流中生成的每个 RDD 上应用函数 func 的最通用的输出操作。

这个函数应该推送每个 RDD 的数据到外部系统,例如保存 RDD 到文件或者通过网络写到数据库中。

需要注意的是,func 函数在驱动程序中执行,并且通常都有 RDD action 在里面推动 RDD 流的计算。

foreachRDD 设计模式的使用

dstream.foreachRDD 是一个强大的原语,发送数据到外部系统中。然而,明白怎样正确地、有效地用这个原语是非常重要的。下面几点介绍了如何避免一般错误。

经常写数据到外部系统需要建一个连接对象(例如到远程服务器的 TCP 连接),用它发送数据到远程系统。为了达到这个目的,开发人员可能不经意的在 Spark 驱动中创建一个连接对象,但是在 Spark worker 中 尝试调用这个连接对象保存记录到 RDD 中,如下 : 

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

这是不正确的,因为这需要先序列化连接对象,然后将它从 driver 发送到 worker 中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该在 worker 中初始化)等等。正确的解决办法是在 worker 中创建连接对象。

然而,这会造成另外一个常见的错误 - 为每一个记录创建了一个连接对象。例如 : 

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

需要注意的是,池中的连接对象应该根据需要延迟创建,并且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。

其它需要注意的地方 : 

  • 输出操作通过懒执行的方式操作 DStream,正如 RDD action 通过懒执行的方式操作 RDD。具体地看,RDD actionDStream 输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者 用于输出操作 dstream.foreachRDD(),但是没有任何 RDD action 操作在 dstream.foreachRDD() 里面,那么什么也不会执行。系统仅仅会接收输入,然后丢弃它们。
  • 默认情况下,DStreams 输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。