页面树结构

2017-11-09 ApacheCN 开源组织,第二期邀请成员活动,一起走的更远 : http://www.apachecn.org/member/209.html


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

Storm/Trident 集成 Apache HBase

用法

操作 HBase 的主要 API 是 org.apache.storm.hbase.bolt.mapper.HBaseMapper 接口:

public interface HBaseMapper extends Serializable {
    byte[] rowKey(Tuple tuple);

    ColumnList columns(Tuple tuple);
}

rowKey 方法是很明确的:输入一个 Storm tuple,返回一个字节数组代表 rowkey。

columns 方法定义了向 HBase 的列写入什么数据。ColumnList 类允许你添加标准的 HBase 列和 HBase 计数器列。

添加一个标准列用 addColumn() 方法:

ColumnList cols = new ColumnList();
cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));

添加一个计数器列,用 addCounter() 方法:

ColumnList cols = new ColumnList();
cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));

当启用远程 HBase 的安全性,Kerberos 的 keytab 和相关的 principal 名称需要提供给 Storm-HBase 连接器。具体地,传递给 Topology 的配置对象应该包含{(“storm.keytab.file”, “$keytab”), ("storm.kerberos.principal", “$principal”)}。例如:

Config config = new Config();
...
config.put("storm.keytab.file", "$keytab");
config.put("storm.kerberos.principal", "$principle");
StormSubmitter.submitTopology("$topologyName", config, builder.createTopology());

使用授权 tokens 操作 Secure HBASE

如果你的 Topology 要操作具有安全性的 HBase,你的 bolts/states 需要被 HBase 认证。上面描述的方法要求所有潜在的主机拥有 storm.keytab.file。如果你的集群上有多个 Topologies,各有不同的 HBase用户,你将必须创建多个 keytab,并将它分发给所有的 workers。你能使用下面的方法替代这种做法:

你的管理员可以配置 nimbus 代表 Topology 提交的用户自动获取授权 tokens 。nimbus 需要以下配置:

nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"]
nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"]
hbase.keytab.file: "/path/to/keytab/on/nimbus"(这是 HBase 超级用户的密钥表,超级用户具有最高权限可以管理其它用户)
hbase.kerberos.principal: "superuser@EXAMPLE.com"
nimbus.credential.renewers.freq.secs : 518400(6天,HBase 授权 tokens  默认7天失效,并且不能够被更新。如果你在 hbase-site.xml 里对 hbase.auth.token.max.lifetime 有自定义的设置,你应该确保这个值至少比那个少一个小时。)

你的 Topology 配置应该有 topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"]。

如果 nimbus 没有上面的配置,你需要添加它,并重启。确保 HBase 的配置文件(core-site.xml,hdfs-site.xml and hbase-site.xml)、storm-hbase jar 和所有的依赖都在 nimbus 的 classpath 中。nimbus 将用配置文件中指定的 keytab 和 principal 与 HBase 进行验证。从此,每次提交 Topology,nimbus 将模拟 Topology 提交的用户,并且代表 Topology 提交的用户去获取 tokens 。如果 Topology 中的 topology.auto-credentials 在开始时被设置为 AutoHBase ,nimbus 将发送授权 tokens 给所有的 worker,HBase bolt/state 将用这些 tokens 进行认证。

要使 nimbus 能代表 Topology 提交用户,你必须保证 storm.kerberos.principal 指定的用户有权代表其他用户去获取 tokens。为了做到这一点,你需要按照这个链接配置

http://hbase.apache.org/book/security.html#security.rest.gateway

你可以从这个链接里了解到关于搭建 Secure HBASE :http://hbase.apache.org/book/security.html

SimpleHBaseMapper

storm-hbase 包含一个通用的 HBaseMapper 实现,叫做 SimpleHBaseMapper,它能够将 Storm  的 tuples 映射到 HBase 标准列和 HBase 计数器列。

使用 SimpleHBaseMapper,你只需要告诉它哪些字段映射到哪些列。

下面的代码创建了一个 SimepleHBaseMapper 实例:

1.用 word 这个字段的值做 row key。

2.为 word 字段增加一个标准列。

3.为 count 字段增加一个计数器列。

4.将值写到列簇 cf。

