Updated on 2022-09-22 GMT+08:00

Completely Migrating Storm Services

Scenarios

This section describes how to convert and run a complete Storm topology developed using Storm API.

Procedure

  1. Open the Storm service project, modify the POM file of the project, and add the reference of flink-storm_2.11, flink-core, and flink-streaming-java_2.11. The following figure shows an example.

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-storm_2.11</artifactId>
        <version>1.4.0</version>
        <exclusions>
            <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>1.4.0</version>
        <exclusions>
            <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.4.0</version>
        <exclusions>
            <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    If the project is not a non-Maven project, manually collect the preceding JAR packages and add them to the classpath environment variable of the project.

  1. Modify the code for submission of the topology. The following uses WordCount as an example:

    1. Keep the structure of the Storm topology unchanged, including the Spout and Bolt developed using Storm API.
    TopologyBuilder builder = new TopologyBuilder(); 
    builder.setSpout("spout", new RandomSentenceSpout(), 5); 
    builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); 
    builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
    1. Modify the code for submission of the topology. An example is described as follows:
    Config conf = new Config(); 
    conf.setNumWorkers(3);  
    StormSubmitter.submitTopology("word-count", conf, builder.createTopology());

    Perform the following operations:

    Config conf = new Config();
     conf.setNumWorkers(3);
     //converts Storm Config to StormConfig of Flink.
     StormConfig stormConfig = new StormConfig(conf);
     //Construct FlinkTopology using TopologBuilder of Storm.
     FlinkTopology topology = FlinkTopology.createTopology(builder);
     //Obtain the Stream execution environment.
     StreamExecutionEnvironment env = topology.getExecutionEnvironment();
     //Set StormConfig to the environment variable of Job to construct Bolt and Spout.
     //If StormConfig is not required during the initialization of Bolt and Spout, you do not need to set this parameter.
     env.getConfig().setGlobalJobParameters(stormConfig);
     //Submit the topology.
     topology.execute();
    1. After the package is repacked, run the following command to submit the package:

      flink run -class {MainClass} WordCount.jar