页面树结构

2017-11-09 ApacheCN 开源组织,第二期邀请成员活动,一起走的更远 : http://www.apachecn.org/member/209.html


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

为了消费Kafka 0.8.x里的数据,Storm提供了core Storm spout和Trident spout的实现。

Spouts

我们支持Trident spout和core Storm spout。对于两个spout的实现,我们用一个BrokerHost接口去追踪Kafka broker的主机分区映射和控制Kafka相关参数的配置文件。

BrokerHosts

为了初始化你的Kafka spout/emitter,你需要构建一个标记 BrokerHosts接口的实例。目前我们支持以下两种实现方式:

ZkHosts

当你想要动态的追踪 Kafka broker的分区映射时,你可以使用ZkHosts。这个类使用Kafka的ZooKeeper跟踪brokerHost - >partition映射。你可以通过调用实例化一个对象

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

brokerZkStr是IP:端口(例如:localhost:2181)。brokerZKPath是存储所有主题和分区信息的根目录。默认情况下是/brokers,Kafka默认使用的。默认情况下broker-partition映射每隔60秒刷新一次从Zookeeper,你可以将它改变为你想要的值,通过设置host.refreshFreqSecs。

StaticHosts

这是一种可替代的实现,其中brokerHost - >partition·信息是静态的。为了构建这个类的一个实例,您需要先构建GlobalPartitionInformation的一个实例。

Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);

KafkaConfig

构建KafkaSpout的第二件事是KafkaConfig实例。

public KafkaConfig(BrokerHosts hosts, String topic)
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)

BrokerHosts可以是如上所述BrokerHosts接口的任何实现。topic是Kafka主题。可选的clientId被用作ZooKeeper路径的一部分,去存储spout当前的消费偏移量。这两种扩展目前在使用。SpoutConfig是KafkaConfig的一个扩展,它支持与ZooKeeper的连接信息和控制KafkaSpout的特定行为相关的字段。ZKroot将被用作根目录去存储你的消费者偏移量。该编号应该唯一识别你的spout。

public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
public SpoutConfig(BrokerHosts hosts, String topic, String id);

你需要为不同的场景使用正确的SpoutConfig:

Core KafkaSpout只接受SpoutConfig实例。

TridentKafkaConfig是另一种KafkaConfig的扩展。

TridentKafkaEmitter只接受TridentKafkaConfig。

除了这些参数,SpoutConfig包含控制KafkaSpout行为方式的以下字段:

// setting for how often to save the current Kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;

// Exponential back-off retry settings.  These are used when retrying messages after a bolt
// calls OutputCollector.fail().
// Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
// resubmitting the message while still retrying.
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
public long retryDelayMaxMs = 60 * 1000;

// if set to true, spout will set Kafka topic as the emitted Stream ID
public boolean topicAsStreamId = false;

KafkaConfig类也有一些控制应用程序行为的公共变量。以下是默认设置:

public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;

它们大多数是不用显示声明的除了MultiScheme。

MultiScheme

MultiScheme是一个接口,用来规定从Kafka消费的字节数组如何转化为Storm tuple,它也控制着你输出字段的名字。

public Iterable<List<Object>> deserialize(byte[] ser);
public Fields getOutputFields();

默认的RawMultiScheme消费字节数组,然后返回一个字节数组类型的tuple,输出字段的名字是“bytes”。这里有可用选择的实现方式如SchemeAsMultiScheme和KeyValueSchemeMultiScheme,它们将字节数组转化为String类型。

Examples

Core Spout

BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Trident Spout

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);

KafkaSpout如何存储一个Kafka主题的偏移量,并且在失败的情况下重新获得

如上面的KafkaConfig属性所示,你能控制spout从Kafka topic读取的位置,通过设置KafkaConfig.startOffsetTime,如下所示:

1、kafka.api.OffsetRequest.EarliestTime():从主题最开始的位置读取(即从最原始的消息开始)

2、kafka.api.OffsetRequest.LatestTime():从主题结尾处的位置读取(即任何正在写入主题的新消息)

3、Unix时间戳aka秒(例如通过System.currentTimeMillis()):详细看Kafka FAQ里的"How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?"

当Topology运行时,Kafka spout跟踪它读取和发送的消息偏移量,并把状态信息存储ZooKeeper路径下SpoutConfig.zkRoot+ "/" + SpoutConfig.id。在失败的情况下从ZooKeeper恢复最后写入的偏移量。

