页面树结构

2017-11-09 ApacheCN 开源组织,第二期邀请成员活动,一起走的更远 : http://www.apachecn.org/member/209.html


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

1、概述

Kafk Streams是一个处理和分析存储在Kafka中数据的客户端库,可以将处理结果写回Kafka或者输出到一个外部的系统。它构建在一些重要的流处理概念之上:比如合理区分事件时间和处理时间、窗口支持、简单高效的应用状态管理。

 

Kafka Streams 上手门槛很低:

用户可以快速的编写并在单台机器上运行一个小规模的验证程序;只需要在多台机器运行应用程序的多个实例来按比例放大生产能力。

Kafka streams 借用kafka的并行模型,透明的处理同一个应用程序的多个实例的负载均衡。

 

Kafka-Streams的一些亮点:

  • 被设计为一个简单、轻量的客户端库,可以很容易的嵌入到任意的Java应用程序中并与现有的包、部署操作工具整合。
  • 除了使用Apache Kafka自身作为内部消息层,在系统上也不需要外部依赖;尤其的,它使用Kafka的分区模型水平伸缩处理能力同时保证强有序性。
  • 支持容错的local state,使得可以执行快速高效的有状态的操作,如基于窗口的连接和聚合。
  • 采用一次处理一条记录的方式来实现毫秒级处理延迟,支持基于事件时间[event time]的窗口操作
  • 提供必要的流处理源语,以及一个高级的 Streams-DSL和一个低级的 Processor-API

2、核心概念

首先总结几个Kafka Streams的关键的概念。

2.1、Stream Processing Topology

  • stream Kafka Streams提供的非常重要的抽象概念:它代表无界的、持续的更新的数据集。Stream是一个有序的、可重播的、容错的、不可变的数据记录的序列。数据记录[data record]被定义为一个key-value对。
  • stream processor processor-topology的一个节点;他代表一个转换streams中数据的处理步骤:每次从他的上游processors接收一个输入记录,对数据记录应用自己的操作逻辑,接下来可能生成一条或者多条数据记录给他的下游processors
  • processor topology是所有的stream processor(节点)被Stream(边)连接起来的一个张图。
  • stream processing application 是任何利用Kafka Streams库的程序。它通过一个或者多个processor topology来定义其计算逻辑。

 

topology中有两种特殊的processor

  • Source Processor:它是一种特殊的stream processor,没有上游processor。它通过消费一个或者多个topic中的记录为自身的拓扑生成输入流,然后将他们转发到下游processor
  • Sink Processors:也是一种特殊的stream processor,他没有下游processor。它发送从上游processor接收到的所有记录到一个特定的Kakfa Topic

Kafka Streams 提供两种定义 stream processing topology 的方式:

  • Kafka Streams DSL提供一般的开箱即用的数据转换操作,如map、filter、joinaggregations
  • Processor API 允许用户定义和连接自定义processor ,以及与state stores交互。

一个processor toplogy 只不过是一个流处理代码的逻辑抽象。在运行时,逻辑拓扑在应用程序内部被实例化并复制,用于并行处理(具体请查看:Stream Partitions and Tasks)。

2.2 、Time

在流处理中关键的一点是 Time 的概念,以及它是如何模型化并被整合的。例如,一些操作如窗口就是基于时间边界来定义。

 

关于流中的Time的一些概念:

  •  event time – 一个事件或者数据记录发生的时间点,换句话说,是“在源头”被最初创建的时间。例子:如果事件是一个车载GPS传感器上报的地理位置的更新,这时关联的event time就是GPS抓取到位置改变时的时间。
  • processing time – 数据记录被流应用程序处理的时间点,换句还说,就是这条数据记录是何时被消费的。processing time可能比event time晚几毫秒、几小时、几天不等。例子:想象一个分析程序,从车载传感器上报的数据中读取并处理地理位置信息,显示到车队的管理仪面板上。这里,在这个分析程序中processing time可能比event-time晚几毫秒或者几秒(例如基于Apache KafkaKafka Streams的实时管道)或者几小时(例如基于Apache Hadoop或者Apache Spark的批量管道)。
  • ingestion time – 数据记录被Kakfa borker保存到 topic partition中的时间点。与event time不同的是这个ingestion时间戳是当记录被保存到目标topic中的时候由kafka broker生成的,不是消息在源头被创建时的时间点。与processing time不同的是,processing time是应用程序处理这条消息的时间点。例如,如果一个记录从未被处理过,这条记录就没有processing time,但是仍然有ingestion time

 

