更新时间:2023-12-01 GMT+08:00
分享

Storm-JDBC开发指引

操作场景

本文档主要说明如何使用开源Storm-JDBC工具包,完成Storm和JDBC之间的交互。Storm-JDBC中包含两类Bolt:JdbcInsertBolt和JdbcLookupBolt。其中,JdbcLookupBolt主要负责从数据库中查数据,JdbcInsertBolt主要向数据库中存数据。当然,JdbcLookupBolt和JdbcInsertBolt中也可以增加处理逻辑对数据进行处理。

本章节只适用Storm与JDBC组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。

应用开发操作步骤

  1. 确认产品Storm组件已经安装,且正常运行。
  2. 参考通过开源镜像站获取样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见环境准备
  3. 工程导入后,修改样例工程的“resources/flux-examples”目录下的“jdbc.properties”文件,根据实际环境信息修改相关参数。

    #配置JDBC服务端IP地址
    JDBC_SERVER_NAME=
    #配置JDBC服务端端口
    JDBC_PORT_NUM=
    #配置JDBC登录用户名
    JDBC_USER_NAME=
    #配置JDBC登录用户密码
    #密码明文存储存在安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全
    JDBC_PASSWORD=
    #配置database表名
    JDBC_BASE_TBL=

  4. 在Linux环境下安装Storm客户端。

    集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端

数据库配置—Derby数据库配置过程

  1. 首先应下载一个数据库,可根据具体场景选择最适合的数据库。

    该任务以Derby数据库为例。Derby是一个小型的,java编写的,易于使用却适合大多数应用程序的开放源码数据库。

  2. Derby数据库的获取。在官网下载最新版的Derby数据库,将下载下来的数据库将传入Linux客户端(如"/opt"),并解压。
  3. 在Derby的安装目录下,进入bin目录,输入如下命令:

    export DERBY_INSTALL=/opt/db-derby-10.12.1.1-bin

    export CLASSPATH=$DERBY_INSTALL/lib/derbytools.jar:$DERBY_INSTALL\lib\derbynet.jar:.

    export DERBY_HOME=/opt/db-derby-10.12.1.1-bin

    . setNetworkServerCP

    ./startNetworkServer -h 主机名

  4. 执行./ij命令,输入connect 'jdbc:derby://主机名:1527/example;create=true';,建立连接。
  5. 数据库建立好后,可以执行sql语句进行操作,需要建立两张表ORIGINAL和GOAL,并向ORIGINAL中插入一组数据,命令如下:(表名仅供参考,可自行设定)

    CREATE TABLE GOAL(WORD VARCHAR(12),COUNT INT );

    CREATE TABLE ORIGINAL(WORD VARCHAR(12),COUNT INT );

    INSERT INTO ORIGINAL VALUES('orange',1),('pineapple',1),('banana',1),('watermelon',1);

代码样例

SimpleJDBCTopology代码样例(代码中涉及到的IP端口请修改为实际的IP及端口):
public class SimpleJDBCTopology {

    private static final Logger logger = Logger.getLogger(SimpleJDBCTopology.class);
    private static final String WORD_SPOUT = "WORD_SPOUT";

    private static final String JDBC_INSERT_BOLT = "JDBC_INSERT_BOLT";

    private static final String JDBC_LOOKUP_BOLT = "JDBC_LOOKUP_BOLT";

    // 用户创建的源表表名,可自行修改
    private static final String JDBC_ORIGIN_TBL = "ORIGINAL";

    // 用户创建的目标表表名,可自行修改
    private static final String JDBC_INSERT_TBL = "GOAL";

    @SuppressWarnings("rawtypes")
    public static void main(String[] args) throws Exception {
        // 获取配置文件
        Properties properties = new Properties();
        String proPath =
                System.getProperty("user.dir")
                        + File.separator
                        + "src"
                        + File.separator
                        + "main"
                        + File.separator
                        + "resources"
                        + File.separator
                        + "flux-examples"
                        + File.separator
                        + "jdbc.properties";
        try {
            properties.load(new FileInputStream(proPath));
        } catch (IOException e) {
            logger.error("Failed to load properties file.");
            throw e;
        }
        String serverName = properties.getProperty("JDBC_SERVER_NAME");
        String portNum = properties.getProperty("JDBC_PORT_NUM");
        String userName = properties.getProperty("JDBC_USER_NAME");
        String password = properties.getProperty("JDBC_PASSWORD");
        String baseTbl = properties.getProperty("JDBC_BASE_TBL");

        // connectionProvider配置
        Map<String, Object> hikariConfigMap = Maps.newHashMap();
        hikariConfigMap.put("dataSourceClassName", "org.apache.derby.jdbc.ClientDataSource");
        hikariConfigMap.put("dataSource.serverName", serverName);
        hikariConfigMap.put("dataSource.portNumber", portNum);
        hikariConfigMap.put("dataSource.databaseName", baseTbl);
        hikariConfigMap.put("dataSource.user", userName);
        hikariConfigMap.put("dataSource.password", password);
        hikariConfigMap.put("connectionTestQuery", "select COUNT from " + JDBC_INSERT_TBL);

        Config conf = new Config();

        ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);

        // JdbcLookupBolt实例化
        Fields outputFields = new Fields("WORD", "COUNT");
        List<Column> queryParamColumns = Lists.newArrayList(new Column("WORD", Types.VARCHAR));
        SimpleJdbcLookupMapper jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
        String selectSql = "select COUNT from " + JDBC_ORIGIN_TBL + " where WORD = ?";
        JdbcLookupBolt wordLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, jdbcLookupMapper);

        // JdbcInsertBolt实例化
        String tableName = JDBC_INSERT_TBL;
        JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
        JdbcInsertBolt wordInsertBolt =
                new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                        .withTableName(JDBC_INSERT_TBL)
                        .withQueryTimeoutSecs(30);

        JDBCSpout wordSpout = new JDBCSpout();

        // 构造拓扑,wordSpout==>wordLookupBolt==>wordInsertBolt
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(WORD_SPOUT, wordSpout);
        builder.setBolt(JDBC_LOOKUP_BOLT, wordLookupBolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("WORD"));
        builder.setBolt(JDBC_INSERT_BOLT, wordInsertBolt, 1).fieldsGrouping(JDBC_LOOKUP_BOLT, new Fields("WORD"));

        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
}

部署运行及结果查看

  1. 导出本地jar包,请参见打包IntelliJ IDEA代码
  2. 获取下列jar包:

    • 在安装好的db数据库目录下进入lib目录,获取如下jar包:
      • derbyclient.jar
      • derby.jar
    • 在Storm客户端的“streaming-cql-<HD-Version>/lib”目录中获取如下jar包:
      • storm-jdbc-<version>.jar
      • guava-<version>.jar
      • commons-lang3-<version>.jar
      • commons-lang-<version>.jar
      • HikariCP-<version>.jar

  3. 将上述两步中获取的jar包合并统一打出完整的业务jar包,请参见打包业务
  4. 执行命令提交拓扑。提交命令示例(拓扑名为jdbc-test):

    storm jar /opt/jartarget/source.jar com.huawei.storm.example.jdbc.SimpleJDBCTopology jdbc-test

结果查看

当拓扑提交完成后,我们可以去数据库中查看对应表中是否有数据插入,具体过程如下:

执行SQL语句select * from goal; 查询“goal”表中的数据,如果goal表中有数据添加,则表明整个拓扑运行成功。

分享:

    相关文档

    相关产品