重要说明:当重新部署Topology的时候,我们要确保SpoutConfig.zkRoot和SpoutConfig.id没有被改变,否则spout将不能够从ZooKeeper读取它之前的消费状态信息(即偏移量),根据你的场景,这将导致不期望的数据丢失。

这意味着当一个设置了KafkaConfig.startOffsetTime的Topology运行一次将不会影响Topology的后续运行,因为现在Topology将依赖ZooKeeper里的消费状态信息去决定从哪里开始读。如果你想要强制spout忽略ZooKeeper里存储的消费状态信息,你可以设置KafkaConfig.ignoreZkOffsets为true。如果为true,spout将总是从上面描述的KafkaConfig.startOffsetTime定义的偏移量开始读取数据。

用不同版本的Scala开发Storm-Kafka

Storm-Kafka的Kafka依赖被定义在maven的提供范围,这意味着它将不作为一个传递依赖了。这允许你使用一个特定版本Scala构建的Kafka。当我们用Storm-Kafka创建一个项目,我们必须显示的添加Kafka依赖。例如,使用构建在Scala 2.10上的Kafka 0.8.1.1,你将在你的pom.xml里使用下面的依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.1.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>

注意ZooKeeper和log4j的依赖被排出在外防止与Storm依赖发生版本冲突。

Topology向Kafka里写入数据

你能够创建一个 org.apache.storm.kafka.bolt.KafkaBolt实例,将它作为你Topology的一个组件。如果你用Trident,你可以使用 org.apache.storm.kafka.trident.TridentState,org.apache.storm.kafka.trident.TridentStateFactory和org.apache.storm.kafka.trident.TridentKafkaUpdater。

你需要实现以下两个接口

TupleToKafkaMapper 和 TridentTupleToKafkaMapper

这些接口定义了两个方法:

K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);

从名字就可以看出,我们调用这些方法将tuple映射到Kafka的键和消息。如果你只想要一个字段作为key,一个字段作value,你可以使用提供的实现类FieldNameBasedTupleToKafkaMapper.java。在KafkaBolt中,如果你使用默认的构造函数构造FieldNameBasedTupleToKafkaMapper,为了向后兼容的原因,这个实现类总是寻找字段名为key和message的字段。或者你也可以使用非默认构造函数去指定一个名字不同的key和message字段。在TridentKafkaState你必须指定key和message的字段名,因为这儿没有默认的构造器。当创建FieldNameBasedTupleToKafkaMapper实例的时候,这些也应该被指定。

KafkaTopicSelector 和 Trident KafkaTopicSelector

这个接口只有一个方法java public interface KafkaTopicSelector { String getTopics(Tuple/TridentTuple tuple); }。实现这个接口应该返回tuple应该发布到的主题,如果你返回null,这个消息将被忽略。如果你有一个静态的主题名称,你可以用DefaultTopicSelector.java,在构造器中设置主题。

指定Kafka producer的属性

你能指定所有的producer属性,参考http://kafka.apache.org/documentation.html#producerconfigs里的章节"Important configuration properties for the producer"。在你的Storm Topology Config里,你可以指定Kafka的属性通过kafka.broker.properties里的key。

实例展示

bolt:

TopologyBuilder builder = new TopologyBuilder();

Fields fields = new Fields("key", "message");
FixedBatchSpout spout = new FixedBatchSpout(fields,
                                            4,
                                            new Values("storm", "1"),
                                            new Values("trident", "1"),
                                            new Values("needs", "1"),
                                            new Values("javadoc", "1")
);
spout.setCycle(true);
builder.setSpout("spout", spout, 5);
KafkaBolt bolt = new KafkaBolt()
                     .withTopicSelector(new DefaultTopicSelector("test"))
                     .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");

Config conf = new Config();
//set producer properties.
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());

Trident:

Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields,
                                            4,
                                            new Values("storm", "1"),
                                            new Values("trident", "1"),
                                            new Values("needs", "1"),
                                            new Values("javadoc", "1")
);
spout.setCycle(true);

TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);

TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
                                            .withKafkaTopicSelector(new DefaultTopicSelector("test"))
                                            .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());

Config conf = new Config();
//set producer properties.
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
  • 无标签