Updated on 2023-08-31 GMT+08:00

Java Sample Code

  1. The publisher customizes source operators to generate data.

    For the complete code, see com.huawei.bigdata.flink.examples.UserSource in the FlinkPipelineJavaExample project.

    package com.huawei.bigdata.flink.examples;
     
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
     
    import java.io.Serializable;
     
    public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable {
     
        private boolean isRunning = true;
     
        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
     
        }
    /**
        * Data generation function, which is used to generate 10000 pieces of data each second.
       */
        public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
     
            while(isRunning) {
                for (int i = 0; i < 10000; i++) {
                    ctx.collect(Tuple2.of(i, "hello-" + i));
                }
                Thread.sleep(1000);
            }
        }
     
        public void close() {
            isRunning = false;
        }
     
        public void cancel() {
            isRunning = false;
        }
    }
  1. Code for the publisher:

    For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySink in the FlinkPipelineJavaExample project.

    package com.huawei.bigdata.flink.examples;
     
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.netty.sink.NettySink;
    import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
     
    public class TestPipeline_NettySink {
     
        public static void main(String[] args) throws Exception{
     
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //Set the concurrency of job to 2.
            env.setBufferTimeout(2);
     
    //Create a ZookeeperRegisterServerHandler.
            ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
    // Add a customized source operator.
            env.addSource(new UserSource())
                    .keyBy(0)
                    .map(new MapFunction<Tuple2<Integer,String>, byte[]>() {
                        //Transform the to-be-sent data into a byte array.
    
    @Override
                        public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                            return integerStringTuple2.f1.getBytes();
                        }
                    }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//NettySink transmits the data.
     
            env.execute();
     
        }
    }
  1. Code for the first subscriber.

    For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 in the FlinkPipelineJavaExample project.

    package com.huawei.bigdata.flink.examples;
     
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.netty.source.NettySource;
    import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
     
    public class TestPipeline_NettySource1 {
     
        public static void main(String[] args) throws Exception{
     
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Set the concurrency of job to 2.
            
    env.setParallelism(2);
     
    // Create a ZookeeperRegisterServerHandler.
            ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
    //Add a NettySource operator to receive messages from the publisher.
            env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
                    .map(new MapFunction<byte[], String>() {
                      // Transform the received byte stream into character strings  
        @Override
                        public String map(byte[] b) {
                            return new String(b);
                        }
                    }).print();
     
            env.execute();
        }
    }
  1. Code for the second subscriber.

    For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 in the FlinkPipelineJavaExample project.

    package com.huawei.bigdata.flink.examples;
     
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.netty.source.NettySource;
    import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
     
    public class TestPipeline_NettySource2 {
     
        public static void main(String[] args) throws Exception {
     
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Set the concurrency of job to 2.       
     env.setParallelism(2);
     
    //Create a ZookeeperRegisterServerHandler.
            ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
    //Add a NettySource operator to receive messages from the publisher.
            env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
                    .map(new MapFunction<byte[], String>() {
              //Transform the received byte array into character strings.
                        @Override
                        public String map(byte[] b) {
                            return new String(b);
                        }
                    }).print();
     
            env.execute();
        }
    }