页面树结构

版本比较

标识

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。

目录

面板
borderColor#00FFFF
borderStyledashed

原文链接 : httphttps://kafka.apache.org/0101/documentation.html(修改该链接为官网对应的链接)documentation/#quickstart

译文链接 : http://cwiki.apachecn.org/pages/viewpage.action?pageId=4882492(修改该链接为 ApacheCN 对应的译文链接) 1.3 Quick Start(快速开始)

贡献者 : @您的名字, 程威ApacheCNApache中文网

本教程,假设你没有任何kafka知识。并且没有现成的Kafka 和ZooKeeper数据。Kafka的命令行脚本在Windows平台和Unix平台不一样,在Windows平台请用这部分内容假设你是一个初学者,没有 Kafka 和 Zookeeper 的相关知识。因为 Kafka 在 Unix 和 Windows 平台上的控制台脚本不一样,所以在Windows平台使用 bin\windows\  代替 bin/  ,脚本的扩展名请改为脚本的后缀也变为 .bat .

Step 1: 下载代码(Download

步骤1:下载代码(Download the code)

下载 0.11.0.0版本代码,并且解压文件。0 release 版本,并解压代码。

代码块
> tar -xzf kafka_2.11-0.11.0.0.tgz
> cd kafka_2.11-0.11.0.0
Step 2:启动服务(Start

步骤2:启动服务(Start the server)

kafka 依赖于zookeeper,因此首先要启动zookeeper 服务,如果没有安装独立的zookeeper。可以使用Kafka中脚本得到一个应急的单节点zookeeper实例。Kafka 依赖于Zookeeper,如果你没有一个Zookeeper服务,你需要先启动一个Zookeeper 服务。你可以使用Kafka 自带的一个脚本,得到一个应急的单节点Zookeeper实例。

代码块
 > bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

现在启动Kafka服务:现在启动Kafka 服务:

代码块
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

Step 3: 创建一个主题(Create 步骤3: 创建一个topic(Create a topic)

我们建立一个名为“我们创建一个名为 test ”的单个分区单个副本主题:的topic,分配一个分区和一个备份:

代码块
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

如果执行浏览topic的命令,就可以看到刚刚创建的topic如果我们执行list topic 命名,就可以看到这个topic:
代码块
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

当前,除了主动创建topic,你可以配置当消息发布到一个不存在的topic,brokers将自动创建topic。

Step 4: 发送信息(Send

或者,除了主动创建topic,你可以配置broker 自动创建topic,当发布到一个不存在的topic的时候。

步骤4: 发送一些消息(Send some messages)

Kafka有个命令行客户端可以通过文件或标准输入向kafka集群发送消息。默认每行都是一条消息。

启动生产者(启动成功进入命令行阻塞状态,可以输入数据,回车发送)启动生产者,然后在控制台输入一些消息发送到sever端:

代码块
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
Step 5:启动consumer

步骤5:开启一个消费者(Start a consumer)

同样的Kafka有个命令行可以获取消息并标准输出

代码块
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果你在另外一个终端运行上面的命令,此时你在producer中发送消息,那么consumer终端会就显示该消息。

所有命令行工具都有很多选项; 不带参数运行命令会显示使用文档。

Step 6:设置broker群集

步骤6:建立一个多broker 集群(Setting up a multi-broker cluster)

到目前为止,我们都在单机上运行Kafka,挺没劲的。虽然多加机器操作上并没有太大改变,不过让我们感受下,让我们将我们的集群扩展到三个节点(仍然在我们的本地机器上)。

首先创建一个配置文件(在Windows中请使用copy命令代替):

代码块
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

编辑两个配置文件如下:

代码块
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

broker.id属性是集群中每个节点的唯一且永久的名称。因为我们正在同一台机器上运行这些,所以端口和日志目录也要修改,否则数据会相互覆盖。

因为Zookeeper 已经在单节点启动,所以我们启动两个新的broker节点即可。

代码块
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

现在创建一个新的主题,并设置3个副本。

代码块
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

现在我们已经创建一个集群,但是我们怎么知道每个broker都做了什么?可以执行”describe toics”查看

代码块
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

这里解释一下上面输出的信息。所有分区信息的概览,后面每一行都是其中一个分区的信息,因为我们只有一个分区因此只有一行

  • “leader” 负责指定分区的所有读写操作,每个分区的Leader都是随机选定的。
  • “replicas” 是复制此分区的日志的节点的列表,无论它们是否为Leader,或者它们当前处于活动状态。
  • “isr” 是“同步中”服务器列表,这个列表中的机器表示其处于活动状态,并且与Leader数据一致。

注意我们单一节点的例子,主题只有一个分区,一个节点,当我们运行”describe toics”查看状态的时候显示如下信息:

代码块
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

跟预料的一样,这个主题没有副本,且只有一个服务器

让我们在新的主题中发送一些消息

代码块
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

消费他们

代码块
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

让我们测一下容错,断掉Leader 的进程

代码块
> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

在windows中使用:

代码块
> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.1.0.jar"  kafka.Kafka config\server-1.properties    644
> taskkill /pid 644 /f

Leader已经被从属者替代,而且也不在in-sync列表里面了:

代码块
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 1,2,0    Isr: 2,0

但是信息仍然可以读取,即使原来的Leader宕机了

代码块
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
Step 7: 使用kafka

步骤7:使用kafka connect 导入/

导出数据

导出数据(Use Kafka Connect to import/export data)

从控制台输入数据虽然比较方便,但是你可能希望从其他数据源导入数据,或者从Kafka导出到其他系统。对于多数系统,您可以使用Kafka Connect来导入或导出数据,而不是自己写代码处理。

Kafka Connect是Kafka自带的一个工具用来导入导出数据。该工具可以通过connectors扩展,实现与其他系统交互。在快速入门中,我们将看到如何使用Kafka Connect运行简单连接器(connectors)将数据从文件导入Kafka主题,并将数据从Kafka主题导出到文件。

首先建一些测试数据:

代码块
> echo -e "foo\nbar" > test.txt

接下来,我们将启动在standalone模式下运行的两个连接器,这意味着它们在单个本地专用进程中运行。 我们提供三个配置文件作为参数。 第一个是Kafka Connect进程的配置,包含常见的配置,比如要连接的Kafka服务器,数据序列化格式。其余的配置文件均指定要创建的连接器。 这些文件包括唯一的连接器名称,实例化的连接器类以及连接器所需的任何其他配置。

代码块
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

这些示例配置文件使用您之前启动的默认本地群集配置,并创建两个连接器:第一个是source connector,从输入文件读取行,然后发送到Kafka主题,第二个是宿连接器(sink connector) 它从Kafka主题读取消息,并将每一行输出到文件。

在启动期间,您将看到一些日志消息,包括一些正在实例化的连接器。 一旦Kafka Connect进程启动,源连接器开始从test.txt读取行并将其生成到主题connect-test,然后sink连接器应该开始从主题connect-test读取消息,并将它们写入文件test.sink.txt。 我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传送:

代码块
> cat test.sink.txt
foo
bar

注意,数据存储在Kafka主题connect-test中,因此我们还可以运行控制台consumer查看主题中的数据(或使用自定义consumer程序代码来处理):

代码块
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

我们可以继续向文件中添加数据,并查看它通过管道中的传送:

代码块
> echo "Another line" >> test.txt
Step 8: 使用kafka Stream处理数据

步骤8:使用kafka Stream处理数据(Use Kafka Streams to process data)

Kafka Streams是Kafka用于实时流处理和分析存储在Kafka服务器中数据的库。 这个快速入门示例演示用词库编写的WordCountDemo程序(代码转为Java 8 lambda表达式方便阅读)。

代码块
KTable wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Ensure the words are available as record keys for the next aggregate operation.
    .map((key, value) -> new KeyValue<>(value, value))

    // Count the occurrences of each word (record key) and store the results into a table named "Counts".
    .countByKey("Counts")

它实现WordCount算法,从输入文本计算单词出现的数量。 但是,与你看到的其他WordCount示例不同,该WordCount演示应用程序设计为对无限的无界数据流进行操作。 与有界变量类似,它有一种有状态算法,用于跟踪和更新单词的计数。 然而,由于它必须假定潜在的无界输入数据,它将周期性地输出其当前状态和结果,同时继续处理更多的数据,因为它不知道它何时处理了“全部”输入数据。

我们现在将准备输入数据到Kafka主题,随后将由Kafka Streams应用程序处理。

代码块
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

Windows:

代码块
> echo all streams lead to kafka> file-input.txt
> echo hello kafka streams>> file-input.txt
> echo|set /p=join kafka summit>> file-input.txt

接下来,我们使用控制台生成器将输入数据发送到名为streams-file-input的主题(实际上,流数据可能会连续流入Kafka,应用程序将并行运行):

代码块
> bin/kafka-topics.sh --create \
            --zookeeper localhost:2181 \
            --replication-factor 1 \
            --partitions 1 \
            --topic streams-file-input

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

我们现在可以运行WordCount演示应用程序来处理输入的数据:

代码块
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

除了日志条目,将不会有任何STDOUT输出,因为结果被连续写回Kafka中名为streams-wordcount-output的另一个主题。 演示将运行几秒钟,然后不像典型的流处理应用程序会自动终止。

我们现在可以通过从其输出主题中读取数据来检查WordCount演示应用程序的输出:

代码块
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
            --topic streams-wordcount-output \
            --from-beginning \
            --formatter kafka.tools.DefaultMessageFormatter \
            --property print.key=true \
            --property print.value=true \
            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

输出如下:

代码块
all     1
lead    1
to      1
hello   1
streams 2
join    1
kafka   3
summit  1

这里,第一列是Kafka消息键,第二列是消息值,是java.lang.String格式。 注意,输出实际上是连续的更新流,其中每个数据记录(如上面输出中的每一行)是每个单词的更新计数。 对于具有相同键的多个记录,以后的每条统计记录都是前一次的更新。

现在,您可以向streams-file-input主题写入更多输入消息,并观察添加到streams-wordcount-output主题的消息,查看跟新记录。

你可以通过Ctrl-C中断consumer