Completely Migrating Storm Services
Scenarios
This section describes how to convert and run a complete Storm topology developed using Storm API.
Procedure
- Open the Storm service project, modify the POM file of the project, and add the reference of flink-storm_2.11, flink-core, and flink-streaming-java_2.11. The following figure shows an example.
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm_2.11</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
If the project is not a non-Maven project, manually collect the preceding JAR packages and add them to the classpath environment variable of the project.
- Modify the code for submission of the topology. The following uses WordCount as an example:
- Keep the structure of the Storm topology unchanged, including the Spout and Bolt developed using Storm API.
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
- Modify the code for submission of the topology. An example is described as follows:
Config conf = new Config(); conf.setNumWorkers(3); StormSubmitter.submitTopology("word-count", conf, builder.createTopology());
Perform the following operations:
Config conf = new Config(); conf.setNumWorkers(3); //converts Storm Config to StormConfig of Flink. StormConfig stormConfig = new StormConfig(conf); //Construct FlinkTopology using TopologBuilder of Storm. FlinkTopology topology = FlinkTopology.createTopology(builder); //Obtain the Stream execution environment. StreamExecutionEnvironment env = topology.getExecutionEnvironment(); //Set StormConfig to the environment variable of Job to construct Bolt and Spout. //If StormConfig is not required during the initialization of Bolt and Spout, you do not need to set this parameter. env.getConfig().setGlobalJobParameters(stormConfig); //Submit the topology. topology.execute();
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot