Updated on 2024-12-10 GMT+08:00

Storm-JDBC Development Guideline

Scenario

This section describes how to use the open-source Storm-JDBC toolkit to implement the interaction between Storm and JDBC. JdbcInsertBolt and JdbcLookupBolt are included. JdbcLookupBolt is used to query data from the database. JdbcInsertBolt is used to store data to the database. In addition, JdbcLookupBolt and JdbcInsertBolt can be used to process data using the data processing logic.

This section applies only to the access between the Storm component and the JDBC component of MRS. Determine the versions of the JAR files described in this section based on the actual situation.

Procedure for Developing an Application

  1. Verify that the Storm component has been installed and is running correctly.
  2. Download the Storm client and import the Storm sample project to the Eclipse development environment. For details, see Importing and Configuring Storm Sample Projects.
  3. Use WinScp to import the Storm client to the Linux environment and install the client. For details, see Preparing a Linux Client Environment.

Configuring the Database – Configuring the Derby Database

  1. Download a database. Select the best suitable database based on the actual scenario.

    In this section, the Derby database is used as an example. The Derby database is a Java-based small-sized open-source database that is easy to use and suitable to most applications.

  2. Obtain the Derby database. Download the Derby database package of the latest version (10.14.1.0 is used in this example) from the official website, use WinSCP to upload the database package to the Linux client, and decompress the package.
  3. In the Derby installation directory, go to the bin directory, and run the following commands:

    export DERBY_INSTALL=/opt/db-derby-10.14.1.0-bin

    export CLASSPATH=$DERBY_INSTALL/lib/derbytools.jar:$DERBY_INSTALL\lib\derbynet.jar:.

    export DERBY_HOME=/opt/db-derby-10.14.1.0-bin

    . setNetworkServerCP

    ./startNetworkServer -h Host name

  4. Run the ./ij command and enter connect 'jdbc:derby://Host name:1527/example;create=true'; to create the connection.

    Before running the ./ij command, ensure that java_home has been configured. You can run the which java command to check whether java_home has been configured.

  5. After the database is connected, run SQL statements to create table ORIGINAL and table GOAL and insert a group of data into table ORIGINAL. The statement examples are as follows (the table names can be customized):

    CREATE TABLE GOAL(WORD VARCHAR(12),COUNT INT );

    CREATE TABLE ORIGINAL(WORD VARCHAR(12),COUNT INT );

    INSERT INTO ORIGINAL VALUES('orange',1),('pineapple',1),('banana',1),('watermelon',1);

Sample Code

SimpleJDBCTopology sample code: (Change the IP addresses and ports to the actual ones.)

public class SimpleJDBCTopology 
 { 
   private static final String WORD_SPOUT = "WORD_SPOUT"; 
   private static final String COUNT_BOLT = "COUNT_BOLT"; 
   private static final String JDBC_INSERT_BOLT = "JDBC_INSERT_BOLT"; 
   private static final String JDBC_LOOKUP_BOLT = "JDBC_LOOKUP_BOLT"; 
   @SuppressWarnings ("unchecked") 
   public static void main(String[] args) throws Exception{ 
     //connectionProvider configuration 
     Map hikariConfigMap = Maps.newHashMap(); 
     hikariConfigMap.put("dataSourceClassName", "org.apache.derby.jdbc.ClientDataSource"); 
     hikariConfigMap.put("dataSource.serverName", "192.168.0.1");//Set this parameter to the actual IP address. 
     hikariConfigMap.put("dataSource.portNumber", "1527");//Set this parameter to the actual port number. 

     hikariConfigMap.put("dataSource.databaseName", "example"); 
     hikariConfigMap.put("connectionTestQuery", "select COUNT from  GOAL"); //The table name must be consistent with that used during table creation. 
     Config conf = new Config(); 

     ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap); 
     //JdbcLookupBolt instantiation 
     Fields outputFields = new Fields("WORD", "COUNT"); 
     List<Column> queryParamColumns = Lists.newArrayList(new Column("WORD", Types.VARCHAR)); 
     SimpleJdbcLookupMapper jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns); 
     String selectSql = "select COUNT from ORIGINAL where WORD = ?"; 
     JdbcLookupBolt wordLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, jdbcLookupMapper); 
     //JdbcInsertBolt instantiation 
     String tableName = "GOAL"; 
     JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider); 
     JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper).withTableName("GOAL").withQueryTimeoutSecs(30); 
     WordSpout wordSpout = new WordSpout();TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout(WORD_SPOUT, wordSpout); 
     builder.setBolt(JDBC_LOOKUP_BOLT, wordLookupBolt, 1).fieldsGrouping(WORD_SPOUT,new Fields("WORD")); 
     builder.setBolt(JDBC_INSERT_BOLT, userPersistanceBolt,1).fieldsGrouping(JDBC_LOOKUP_BOLT,new Fields("WORD"));StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); 
   } 
 }

Running the Application

  1. In the root directory of Storm sample code, run the mvn package command. After the command is executed successfully, the storm-examples-1.0.jar file is generated in the target directory.
  2. Run the related command to submit the topology. The submission command example is as follows (the topology name is jdbc-test):

    storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.jdbc.SimpleJDBCTopology jdbc-test

Viewing Results

After the topology is submitted, go to the database and check whether data is inserted into the related tables.

Run the select * from goal; statement to query data in table GOAL. If data is inserted into table GOAL, the topology is executed successfully.