event-timeingestion-time的选择事实上通过Kafka(不是Kafka Streams)的配置来完成。从Kafka 0.10.x往前,时间戳自动包含在Kafka messages中。根据Kafka的配置,这些时间戳表现为event-time或者ingestion-time。可以在borker级别或者每个topic上分别指定各自的Kafka配置的设置。Kafka Streams中默认时间戳抽取器会原样获取这些嵌入的时间戳。因此,你的应用程序中生效的时间定义,取决于生效的嵌入时间戳相关的Kafka配置。

 

Kafka Streams assigns a timestamp to every data record via the TimestampExtractor interface. Concrete implementations of this interface may retrieve or compute timestamps based on the actual contents of data records such as an embedded timestamp field to provide event-time semantics, or use any other approach such as returning the current wall-clock time at the time of processing, thereby yielding processing-time semantics to stream processing applications. Developers can thus enforce different notions of time depending on their business needs. For example, per-record timestamps describe the progress of a stream with regards to time (although records may be out-of-order within the stream) and are leveraged by time-dependent operations such as joins.

 

最后,无论何时Kafka Streams应用程序向Kafka中写入记录,它也会为这些新的记录赋予时间戳。赋予时间戳的方式取决于context

 

当处理一些输入记录生成一个新记录时,例如,在process( ) 函数调用中,触发了context.forward(),输出记录的时间戳直接从输入记录的时间戳继承。

