Java Sample Code
Following is the main logic code for demonstration.
For details about the complete code, see the following:
- com.huawei.bigdata.flink.examples.UserSource.
- com.huawei.bigdata.flink.examples.TestPipeline_NettySink.
- com.huawei.bigdata.flink.examples.TestPipeline_NettySource1.
- com.huawei.bigdata.flink.examples.TestPipeline_NettySource2.
- The publisher customizes source operators to generate data.
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.io.Serializable;
public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable {
private boolean isRunning = true;
public void open(Configuration configuration) throws Exception {
super.open(configuration);
}
/**
* Data generation function, which is used to generate 10000 pieces of data each second.
*/
public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {
while(isRunning) {
for (int i = 0; i < 10000; i++) {
ctx.collect(Tuple2.of(i, "hello-" + i));
}
Thread.sleep(1000);
}
}
public void close() {
isRunning = false;
}
public void cancel() {
isRunning = false;
}
} - Code for the publisher:
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.sink.NettySink;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
public class TestPipeline_NettySink {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Set the concurrency of job to 2.
env.setBufferTimeout(2);
//Create a ZookeeperRegisterServerHandler.
ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
// Add a customized source operator.
env.addSource(new UserSource())
.keyBy(0)
.map(new MapFunction<Tuple2<Integer,String>, byte[]>() {
//Transform the to-be-sent data into a byte array.
@Override
public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception {
return integerStringTuple2.f1.getBytes();
}
}).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//NettySink transmits the data.
env.execute();
}
} - Code for the first subscriber.
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
public class TestPipeline_NettySource1 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the concurrency of job to 2.
env.setParallelism(2);
// Create a ZookeeperRegisterServerHandler.
ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//Add a NettySource operator to receive messages from the publisher.
env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
.map(new MapFunction<byte[], String>() {
// Transform the received byte stream into character strings
@Override
public String map(byte[] b) {
return new String(b);
}
}).print();
env.execute();
}
} - Code for the second subscriber.
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
public class TestPipeline_NettySource2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the concurrency of job to 2.
env.setParallelism(2);
//Create a ZookeeperRegisterServerHandler.
ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//Add a NettySource operator to receive messages from the publisher.
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
.map(new MapFunction<byte[], String>() {
//Transform the received byte array into character strings.
@Override
public String map(byte[] b) {
return new String(b);
}
}).print();
env.execute();
}
} Last Article: Scenario
Next Article: Scala Sample Code
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.