更新时间:2024-08-05 GMT+08:00

Flink Job Pipeline样例程序(Java)

  1. 发布Job自定义Source算子产生数据

    下面代码片段仅为演示,完整代码参见FlinkPipelineJavaExample样例工程下的com.huawei.bigdata.flink.examples.UserSource:

    package com.huawei.bigdata.flink.examples;
     
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
     
    import java.io.Serializable;
     
    public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable {
     
        private boolean isRunning = true;
     
        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
     
        }
    /**
        * 数据产生函数,每秒钟产生10000条数据
       */
        public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
     
            while(isRunning) {
                for (int i = 0; i < 10000; i++) {
                    ctx.collect(Tuple2.of(i, "hello-" + i));
                }
                Thread.sleep(1000);
            }
        }
     
        public void close() {
            isRunning = false;
        }
     
        public void cancel() {
            isRunning = false;
        }
    }
  1. 发布者代码

    下面代码片段仅为演示,完整代码参见FlinkPipelineJavaExample样例工程下的com.huawei.bigdata.flink.examples.TestPipeline_NettySink:

    package com.huawei.bigdata.flink.examples;
     
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.netty.sink.NettySink;
    import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
     
    public class TestPipeline_NettySink {
     
        public static void main(String[] args) throws Exception{
     
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //设置job的并发度为2
            env.setBufferTimeout(2);
     
    // 创建Zookeeper的注册服务器handler
            ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
    // 添加自定义Source算子
            env.addSource(new UserSource())
                    .keyBy(0)
                    .map(new MapFunction<Tuple2<Integer,String>, byte[]>() {
                        //将发送信息转化成字节数组
    @Override
                        public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                            return integerStringTuple2.f1.getBytes();
                        }
                    }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//通过NettySink发送出去。
     
            env.execute();
     
        }
    }
  1. 第一个订阅者

    下面代码片段仅为演示,完整代码参见FlinkPipelineJavaExample样例工程下的com.huawei.bigdata.flink.examples.TestPipeline_NettySource1:

    package com.huawei.bigdata.flink.examples;
     
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.netty.source.NettySource;
    import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
     
    public class TestPipeline_NettySource1 {
     
        public static void main(String[] args) throws Exception{
     
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置job的并发度为2        
    env.setParallelism(2);
     
    // 创建Zookeeper的注册服务器句柄
            ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
    //添加NettySource算子,接收来自发布者的消息
            env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
                    .map(new MapFunction<byte[], String>() {
                      // 将接收到的字节流转化成字符串  
        @Override
                        public String map(byte[] b) {
                            return new String(b);
                        }
                    }).print();
     
            env.execute();
        }
    }
  1. 第二个订阅者

    下面代码片段仅为演示,完整代码参见FlinkPipelineJavaExample样例工程下的com.huawei.bigdata.flink.examples.TestPipeline_NettySource2:

    package com.huawei.bigdata.flink.examples;
     
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.netty.source.NettySource;
    import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
     
    public class TestPipeline_NettySource2 {
     
        public static void main(String[] args) throws Exception {
     
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置作业的并发度为2       
     env.setParallelism(2);
     
    //创建Zookeeper的注册服务器句柄
            ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
    //添加NettySource算子,接收来自发布者的数据
            env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
                    .map(new MapFunction<byte[], String>() {
              //将接收到的字节数组转化成字符串
                        @Override
                        public String map(byte[] b) {
                            return new String(b);
                        }
                    }).print();
     
            env.execute();
        }
    }