Flink Job Pipeline样例程序(Java)
- 发布Job自定义Source算子产生数据
下面代码片段仅为演示,完整代码参见FlinkPipelineJavaExample样例工程下的com.huawei.bigdata.flink.examples.UserSource:
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.io.Serializable; public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable { private boolean isRunning = true; public void open(Configuration configuration) throws Exception { super.open(configuration); } /** * 数据产生函数,每秒钟产生10000条数据 */ public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { while(isRunning) { for (int i = 0; i < 10000; i++) { ctx.collect(Tuple2.of(i, "hello-" + i)); } Thread.sleep(1000); } } public void close() { isRunning = false; } public void cancel() { isRunning = false; } }
- 发布者代码
下面代码片段仅为演示,完整代码参见FlinkPipelineJavaExample样例工程下的com.huawei.bigdata.flink.examples.TestPipeline_NettySink:
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.sink.NettySink; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySink { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置job的并发度为2 env.setBufferTimeout(2); // 创建Zookeeper的注册服务器handler ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); // 添加自定义Source算子 env.addSource(new UserSource()) .keyBy(0) .map(new MapFunction<Tuple2<Integer,String>, byte[]>() { //将发送信息转化成字节数组 @Override public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception { return integerStringTuple2.f1.getBytes(); } }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//通过NettySink发送出去。 env.execute(); } }
- 第一个订阅者
下面代码片段仅为演示,完整代码参见FlinkPipelineJavaExample样例工程下的com.huawei.bigdata.flink.examples.TestPipeline_NettySource1:
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.source.NettySource; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySource1 { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置job的并发度为2 env.setParallelism(2); // 创建Zookeeper的注册服务器句柄 ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); //添加NettySource算子,接收来自发布者的消息 env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler)) .map(new MapFunction<byte[], String>() { // 将接收到的字节流转化成字符串 @Override public String map(byte[] b) { return new String(b); } }).print(); env.execute(); } }
- 第二个订阅者
下面代码片段仅为演示,完整代码参见FlinkPipelineJavaExample样例工程下的com.huawei.bigdata.flink.examples.TestPipeline_NettySource2:
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.source.NettySource; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySource2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置作业的并发度为2 env.setParallelism(2); //创建Zookeeper的注册服务器句柄 ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); //添加NettySource算子,接收来自发布者的数据 env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler)) .map(new MapFunction<byte[], String>() { //将接收到的字节数组转化成字符串 @Override public String map(byte[] b) { return new String(b); } }).print(); env.execute(); } }