更新时间:2022-02-22 GMT+08:00

嵌入式迁移Storm业务

操作场景

该任务指导用户通过嵌入式迁移的方式在Flink的DataStream中嵌入Storm的代码,如使用Storm API编写的Spout/Bolt。

操作步骤

  1. 在Flink中,对Storm拓扑中的Spout和Bolt进行嵌入式转换,将之转换为Flink的Operator,代码示例如下:

    //set up the execution environment 
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
    
    //get input data 
    final DataStream<String> text = getTextDataStream(env);  
    
    final DataStream<Tuple2<String, Integer>> counts = text 
    
      //split up the lines in pairs (2-tuples) containing: (word,1)            
      //this is done by a bolt that is wrapped accordingly            
      .transform("CountBolt",                     
        TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),                     
        new BoltWrapper<String, Tuple2<String, Integer>>(new CountBolt()))           
      //group by the tuple field "0" and sum up tuple field "1"            
      .keyBy(0).sum(1);  
    // execute program                 
    env.execute("Streaming WordCount with bolt tokenizer");

  2. 修改完成后使用Flink命令进行提交。

    flink run -class {MainClass} WordCount.jar