Updated on 2022-08-16 GMT+08:00

Storm-JDBC Development Guideline

Scenario

This topic describes how to use the open-source Storm-JDBC toolkit to implement the interaction between Storm and JDBC. JdbcInsertBolt and JdbcLookupBolt are included. JdbcLookupBolt is used to query data from the database. JdbcInsertBolt is used to store data to the database. In addition, JdbcLookupBolt and JdbcInsertBolt can be used to process data using the data processing logic.

This topic applies only to the access between the Storm component and the JDBC component. Determine the versions of the jar packages described in this chapter based on the actual situation.

Procedure for Developing an Application

  1. Verify that the Storm component has been installed and is running properly.
  2. Obtain the sample project folder storm-examples in the src\storm-examples directory where the sample code is decompressed. For details, see Obtaining Sample Projects from Huawei Mirrors. Import storm-examples to the IntelliJ IDEA development environment. For details, see Environment Preparation.
  3. Install the Storm client in Linux OS.

    For details about how to use the client on a Master or Core node in the cluster, see Using an MRS Client on Nodes Inside a Cluster. For details about how to install the client outside the MRS cluster, see Using an MRS Client on Nodes Outside a Cluster.

Configuring the Database - Configuring the Derby Database

  1. Download a database. Select the most appropriate database based on the actual scenario.

    In this topic, the Derby database is used as an example. The Derby database is a Java-based small-sized open-source database that is easy to use and suitable to most applications.

  2. Obtain the Derby database. Download the Derby database package of the latest version from the official website, use WinSCP to upload the database package to the Linux client, for example, /opt, and decompress the package.
  3. In the Derby installation directory, go to the bin directory, and run the following commands:

    export DERBY_INSTALL=/opt/db-derby-10.12.1.1-bin

    export CLASSPATH=$DERBY_INSTALL/lib/derbytools.jar:$DERBY_INSTALL\lib\derbynet.jar:.

    export DERBY_HOME=/opt/db-derby-10.12.1.1-bin

    . setNetworkServerCP

    ./startNetworkServer -h Host name

  4. Run the ./ij command, and enter connect 'jdbc:derby://Host name:1527/example;create=true'; to create the connection.
  5. After the database is connected, run SQL statements to create table ORIGINAL and table GOAL and insert a group of data into table ORIGINAL. The statement examples are as follows (the table names can be customized):

    CREATE TABLE GOAL(WORD VARCHAR(12),COUNT INT );

    CREATE TABLE ORIGINAL(WORD VARCHAR(12),COUNT INT );

    INSERT INTO ORIGINAL VALUES('orange',1),('pineapple',1),('banana',1),('watermelon',1);

Code Sample

SimpleJDBCTopology code sample: (Change the IP addresses and ports to the actual ones.)

public class SimpleJDBCTopology
{
  private static final String WORD_SPOUT = "WORD_SPOUT";
  private static final String COUNT_BOLT = "COUNT_BOLT";
  private static final String JDBC_INSERT_BOLT = "JDBC_INSERT_BOLT";
  private static final String JDBC_LOOKUP_BOLT = "JDBC_LOOKUP_BOLT";
  @SuppressWarnings ("unchecked")
  public static void main(String[] args) throws Exception{
    //connectionProvider configuration
    Map hikariConfigMap = Maps.newHashMap();
    hikariConfigMap.put("dataSourceClassName", "org.apache.derby.jdbc.ClientDataSource");
    hikariConfigMap.put("dataSource.serverName", "192.168.0.1");//Set this parameter to the actual IP address.
    hikariConfigMap.put("dataSource.portNumber", "1527");//Set this parameter to the actual port number.

    hikariConfigMap.put("dataSource.databaseName", "example");
    hikariConfigMap.put("dataSource.user", "app");
    hikariConfigMap.put("dataSource.password", "mine");
    hikariConfigMap.put("connectionTestQuery", "select COUNT from  GOAL"); //The table name must be consistent with that during table creation.
    Config conf = new Config();

    ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
    //JdbcLookupBolt instantiation
    Fields outputFields = new Fields("WORD", "COUNT");
    List<Column> queryParamColumns = Lists.newArrayList(new Column("WORD", Types.VARCHAR));
    SimpleJdbcLookupMapper jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
    String selectSql = "select COUNT from ORIGINAL where WORD = ?";
    JdbcLookupBolt wordLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, jdbcLookupMapper);
    //JdbcInsertBolt instantiation
    String tableName = "GOAL";
    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
    JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper).withTableName("GOAL").withQueryTimeoutSecs(30);
    WordSpout wordSpout = new WordSpout();TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(WORD_SPOUT, wordSpout);
    builder.setBolt(JDBC_LOOKUP_BOLT, wordLookupBolt, 1).fieldsGrouping(WORD_SPOUT,new Fields("WORD"));
    builder.setBolt(JDBC_INSERT_BOLT, userPersistanceBolt,1).fieldsGrouping(JDBC_LOOKUP_BOLT,new Fields("WORD"));StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
  }
}

Running and Submitting the Topology

  1. Export the local JAR package. For details, see Packaging IntelliJ IDEA Code.
  2. Obtain the following JAR packages.

    • Go to the lib directory in the installed database, and obtain the following JAR packages:
      • derbyclient.jar
      • derby.jar
    • Obtain the following JAR file from the streaming-cql-<HD-Version>/lib directory on the Storm client:
      • storm-jdbc-<version>.jar
      • guava-<version>.jar
      • commons-lang3-<version>.jar
      • commons-lang-<version>.jar
      • HikariCP-<version>.jar

  3. Combine the JAR packages obtained in the preceding steps and export a complete service JAR package. For details, see Packaging Services.
  4. Run a command to submit the topology. The submission command example is as follows (the topology name is jdbc-test):

    storm jar /opt/jartarget/source.jar com.huawei.storm.example.jdbc.SimpleJDBCTopology jdbc-test

Viewing Results

After the topology is submitted, go to the database and check whether data is inserted into the related tables.

Run the select * from goal; statement to query data in table goal. If data is inserted into table GOAL, the topology is executed successfully.