Updated on 2024-08-12 GMT+08:00

Storm-JDBC Development Guideline

Scenario

This topic describes how to use the open-source Storm-JDBC toolkit to implement the interaction between Storm and JDBC. JdbcInsertBolt and JdbcLookupBolt are included. JdbcLookupBolt is used to query data from the database. JdbcInsertBolt is used to store data to the database. In addition, JdbcLookupBolt and JdbcInsertBolt can be used to process data using the data processing logic.

This topic applies only to the access between the Storm component and the JDBC component. Determine the versions of the jar packages described in this chapter based on the actual situation.

Procedure for Developing an Application

  1. Verify that the Storm component has been installed and is running properly.
  2. Obtain the sample project folder storm-examples in the src\storm-examples directory where the sample code is decompressed. For details, see Obtaining the MRS Application Development Sample Project. Import storm-examples to the IntelliJ IDEA development environment. For details, see Environment Preparation.
  3. After the project is imported, modify the jdbc.properties file in the resources/flux-examples directory of the sample project and modify related parameters based on the actual environment information.

    # Configure the IP address of the JDBC server.
    JDBC_SERVER_NAME=
    # Configure the JDBC server port.
    JDBC_PORT_NUM=
    # Configure the JDBC login user name.
    JDBC_USER_NAME=
    # Configure the password for logging in to the JDBC.
    # Passwords stored in plaintext pose security risks. Store them in ciphertext in configuration files or environment variables.
    JDBC_PASSWORD=
    # Configure the database table name.
    JDBC_BASE_TBL=

  4. Install the Storm client in Linux OS.

    For details about how to use the client on a Master or Core node in the cluster, see Using an MRS Client on Nodes Inside a Cluster. For details about how to install the client outside the MRS cluster, see Using an MRS Client on Nodes Outside a Cluster.

Configuring the Database - Configuring the Derby Database

  1. Download a database. Select the most appropriate database based on the actual scenario.

    In this topic, the Derby database is used as an example. The Derby database is a Java-based small-sized open-source database that is easy to use and suitable to most applications.

  2. Obtain the Derby database. Download the Derby database package of the latest version from the official website, use WinSCP to upload the database package to the Linux client, for example, /opt, and decompress the package.
  3. In the Derby installation directory, go to the bin directory, and run the following commands:

    export DERBY_INSTALL=/opt/db-derby-10.12.1.1-bin

    export CLASSPATH=$DERBY_INSTALL/lib/derbytools.jar:$DERBY_INSTALL\lib\derbynet.jar:.

    export DERBY_HOME=/opt/db-derby-10.12.1.1-bin

    . setNetworkServerCP

    ./startNetworkServer -h Host name

  4. Run the ./ij command, and enter connect 'jdbc:derby://Host name:1527/example;create=true'; to create the connection.
  5. After the database is connected, run SQL statements to create table ORIGINAL and table GOAL and insert a group of data into table ORIGINAL. The statement examples are as follows (the table names can be customized):

    CREATE TABLE GOAL(WORD VARCHAR(12),COUNT INT );

    CREATE TABLE ORIGINAL(WORD VARCHAR(12),COUNT INT );

    INSERT INTO ORIGINAL VALUES('orange',1),('pineapple',1),('banana',1),('watermelon',1);

Code Sample

SimpleJDBCTopology code sample: (Change the IP addresses and ports to the actual ones.)

public class SimpleJDBCTopology {

    private static final Logger logger = Logger.getLogger(SimpleJDBCTopology.class);
    private static final String WORD_SPOUT = "WORD_SPOUT";

    private static final String JDBC_INSERT_BOLT = "JDBC_INSERT_BOLT";

    private static final String JDBC_LOOKUP_BOLT = "JDBC_LOOKUP_BOLT";

    // Name of the source table created by the user, which can be changed.
    private static final String JDBC_ORIGIN_TBL = "ORIGINAL";

    // Name of the target table created by the user, which can be changed.
    private static final String JDBC_INSERT_TBL = "GOAL";

