Java Sample Code
- The publisher customizes source operators to generate data.
For the complete code, see com.huawei.bigdata.flink.examples.UserSource in the FlinkPipelineJavaExample project.
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.io.Serializable; public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable { private boolean isRunning = true; public void open(Configuration configuration) throws Exception { super.open(configuration); } /** * Data generation function, which is used to generate 10000 pieces of data each second. */ public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { while(isRunning) { for (int i = 0; i < 10000; i++) { ctx.collect(Tuple2.of(i, "hello-" + i)); } Thread.sleep(1000); } } public void close() { isRunning = false; } public void cancel() { isRunning = false; } }
- Code for the publisher:
For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySink in the FlinkPipelineJavaExample project.
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.sink.NettySink; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySink { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Set the concurrency of job to 2. env.setBufferTimeout(2); //Create a ZookeeperRegisterServerHandler. ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); // Add a customized source operator. env.addSource(new UserSource()) .keyBy(0) .map(new MapFunction<Tuple2<Integer,String>, byte[]>() { //Transform the to-be-sent data into a byte array. @Override public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception { return integerStringTuple2.f1.getBytes(); } }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//NettySink transmits the data. env.execute(); } }
- Code for the first subscriber.
For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 in the FlinkPipelineJavaExample project.
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.source.NettySource; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySource1 { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set the concurrency of job to 2. env.setParallelism(2); // Create a ZookeeperRegisterServerHandler. ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); //Add a NettySource operator to receive messages from the publisher. env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler)) .map(new MapFunction<byte[], String>() { // Transform the received byte stream into character strings @Override public String map(byte[] b) { return new String(b); } }).print(); env.execute(); } }
- Code for the second subscriber.
For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 in the FlinkPipelineJavaExample project.
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.source.NettySource; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySource2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set the concurrency of job to 2. env.setParallelism(2); //Create a ZookeeperRegisterServerHandler. ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); //Add a NettySource operator to receive messages from the publisher. env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler)) .map(new MapFunction<byte[], String>() { //Transform the received byte array into character strings. @Override public String map(byte[] b) { return new String(b); } }).print(); env.execute(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.