当输出记录通过周期执行的函数如punctuate( )生成时,输出记录时间戳使用当前流task的内部时间(通过context.timestamp( )

对于聚合操作,聚合结果更新记录的时间戳,应该是最新到达的输入记录触发更新操作时的时间戳。

2.3、状态

一些流处理应用不需要状态,意味着一个消息的处理与其它所有消息的处理是相互独立的。尽管如此,能够保持状态的特性,为很多复杂巧妙的流处理应用开启了很多可能性:你可以联结输入流,或者分组、聚合数据记录。Kafka Stream DSL提供许多这种有状态的操作。

 

Kafka Stream提供一个所谓的state-stores,可以被流流处理应用程序用于存储和查询数据。,这对实现有状态的操作来说是一个很重要的能力。Kafka Streams中的每个任务含有一个或者多个state stores,可以通过APIs来存储或者查询程序处理所需要的数据。这些state store可以是一个持久化的key-value存储、一个内存中hashmap或者其它便捷的数据结构。Kafka Streamslocal state stores提供容错和自动恢复。

 

Kafka Streams allows direct read-only queries of the state stores by methods, threads, processes or applications external to the stream processing application that created the state stores. This is provided through a feature called Interactive Queries. All stores are named and Interactive Queries exposes only the read operations of the underlying implementation.

3、架构

Kafka Streams基于Kafkaproducerconsumer库来简化应用开发,借用Kafka的原生功能来提供并行、分布式协调、容错、操作的简化。在这一节,我们讲解Kafka Streams在底层是如何工作的。

 

下图是应用程序利用Kafka Streams库的原理解析。我们来简单的探讨一些细节。

 

3.1、Stream Partition and Tasks

Kafka partition的消息层用于存储和传输,Kafka Streams分区用于处理。这两种情形下,这种分区组织方式可以使数据本地化、弹性化、可扩展化、高性能、容错。Kafka Streams基于Kafka topic partition, 利用分区和任务的概念作为它并行模型的逻辑单元。Kafka StreamsKafka在并行的环境中关联紧密。

  • 每一个stream partition是一个完全有序的的数据记录的序列,映射到一个Kafka topic partition
  • stream中的数据记录[data record] 映射到这个topic中的一个Kafka消息。
  • 数据记录的keys决定数据在kafkaKafka Streams中的分区,也就是说,数据是如何被路由到topics中指定的分区的。

 

一个应用程序的processor topology通过把自己分为多个task来进行伸缩。更具体的说,Kafka Streams基于输入流的分区为应用程序创建固定数量的任务,每个任务分配一个输入流(也就是,Kafka Topics)的分区的列表。任务与分区的分配关系永不改变,因此每个任务是应用程序的并行度的固定单元。任务可以基于分配的partitions实例化他们所属的processor topology。他们为每一个分配的分区保持一个缓存,然后从这些记录缓存中逐条处理消息。因此,stream任务可以不需要人工的干预的情况下独立并行的被处理。

 

要明白Kafka Streams不是一个资源管理器,而是一个可以在任何流处理应用程序运行的地方执行的一个类库。应用程序的多实例可以在一台机器或者跨多台机器执行,任务会被这个库自动分布到这些运行的应用实例上。任务与分区的分配关系永远不变;如果一个应用实例故障,所有承载的任务会自动在其他的实例上自动重新启动,来继续从同一个stream partitinons中消费。

 

下图显示了两个task,每个task都分配了同一个输入流的其中一个partition

3.2 、线程模型

Every stream task in a Kafka Streams application may embed one or more local state stores that can be accessed via APIs to store and query data required for processing. Kafka Streams offers fault-tolerance and automatic recovery for such local state stores.

 

Kafka Streams允许用户配置线程的数量,被这个库用于在一个应用实例中进行并行化处理。每一个线程都可以与他的processor topology独立的执行一个或者多个任务。例如,下图显示了一个stream线程包含2个stream任务。

 

Starting more stream threads or more instances of the application merely amounts to replicating the topology and having it process a different subset of Kafka partitions, effectively parallelizing processing. It is worth noting that there is no shared state amongst the threads, so no inter-thread coordination is necessary. This makes it very simple to run topologies in parallel across the application instances and threads. The assignment of Kafka topic partitions amongst the various stream threads is transparently handled by Kafka Streams leveraging Kafka's coordination functionality.

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal

 

As we described above, scaling your stream processing application with Kafka Streams is easy: you merely need to start additional instances of your application, and Kafka Streams takes care of distributing partitions amongst tasks that run in the application instances. You can start as many threads of the application as there are input Kafka topic partitions so that, across all running instances of an application, every thread (or rather, the tasks it runs) has at least one input partition to process.

3.3、本地状态存储Local State Stores

Kakfa Streams 提供所谓的state stores,可以被stream processing application用来存储和查询数据,是实现有状态操作的重要的能力。 Kafka Streams DSL,当调用有状态的操作的时候自动创建和管理这种state stores,有状态的操作如join() 或者 aggregate(),或者当你窗口化一个stream时.

 

Kafka Stream应用程序中的中的每一个stream task都可以包含一个或者多个local state store,可以通过APIs来存储或者查询处理过程中需要的数据。Kafka Streams为这种local state stores提供容错和自动的恢复机制。

 

下图显示了2个stream tasks与他们专有的local state stores

 

 

 

3.4、Fault Tolerance

Kafka Streams的容错机制建立在Kafka原生的容错能力之上。Kafka partitions是高可用和可复制的;因此当stream data被保存到Kafka以后,哪怕应用程序失败了并且需要重新处理,也是可以重新获取到这些数据的。Kafka Stream中的task拥有Kafka Consumer Client提供的容错能力来处理失败的情况。如果一个运行在一台主机上的task失败了,Kafka Streams 会在仍然正常运行的应用程序的实例上自动重启这个任务。

 

另外,Kafka Streams 确保对于local state stores失败情况也具有健壮性。对于每个state store,保持一个可复制的changelog Kafka topic用于跟踪state的任何变更。这些changelog topic同样是被分区的(These changelog topics are partitioned as well so that each local state store instance, and hence the task accessing the store, has its own dedicated changelog topic partition)。在changelog topic中会开启日志压缩,比较老的数据可以被安全的清理,防止topic的无限制的增长。如果运行在一台主机上的task失败了,并且在另一台机器上被重新启动,Kafka Streams可以确保恢复他们相关联的state stores到失败之前的内容,通过在新启动的task恢复处理之前重播对应的changelog topic。因此,失败处理对于最终用户来说是完全透明的。

 

注意,task初始化(或者重新初始化)的耗时通常主要取决于通过重播state stores来恢复状态的时间。为了减少恢复时间,用户可以配置他们的应用拥有一个备用的local states的副本(也就是说,一个state副本的完全拷贝)。当发生了一个task迁移,Kafka Streams试图将task分配到一个应用程序的实例上,这个实例上已经存在一个备用的副本用于最小化task初始化的时间消耗。

 

请在Kafka Streams Configs部分查看num.standby.replicas选项。

http://kafka.apache.org/0102/documentation/#streamsconfigs

4、开发指南

这里有一个快速入门的例子,提供了如何运行一个使用Kafka Streams库开发的stream处理程序。这一节主要聚焦于如何编写、配置、执行一个Kafka Streams应用程序。

 

如上面提到的,kafka Streams应用程序的计算逻辑被定义为一个 处理器拓扑[processor topology]

 

当前的Kafka Streams 提供两套API 来定义processor topology,下面会详细讲解。

 

快速入门例子:

http://kafka.apache.org/0102/documentation/#quickstart_kafkastreams

Processor Toplogy:

http://kafka.apache.org/documentation/streams#streams_topology

4.1、低级Processor API

4.1.1、Processor

正如在核心概念[Core Concepts]中提到的,一个steam processor是一个processor topology中的一个节点,代表单个处理步骤。使用 Processor API,开发者可以定义任意的stream processors用来一次处理一条接收到的记录,使用与之相关联的state stores连接这些processors 来组成一个processor topology,作为他们自定义的处理逻辑。

 

Processor 接口提供2个主要的API方法:processpunctuate

process会针对每一条接受记录执行;punctuate方法会基于时延定期执行。

另外,processor可以维护在init方法中初始化的当前ProcessContext实例变量,使用这个context来定义punctuation周期(context().schedule),将修改过的或者新key-value对转发到下游的processorscontext().forward),提交当前的处理进度(context().commit),等等。

 

