Help Center/
MapReduce Service/
Developer Guide (LTS)/
Flink Development Guide (Security Mode)/
Developing an Application/
Job Pipeline Program/
Java Sample Code
Updated on 2022-11-18 GMT+08:00
Java Sample Code
Following is the main logic code for demonstration.
For details about the complete code, see the following:
- com.huawei.bigdata.flink.examples.UserSource.
- com.huawei.bigdata.flink.examples.TestPipeline_NettySink.
- com.huawei.bigdata.flink.examples.TestPipeline_NettySource1.
- com.huawei.bigdata.flink.examples.TestPipeline_NettySource2.
- The publisher customizes source operators to generate data.
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.io.Serializable; public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable { private boolean isRunning = true; public void open(Configuration configuration) throws Exception { super.open(configuration); } /** * Data generation function, which is used to generate 10000 pieces of data each second. */ public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { while(isRunning) { for (int i = 0; i < 10000; i++) { ctx.collect(Tuple2.of(i, "hello-" + i)); } Thread.sleep(1000); } } public void close() { isRunning = false; } public void cancel() { isRunning = false; } }
- Code for the publisher:
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.sink.NettySink; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySink { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Set the concurrency of job to 2. env.setBufferTimeout(2); //Create a ZookeeperRegisterServerHandler. ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); // Add a customized source operator. env.addSource(new UserSource()) .keyBy(0) .map(new MapFunction<Tuple2<Integer,String>, byte[]>() { //Transform the to-be-sent data into a byte array. @Override public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception { return integerStringTuple2.f1.getBytes(); } }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//NettySink transmits the data. env.execute(); } }
- Code for the first subscriber.
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.source.NettySource; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySource1 { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set the concurrency of job to 2. env.setParallelism(2); // Create a ZookeeperRegisterServerHandler. ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); //Add a NettySource operator to receive messages from the publisher. env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler)) .map(new MapFunction<byte[], String>() { // Transform the received byte stream into character strings @Override public String map(byte[] b) { return new String(b); } }).print(); env.execute(); } }
- Code for the second subscriber.
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.source.NettySource; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySource2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set the concurrency of job to 2. env.setParallelism(2); //Create a ZookeeperRegisterServerHandler. ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); //Add a NettySource operator to receive messages from the publisher. env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler)) .map(new MapFunction<byte[], String>() { //Transform the received byte array into character strings. @Override public String map(byte[] b) { return new String(b); } }).print(); env.execute(); } }
Parent topic: Job Pipeline Program
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.
The system is busy. Please try again later.