页面树结构

版本比较

标识

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。

...

注:在下面的例子中,我们利用 om.google.common.collect.Lists 和com.google.common.collect.Maps.

插入数据库

为了插入数据到数据库表,这个包里的  bolt 和 trident state 被绑定到一张表。

ConnectionProvider

可以被不同连接池机制实现的接口

代码块
languagejava
linenumberstrue
public interface ConnectionProvider extends Serializable {
    /**
     * method must be idempotent.
     */
    void prepare();

    /**
     *
     * @return a DB connection over which the queries can be executed.
     */
    Connection getConnection();

    /**
     * called once when the system is shutting down, should be idempotent.
     */
    void cleanup();
}

org.apache.storm.jdbc.common.HikariCPConnectionProvider 是已经实现的类,用 HikariCP 实现的。

JdbcMapper

用 JDBC 向一张表里插入数据的主要 API 是 org.apache.storm.jdbc.mapper.JdbcMapper 接口:

...

getColumns() 方法定义了 Storm tuple 如何映射到数据库每一行的列里。返回的列表的顺序是重要的。语句里的占位符被返回的列表的顺序决定。例如,如果用户提供的插入语句是 insert into user(user_id, user_name, create_date) values (?,?, now()) ,getColumns() 方法返回的列表的第一个值将映射到第一个位置,第二个值映射到第二个位置等等。我们不分析提供的执行语句,通过列名解决占位符。未作出有关执行语句语法任何假设允许该连接被非标准 SQL 框架使用,例如 Pheonix  只支持 upsert into。

JdbcInsertBolt

使用 JdbcInsertBolt ,要构建一个实例,指定一个 ConnectionProvider 实现类和一个将 tuple 转化为 DB 行的 JdbcMapper 实现类。除此之外,你必须提供一个表名用 withTableName 方法,或者是一个插入语句用 withInsertQuery 方法。如果你指定一个插入语句,你需要保证你的 JdbcMapper 实现类返回的列的列表和插入语句的顺序相同。你可以选择指定一个语句执行超时参数,它指定了插入语句可以花费的最大时间。 默认地被设置为 topology.message.timeout.secs 的值,如果值为-1,表明没有执行超时时间设置。你应该设置语句执行超时时间小于等于 topology.message.timeout.secs。

代码块
languagejava
linenumberstrue
Map hikariConfigMap = Maps.newHashMap();
hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","password");
ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);

String tableName = "user_details";
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);

JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                                    .withTableName("user")
                                    .withQueryTimeoutSecs(30);
                                    Or
JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                                    .withInsertQuery("insert into user values (?,?)")
                                    .withQueryTimeoutSecs(30);  

SimpleJdbcMapper

storm-jdbc 包含一个通用的 JdbcMapper 实现类,叫做 SimpleJdbcMapper,它可以将 Storm tuple 映射到 Database 行。SimpleJdbcMapper 假设 Storm tuple 字段名与你打算写入的数据库表列名相同。

...

代码块
languagejava
linenumberstrue
List<Column> columnSchema = Lists.newArrayList(
    new Column("user_id", java.sql.Types.INTEGER),
    new Column("user_name", java.sql.Types.VARCHAR),
    new Column("dept_name", java.sql.Types.VARCHAR));
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);

JdbcTridentState

在 Trident Topology 中,我们也支持  trident 中,我们支持  trident persistent state。为了创建一个 jdbc persistent trident state,你需要用表名、插入语句、JdbcMapper 实例、ConnectionProvider实例初始化。看下面的例子:

...

类似于 JdbcInsertBolt,你可以用 withInsertQuery 指定一个插入语句,而不是指定一个表名。

Lookup from Database

我们支持从数据库中查询数据,写入 Storm tuples。在数据库上使用 JDBC 执行查询语句的主要 API 是 org.apache.storm.jdbc.mapper.JdbcLookupMapper 接口:

...

toTuple 方法以输入的 tuple 和 DB 查询结果列的列表为参数,返回一个值列表发射到 Topology中。注意,它返回的是 Values 的列表,而不是单个Values。这说明一个 DB 行能够被映射到多个输出 tuples。

SimpleJdbcLookupMapper

storm-jdbc 包含一个通用的 JdbcLookupMapper 实现类,叫 SimpleJdbcLookupMapper。

...

代码块
languagejava
linenumberstrue
Fields outputFields = new Fields("user_id", "user_name", "create_date");
List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);

 

 

 

 

JdbcLookupBolt

使用 JdbcLookupBolt,我们用一个 ConnectionProvider 实例,一个  JdbcLookupMapper 实例,要执行的查询语句去实例化它。你可以选择去指定一个语句执行时间超时参数,它指定了查询语句可以花费的最大时间。默认值被设为 topology.message.timeout.secs 的值。你应该设置改值小于等于 topology.message.timeout.secs 。

代码块
languagejava
linenumberstrue
String selectSql = "select user_name from user_details where user_id = ?";
SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns)
JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
        .withQueryTimeoutSecs(30);

JdbcTridentState for lookup

在 Trident Topology 中,我们支持 trident query state。

代码块
languagejava
linenumberstrue
JdbcState.Options options = new JdbcState.Options()
        .withConnectionProvider(connectionProvider)
        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
        .withSelectQuery("select user_name from user_details where user_id = ?");
        .withQueryTimeoutSecs(30);

例如:

在 src/test/java/topology 目录下有一个可运行的实例。

设置

确保你选择的数据库 JDBC 实现依赖包含在你的 build configuration 中。测试 Topology 执行下列语句,所以为了使测试 Topology 运行,你准备的 DB 必须支持这些语句。

代码块
languagejava
linenumberstrue
SQL
create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
create table if not exists department (dept_id integer, dept_name varchar(100)); create table if not exists user_department (user_id integer, dept_id integer);
insert into department values (1, 'R&D');
insert into department values (2, 'Finance');
insert into department values (3, 'HR');
insert into department values (4, 'Sales');
insert into user_department values (1, 1);
insert into user_department values (2, 2);
insert into user_department values (3, 3);
insert into user_department values (4, 4);
select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;

运行

运行 org.apache.storm.jdbc.topology.UserPersistanceTopology 类用 storm jar 命令。这个类需要5个参数 storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology [topology name] 。

为了使它与 MySQL 正常工作,我们将下面的加入到 pom.xml 中。

代码块
languagejava
linenumberstrue
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.31</version>
</dependency>

你可以用 mvn assembly plugin 生成一个带有依赖的单一 jar。为了使用 plugin,将下面的加入到 pom.xml 中,并且执行 mvn clean compile assembly:single。

代码块
languagejava
linenumberstrue
<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <archive>
            <manifest>
                <mainClass>fully.qualified.MainClass</mainClass>
            </manifest>
        </archive>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
</plugin>

MySQL 例子:

代码块
languagejava
linenumberstrue
storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar org.apache.storm.jdbc.topology.UserPersistanceTopology com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology

你可以在表 user 上执行一个查询语句,展示新插入的列:

代码块
languagejava
linenumberstrue
select * from user;

对于 Trident,你可以查看 org.apache.storm.jdbc.topology.UserPersistanceTridentTopology 。