更新时间:2024-08-03 GMT+08:00

Storm-JDBC开发指引

操作场景

本文档主要说明如何使用开源Storm-JDBC工具包,完成Storm和JDBC之间的交互。Storm-JDBC中包含两类Bolt:JdbcInsertBolt和JdbcLookupBolt。其中,JdbcLookupBolt主要负责从数据库中查数据,JdbcInsertBolt主要向数据库中存数据。当然,JdbcLookupBolt和JdbcInsertBolt中也可以增加处理逻辑对数据进行处理。

本章节只适用与MRS产品Storm与JDBC组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。

应用开发操作步骤

  1. 确认Storm组件已经安装,且正常运行。
  2. 下载Storm客户端,将Storm样例工程导入到Eclipse开发环境,参见导入并配置Storm样例工程
  3. 用WinScp工具将Storm客户端导入Linux环境并安装,具体请参见准备Linux客户端环境

数据库配置—Derby数据库配置过程

  1. 首先应下载一个数据库,可根据具体场景选择最适合的数据库。

    该任务以Derby数据库为例。Derby是一个小型的,java编写的,易于使用却适合大多数应用程序的开放源码数据库。

  2. Derby数据库的获取。在官网下载最新版的Derby数据库(本示例使用10.14.1.0),通过WinScp等工具传入Linux客户端,并解压。
  3. 在Derby的安装目录下,进入bin目录,输入如下命令。

    export DERBY_INSTALL=/opt/db-derby-10.14.1.0-bin

    export CLASSPATH=$DERBY_INSTALL/lib/derbytools.jar:$DERBY_INSTALL\lib\derbynet.jar:.

    export DERBY_HOME=/opt/db-derby-10.14.1.0-bin

    . setNetworkServerCP

    ./startNetworkServer -h 主机名

  4. 执行./ij命令,输入connect 'jdbc:derby://主机名:1527/example;create=true';,建立连接。

    • 执行./ij命令前,需要确保已配置java_home,可通过which java命令检查是否已配置。

  5. 数据库建立好后,可以执行sql语句进行操作,需要建立两张表ORIGINAL和GOAL,并向ORIGINAL中插入一组数据,命令如下:(表名仅供参考,可自行设定)

    CREATE TABLE GOAL(WORD VARCHAR(12),COUNT INT );

    CREATE TABLE ORIGINAL(WORD VARCHAR(12),COUNT INT );

    INSERT INTO ORIGINAL VALUES('orange',1),('pineapple',1),('banana',1),('watermelon',1);

代码样例

SimpleJDBCTopology代码样例(代码中涉及到的IP端口请修改为实际的IP及端口)

public class SimpleJDBCTopology 
 { 
   private static final String WORD_SPOUT = "WORD_SPOUT"; 
   private static final String COUNT_BOLT = "COUNT_BOLT"; 
   private static final String JDBC_INSERT_BOLT = "JDBC_INSERT_BOLT"; 
   private static final String JDBC_LOOKUP_BOLT = "JDBC_LOOKUP_BOLT"; 
   @SuppressWarnings ("unchecked") 
   public static void main(String[] args) throws Exception{ 
     //connectionProvider配置 
     Map hikariConfigMap = Maps.newHashMap(); 
     hikariConfigMap.put("dataSourceClassName", "org.apache.derby.jdbc.ClientDataSource"); 
     hikariConfigMap.put("dataSource.serverName", "192.168.0.1");//请改为实际的IP 
     hikariConfigMap.put("dataSource.portNumber", "1527");//请改为实际的端口 

     hikariConfigMap.put("dataSource.databaseName", "example"); 
     hikariConfigMap.put("connectionTestQuery", "select COUNT from  GOAL"); //表名需与建表时保持一致 
     Config conf = new Config(); 

     ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap); 
     //JdbcLookupBolt 实例化 
     Fields outputFields = new Fields("WORD", "COUNT"); 
     List<Column> queryParamColumns = Lists.newArrayList(new Column("WORD", Types.VARCHAR)); 
     SimpleJdbcLookupMapper jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns); 
     String selectSql = "select COUNT from ORIGINAL where WORD = ?"; 
     JdbcLookupBolt wordLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, jdbcLookupMapper); 
     //JdbcInsertBolt 实例化 
     String tableName = "GOAL"; 
     JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider); 
     JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper).withTableName("GOAL").withQueryTimeoutSecs(30); 
     WordSpout wordSpout = new WordSpout();TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout(WORD_SPOUT, wordSpout); 
     builder.setBolt(JDBC_LOOKUP_BOLT, wordLookupBolt, 1).fieldsGrouping(WORD_SPOUT,new Fields("WORD")); 
     builder.setBolt(JDBC_INSERT_BOLT, userPersistanceBolt,1).fieldsGrouping(JDBC_LOOKUP_BOLT,new Fields("WORD"));StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 
   } 
 }

部署运行

  1. 在Storm示例代码根目录执行如下命令打包:"mvn package"。执行成功后,将会在target目录生成storm-examples-1.0.jar。
  2. 执行命令提交拓扑。提交命令示例(拓扑名为jdbc-test)。

    storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.jdbc.SimpleJDBCTopology jdbc-test

结果查看

当拓扑提交完成后,可以去数据库中查看对应表中是否有数据插入,具体过程如下:

执行SQL语句select * from goal; 查询“GOAL”表中的数据,如果GOAL表中有数据添加,则表明整个拓扑运行成功。