Completely Migrating Storm Services
Scenarios
This section describes how to convert and run a complete Storm topology developed using Storm API.
Procedure
- Open the Storm service project, modify the POM file of the project, and add the reference of flink-storm_2.11, flink-core, and flink-streaming-java_2.11. The following figure shows an example.
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm_2.11</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
If the project is not a non-Maven project, manually collect the preceding JAR packages and add them to the classpath environment variable of the project.
- Modify the code for submission of the topology. The following uses WordCount as an example:
- Keep the structure of the Storm topology unchanged, including the Spout and Bolt developed using Storm API.
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
- Modify the code for submission of the topology. An example is described as follows:
Config conf = new Config(); conf.setNumWorkers(3); StormSubmitter.submitTopology("word-count", conf, builder.createTopology());
Perform the following operations:
Config conf = new Config(); conf.setNumWorkers(3); //converts Storm Config to StormConfig of Flink. StormConfig stormConfig = new StormConfig(conf); //Construct FlinkTopology using TopologBuilder of Storm. FlinkTopology topology = FlinkTopology.createTopology(builder); //Obtain the Stream execution environment. StreamExecutionEnvironment env = topology.getExecutionEnvironment(); //Set StormConfig to the environment variable of Job to construct Bolt and Spout. //If StormConfig is not required during the initialization of Bolt and Spout, you do not need to set this parameter. env.getConfig().setGlobalJobParameters(stormConfig); //Submit the topology. topology.execute();
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.