更新时间:2022-09-30 GMT+08:00
完整迁移Storm业务
操作场景
该任务指导用户通过Storm业务完整迁移的方式转换并运行完整的由Storm API开发的Storm拓扑。
操作步骤
- 打开Storm业务工程,修改工程的pom文件,增加“flink-storm” 、“flink-core”和“flink-streaming-java_2.11”的引用。如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm_2.11</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
如果是非maven工程,则手动收集如上jar包,添加到工程的classpath中。
- 修改拓扑提交部分代码,下面以WordCount为例:
- Storm拓扑的构造部分保持不变,无需修改,包括使用Storm API开发的Spout和Bolt都无需修改。
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
- 拓扑的提交部分需要修改,Storm的提交示例如下:
Config conf = new Config(); conf.setNumWorkers(3); StormSubmitter.submitTopology("word-count", conf, builder.createTopology());
需要进行如下修改:
Config conf = new Config(); conf.setNumWorkers(3); //将Storm的Config转化为Flink的StormConfig StormConfig stormConfig = new StormConfig(conf); //使用Storm的TopologBuilder构造FlinkTopology FlinkTopology topology = FlinkTopology.createTopology(builder); //获取StreamExecutionEnvironment StreamExecutionEnvironment env = topology.getExecutionEnvironment(); //将StormConfig设置到Job的环境变量中,用于构造Bolt和Spout //如果Bolt和Spout初始化时不需要config,则不用设置 env.getConfig().setGlobalJobParameters(stormConfig); //执行拓扑提交 topology.execute();
父主题: 迁移Storm业务至Flink