页面树结构

2017-11-09 ApacheCN 开源组织,第二期邀请成员活动,一起走的更远 : http://www.apachecn.org/member/209.html


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

Storm 操作 HDFS文件系统组件。

用法

下面的例子将 tuple 字段以 "|" 分割,写入 HDFS 目录 hdfs://localhost:54310/foo 。每 1000 个 tuples 同步一次,写入到文件系统,是这些数据对其它 HDFS 客户端可见。当文件大小到达 5 M 时,它将轮流写入文件。

// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
        .withFieldDelimiter("|");

// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

FileNameFormat fileNameFormat = new DefaultFileNameFormat()
        .withPath("/foo/");

HdfsBolt bolt = new HdfsBolt()
        .withFsUrl("hdfs://localhost:54310")
        .withFileNameFormat(fileNameFormat)
        .withRecordFormat(format)
        .withRotationPolicy(rotationPolicy)
        .withSyncPolicy(syncPolicy);

将 Topology 打包

当给你的 Topology 打包的时候,相比 maven-assembly-plugin ,更重要的是使用 maven-shade-plugin 。

shade plugin 提供了合并 JAR 列表的工具,而Hadoop 客户端利用 URL scheme 解决。

如果你有如下错误:

java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs

这表示你的 Topology jar 文件没有打包正确。

如果你用 maven 去给你的 Topology 打 jar 包,你应该用下面的 maven-shade-plugin 配置文件去打 jar 包:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>1.4</version>
    <configuration>
        <createDependencyReducedPom>true</createDependencyReducedPom>
    </configuration>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass></mainClass>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>

指定一个 Hadoop 版本

默认地,storm-hdfs 使用下列 Hadoop 依赖:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

如果你使用不同版本的 Hadoop ,你应该排除 storm-hdfs 依赖里的 Hadoop 依赖库,并且在 pom 里加入你选择版本的依赖。

Hadoop客户端版本不兼容,显示如下错误:

com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)

自定义

记录格式

记录格式可以通过实现 org.apache.storm.hdfs.format.RecordFormat 接口来控制:

public interface RecordFormat extends Serializable {
    byte[] format(Tuple tuple);
}

提供的 org.apache.storm.hdfs.format.DelimitedRecordFormat 格式是能够生产的格式,如 CSV 文件和制表符分割文件。

文件命名

文件命名可以通过实现 org.apache.storm.hdfs.format.FileNameFormat 接口来控制:

public interface FileNameFormat extends Serializable {
    void prepare(Map conf, TopologyContext topologyContext);
    String getName(long rotation, long timeStamp);
    String getPath();
}

提供的 org.apache.storm.hdfs.format.DefaultFileNameFormat 将创建下列格式的文件名:

 {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}

例如:

MyBolt-5-7-1390579837830.txt

默认情况下,prefix 为空, 文件扩展名为 ".txt"。

同步策略

同步策略可以通过实现 org.apache.storm.hdfs.sync.SyncPolicy 接口来控制将缓存数据刷新到低层文件系统(这样客户端就能够读取到这些数据)。

接口:

public interface SyncPolicy extends Serializable {
    boolean mark(Tuple tuple, long offset);
    void reset();
}

HdfsBolt 将会为每个它处理的 tuple 调用 mark() 方法,当返回 true 的时候,将触发 HdfsBolt 去执行同步写入,之后将调用 reset() 方法。

指定数目的 tuples 被处理完后,org.apache.storm.hdfs.sync.CountSyncPolicy 类将会触发一次同步操作。

文件循环策略

类似于同步策略,文件循环策略可以通过实现 org.apache.storm.hdfs.rotation.FileRotation  接口来控制数据文件的循环。

public interface FileRotationPolicy extends Serializable {
    boolean mark(Tuple tuple, long offset);
    void reset();
}

当数据文件到达一个指定的文件大小,org.apache.storm.hdfs.rotation.FileSizeRotationPolicy 实现类会触发文件循环操作:

FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

文件循环动作

Hdfs Bolt 和 Trident State 实现类都可以注册任意数量的 RotationActions。RotationActions 提供一个触发器,这让你在文件循环之后,可以执行某个操作。例如,移动文件或文件重命名。

public interface RotationAction extends Serializable {
    void execute(FileSystem fileSystem, Path filePath) throws IOException;
}

Storm-HDFS 包含一个简单的动作,在循环之后,它将移动文件:

public class MoveFileAction implements RotationAction {
    private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);

    private String destination;

    public MoveFileAction withDestination(String destDir){
        destination = destDir;
        return this;
    }

    @Override
    public void execute(FileSystem fileSystem, Path filePath) throws IOException {
        Path destPath = new Path(destination, filePath.getName());
        LOG.info("Moving file {} to {}", filePath, destPath);
        boolean success = fileSystem.rename(filePath, destPath);
        return;
    }
}

如果你使用 Trident 和序列化文件,你可以按如下操作:

HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
                .withFileNameFormat(fileNameFormat)
                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
                .withRotationPolicy(rotationPolicy)
                .withFsUrl("hdfs://localhost:54310")
                .addRotationAction(new MoveFileAction().withDestination("/dest2/"));

支持 HDFS 序列化文件

org.apache.storm.hdfs.bolt.SequenceFileBolt 类可以将 Storm 数据写入 HDFS 序列化文件:

 // sync the filesystem after every 1k tuples
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);

        // rotate files when they reach 5MB
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                .withExtension(".seq")
                .withPath("/data/");

        // create sequence format instance.
        DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence");

        SequenceFileBolt bolt = new SequenceFileBolt()
                .withFsUrl("hdfs://localhost:54310")
                .withFileNameFormat(fileNameFormat)
                .withSequenceFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy)
                .withCompressionType(SequenceFile.CompressionType.RECORD)
                .withCompressionCodec("deflate");

SequenceFileBolt 需要你提供一个 org.apache.storm.hdfs.bolt.format.SequenceFormat 将 tuples 映射为 key/value 对。

public interface SequenceFormat extends Serializable {
    Class keyClass();
    Class valueClass();

    Writable key(Tuple tuple);
    Writable value(Tuple tuple);
}

Trident API

storm-hdfs 包含一个 Trident state 实现类,将数据写入HDFS,有一个 API 密切反映 bolts。

Fields hdfsFields = new Fields("field1", "field2");

         FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                 .withPath("/trident")
                 .withPrefix("trident")
                 .withExtension(".txt");

         RecordFormat recordFormat = new DelimitedRecordFormat()
                 .withFields(hdfsFields);

         FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);

        HdfsState.Options options = new HdfsState.HdfsFileOptions()
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(recordFormat)
                .withRotationPolicy(rotationPolicy)
                .withFsUrl("hdfs://localhost:54310");

         StateFactory factory = new HdfsStateFactory().withOptions(options);

         TridentState state = stream
                 .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());

如果用 sequence file State 实现类,可以用 HdfsState.SequenceFileOptions:

HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
                .withFileNameFormat(fileNameFormat)
                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
                .withRotationPolicy(rotationPolicy)
                .withFsUrl("hdfs://localhost:54310")
                .addRotationAction(new MoveFileAction().toDestination("/dest2/"));

操作具有安全性的 HDFS

如果 Topology 要操作具有安全性的 HDFS,bolts/states 需要 NameNode 认证。目前有两种方法解决。

使用 HDFS 授权 tokens

你的管理员可以配置 nimbus 代表 Topology 提交的用户自动获取授权 tokens 。nimbus 需要以下配置:

nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"]

nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"]

hdfs.keytab.file: "/path/to/keytab/on/nimbus"  (这是 HDFS 超级用户的密钥表,超级用户具有最高权限可以管理其它用户)

hdfs.kerberos.principal: "superuser@EXAMPLE.com" nimbus.credential.renewers.freq.secs : 82800 (23小时,HDFS tokens 每24小时更新一次,所以这个值要小于24小时)

topology.hdfs.uri:"hdfs://host:port" (这是一个可选配置,默认情况下,我们将使用 Hadoop core-site.xml 里指定的 fs.defaultFS 属性的值)

你的 Topology 配置应该有 topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"]

如果 nimbus 没有上面的配置,你需要添加它,并重启。确保 Hadoop 的配置文件(core-site.xml and hdfs-site.xml)、storm-hdfs jar 和所有的依赖都在 nimbus 的 classpath 中。nimbus 将用配置文件中指定的 keytab 和 principal 与 NameNode 进行验证。从此,每次提交 Topology,nimbus 将模拟 Topology 提交的用户,并且代表 Topology 提交的用户去获取 tokens 。如果 Topology 中的 topology.auto-credentials 在开始时被设置为 AutoHDFS ,nimbus 将发送授权 tokens 给所有的 worker,hdfs bolt/state 将用这些 tokens 与 NameNode 进行认证。

要使 nimbus 能代表 Topology 提交用户,你必须保证 storm.kerberos.principal 指定的用户有权代表其他用户去获取 tokens。为了做到这一点,你需要按照这个链接配置 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html

你可以从这个链接里了解到关于搭建 Secure HDFS: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html

在所有的 worker 主机上使用密匙表

你可以使用这种方法,如果你已经将 HDFS 用户密匙表文件分配给所有潜在的 worker 主机。

你应该用方法 HdfsBolt/State.withconfigKey("somekey") 指定一个 HDFS 配置键,并且这个键映射的值应该包含下面两个属性:

hdfs.keytab.file: "/path/to/keytab/"

hdfs.kerberos.principal: "user@EXAMPLE.com"

在 worker 主机上运行的 bolt/trident-state 代码将用配置文件中配置的 principal 提供的密匙表与 NameNode 进行认证。这种方法是有点危险的,因为你需要保证每个 worke 在同一个目录下拥有密匙表,并且当你在集群中引入新的主机时,你需要记住去做同样的操作。


  • 无标签