下面的例子Processor实现定义了一个简单的word-count算法:

public class MyProcessor extends Processor<String, String> {
    private ProcessorContext context;
    private KeyValueStore<String, Long> kvStore;

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        // keep the processor context locally because we need it in punctuate() and commit()
        this.context = context;

        // call this processor's punctuate() method every 1000 milliseconds.
        this.context.schedule(1000);

        // retrieve the key-value store named "Counts"
        this.kvStore = (KeyValueStore<String, Long>) context.getStateStore("Counts");
    }

    @Override
    public void process(String dummy, String line) {
        String[] words = line.toLowerCase().split(" ");

        for (String word : words) {
            Long oldValue = this.kvStore.get(word);

            if (oldValue == null) {
                this.kvStore.put(word, 1L);
            } else {
                this.kvStore.put(word, oldValue + 1L);
            }
        }
    }


    @Override
    public void punctuate(long timestamp) {
        KeyValueIterator<String, Long> iter = this.kvStore.all();

        while (iter.hasNext()) {
            KeyValue<String, Long> entry = iter.next();
            context.forward(entry.key, entry.value.toString());
        }

        iter.close();
        // commit the current processing progress
        context.commit();
    }

    @Override
    public void close() {
        // close the key-value store
        this.kvStore.close();
    }
};


上面的程序中,执行了一下的动作: 

  • init方法中,每1秒调度一次punctuation,使用名字 “Count” 获取local state store
  • process方法中,当收到每条记录,将字符串分割成单词,更新他们的数量到state store(后面再详细讨论这个东西)
  • punctuate方法中,遍历local state store,将所有聚合完成的counts发送到下游processor,并提交当前stream state

4.1.2 Processor Topology

当自定义的processor使用Processor API开发完成后,开发者可以使用TopologyBuilder 连接起所有的processor来构建一个processor topology

 

TopologyBuilder builder = new TopologyBuilder();

builder.addSource("SOURCE", "src-topic")
    // add "PROCESS1" node which takes the source processor "SOURCE" as its upstream processor
    .addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
    // add "PROCESS2" node which takes "PROCESS1" as its upstream processor
    .addProcessor("PROCESS2", () -> new MyProcessor2(), "PROCESS1")
    // add "PROCESS3" node which takes "PROCESS1" as its upstream processor
    .addProcessor("PROCESS3", () -> new MyProcessor3(), "PROCESS1")
    // add the sink processor node "SINK1" that takes Kafka topic "sink-topic1"
    // as output and the "PROCESS1" node as its upstream processor
    .addSink("SINK1", "sink-topic1", "PROCESS1")
    // add the sink processor node "SINK2" that takes Kafka topic "sink-topic2"
    // as output and the "PROCESS2" node as its upstream processor
    .addSink("SINK2", "sink-topic2", "PROCESS2")
    // add the sink processor node "SINK3" that takes Kafka topic "sink-topic3"
    // as output and the "PROCESS3" node as its upstream processor
    .addSink("SINK3", "sink-topic3", "PROCESS3");


上面的代码构建一个topology,有多个步骤,下面快速浏览一下:

  • 首先,使用addSource方法将一个名为 “SOURCE”source节点添加到topology,由一个名为 “src-topic”Kafka Topic提供数据。
  • 然后,使用addProcessor方法添加3个processor节点。第一个processor是一个“SOURCE“ 节点的子节点,但是是另外两个processor的父节点。
  • 最后,使用addSink方法添加三个sink节点到topology,将每一个processor节点的流出写入到各自的topic中。

4.1.3、State Stores

