更新时间:2022-02-22 GMT+08:00

完整迁移Storm业务

操作场景

该任务指导用户通过Storm业务完整迁移的方式转换并运行完整的由Storm API开发的Storm拓扑。

操作步骤

  1. 打开Storm业务工程,修改工程的pom文件,增加“flink-storm”“flink-core”“flink-streaming-java_2.11”的引用。如下:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-storm_2.11</artifactId>
        <version>1.4.0</version>
        <exclusions>
            <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>1.4.0</version>
        <exclusions>
            <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.4.0</version>
        <exclusions>
            <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    如果是非maven工程,则手动收集如上jar包,添加到工程的classpath中。

  2. 修改拓扑提交部分代码,下面以WordCount为例:

    1. Storm拓扑的构造部分保持不变,无需修改,包括使用Storm API开发的Spout和Bolt都无需修改。
    TopologyBuilder builder = new TopologyBuilder(); 
    builder.setSpout("spout", new RandomSentenceSpout(), 5); 
    builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); 
    builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
    1. 拓扑的提交部分需要修改,Storm的提交示例如下:
    Config conf = new Config(); 
    conf.setNumWorkers(3);  
    
    StormSubmitter.submitTopology("word-count", conf, builder.createTopology());

    需要进行如下修改:

    Config conf = new Config();
     conf.setNumWorkers(3);
    
     //将Storm的Config转化为Flink的StormConfig
     StormConfig stormConfig = new StormConfig(conf);
    
     //使用Storm的TopologBuilder构造FlinkTopology
     FlinkTopology topology = FlinkTopology.createTopology(builder);
    
     //获取StreamExecutionEnvironment
     StreamExecutionEnvironment env = topology.getExecutionEnvironment();
    
     //将StormConfig设置到Job的环境变量中,用于构造Bolt和Spout
     //如果Bolt和Spout初始化时不需要config,则不用设置
     env.getConfig().setGlobalJobParameters(stormConfig);
     //执行拓扑提交
     topology.execute();
    1. 重新打包之后使用flink命令行进行提交:

      flink run -class {MainClass} WordCount.jar