页面树结构

2017-11-09 ApacheCN 开源组织,第二期邀请成员活动,一起走的更远 : http://www.apachecn.org/member/209.html


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

Storm/Trident 集成 JDBC。这个包包含  core bolts 和 trident states,它使 Storm Topology 要么向一个数据库表里插入 tuples,要么在数据库执行查询语句,写入Storm tuples。

注:在下面的例子中,我们利用 om.google.common.collect.Lists 和com.google.common.collect.Maps.

插入数据库

为了插入数据到数据库表,这个包里的  bolt 和 trident state 被绑定到一张表。

ConnectionProvider

可以被不同连接池机制实现的接口

public interface ConnectionProvider extends Serializable {
    /**
     * method must be idempotent.
     */
    void prepare();

    /**
     *
     * @return a DB connection over which the queries can be executed.
     */
    Connection getConnection();

    /**
     * called once when the system is shutting down, should be idempotent.
     */
    void cleanup();
}

org.apache.storm.jdbc.common.HikariCPConnectionProvider 是已经实现的类,用 HikariCP 实现的。

JdbcMapper

用 JDBC 向一张表里插入数据的主要 API 是 org.apache.storm.jdbc.mapper.JdbcMapper 接口:

public interface JdbcMapper  extends Serializable {
    List<Column> getColumns(ITuple tuple);
}

getColumns() 方法定义了 Storm tuple 如何映射到数据库每一行的列里。返回的列表的顺序是重要的。语句里的占位符被返回的列表的顺序决定。例如,如果用户提供的插入语句是 insert into user(user_id, user_name, create_date) values (?,?, now()) ,getColumns() 方法返回的列表的第一个值将映射到第一个位置,第二个值映射到第二个位置等等。我们不分析提供的执行语句,通过列名解决占位符。未作出有关执行语句语法任何假设允许该连接被非标准 SQL 框架使用,例如 Pheonix  只支持 upsert into。

JdbcInsertBolt

使用 JdbcInsertBolt ,要构建一个实例,指定一个 ConnectionProvider 实现类和一个将 tuple 转化为 DB 行的 JdbcMapper 实现类。除此之外,你必须提供一个表名用 withTableName 方法,或者是一个插入语句用 withInsertQuery 方法。如果你指定一个插入语句,你需要保证你的 JdbcMapper 实现类返回的列的列表和插入语句的顺序相同。你可以选择指定一个语句执行超时参数,它指定了插入语句可以花费的最大时间。 默认地被设置为 topology.message.timeout.secs 的值,如果值为-1,表明没有执行超时时间设置。你应该设置语句执行超时时间小于等于 topology.message.timeout.secs。

Map hikariConfigMap = Maps.newHashMap();
hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","password");
ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);

String tableName = "user_details";
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);

JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                                    .withTableName("user")
                                    .withQueryTimeoutSecs(30);
                                    Or
JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                                    .withInsertQuery("insert into user values (?,?)")
                                    .withQueryTimeoutSecs(30);  

SimpleJdbcMapper

storm-jdbc 包含一个通用的 JdbcMapper 实现类,叫做 SimpleJdbcMapper,它可以将 Storm tuple 映射到 Database 行。SimpleJdbcMapper 假设 Storm tuple 字段名与你打算写入的数据库表列名相同。

使用 SimpleJdbcMapper,你只需要告诉它你想写入的表名和提供一个 connectionProvider实例。

下面代码创建了一个 SimpleJdbcMapper 实例:

1.mapper 将 Storm tuple 转化为表 test.user_details 一行的各个列

2.使用提供的 HikariCP 配置来建立连接池与指定的数据库配置,并自动计算出你打算写列名和相应的数据类型的表。请查看 https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby 去了解更多关于 hikari 的配置属性。

Map hikariConfigMap = Maps.newHashMap();
hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","password");
ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
String tableName = "user_details";
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);

上面的例子里初始化的 mapper 假设 Storm tuple 拥有将要写入的数据库表的列的所有值。它的 getColumn 方法返回的列的顺序将按照 Jdbc 连接实例 connection.getMetaData().getColumns(); 方法返回的列的顺序。如果你在 JdbcInsertBolt 中指定自己的插入语句,你必须实例化 SimpleJdbcMapper 用明确的columnschema,schema 的列与你插入语句里的列顺序相同。例如,你的插入语句是 Insert into user (user_id, user_name) values (?,?) ,SimpleJdbcMapper 应该用下面的声明初始化:

java List<Column> columnSchema = Lists.newArrayList( new Column("user_id", java.sql.Types.INTEGER), new Column("user_name", java.sql.Types.VARCHAR));
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);

如果你的 Storm tuple 只有数据库表列的一部分字段,比如如果一些列有默认值,并且你只想插入没有默认值的列,你可以通过用明确的 columnschema 实例化 SimpleJdbcMapper 来实现这样的操作。例如,你有一张 user_details 表:

List<Column> columnSchema = Lists.newArrayList(
    new Column("user_id", java.sql.Types.INTEGER),
    new Column("user_name", java.sql.Types.VARCHAR),
    new Column("dept_name", java.sql.Types.VARCHAR));
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);

JdbcTridentState

在 Trident Topology 中,我们支持  trident persistent state。为了创建一个 jdbc persistent trident state,你需要用表名、插入语句、JdbcMapper 实例、ConnectionProvider实例初始化。看下面的例子:

JdbcState.Options options = new JdbcState.Options()
        .withConnectionProvider(connectionProvider)
        .withMapper(jdbcMapper)
        .withTableName("user_details")
        .withQueryTimeoutSecs(30);
JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);

类似于 JdbcInsertBolt,你可以用 withInsertQuery 指定一个插入语句,而不是指定一个表名。

Lookup from Database

我们支持从数据库中查询数据,写入 Storm tuples。在数据库上使用 JDBC 执行查询语句的主要 API 是 org.apache.storm.jdbc.mapper.JdbcLookupMapper 接口:

void declareOutputFields(OutputFieldsDeclarer declarer);
    List<Column> getColumns(ITuple tuple);
    List<Values> toTuple(ITuple input, List<Column> columns);

declareOutputFields 方法用来表明哪些字段将被发送作为 Storm tuple 的一部分。

getColumns 方法指定了查询语句中的占位符列,它们的 SQL 类型和要用的值。例如上面提及的 user_details 表,如果你正在执行一个查询语句 select user_name from user_details where user_id = ? and create_time > ? ,getColumns 方法以一个 tuple 作为输入,返回包含两个占位符字段的列表。列类型方法 getValue() 的第一个实例将被用作 user_id 的值,第二个实例将被用作 create_time 的值。注:返回的列表的顺序决定的占位符的值。换句话说列表中的第一个值对应查询语句中的第一个?,列表中的第二个值对应查询语句中的第二个?等等。

toTuple 方法以输入的 tuple 和 DB 查询结果列的列表为参数,返回一个值列表发射到 Topology中。注意,它返回的是 Values 的列表,而不是单个Values。这说明一个 DB 行能够被映射到多个输出 tuples。

SimpleJdbcLookupMapper

storm-jdbc 包含一个通用的 JdbcLookupMapper 实现类,叫 SimpleJdbcLookupMapper。

使用 SimpleJdbcMapper,你需要用 bolt 将要输出的字段和查询语句中占位符列的列表初始化它。下面的例子展示了一个 SimpleJdbcLookupMapper 初始化实例,它声明 user_id,user_name,create_date 作为输出字段 ,user_id 作为查询语句中的占位符列。SimpleJdbcMapper 假设 tuple 中的字段名和占位符列名相同。也就是说 SimpleJdbcMapper 将在输入的 tuple 中寻找 user_id 字段,用它的值作为查询语句中占位符的值。为了构建输出 tuples, SimpleJdbcMapper 将先在 输入 tuple 中查找输入字段中的字段,如果在输入 tuple 中没有找到,它将在查询语句输出结果中查找与输出字段名相同的列。所以在下面的例子中,如果输入 tuple 已经有字段 user_id, create_date,并且查询语句是 select user_name from user_details where user_id = ?, 对于每个输入 tuple ,SimpleJdbcLookupMapper.getColumns(tuple) 将返回 tuple.getValueByField("user_id") ,它将作为查询语句中?的值。对于每个 DB 输出行,SimpleJdbcLookupMapper.toTuple() 将用输入 tuple 中的 user_id,,create_date,查询结果中的 user_name 这三个字段作为一个输出 tuple。

Fields outputFields = new Fields("user_id", "user_name", "create_date");
List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);

JdbcLookupBolt

使用 JdbcLookupBolt,我们用一个 ConnectionProvider 实例,一个  JdbcLookupMapper 实例,要执行的查询语句去实例化它。你可以选择去指定一个语句执行时间超时参数,它指定了查询语句可以花费的最大时间。默认值被设为 topology.message.timeout.secs 的值。你应该设置改值小于等于 topology.message.timeout.secs 。

String selectSql = "select user_name from user_details where user_id = ?";
SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns)
JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
        .withQueryTimeoutSecs(30);

JdbcTridentState for lookup

在 Trident Topology 中,我们支持 trident query state。

JdbcState.Options options = new JdbcState.Options()
        .withConnectionProvider(connectionProvider)
        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
        .withSelectQuery("select user_name from user_details where user_id = ?");
        .withQueryTimeoutSecs(30);

例如:

在 src/test/java/topology 目录下有一个可运行的实例。

设置

确保你选择的数据库 JDBC 实现依赖包含在你的 build configuration 中。测试 Topology 执行下列语句,所以为了使测试 Topology 运行,你准备的 DB 必须支持这些语句。

SQL
create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
create table if not exists department (dept_id integer, dept_name varchar(100)); create table if not exists user_department (user_id integer, dept_id integer);
insert into department values (1, 'R&D');
insert into department values (2, 'Finance');
insert into department values (3, 'HR');
insert into department values (4, 'Sales');
insert into user_department values (1, 1);
insert into user_department values (2, 2);
insert into user_department values (3, 3);
insert into user_department values (4, 4);
select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;

运行

运行 org.apache.storm.jdbc.topology.UserPersistanceTopology 类用 storm jar 命令。这个类需要5个参数 storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology [topology name] 。

为了使它与 MySQL 正常工作,我们将下面的加入到 pom.xml 中。

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.31</version>
</dependency>

你可以用 mvn assembly plugin 生成一个带有依赖的单一 jar。为了使用 plugin,将下面的加入到 pom.xml 中,并且执行 mvn clean compile assembly:single。

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <archive>
            <manifest>
                <mainClass>fully.qualified.MainClass</mainClass>
            </manifest>
        </archive>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
</plugin>

MySQL 例子:

storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar org.apache.storm.jdbc.topology.UserPersistanceTopology com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology

你可以在表 user 上执行一个查询语句,展示新插入的列:

select * from user;

对于 Trident,你可以查看 org.apache.storm.jdbc.topology.UserPersistanceTridentTopology 。

 

  • 无标签