Updated on 2022-09-14 GMT+08:00

Java Sample Code

Following is the main logic code for demonstration.

For details about the complete code, see the following:

  • com.huawei.bigdata.flink.examples.UserSource.
  • com.huawei.bigdata.flink.examples.TestPipelineNettySink.
  • com.huawei.bigdata.flink.examples.TestPipelineNettySource1.
  • com.huawei.bigdata.flink.examples.TestPipelineNettySource2.
  1. The publisher customizes source operators to generate data.
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 
import java.io.Serializable;
 
public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable {
 
    private boolean isRunning = true;
 
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
 
    }
/**
    * Data generation function, which is used to generate 10000 pieces of data each second.
   */
    public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
 
        while(isRunning) {
            for (int i = 0; i < 10000; i++) {
                ctx.collect(Tuple2.of(i, "hello-" + i));
            }
            Thread.sleep(1000);
        }
    }
 
    public void close() {
        isRunning = false;
    }
 
    public void cancel() {
        isRunning = false;
    }
}
  1. Code for the publisher:
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.sink.NettySink;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
public class TestPipelineNettySink {
 
    public static void main(String[] args) throws Exception{
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Set the concurrency of job to 2.
        env.setBufferTimeout(2);
 
//Create a ZookeeperRegisterServerHandler.
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
// Add a customized source operator.
        env.addSource(new UserSource())
                .keyBy(0)
                .map(new MapFunction<Tuple2<Integer,String>, byte[]>() {
                    //Transform the to-be-sent data into a byte array.

@Override
                    public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception {
                        return integerStringTuple2.f1.getBytes();
                    }
                }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//NettySink transmits the data.
 
        env.execute();
 
    }
}
  1. Code for the first subscriber.
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;

import java.nio.charset.Charset;
 
public class TestPipelineNettySource1 {
 
    public static void main(String[] args) throws Exception{
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the concurrency of job to 2.
        
env.setParallelism(2);
 
// Create a ZookeeperRegisterServerHandler.
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//Add a NettySource operator to receive messages from the publisher.
        env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
                .map(new MapFunction<byte[], String>() {
                  // Transform the received byte stream into character strings  
    @Override
                    public String map(byte[] bytes) {
                        return new String(bytes, Charset.forName("UTF-8"));
                    }
                }).print();
 
        env.execute();
    }
}
  1. Code for the second subscriber.
package com.huawei.bigdata.flink.examples;
 
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
 
import java.nio.charset.Charset;

public class TestPipelineNettySource2 {
 
    public static void main(String[] args) throws Exception {
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the concurrency of job to 2.       
 env.setParallelism(2);
 
//Create a ZookeeperRegisterServerHandler.
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//Add a NettySource operator to receive messages from the publisher.
        env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
                .map(new MapFunction<byte[], String>() {
          //Transform the received byte array into character strings.
                    @Override
                    public String map(byte[] bytes) {
                        return new String(bytes, Charset.forName("UTF-8"));
                    }
                }).print();
 
        env.execute();
    }
}