页面树结构

版本比较

标识

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。

为了消费Kafka 0.8.x里的数据,Storm提供了core Storm spout和Trident spout的实现。

Spouts

我们支持Trident spout和core Storm spout。对于两个spout的实现,我们用一个BrokerHost接口去追踪Kafka broker的主机分区映射和控制Kafka相关参数的配置文件。

BrokerHosts

为了初始化你的Kafka spout/emitter,你需要构建一个标记 BrokerHosts接口的实例。目前我们支持以下两种实现方式:

ZkHosts

当你想要动态的追踪 Kafka broker的分区映射时,你可以使用ZkHosts。这个类使用Kafka的ZooKeeper跟踪brokerHost - >partition映射。你可以通过调用实例化一个对象

...

brokerZkStr是IP:端口(例如:localhost:2181)。brokerZKPath是存储所有主题和分区信息的根目录。默认情况下是/brokers,Kafka默认使用的。默认情况下broker-partition映射每隔60秒刷新一次从Zookeeper,你可以将它改变为你想要的值,通过设置host.refreshFreqSecs。

StaticHosts

这是一种可替代的实现,其中brokerHost - >partition·信息是静态的。为了构建这个类的一个实例,您需要先构建GlobalPartitionInformation的一个实例。

代码块
languagejava
linenumberstrue
Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);

KafkaConfig

构建KafkaSpout的第二件事是KafkaConfig实例。

...

它们大多数是不用显示声明的除了MultiScheme。

MultiScheme

MultiScheme是一个接口,用来规定从Kafka消费的字节数组如何转化为Storm tuple,它也控制着你输出字段的名字。

...

默认的RawMultiScheme消费字节数组,然后返回一个字节数组类型的tuple,输出字段的名字是“bytes”。这里有可用选择的实现方式如SchemeAsMultiScheme和KeyValueSchemeMultiScheme,它们将字节数组转化为String类型。

Examples

Core Spout

代码块
languagejava
linenumberstrue
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Trident Spout

代码块
languagejava
linenumberstrue
TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);

KafkaSpout如何存储一个Kafka主题的偏移量,并且在失败的情况下重新获得

如上面的KafkaConfig属性所示,你能控制spout从Kafka topic读取的位置,通过设置KafkaConfig.startOffsetTime,如下所示:

...

这意味着当一个设置了KafkaConfig.startOffsetTime的Topology运行一次将不会影响Topology的后续运行,因为现在Topology将依赖ZooKeeper里的消费状态信息去决定从哪里开始读。如果你想要强制spout忽略ZooKeeper里存储的消费状态信息,你可以设置KafkaConfig.ignoreZkOffsets为true。如果为true,spout将总是从上面描述的KafkaConfig.startOffsetTime定义的偏移量开始读取数据。

用不同版本的Scala开发Storm-Kafka

Storm-Kafka的Kafka依赖被定义在maven的提供范围,这意味着它将不作为一个传递依赖了。这允许你使用一个特定版本Scala构建的Kafka。当我们用Storm-Kafka创建一个项目,我们必须显示的添加Kafka依赖。例如,使用构建在Scala 2.10上的Kafka 0.8.1.1,你将在你的pom.xml里使用下面的依赖:

...

注意ZooKeeper和log4j的依赖被排出在外防止与Storm依赖发生版本冲突。

Topology向Kafka里写入数据

你能够创建一个 org.apache.storm.kafka.bolt.KafkaBolt实例,将它作为你Topology的一个组件。如果你用Trident,你可以使用 org.apache.storm.kafka.trident.TridentState,org.apache.storm.kafka.trident.TridentStateFactory和org.apache.storm.kafka.trident.TridentKafkaUpdater。

你需要实现以下两个接口

TupleToKafkaMapper 和 TridentTupleToKafkaMapper

这些接口定义了两个方法:

代码块
languagejava
linenumberstrue
K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);

从名字就可以看出,我们调用这些方法将tuple映射到Kafka的键和消息。如果你只想要一个字段作为key,一个字段作value,你可以使用提供的实现类FieldNameBasedTupleToKafkaMapper.java。在KafkaBolt中,如果你使用默认的构造函数构造FieldNameBasedTupleToKafkaMapper,为了向后兼容的原因,这个实现类总是寻找字段名为key和message的字段。或者你也可以使用非默认构造函数去指定一个名字不同的key和message字段。在TridentKafkaState你必须指定key和message的字段名,因为这儿没有默认的构造器。当创建FieldNameBasedTupleToKafkaMapper实例的时候,这些也应该被指定。

KafkaTopicSelector 和 Trident KafkaTopicSelector

这个接口只有一个方法java public interface KafkaTopicSelector { String getTopics(Tuple/TridentTuple tuple); }。实现这个接口应该返回tuple应该发布到的主题,如果你返回null,这个消息将被忽略。如果你有一个静态的主题名称,你可以用DefaultTopicSelector.java,在构造器中设置主题。

...

你能指定所有的producer属性,参考http://kafka.apache.org/documentation.html#producerconfigs里的章节"Important configuration properties for the producer"。在你的Storm Topology Config里,你可以指定Kafka的属性通过kafka.broker.properties里的key。

实例展示

bolt:

代码块
languagejava
linenumberstrue
TopologyBuilder builder = new TopologyBuilder();

Fields fields = new Fields("key", "message");
FixedBatchSpout spout = new FixedBatchSpout(fields,
                                            4,
                                            new Values("storm", "1"),
                                            new Values("trident", "1"),
                                            new Values("needs", "1"),
                                            new Values("javadoc", "1")
);
spout.setCycle(true);
builder.setSpout("spout", spout, 5);
KafkaBolt bolt = new KafkaBolt()
                     .withTopicSelector(new DefaultTopicSelector("test"))
                     .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");

Config conf = new Config();
//set producer properties.
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());

...