Storm-JDBC开发指引
操作场景
本文档主要说明如何使用开源Storm-JDBC工具包,完成Storm和JDBC之间的交互。Storm-JDBC中包含两类Bolt:JdbcInsertBolt和JdbcLookupBolt。其中,JdbcLookupBolt主要负责从数据库中查数据,JdbcInsertBolt主要向数据库中存数据。当然,JdbcLookupBolt和JdbcInsertBolt中也可以增加处理逻辑对数据进行处理。
本章节只适用Storm与JDBC组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
应用开发操作步骤
- 确认华为MRS产品Storm组件已经安装,且正常运行。
- 参考获取MRS应用开发样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见准备Storm应用开发环境。
- 工程导入后,修改样例工程“resources/flux-examples”目录下的“jdbc.properties”文件,根据实际环境信息修改相关参数。
#配置JDBC服务端IP地址 JDBC_SERVER_NAME= #配置JDBC服务端端口 JDBC_PORT_NUM= #配置JDBC登录用户名 JDBC_USER_NAME= #配置JDBC登录用户密码 #密码明文存储存在安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全 JDBC_PASSWORD= #配置database表名 JDBC_BASE_TBL=
- 在Linux环境下安装Storm客户端。
集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端。
数据库配置—Derby数据库配置过程
- 首先应下载一个数据库,可根据具体场景选择最适合的数据库。
该任务以Derby数据库为例。Derby是一个小型的,java编写的,易于使用却适合大多数应用程序的开放源码数据库。
- Derby数据库的获取。在官网下载最新版的Derby数据库,将下载下来的数据库将传入Linux客户端(如"/opt"),并解压。
- 在Derby的安装目录下,进入bin目录,输入如下命令:
export DERBY_INSTALL=/opt/db-derby-10.12.1.1-bin
export CLASSPATH=$DERBY_INSTALL/lib/derbytools.jar:$DERBY_INSTALL\lib\derbynet.jar:.
export DERBY_HOME=/opt/db-derby-10.12.1.1-bin
. setNetworkServerCP
./startNetworkServer -h 主机名
- 执行./ij命令,输入connect 'jdbc:derby://主机名:1527/example;create=true';,建立连接。
- 数据库建立好后,可以执行sql语句进行操作,需要建立两张表ORIGINAL和GOAL,并向ORIGINAL中插入一组数据,命令如下:(表名仅供参考,可自行设定)
CREATE TABLE GOAL(WORD VARCHAR(12),COUNT INT );
CREATE TABLE ORIGINAL(WORD VARCHAR(12),COUNT INT );
INSERT INTO ORIGINAL VALUES('orange',1),('pineapple',1),('banana',1),('watermelon',1);
代码样例
public class SimpleJDBCTopology { private static final Logger logger = Logger.getLogger(SimpleJDBCTopology.class); private static final String WORD_SPOUT = "WORD_SPOUT"; private static final String JDBC_INSERT_BOLT = "JDBC_INSERT_BOLT"; private static final String JDBC_LOOKUP_BOLT = "JDBC_LOOKUP_BOLT"; // 用户创建的源表表名,可自行修改 private static final String JDBC_ORIGIN_TBL = "ORIGINAL"; // 用户创建的目标表表名,可自行修改 private static final String JDBC_INSERT_TBL = "GOAL"; @SuppressWarnings("rawtypes") public static void main(String[] args) throws Exception { // 获取配置文件 Properties properties = new Properties(); String proPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "flux-examples" + File.separator + "jdbc.properties"; try { properties.load(new FileInputStream(proPath)); } catch (IOException e) { logger.error("Failed to load properties file."); throw e; } String serverName = properties.getProperty("JDBC_SERVER_NAME"); String portNum = properties.getProperty("JDBC_PORT_NUM"); String userName = properties.getProperty("JDBC_USER_NAME"); String password = properties.getProperty("JDBC_PASSWORD"); String baseTbl = properties.getProperty("JDBC_BASE_TBL"); // connectionProvider配置 Map<String, Object> hikariConfigMap = Maps.newHashMap(); hikariConfigMap.put("dataSourceClassName", "org.apache.derby.jdbc.ClientDataSource"); hikariConfigMap.put("dataSource.serverName", serverName); hikariConfigMap.put("dataSource.portNumber", portNum); hikariConfigMap.put("dataSource.databaseName", baseTbl); hikariConfigMap.put("dataSource.user", userName); hikariConfigMap.put("dataSource.password", password); hikariConfigMap.put("connectionTestQuery", "select COUNT from " + JDBC_INSERT_TBL); Config conf = new Config(); ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap); // JdbcLookupBolt实例化 Fields outputFields = new Fields("WORD", "COUNT"); List<Column> queryParamColumns = Lists.newArrayList(new Column("WORD", Types.VARCHAR)); SimpleJdbcLookupMapper jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns); String selectSql = "select COUNT from " + JDBC_ORIGIN_TBL + " where WORD = ?"; JdbcLookupBolt wordLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, jdbcLookupMapper); // JdbcInsertBolt实例化 String tableName = JDBC_INSERT_TBL; JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider); JdbcInsertBolt wordInsertBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper) .withTableName(JDBC_INSERT_TBL) .withQueryTimeoutSecs(30); JDBCSpout wordSpout = new JDBCSpout(); // 构造拓扑,wordSpout==>wordLookupBolt==>wordInsertBolt TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(WORD_SPOUT, wordSpout); builder.setBolt(JDBC_LOOKUP_BOLT, wordLookupBolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("WORD")); builder.setBolt(JDBC_INSERT_BOLT, wordInsertBolt, 1).fieldsGrouping(JDBC_LOOKUP_BOLT, new Fields("WORD")); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } }
部署运行及结果查看
- 导出本地jar包,请参见打包Storm样例工程应用。
- 获取下列jar包:
- 在安装好的db数据库目录下进入lib目录,获取如下jar包:
- derbyclient.jar
- derby.jar
- 在Storm客户端的“streaming-cql-<HD-Version>/lib”目录中获取如下jar包:
- storm-jdbc-<version>.jar
- guava-<version>.jar
- commons-lang3-<version>.jar
- commons-lang-<version>.jar
- HikariCP-<version>.jar
- 在安装好的db数据库目录下进入lib目录,获取如下jar包:
- 将上述两步中获取的jar包合并统一打出完整的业务jar包,请参见打包Storm业务。
- 执行命令提交拓扑。提交命令示例(拓扑名为jdbc-test):
storm jar /opt/jartarget/source.jar com.huawei.storm.example.jdbc.SimpleJDBCTopology jdbc-test
结果查看
当拓扑提交完成后,可以去数据库中查看对应表中是否有数据插入,具体过程如下:
执行SQL语句select * from goal; 查询“goal”表中的数据,如果goal表中有数据添加,则表明整个拓扑运行成功。