SimpleHBaseMapper mapper = new SimpleHBaseMapper() 
        .withRowKeyField("word")
        .withColumnFields(new Fields("word"))
        .withCounterFields(new Fields("count"))
        .withColumnFamily("cf");

HBaseBolt

要使用 HBaseBolt,我们需要写入表的名字和一个 HBaseMapper 的实现。

HBaseBolt hbase = new HBaseBolt("WordCount", mapper);

HBaseBolt 将使用 mapper 实例将 tuple 数据持久化到 HBase 中。

HBaseValueMapper

这个类可以将 HBase 的查找结果转化为 Storm HBaseLookupBolt 要发出的值。

public interface HBaseValueMapper extends Serializable {
    public List<Values> toTuples(Result result) throws Exception;
    void declareOutputFields(OutputFieldsDeclarer declarer);
}

toTuples 方法以HBase Result为参数,返回一个值列表。返回列表中的每一个值将被 HBaseLookupBolt 发出。

declareOutputFields 方法通常用来声明 HBaseLookupBolt 输出的字段。

在 src/test/java 目录里有一个实现例子。

HBaseProjectionCriteria

这个类可以让你指定 HBase Get 函数的投影列。这是 lookupBolt 的一个可选参数,如果你不指定此类实例,所有的列将会被 HBaseLookupBolt 返回。

public class HBaseProjectionCriteria implements Serializable {
    public HBaseProjectionCriteria addColumnFamily(String columnFamily);
    public HBaseProjectionCriteria addColumn(ColumnMetaData column);

addColumnFamily 方法以列簇为参数,设置这个参数意味着,这个列簇里的所有列将被投影。

addColumn 方法以 columnMetaData 实例为参数,设置这个参数意味着这个列簇只有这一列将会被投影。下面的代码创建了一个 projectionCriteria,它指定了投影条件:

1.列簇 cf 里的 count 列。

2.列簇 cf2 里的所有列。

HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria()
    .addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"))
    .addColumnFamily("cf2");

 HBaseLookupBolt

创建一个 HBaseLookupBolt 你需要写入的表名,一个 HBaseMapper 实现类和一个 HBaseRowToStormValueMapper 的实现类。你也可以指定一个 HBaseProjectionCriteria。HBaseLookupBolt 用HBaseMapper 获取 rowKey 去查找,用 HBaseProjectionCriteria 去指定哪些列包含在结果里,用 HBaseRowToStormValueMapper 让 bolt 将这些数据发射出去。

你可以在 src/test/java 目录下看到一个 Topology 例子 LookupWordCount.java。

例如:Word Count

在 src/test/java 目录下可找到可运行的例子。

Setup

下面步骤是假定你是在本地运行 HBase,或者是你的 classpath 里有一个hbase-site.xml 指向你的集群。

用 hbase shell 命令创建 schema。

> create 'WordCount', 'cf'

Execution

运行 org.apache.storm.hbase.topology.PersistenWordCount 类(它将运行 Topology 10秒,然后退出)。

当 word count topology 正在运行时,运行 org.apache.storm.hbase.topology.WordCountClient 类去查看存储在HBase里的计数值。你应该看到以下输出:

Word: 'apple', Count: 6867
Word: 'orange', Count: 6645
Word: 'pineapple', Count: 6954
Word: 'banana', Count: 6787
Word: 'watermelon', Count: 6806

示例 Topology 如下,仅供参考:

public class PersistentWordCount {
    private static final String WORD_SPOUT = "WORD_SPOUT";
    private static final String COUNT_BOLT = "COUNT_BOLT";
    private static final String HBASE_BOLT = "HBASE_BOLT";


    public static void main(String[] args) throws Exception {
        Config config = new Config();

        WordSpout spout = new WordSpout();
        WordCounter bolt = new WordCounter();

        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
                .withRowKeyField("word")
                .withColumnFields(new Fields("word"))
                .withCounterFields(new Fields("count"))
                .withColumnFamily("cf");

        HBaseBolt hbase = new HBaseBolt("WordCount", mapper);


        // wordSpout ==> countBolt ==> HBaseBolt
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(WORD_SPOUT, spout, 1);
        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
        builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));


        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();
            System.exit(0);
        } else {
            config.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        }
    }
}
  • 无标签