注意到Processor API不仅限于处理在process() 方法中接收的记录,也可以用于维护处理状态,保持最近收到的记录,用于有状态的操作如窗口的关联和聚合。利用这些状态,用户可以通过实现StateStore接口(Kafka Streams库也有几个扩展的接口如KeyValueStore)来定义一个state store;在实践中,尽管用户通常不需要从零开始自定义这样的一个state store,可以简单的使用Stores工厂来定义一个state store,指定是否应该持久化、日志滚动,等等。在下面的例子中,一个名为 “Counts” 的持久化的key-value store被创建,key类型String,value类型Long。

StateStoreSupplier countStore = Stores.create("Counts")
    .withKeys(Serdes.String())
    .withValues(Serdes.Long())
    .persistent()
.build();


为了利用这些state stores,当开发者构建一个processor topology来创建一个local state,并且把local state与需要访问stateprocessor节点关联起来的时候,可以使用TopologyBuilder.addStateStore方法;或者他们可以使用已经存在的processor节点连接到state store,通过:

TopologyBuilder.connectProcessorAndStateStores. 方法

TopologyBuilder builder = new TopologyBuilder();

builder.addSource("SOURCE", "src-topic")
    .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
    // add the created state store "COUNTS" associated with processor "PROCESS1"
    .addStateStore(countStore, "PROCESS1")
    .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
    .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
    // connect the state store "COUNTS" with processor "PROCESS2"
    .connectProcessorAndStateStores("PROCESS2", "COUNTS");
    .addSink("SINK1", "sink-topic1", "PROCESS1")
    .addSink("SINK2", "sink-topic2", "PROCESS2")
    .addSink("SINK3", "sink-topic3", "PROCESS3");


在下一节,我们提供了里能一种构建processor topology的方法:Kafka Streams DSL。

4.2、高级Streams DSL

为了使用 Steams DSL 构建一个 processor topology,开发者可以使用 KStreamBuilder类,KStreamBuilder 继承自TopologyBuilder 类。在Kafka的源代码的streams/examples包下面有一个简单的例子。

 

下面将浏览一些代码来演示一些使Streams DSL来创建topology 的关键的代码,但是建议开发者阅读完整的源代码来了解所有细节。

4.2.1、Streams和Tables的二元性

stream-table duality:

https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying/

 

在讨论比如聚类之类的概念之前,我们需要首先介绍下table,以及更重要的tablestream之间的关系:所谓的 stream-table 二元性。本质上,这个二元性意味着stream可以看作table,反之亦然。例如Kafka的日志压缩特性,就利用这种二元性。

 

table的简单的形式是key-value对的集合,也叫map或者联合数组。这种table看起来像这样:

stream-table 二元性 描述了streamstables的密切关系。

  • Stream as Tablesstream可以被认为是tablechangelogstream中的每一条数据记录捕获一个table的状态变更。stream 因此是伪装的table,可以很容易的转变为“真正的” table,通过从头到尾的回放changelog来重建一个table。类似的,用更一般的比方,聚合stream中的数据记录(例如从一连串的页面浏览事件中根据用户来统计页面浏览量)会返回table(这里的key与value分别对应用户与用户相关联的页面浏览总量)。
  • table as streamtable可以看作是在某个时间点上,stream中的每一个key对应的最新value的一个快照(stream中的数据记录是key-value对)。因此table是伪装的stream,它可以很容易的转变成“真正的” stream,通过遍历table中的key-value。

 

译注:

stream是逐条流动的数据记录,可以通过计算一段数据流,将结果记录在tables里面

table是某个时间点上一个静态的统计结果,可以遍历table的每一条记录,来形成一个stream

stream和table可以互相转化,stream统计结果信息为table,table逐条遍历形成stream。

 

让我们使用一个例子来演示这种关系。设想有一个table,用来跟踪每个用户的浏览页面的总量(下图的第一列)。随着时间流逝,无论何时当一个新的浏览页面事件被处理完,table的状态都会相应的更新。这里,不同的时间点状态变化和table的不同版本表示为一个changelog流(第二列)。

有意思的是,由于 stream-table二元性,可以用同一个stream重建原始的table(第三列)。

同样的机制经常被使用,比如,通过变更数据捕获(CDC)复制数据库,在Kafka streams中为容错的需要跨主机复制所谓的state storesstream-table二元性是如此重要的概念以至于Kafka Streams 通过KStreamKTableGlobalKTable接口,明确的将它作为模型,下一节做介绍。

 

KStream,KTable,和 GlobalKTable

DSL 使用三个主要的抽象。

