Storm 操作 HDFS文件系统组件。
用法
下面的例子将 tuple 字段以 "|" 分割,写入 HDFS 目录 hdfs://localhost:54310/foo 。每 1000 个 tuples 同步一次,写入到文件系统,是这些数据对其它 HDFS 客户端可见。当文件大小到达 5 M 时,它将轮流写入文件。
代码块 | ||||
---|---|---|---|---|
| ||||
// use "|" instead of "," for field delimiter RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); // sync the filesystem after every 1k tuples SyncPolicy syncPolicy = new CountSyncPolicy(1000); // rotate files when they reach 5MB FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/foo/"); HdfsBolt bolt = new HdfsBolt() .withFsUrl("hdfs://localhost:54310") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); |
...
将 Topology 打包
当给你的 Topology 打包的时候,相比 maven-assembly-plugin ,更重要的是使用 maven-shade-plugin 。
...
代码块 | ||||
---|---|---|---|---|
| ||||
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> |
指定一个 Hadoop 版本
默认地,storm-hdfs 使用下列 Hadoop 依赖:
...
代码块 | ||||
---|---|---|---|---|
| ||||
com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero) |
自定义
记录格式
记录格式可以通过实现 org.apache.storm.hdfs.format.RecordFormat 接口来控制:
...
提供的 org.apache.storm.hdfs.format.DelimitedRecordFormat 格式是能够生产的格式,如 CSV 文件和制表符分割文件。
文件命名
文件命名可以通过实现 org.apache.storm.hdfs.format.FileNameFormat 接口来控制:
...
默认情况下,prefix 为空, 文件扩展名为 ".txt"。
同步策略
同步策略可以通过实现 org.apache.storm.hdfs.sync.SyncPolicy 接口来控制将缓存数据刷新到低层文件系统(这样客户端就能够读取到这些数据)。
...
指定数目的 tuples 被处理完后,org.apache.storm.hdfs.sync.CountSyncPolicy 类将会触发一次同步操作。
文件循环策略
类似于同步策略,文件循环策略可以通过实现 org.apache.storm.hdfs.rotation.FileRotation 接口来控制数据文件的循环。
...
代码块 | ||||
---|---|---|---|---|
| ||||
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); |
文件循环动作
Hdfs Bolt 和 Trident State 实现类都可以注册任意数量的 RotationActions。RotationActions 提供一个触发器,这让你在文件循环之后,可以执行某个操作。例如,移动文件或文件重命名。
...
代码块 | ||||
---|---|---|---|---|
| ||||
HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions() .withFileNameFormat(fileNameFormat) .withSequenceFormat(new DefaultSequenceFormat("key", "data")) .withRotationPolicy(rotationPolicy) .withFsUrl("hdfs://localhost:54310") .addRotationAction(new MoveFileAction().withDestination("/dest2/")); |
支持 HDFS 序列化文件
org.apache.storm.hdfs.bolt.SequenceFileBolt 类可以将 Storm 数据写入 HDFS 序列化文件:
...
代码块 | ||||
---|---|---|---|---|
| ||||
public interface SequenceFormat extends Serializable { Class keyClass(); Class valueClass(); Writable key(Tuple tuple); Writable value(Tuple tuple); } |
Trident API
storm-hdfs 包含一个 Trident state 实现类,将数据写入HDFS,有一个 API 密切反映 bolts。
...
代码块 | ||||
---|---|---|---|---|
| ||||
HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions() .withFileNameFormat(fileNameFormat) .withSequenceFormat(new DefaultSequenceFormat("key", "data")) .withRotationPolicy(rotationPolicy) .withFsUrl("hdfs://localhost:54310") .addRotationAction(new MoveFileAction().toDestination("/dest2/")); |
操作具有安全性的 HDFS
如果 Topology 要操作具有安全性的 HDFS,bolts/states 需要 NameNode 认证。目前有两种方法解决。
使用 HDFS 授权 tokens
你的管理员可以配置 nimbus 代表 Topology 提交的用户自动获取授权 tokens 。nimbus 需要以下配置:
nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"]
nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"]
hdfs.keytab.file: "/path/to/keytab/on/nimbus" (这是 HDFS 超级用户的密钥表,超级用户具有最高权限可以管理其它用户)
hdfs.kerberos.principal: "superuser@EXAMPLE.com" nimbus.credential.renewers.freq.secs : 82800 (23小时,HDFS tokens 每24小时更新一次,所以这个值要小于24小时)
topology.hdfs.uri:"hdfs://host:port" (这是一个可选配置,默认情况下,我们将使用 Hadoop core-site.xml 里指定的 fs.defaultFS 属性的值)
你的 Topology 配置应该有 topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"]
如果 nimbus 没有上面的配置,你需要添加它,并重启。确保 Hadoop 的配置文件(core-site.xml and hdfs-site.xml)、storm-hdfs jar 和所有的依赖都在 nimbus 的 classpath 中。nimbus 将用配置文件中指定的 keytab 和 principal 与 NameNode 进行验证。从此,每次提交 Topology,nimbus 将模拟 Topology 提交的用户,并且代表 Topology 提交的用户去获取 tokens 。如果 Topology 中的 topology.auto-credentials 在开始时被设置为 AutoHDFS ,nimbus 将发送授权 tokens 给所有的 worker,hdfs bolt/state 将用这些 tokens 与 NameNode 进行认证。
要使 nimbus 能代表 Topology 提交用户,你必须保证 storm.kerberos.principal 指定的用户有权代表其他用户去获取 tokens。为了做到这一点,你需要按照这个链接配置 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html
你可以从这个链接里了解到关于搭建 Secure HDFS: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html
在所有的 worker 主机上使用密匙表
你可以使用这种方法,如果你已经将 HDFS 用户密匙表文件分配给所有潜在的 worker 主机。
你应该用方法 HdfsBolt/State.withconfigKey("somekey") 指定一个 HDFS 配置键,并且这个键映射的值应该包含下面两个属性:
hdfs.keytab.file: "/path/to/keytab/"
hdfs.kerberos.principal: "user@EXAMPLE.com"
在 worker 主机上运行的 bolt/trident-state 代码将用配置文件中配置的 principal 提供的密匙表与 NameNode 进行认证。这种方法是有点危险的,因为你需要保证每个 worke 在同一个目录下拥有密匙表,并且当你在集群中引入新的主机时,你需要记住去做同样的操作。