更新时间:2022-09-30 GMT+08:00
嵌入式迁移Storm业务
操作场景
该任务指导用户通过嵌入式迁移的方式在Flink的DataStream中嵌入Storm的代码,如使用Storm API编写的Spout/Bolt。
操作步骤
- 在Flink中,对Storm拓扑中的Spout和Bolt进行嵌入式转换,将之转换为Flink的Operator,代码示例如下:
//set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //get input data final DataStream<String> text = getTextDataStream(env); final DataStream<Tuple2<String, Integer>> counts = text //split up the lines in pairs (2-tuples) containing: (word,1) //this is done by a bolt that is wrapped accordingly .transform("CountBolt", TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), new BoltWrapper<String, Tuple2<String, Integer>>(new CountBolt())) //group by the tuple field "0" and sum up tuple field "1" .keyBy(0).sum(1); // execute program env.execute("Streaming WordCount with bolt tokenizer");
- 修改完成后使用Flink命令进行提交。
flink run -class {MainClass} WordCount.jar
父主题: 迁移Storm业务至Flink