KStream是一个记录流的抽象,每一个数据记录在一个无界的数据集中表现为一个独立的数据。

KTable 是一个changelog stream的抽象,每条数据记录代表一个更新。更准确的说,数据记录中的值是同一个记录key对应的最新的value值的更新,如果有的话(如果相应的key还不存在,update操作会当作create操作)。

GlobalKTable,与KTable类似,一个GlobalKTable是一个changelog stream的抽象,每条数据记录代表一个更新。但GlobalKTable又不同于KTable,它在每个Kafka Streams实例上完全复制。GlobalKTable还提供使用key查找数据记录当前value的功能。这种表查找功能可以通过连接操作获得。为了演示KStreamsKTable/GlobalKTable的不同,我们设想下面的两条数据已经被发送到stream:

("alice", 1) --> ("alice", 3)


对于这两条记录,KStreamstream processing application会累加这两个值并返回4。

如果这些记录是KTable或者GlobalKTable,将会返回3。因为最后一条记录会被认为是对前一条记录的更新。

4.2.2、从Kafka中创建源Streams

要么是record stream(定义为KStream)要么是一个changelog steam(定义为KTable或者GlobalKTable),可以从一个或者多个Kafak topic创建一个source stream。(对于KTableGlobalKTable,你只能从一个单独的topic来创建source stream)。

KStreamBuilder builder = new KStreamBuilder();

KStream<String, GenericRecord> source1
    = builder.stream("topic1", "topic2");
KTable<String, GenericRecord> source2
    = builder.table("topic3", "stateStoreName");
GlobalKTable<String, GenericRecord> source2
    = builder.globalTable("topic4", "globalStoreName");

 

4.2.3、窗口化Stream

一个stream processor可能需要把数据记录划分到时段中,也就是说,按照时间来窗口化stream。这对于joinaggretation等操作通常是需要的。Kafka Streams当前定义了下面的几种窗口类型:

  • Hopping time widows [ 跳频时间窗 ]:是基于时间间隔的窗口。他们形成固定大小,(可能)互相重叠的窗口。调频窗口通过2个属性来定义:窗口尺寸和它的时间间隔(又叫 “跳”)。时间间隔指定了窗口相对于前一个窗口向前移动多少。例如,你可以配置一个跳频窗口大小为5分钟和时间间隔1分钟。由于跳频窗口可以重叠,一个数据记录可能属于多个窗口。
  • Tumbling time windows [ 滚动时间窗 ]:是跳频时间窗口的一个特例,是基于时间间隔的。他们形成大小固定,不重叠,无间隙的窗口。滚动窗口仅使用一个属性来定义:窗口大小。滚动窗口是一个窗口大小等于时间间隔的跳频窗口。由于滚动窗口永不重叠,一条数据记录永远只属于一个窗口。
  • Sliding windows [ 滑动窗口 ]:形成一个在时间轴上持续滑动固定尺寸的窗口;这里,如果两条数据记录的时间戳的差额在窗口尺寸内,他们将包含在同一个时间窗口中。因此,滑动窗口不是通过epoch来对齐,而是通过数据记录自身时间戳来对齐。在Kafka Streams中,滑动窗口仅用于join操作,可以通过JoinWindows类来指定。
  • Session windows [ 会话窗口 ]:are used to aggregate key-based events into sessions. Sessions represent a period of activity separated by a defined gap of inactivity. Any events processed that fall within the inactivity gap of any existing sessions are merged into the existing sessions. If the event falls outside of the session gap, then a new session will be created. Session windows are tracked independently across keys (e.g. windows of different keys typically have different start and end times) and their sizes vary (even windows for the same key typically have different sizes); as such session windows can't be pre-computed and are instead derived from analyzing the timestamps of the data records.

 

Kafka Streams DSL中用户可以为窗口指定一个保留周期[retention period],这允许Kakfa Streams在一定的时间周期内保持一个旧窗口的桶,为了等迟到的时间戳落在这个窗口中的记录到达。如果一条记录在保留周期之后到达,这条记录就会被丢弃。

 

译注:

意思就是某个时间窗口过去以后,可能还会有一些来的很迟的记录,这些记录本应该是落在这个时间窗口的,系统会保留窗口内数据并等待一段时间,等是否有迟来的记录并加入窗口。时间一过,迟来的记录也不要了。等待的时间就是 保留时间。

 