    @SuppressWarnings("rawtypes")
    public static void main(String[] args) throws Exception {
        // Obtain the configuration file.
        Properties properties = new Properties();
        String proPath =
                System.getProperty("user.dir")
                        + File.separator
                        + "src"
                        + File.separator
                        + "main"
                        + File.separator
                        + "resources"
                        + File.separator
                        + "flux-examples"
                        + File.separator
                        + "jdbc.properties";
        try {
            properties.load(new FileInputStream(proPath));
        } catch (IOException e) {
            logger.error("Failed to load properties file.");
            throw e;
        }
        String serverName = properties.getProperty("JDBC_SERVER_NAME");
        String portNum = properties.getProperty("JDBC_PORT_NUM");
        String userName = properties.getProperty("JDBC_USER_NAME");
        String password = properties.getProperty("JDBC_PASSWORD");
        String baseTbl = properties.getProperty("JDBC_BASE_TBL");

        // The configuration of connectionProvider.
        Map<String, Object> hikariConfigMap = Maps.newHashMap();
        hikariConfigMap.put("dataSourceClassName", "org.apache.derby.jdbc.ClientDataSource");
        hikariConfigMap.put("dataSource.serverName", serverName);
        hikariConfigMap.put("dataSource.portNumber", portNum);
        hikariConfigMap.put("dataSource.databaseName", baseTbl);
        hikariConfigMap.put("dataSource.user", userName);
        hikariConfigMap.put("dataSource.password", password);
        hikariConfigMap.put("connectionTestQuery", "select COUNT from " + JDBC_INSERT_TBL);

        Config conf = new Config();

        ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);

        // JdbcLookupBolt instantiation
        Fields outputFields = new Fields("WORD", "COUNT");
        List<Column> queryParamColumns = Lists.newArrayList(new Column("WORD", Types.VARCHAR));
        SimpleJdbcLookupMapper jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
        String selectSql = "select COUNT from " + JDBC_ORIGIN_TBL + " where WORD = ?";
        JdbcLookupBolt wordLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, jdbcLookupMapper);

        // JdbcInsertBolt instantiation
        String tableName = JDBC_INSERT_TBL;
        JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
        JdbcInsertBolt wordInsertBolt =
                new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                        .withTableName(JDBC_INSERT_TBL)
                        .withQueryTimeoutSecs(30);

        JDBCSpout wordSpout = new JDBCSpout();

        // Construct topology, wordSpout==>wordLookupBolt==>wordInsertBolt
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(WORD_SPOUT, wordSpout);
        builder.setBolt(JDBC_LOOKUP_BOLT, wordLookupBolt, 1).fieldsGrouping(WORD_SPOUT, new Fields("WORD"));
        builder.setBolt(JDBC_INSERT_BOLT, wordInsertBolt, 1).fieldsGrouping(JDBC_LOOKUP_BOLT, new Fields("WORD"));

        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
}

Running and Submitting the Topology

  1. Export the local JAR package. For details, see Packaging IntelliJ IDEA Code.
  2. Obtain the following JAR packages.

    • Go to the lib directory in the installed database, and obtain the following JAR packages:
      • derbyclient.jar
      • derby.jar
    • Obtain the following JAR file from the streaming-cql-<HD-Version>/lib directory on the Storm client:
      • storm-jdbc-<version>.jar
      • guava-<version>.jar
      • commons-lang3-<version>.jar
      • commons-lang-<version>.jar
      • HikariCP-<version>.jar

  3. Combine the JAR packages obtained in the preceding steps and export a complete service JAR package. For details, see Packaging Services.
  4. Run a command to submit the topology. The submission command example is as follows (the topology name is jdbc-test):

    storm jar /opt/jartarget/source.jar com.huawei.storm.example.jdbc.SimpleJDBCTopology jdbc-test

Viewing Results

After the topology is submitted, go to the database and check whether data is inserted into the related tables.

Run the select * from goal; statement to query data in table goal. If data is inserted into table GOAL, the topology is executed successfully.