Storm-JDBC开发指引
操作场景
本文档主要说明如何使用开源Storm-JDBC工具包,完成Storm和JDBC之间的交互。Storm-JDBC中包含两类Bolt:JdbcInsertBolt和JdbcLookupBolt。其中,JdbcLookupBolt主要负责从数据库中查数据,JdbcInsertBolt主要向数据库中存数据。当然,JdbcLookupBolt和JdbcInsertBolt中也可以增加处理逻辑对数据进行处理。
本章节只适用Storm与JDBC组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
应用开发操作步骤
- 确认产品Storm组件已经安装,且正常运行。
- 参考获取MRS应用开发样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见准备Storm应用开发环境。
- 工程导入后,修改样例工程的“resources/flux-examples”目录下的“jdbc.properties”文件,根据实际环境信息修改相关参数。
#配置JDBC服务端IP地址 JDBC_SERVER_NAME= #配置JDBC服务端端口 JDBC_PORT_NUM= #配置JDBC登录用户名 JDBC_USER_NAME= #配置JDBC登录用户密码 #密码明文存储存在安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全 JDBC_PASSWORD= #配置database表名 JDBC_BASE_TBL=
- 在Linux环境下安装Storm客户端。
集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端。
数据库配置—Derby数据库配置过程
- 首先应下载一个数据库,可根据具体场景选择最适合的数据库。
该任务以Derby数据库为例。Derby是一个小型的、Java编写的,易于使用却适合大多数应用程序的开放源码数据库。
- Derby数据库的获取。在官网下载最新版的Derby数据库,将下载下来的数据库将传入Linux客户端(如"/opt"),并解压。
- 在Derby的安装目录下,进入bin目录,输入如下命令:
export DERBY_INSTALL=/opt/db-derby-10.12.1.1-bin
export CLASSPATH=$DERBY_INSTALL/lib/derbytools.jar:$DERBY_INSTALL\lib\derbynet.jar:.
export DERBY_HOME=/opt/db-derby-10.12.1.1-bin
. setNetworkServerCP
./startNetworkServer -h 主机名
- 执行./ij命令,输入connect 'jdbc:derby://主机名:1527/example;create=true';,建立连接。
- 数据库建立好后,可以执行sql语句进行操作,需要建立两张表ORIGINAL和GOAL,并向ORIGINAL中插入一组数据,命令如下:(表名仅供参考,可自行设定)
CREATE TABLE GOAL(WORD VARCHAR(12),COUNT INT );
CREATE TABLE ORIGINAL(WORD VARCHAR(12),COUNT INT );
INSERT INTO ORIGINAL VALUES('orange',1),('pineapple',1),('banana',1),('watermelon',1);
代码样例
public class SimpleJDBCTopology {
private static final Logger logger = Logger.getLogger(SimpleJDBCTopology.class);
private static final String WORD_SPOUT = "WORD_SPOUT";
private static final String JDBC_INSERT_BOLT = "JDBC_INSERT_BOLT";
private static final String JDBC_LOOKUP_BOLT = "JDBC_LOOKUP_BOLT";
// 用户创建的源表表名,可自行修改
private static final String JDBC_ORIGIN_TBL = "ORIGINAL";
// 用户创建的目标表表名,可自行修改
private static final String JDBC_INSERT_TBL = "GOAL";
@SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception {
// 获取配置文件
Properties properties = new Properties();
String proPath =
System.getProperty("user.dir")
+ File.separator
+ "src"
+ File.separator
+ "main"
+ File.separator
+ "resources"
+ File.separator
+ "flux-examples"
+ File.separator
+ "jdbc.properties";
try {
properties.load(new FileInputStream(proPath));
} catch (IOException e) {
logger.error("Failed to load properties file.");
throw e;
}
String serverName = properties.getProperty("JDBC_SERVER_NAME");
String portNum = properties.getProperty("JDBC_PORT_NUM");
String userName = properties.getProperty("JDBC_USER_NAME");
String password = properties.getProperty("JDBC_PASSWORD");
String baseTbl = properties.getProperty("JDBC_BASE_TBL");
// connectionProvider配置
Map<String, Object> hikariConfigMap = Maps.newHashMap();
hikariConfigMap.put("dataSourceClassName", "org.apache.derby.jdbc.ClientDataSource");
hikariConfigMap.put("dataSource.serverName", serverName);
hikariConfigMap.put("dataSource.portNumber", portNum);
hikariConfigMap.put("dataSource.databaseName", baseTbl);
hikariConfigMap.put("dataSource.user", userName);
hikariConfigMap.put("dataSource.password", password);
hikariConfigMap.put("connectionTestQuery", "select COUNT from " + JDBC_INSERT_TBL);
Config conf = new Config();
ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
// JdbcLookupBolt实例化
Fields outputFields = new Fields("WORD", "COUNT");
List<Column> queryParamColumns = Lists.newArrayList(new Column("WORD", Types.VARCHAR));
SimpleJdbcLookupMapper jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
String selectSql = "select COUNT from " + JDBC_ORIGIN_TBL + " where WORD = ?";
JdbcLookupBolt wordLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, jdbcLookupMapper);
// JdbcInsertBolt实例化
String tableName = JDBC_INSERT_TBL;
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
JdbcInsertBolt wordInsertBolt =
new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withTableName(JDBC_INSERT_TBL)
.withQueryTimeoutSecs(30);
JDBCSpout wordSpout = new JDBCSpout();
// 构造拓扑,wordSpout==>wordLookupBolt==>wordInsertBolt
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(WORD_SPOUT, wordSpout);
builder.setBolt(JDBC_LOOKUP_BOLT, wordLookupBolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("WORD"));
builder.setBolt(JDBC_INSERT_BOLT, wordInsertBolt, 1).fieldsGrouping(JDBC_LOOKUP_BOLT, new Fields("WORD"));
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
}
部署运行及结果查看
- 导出本地jar包,请参见打包Storm样例工程应用。
- 获取下列jar包:
- 在安装好的db数据库目录下进入lib目录,获取如下jar包:
- derbyclient.jar
- derby.jar
- 在Storm客户端的“streaming-cql-<HD-Version>/lib”目录中获取如下jar包:
- storm-jdbc-<version>.jar
- guava-<version>.jar
- commons-lang3-<version>.jar
- commons-lang-<version>.jar
- HikariCP-<version>.jar
- 在安装好的db数据库目录下进入lib目录,获取如下jar包:
- 将上述两步中获取的jar包合并统一打出完整的业务jar包,请参见打包Storm应用业务。
- 执行命令提交拓扑。提交命令示例(拓扑名为jdbc-test):
storm jar /opt/jartarget/source.jar com.huawei.storm.example.jdbc.SimpleJDBCTopology jdbc-test
结果查看
当拓扑提交完成后,用户可以去数据库中查看对应表中是否有数据插入,具体过程如下:
执行SQL语句select * from goal; 查询“goal”表中的数据,如果goal表中有数据添加,则表明整个拓扑运行成功。