页面树结构

版本比较

标识

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。

Storm 操作 HDFS文件系统组件。

用法

下面的例子将 tuple 字段以 "|" 分割,写入 HDFS 目录 hdfs://localhost:54310/foo 。每 1000 个 tuples 同步一次,写入到文件系统,是这些数据对其它 HDFS 客户端可见。当文件大小到达 5 M 时,它将轮流写入文件。

代码块
languagejava
linenumberstrue
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
        .withFieldDelimiter("|");

// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

FileNameFormat fileNameFormat = new DefaultFileNameFormat()
        .withPath("/foo/");

HdfsBolt bolt = new HdfsBolt()
        .withFsUrl("hdfs://localhost:54310")
        .withFileNameFormat(fileNameFormat)
        .withRecordFormat(format)
        .withRotationPolicy(rotationPolicy)
        .withSyncPolicy(syncPolicy);

...

Topology 打包

当给你的 Topology 打包的时候,相比 maven-assembly-plugin ,更重要的是使用 maven-shade-plugin 。

...

代码块
languagejava
linenumberstrue
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>1.4</version>
    <configuration>
        <createDependencyReducedPom>true</createDependencyReducedPom>
    </configuration>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass></mainClass>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>

指定一个 Hadoop 版本

默认地,storm-hdfs 使用下列 Hadoop 依赖:

...

代码块
languagejava
linenumberstrue
com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)

自定义

记录格式

记录格式可以通过实现 org.apache.storm.hdfs.format.RecordFormat 接口来控制:

...

提供的 org.apache.storm.hdfs.format.DelimitedRecordFormat 格式是能够生产的格式,如 CSV 文件和制表符分割文件。

文件命名

文件命名可以通过实现 org.apache.storm.hdfs.format.FileNameFormat 接口来控制:

...

默认情况下,prefix 为空, 文件扩展名为 ".txt"。

同步策略

同步策略可以通过实现 org.apache.storm.hdfs.sync.SyncPolicy 接口来控制将缓存数据刷新到低层文件系统(这样客户端就能够读取到这些数据)。

...

指定数目的 tuples 被处理完后,org.apache.storm.hdfs.sync.CountSyncPolicy 类将会触发一次同步操作。

文件循环策略

类似于同步策略,文件循环策略可以通过实现 org.apache.storm.hdfs.rotation.FileRotation  接口来控制数据文件的循环。

...

代码块
languagejava
linenumberstrue
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

文件循环动作

Hdfs Bolt 和 Trident State 实现类都可以注册任意数量的 RotationActions。RotationActions 提供一个触发器,这让你在文件循环之后,可以执行某个操作。例如,移动文件或文件重命名。

...

代码块
languagejava
linenumberstrue
HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
                .withFileNameFormat(fileNameFormat)
                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
                .withRotationPolicy(rotationPolicy)
                .withFsUrl("hdfs://localhost:54310")
                .addRotationAction(new MoveFileAction().withDestination("/dest2/"));

支持 HDFS 序列化文件

org.apache.storm.hdfs.bolt.SequenceFileBolt 类可以将 Storm 数据写入 HDFS 序列化文件:

...

代码块
languagejava
linenumberstrue
public interface SequenceFormat extends Serializable {
    Class keyClass();
    Class valueClass();

    Writable key(Tuple tuple);
    Writable value(Tuple tuple);
}

Trident API

storm-hdfs 包含一个 Trident state 实现类,将数据写入HDFS,有一个 API 密切反映 bolts。

...

代码块
languagejava
linenumberstrue
HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
                .withFileNameFormat(fileNameFormat)
                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
                .withRotationPolicy(rotationPolicy)
                .withFsUrl("hdfs://localhost:54310")
                .addRotationAction(new MoveFileAction().toDestination("/dest2/"));

操作具有安全性的 HDFS

如果 Topology 要操作具有安全性的 HDFS,bolts/states 需要 NameNode 认证。目前有两种方法解决。

使用 HDFS 授权 tokens

你的管理员可以配置 nimbus 代表 Topology 提交的用户自动获取授权 tokens 。nimbus 需要以下配置:

nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"]

nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"]

hdfs.keytab.file: "/path/to/keytab/on/nimbus"  (这是 HDFS 超级用户的密钥表,超级用户具有最高权限可以管理其它用户)

hdfs.kerberos.principal: "superuser@EXAMPLE.com" nimbus.credential.renewers.freq.secs : 82800 (23小时,HDFS tokens 每24小时更新一次,所以这个值要小于24小时)

topology.hdfs.uri:"hdfs://host:port" (这是一个可选配置,默认情况下,我们将使用 Hadoop core-site.xml 里指定的 fs.defaultFS 属性的值)

你的 Topology 配置应该有 topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"]

如果 nimbus 没有上面的配置,你需要添加它,并重启。确保 Hadoop 的配置文件(core-site.xml and hdfs-site.xml)、storm-hdfs jar 和所有的依赖都在 nimbus 的 classpath 中。nimbus 将用配置文件中指定的 keytab 和 principal 与 NameNode 进行验证。从此,每次提交 Topology,nimbus 将模拟 Topology 提交的用户,并且代表 Topology 提交的用户去获取 tokens 。如果 Topology 中的 topology.auto-credentials 在开始时被设置为 AutoHDFS ,nimbus 将发送授权 tokens 给所有的 worker,hdfs bolt/state 将用这些 tokens 与 NameNode 进行认证。

要使 nimbus 能代表 Topology 提交用户,你必须保证 storm.kerberos.principal 指定的用户有权代表其他用户去获取 tokens。为了做到这一点,你需要按照这个链接配置 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html

你可以从这个链接里了解到关于搭建 Secure HDFS: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html

在所有的 worker 主机上使用密匙表

你可以使用这种方法,如果你已经将 HDFS 用户密匙表文件分配给所有潜在的 worker 主机。

你应该用方法 HdfsBolt/State.withconfigKey("somekey") 指定一个 HDFS 配置键,并且这个键映射的值应该包含下面两个属性:

hdfs.keytab.file: "/path/to/keytab/"

hdfs.kerberos.principal: "user@EXAMPLE.com"

在 worker 主机上运行的 bolt/trident-state 代码将用配置文件中配置的 principal 提供的密匙表与 NameNode 进行认证。这种方法是有点危险的,因为你需要保证每个 worke 在同一个目录下拥有密匙表,并且当你在集群中引入新的主机时,你需要记住去做同样的操作。