在这部分,我们将讨论 Spark Streaming 应用程序在发生故障时的行为。
背景
为了理解 Spark Streaming 提供的语义,让我们记住 Spark RDDs 最基本的容错语义。
- RDD 是不可变的,确定重新计算的,分布式的 DataSet(数据集)。
- 如果任何分区的 RDD 由于 Worker 节点失败而丢失,那么这个分区能够从原来原始容错的 DataSet(数据集)使用操作的 lineage(血统)关系来重新计算。
- 假设所有的 RDD 转换是确定的,在最终转换的 RDD 中的数据总是相同的,无论 Spark 群集中的失败。
Spark 在数据上的操作像 HDFS 或者 S3 这样容错的文件系统一样。因此,所有从容错的数据中产生的 RDDs 也是容错的。然而,这种情况在 Spark Streaming 中不适用,因为在大多数情况下数据通过网络被接收(除非使用 fileStream)。为了对所有产生的 RDDs 实现相同的容错语义属性,接收的数据被复制到群集中 Worker 节点的多个 Spark Executor 之间(默认复制因子是 2)。这将会造成需要在发生故障时去恢复两种文件系统的数据类型 :
- Data received and replicated(数据接收并且被复制)- 这份数据幸存于一个单独的 Worker 节点故障中因为其它的节点复制了它。
- Data received but buffered for replication(数据接收但是缓冲了副本)- 因为这份数据没有被复制,唯一恢复这份数据的方式是从 Source(源/数据源)再次获取它。
此外,我们应该关注两种类型的故障 :
- Failure of a Worker Node(Worker 节点的故障)- 任何运行 Executor 的 Worker 节点都是可以故障,并且这些节点所有在 Memory(内存)中的数据将会丢失。如果任何 Receiver(接收器)运行在故障的节点,那么他们缓存的数据将丢失。
- Failure of the Driver Node(Driver 节点的故障)- 如果 Driver 节点运行的 Spark Streaming 应用程序发生故障,那么很显然 SparkContext 将会丢失,所有 Executor 与它们在内存中的数据都会丢失。
与这个基础知识一起,让我们理解 Spark Streaming 的容错语义。
定义
Streaming 处理系统经常会捕获系统异常并记录执行次数以保障系统容错,其中在这三种条件下可以保障服务,等等。
- 最多一次 : 每个记录将被处理一次或者根本不处理。
- 至少一次 : 每个记录将被处理一次或多次。这主要是在最后一次次,因为它确保数据不会丢失。但也有可能是重复处理的。
- 只有一次 : 每个记录将被处理一次,确保这一次数据完整。这显然是三者的最强保障。
基础语义
在任何流处理系统,从广义上讲,处理数据有三个步骤。
接收数据 : 接受数据或者从其他数据源接受数据。
转换数据 : 所接收的数据使用 DStream 和 RDD 变换转化。
输出数据 : 最后变换的数据被推送到外部系统,如文件系统,数据库,DashBoard 等。
如果一个 Stream 应用程序来实现端到端的数据传输,则每个环节都需要一次性完整保障。也就是说,每个记录都必须被接收正好一次,恰好转化一次,被推到下游系统一次。让我们了解在 Spark Streaming 的情况下这些步骤的语义。
接收数据 : 不同的输入源提供不同的容错。详细过程在下一小节中讨论。
转换数据 : 已经接收将被处理一次的所有数据,这要依靠于 RDDs 提供保证。即使有故障,只要接收到的输入数据是可读的,最终通过 RDDs 的转化将始终具有相同的内容。
输出数据 : 定义了数据至少一次输出,因为它依赖于输出数据操作类型(是否支持转换)和定义的下游系统(是否支持事务)。但是,用户可以实现自己的传输机制,以实现只要一次定义。会在后面小节里有更详细的讨论。
接收数据的语义
不同的输入源提供不同的保障,从至少一次到正好一次。阅读更多的细节。
关于文件
如果所有的输入数据已经存在如 HDFS 的容错文件系统,Spark Streaming 总是可以从任何故障中恢复所有数据。这种定义,在一次处理后就能恢复。
关于基于 Receiver(接收器) 的 Source(源)
对于基于 receiver 的输入源,容错的语义既依赖于故障的情形也依赖于 receiver 的类型。正如之前讨论的,有两种类型的 receiver :
- Reliable Receiver(可靠的接收器): 这些 receivers 只有在确保数据复制之后才会告知可靠源。如果这样一个 receiver 失败了,缓冲(非复制)数据不会被源所承认。如果 receiver 重启,源会重发数 据,因此不会丢失数据。
- Unreliable Receiver(不可靠的接收器): 当 worker 或者 driver 节点故障,这种 receiver 会丢失数据。
选择哪种类型的 receiver 依赖于这些语义。如果一个 worker 节点出现故障,Reliable Receiver 不会丢失数据,Unreliable Receiver 会丢失接收了但是没有复制的数据。如果 driver 节点出现故障,除了以上情况下的数据丢失,所有过去接收并复制到内存中的数据都会丢失,这会影响有状态 transformation 的结果。
为了避免丢失过去接收的数据,Spark 1.2 引入了一个实验性的特征(预写日志机制)write ahead logs,它保存接收的数据到容错存储系统中。有了 write ahead logs 和 Reliable Receiver,我们可以做到零数据丢失以及 exactly-once 语义。
下表总结了根据故障的语义 :
Deployment Scenario | Worker Failure | Driver Failure |
---|---|---|
Spark 1.1 或者更早, 没有 write ahead log 的 Spark 1.2 | 在 Unreliable Receiver 情况下缓冲数据丢失;在 Reliable Receiver 和文件的情况下,零数据丢失 | 在 Unreliable Receiver 情况下缓冲数据丢失;在所有 receiver 情况下,过去的数据丢失;在文件的情况下,零数据丢失 |
带有 write ahead log 的 Spark 1.2 | 在 Reliable Receiver 和文件的情况下,零数据丢失 | 在 Reliable Receiver 和文件的情况下,零数据丢失 |
Kafka Direct API
在 Spark 1.3 中,我们引入了一个新的 kafka Direct 的 API,它可以保证所有 kafka 数据由 spark stream 收到一次。如果实现仅一次输出操作,就可以实现保证终端到终端的一次。这种方法(版本 Spark2.0.2)中进一步讨论 kafka集成指南。
输出操作的语义
输出操作(例如 foreachRDD
)至少被定义一次,即变换后的数据在一次人工故障的情况下可能会不止一次写入外部实体。虽然可以通过操作 saveAs***Files
保存文件到系统上(具有相同的数据将简单地被覆盖),额外的尝试可能是必要的,以实现一次准确的语义。有两种方法。
幂等更新 : 多次尝试写相同的数据。例如,
saveAs***Files
总是写入相同数据生成的文件。事务更新 : 所有更新事务作出这样的更新是恰好遵循原子性。这样做的方法如下:
- 使用批处理时间创建一个标识符(
foreachRDD
)和 RDD 的分区索引。给这个应用定义唯一标识符标识 blob 数据。 更新外部系统与当前事务(即原子性),如果已经被标识过得数据应用已经存在,那么就跳过更新。
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }
- 使用批处理时间创建一个标识符(