迟到记录有可能一直在实时数据流中。不管怎样,都取决于实际的时间定义,即如何定义一个记录是迟到的。使用processing-time的情况下,时间的定义是一条数据是何时被处理,意味着“迟到记录”的概念并不适用,根据定义没有记录会迟到。因此,迟到记录事实上可以认为是仅对event-timeingestion-time的时间语义有用。在这两种情况下,Kafka Streams能够合理的处理迟到记录。

 

译注:

迟到记录的保留周期,仅对event-time和ingestion-time的窗口时间的定义方式有效。如果窗口时间使用记录被处理的时间戳,就不会有迟到记录的概念,因为记录的参考时间永远是处理器的当前时间。

4.2.4、join multiple Streams

标题:联合多个streams。

 

join操作基于数据记录的keys合并两个stream,然后产生一个新的stream。数据流的上的join操作通常需要在一个时间窗口的基础上执行,因为要执行join操作就需要保持一定数量的记录,没有时间窗口的话记录数量会无限增长。在Kafka Streams,可以执行如下的join操作:

  • KStream-to-KStream Joins
  • KTable-to-KTable Joins
  • KStream-to-KTable Joins
  • KStream-to-GLobalKTable Joins

4.2.5、Aggregate a stream

标题:聚合一个流

 

aggregation操作获取一个stream输入,通过整合多条输入记录为一个输出记录,最后产生一个新的stream。计算数量或者求和都是aggregation的例子。数据流上的aggregation操作需要在窗口的基础上执行,如果不在窗口上执行的话需要保持的数据量会无限增长。

 

Kafka Streams DSL上,aggragation操作的输入流可以是KStream或者KTable,但是输出流永远是KTable。这使得Kafka Streams可以在聚合值产生并发出后,用新来的数据记录继续更新聚合的值。在接下来的处理中,同一个key的新值会覆盖旧值。

4.2.6、Transform a Stream

标题:转换一个流

 

除了joinaggregation操作,还分别为KStreamKTable提供了一系列的其他转换操作。每一个操作都产生一个或者多个KStreamKTable对象,这些操作同时被翻译成一个或者多个已连接的processor组成一个底层的processor topology。所有的转换方法可以链式串接组成一个复杂的processor topology。由于KStreamKTable是强类型的,所有转换操作都被定义为一个泛型函数,用户可以指定输入和输出类型。

 

在所有的转换操作中,有一部分如filter,map,mapValues等等,都是无状态的转换操作,可以应用到KStreamKTable,用户通常传入一个自定义的函数作为一个参数,如Pridicate作为filter的参数,KeyValueMapper作为map的参数,等等。

// written in Java 8+, using lambda expressions
KStream<String, GenericRecord> mapped = source1.mapValue(record -> record.get("category"));

 


无状态转换
,根据定义,不依赖于任何处理状态,因此就实现而言,他们不需要一个与stream processor相关联的 state store

有状态的转换,需要访问一个相关联的状态用于处理和产生输出。例如在joinaggregate操作中,一个窗口状态经常用于保存接收到的定义在窗口边界内的记录。操作者可以访问这些store中累积的记录,并且基于这些记录做计算。

// written in Java 8+, using lambda expressions
KTable<Windowed<String>, Long> counts = source1.groupByKey().aggregate(
    () -> 0L,  // initial value
    (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
    TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds
    Serdes.Long() // serde for aggregated value
);

KStream<String, String> joined = source1.leftJoin(source2,
    (record1, record2) -> record1.get("user") + "-" + record2.get("region");
);

 

4.2.7、Write streams back to Kafka

标题:将流写回Kafka

 

在处理过程的最后,用户可以(持续的)将最终结果流写回Kafka Topic通过KStream。to和KTable.to.

joined.to("topic4");

 

当记录已经通过to方法物化到一个topic中,如果你的应用需要继续读取或者处理这些记录,一个选择是从这个topic中创建一个新的stream;Kafka Streams提供了一个便捷的方法 through:

// equivalent to
//
// joined.to("topic4");
// materialized = builder.stream("topic4");
KStream<String, String> materialized = joined.through("topic4");

 

4.2.8、Application Configuration and Execution

标题:应用程序的配置和执行

 

除了定义一个topology,开发人员在运行它之前,也需要在StreamConfig中做配置。一个完整的Kafka Streams的配置列表可以在这里找到:

http://kafka.apache.org/0102/documentation/#streamsconfigs

 

Kafka Streams中指定配置,与在Kafka ProducerConsumer Client中指定配置相似。通常创建一个java.util.Properties实例,设置必要的参数,然后使用Properties实例构建一个StreamConfig实例。

import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;

Properties settings = new Properties();
// Set a few key parameters
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper1:2181");
// Any further settings
settings.put(... , ...);

// Create an instance of StreamsConfig from the Properties instance
StreamsConfig config = new StreamsConfig(settings);


除了Kafka Streams自带的配置项,你也可以为Kafka consumersproducers设定参数作为内部使用,取决于你的应用程序。与Streams设置类似,你可以通过StreamConfig定义任何这种consumer 或者(和)producer的配置。注意,一些consumerproducer配置参数使用相同的参数名字。例如,send.buffer.bytes或者receive.buffer.bytes 用于配置TCP缓冲; request.timeout.msretry.backoff.ms 控制客户端请求的重试次数(还有更多)。如果你想为consumerproducer的这些参数各自定义不同的值,你可以在参数名字前添加前缀comsumer.producer.  :

Properties settings = new Properties();
// Example of a "normal" setting for Kafka Streams
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");

// Customize the Kafka consumer settings
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);

