Java Sample Code
- The publisher customizes source operators to generate data.
For the complete code, see com.huawei.bigdata.flink.examples.UserSource in the FlinkPipelineJavaExample project.
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.io.Serializable; public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable { private boolean isRunning = true; public void open(Configuration configuration) throws Exception { super.open(configuration); } /** * Data generation function, which is used to generate 10000 pieces of data each second. */ public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception { while(isRunning) { for (int i = 0; i < 10000; i++) { ctx.collect(Tuple2.of(i, "hello-" + i)); } Thread.sleep(1000); } } public void close() { isRunning = false; } public void cancel() { isRunning = false; } }
- Code for the publisher:
For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySink in the FlinkPipelineJavaExample project.
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.sink.NettySink; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySink { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Set the concurrency of job to 2. env.setBufferTimeout(2); //Create a ZookeeperRegisterServerHandler. ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); // Add a customized source operator. env.addSource(new UserSource()) .keyBy(0) .map(new MapFunction<Tuple2<Integer,String>, byte[]>() { //Transform the to-be-sent data into a byte array. @Override public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception { return integerStringTuple2.f1.getBytes(); } }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//NettySink transmits the data. env.execute(); } }
- Code for the first subscriber.
For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 in the FlinkPipelineJavaExample project.
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.source.NettySource; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySource1 { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set the concurrency of job to 2. env.setParallelism(2); // Create a ZookeeperRegisterServerHandler. ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); //Add a NettySource operator to receive messages from the publisher. env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler)) .map(new MapFunction<byte[], String>() { // Transform the received byte stream into character strings @Override public String map(byte[] b) { return new String(b); } }).print(); env.execute(); } }
- Code for the second subscriber.
For the complete code, see com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 in the FlinkPipelineJavaExample project.
package com.huawei.bigdata.flink.examples; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.netty.source.NettySource; import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler; public class TestPipeline_NettySource2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set the concurrency of job to 2. env.setParallelism(2); //Create a ZookeeperRegisterServerHandler. ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); //Add a NettySource operator to receive messages from the publisher. env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler)) .map(new MapFunction<byte[], String>() { //Transform the received byte array into character strings. @Override public String map(byte[] b) { return new String(b); } }).print(); env.execute(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot