Storm-JDBC开发指引
操作场景
本文档主要说明如何使用开源Storm-JDBC工具包,完成Storm和JDBC之间的交互。Storm-JDBC中包含两类Bolt:JdbcInsertBolt和JdbcLookupBolt。其中,JdbcLookupBolt主要负责从数据库中查数据,JdbcInsertBolt主要向数据库中存数据。当然,JdbcLookupBolt和JdbcInsertBolt中也可以增加处理逻辑对数据进行处理。
本章节只适用与MRS产品Storm与JDBC组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
应用开发操作步骤
- 确认Storm组件已经安装,且正常运行。
- 下载Storm客户端,将Storm样例工程导入到Eclipse开发环境,参见导入并配置Storm样例工程。
- 用WinScp工具将Storm客户端导入Linux环境并安装,具体请参见准备Linux客户端环境。
数据库配置—Derby数据库配置过程
- 首先应下载一个数据库,可根据具体场景选择最适合的数据库。
该任务以Derby数据库为例。Derby是一个小型的,java编写的,易于使用却适合大多数应用程序的开放源码数据库。
- Derby数据库的获取。在官网下载最新版的Derby数据库(本示例使用10.14.1.0),通过WinScp等工具传入Linux客户端,并解压。
- 在Derby的安装目录下,进入bin目录,输入如下命令。
export DERBY_INSTALL=/opt/db-derby-10.14.1.0-bin
export CLASSPATH=$DERBY_INSTALL/lib/derbytools.jar:$DERBY_INSTALL\lib\derbynet.jar:.
export DERBY_HOME=/opt/db-derby-10.14.1.0-bin
. setNetworkServerCP
./startNetworkServer -h 主机名
- 执行./ij命令,输入connect 'jdbc:derby://主机名:1527/example;create=true';,建立连接。
- 执行./ij命令前,需要确保已配置java_home,可通过which java命令检查是否已配置。
- 数据库建立好后,可以执行sql语句进行操作,需要建立两张表ORIGINAL和GOAL,并向ORIGINAL中插入一组数据,命令如下:(表名仅供参考,可自行设定)
CREATE TABLE GOAL(WORD VARCHAR(12),COUNT INT );
CREATE TABLE ORIGINAL(WORD VARCHAR(12),COUNT INT );
INSERT INTO ORIGINAL VALUES('orange',1),('pineapple',1),('banana',1),('watermelon',1);
代码样例
SimpleJDBCTopology代码样例(代码中涉及到的IP端口请修改为实际的IP及端口)
public class SimpleJDBCTopology { private static final String WORD_SPOUT = "WORD_SPOUT"; private static final String COUNT_BOLT = "COUNT_BOLT"; private static final String JDBC_INSERT_BOLT = "JDBC_INSERT_BOLT"; private static final String JDBC_LOOKUP_BOLT = "JDBC_LOOKUP_BOLT"; @SuppressWarnings ("unchecked") public static void main(String[] args) throws Exception{ //connectionProvider配置 Map hikariConfigMap = Maps.newHashMap(); hikariConfigMap.put("dataSourceClassName", "org.apache.derby.jdbc.ClientDataSource"); hikariConfigMap.put("dataSource.serverName", "192.168.0.1");//请改为实际的IP hikariConfigMap.put("dataSource.portNumber", "1527");//请改为实际的端口 hikariConfigMap.put("dataSource.databaseName", "example"); hikariConfigMap.put("connectionTestQuery", "select COUNT from GOAL"); //表名需与建表时保持一致 Config conf = new Config(); ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap); //JdbcLookupBolt 实例化 Fields outputFields = new Fields("WORD", "COUNT"); List<Column> queryParamColumns = Lists.newArrayList(new Column("WORD", Types.VARCHAR)); SimpleJdbcLookupMapper jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns); String selectSql = "select COUNT from ORIGINAL where WORD = ?"; JdbcLookupBolt wordLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, jdbcLookupMapper); //JdbcInsertBolt 实例化 String tableName = "GOAL"; JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider); JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper).withTableName("GOAL").withQueryTimeoutSecs(30); WordSpout wordSpout = new WordSpout();TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(WORD_SPOUT, wordSpout); builder.setBolt(JDBC_LOOKUP_BOLT, wordLookupBolt, 1).fieldsGrouping(WORD_SPOUT,new Fields("WORD")); builder.setBolt(JDBC_INSERT_BOLT, userPersistanceBolt,1).fieldsGrouping(JDBC_LOOKUP_BOLT,new Fields("WORD"));StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } }
部署运行
- 在Storm示例代码根目录执行如下命令打包:"mvn package"。执行成功后,将会在target目录生成storm-examples-1.0.jar。
- 执行命令提交拓扑。提交命令示例(拓扑名为jdbc-test)。
storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.jdbc.SimpleJDBCTopology jdbc-test
结果查看
当拓扑提交完成后,可以去数据库中查看对应表中是否有数据插入,具体过程如下:
执行SQL语句select * from goal; 查询“GOAL”表中的数据,如果GOAL表中有数据添加,则表明整个拓扑运行成功。