// Customize a common client setting for both consumer and producer
settings.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 100L);

// Customize different values for consumer and producer
settings.put("consumer." + ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
settings.put("producer." + ProducerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);
// Alternatively, you can use
settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 1024 * 1024);
settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG), 64 * 1024);

 

可以在应用代码的任何地方调用Kafka Streams。最普通的想法是,在应用程序的main()方法中做这个操作,or some variant thereof。

 

首先,必须创建一个KafkaStreams的实例。KafkaStreams构造函数的第一个参数是一个用于定义一个topologytopology builder(要么是Kafka Streams DSLKStreamBuilder,要么是Processor APITopologyBuilder);第二个参数是一个上面提到的StreamConfig实例。

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TopologyBuilder;


// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

KStreamBuilder builder = ...;  // when using the Kafka Streams DSL
//
// OR
//
TopologyBuilder builder = ...; // when using the Processor API

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);


到这里,内部结构已经被初始化完成,但是处理过程还没有启动。必须通过调用start()方法显式的启动Kafka Streams线程。

// Start the Kafka Streams instance
streams.start();

 

为了捕获到任何意料之外的异常信息,可以在启动应用程序之前设置一个java.lang.Thread.UncaughtExceptionHandler。这个handler 会在stream线程异常终止的任何时刻被调用。

streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public uncaughtException(Thread t, throwable e) {
        // here you should examine the exception and perform an appropriate action!
    }
);


调用close()方法来停止应用程序:

// Stop the Kafka Streams instance
streams.close();


现在是时候像执行其他的java应用程序一样执行由Kafka Steams库编写的应用程序——在Kafka Streams这边没有什么特殊的戏法或者要求。例如,可以把java应用程序打包成一个胖jar包,然后通过下面方式启动:

# Start the application in class `com.example.MyStreamsApp`
# from the fat jar named `path-to-app-fatjar.jar`.
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp


当应用程序实例开始运行,预定义的processor topology被初始化为一个或者多个streamtask,这些task可以被实例内部的stream线程并行执行。如果processor topology 定义了任何state stores,这些state stores在相关联的stream tasks初始化期间也会被创建(或者重建)。

需要重点理解,当启动上述的应用程序时,实际上启动了Kafka Streams 所认为的应用程序的一个实例。应用程序的多个实例可以在同一时间启动,事实上他们就是应用程序的并行执行的多个实例(例如,在其他的JVM上或者机器上)。这种情况下,Kafka Streams依据所有现存的实例透明的重新分配任务到刚启动的新实例上。

 

更多细节,请参考上面的章节:

Stream Partitions and Task

http://kafka.apache.org/documentation/streams#streams_architecture_tasks

Threading Model

http://kafka.apache.org/documentation/streams#streams_architecture_threads

 

译注:

主要的步骤可以总结一下,官方文档稍显散乱

5、升级指南与API更改

如果想从0.10.1升级到0.10.2,查看Upgrade Section for 0.10.2

它标注了需要升级代码和应用的一些不兼容的更改。

查看下面的0.10.2 API和语义上的变化的完全列表,可以升级应用程序,简化代码库,使用新的特性。

 

如果想从0.10.0升级到0.10.2,查看Upgrade Section for 0.10.1

它标注了需要升级代码和应用的一些不兼容的更改。

查看下面的0.10.2 API的变化的完全列表,可以升级应用程序,简化代码库,使用新的特性。

 

5.1、Streams API changes in 0.10.2.0

http://kafka.apache.org/documentation/streams#streams_api_changes_0102

5.2 、Streams API changes in 0.10.1.0

http://kafka.apache.org/documentation/streams#streams_api_changes_0101