Java Sample Code
Function Description
In a Flink application, call API of the flink-connector-kafka module to produce and consume data.
Sample Code
Following is the main logic code of Kafka Consumer and Kafka Producer.
For the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.ReadFromKafka.
//producer code
public class WriteIntoKafka {
public static void main(String[] args) throws Exception {
//Print the reference command of flink run.
System.out.println("use command as: ");
System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
" /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:9092");
System.out.println("******************************************************************************************");
System.out.println("<topic> is the kafka topic name");
System.out.println("<bootstrap.servers> is the ip:port list of brokers");
System.out.println("******************************************************************************************");
//Build the execution environment.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Configure the parallelism.
env.setParallelism(1);
//Parse the execution parameter.
ParameterTool paraTool = ParameterTool.fromArgs(args);
//Build the StreamGraph and write data generated by customized source into Kafka.
DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
messageStream.addSink(new FlinkKafkaProducer<>(paraTool.get("topic"),
new SimpleStringSchema(),
paraTool.getProperties()));
//Call execute to trigger the execution.
env.execute();
}
//Customize source to continuously generate messages every one second.
public static class SimpleStringGenerator implements SourceFunction<String> {
private static final long serialVersionUID = 2174904787118597072L;
boolean running = true;
long i = 0;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (running) {
ctx.collect("element-" + (i++));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
}
//consumer code
public class ReadFromKafka {
public static void main(String[] args) throws Exception {
//Print the reference command of flink run.
System.out.println("use command as: ");
System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +
" /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092");
System.out.println("******************************************************************************************");
System.out.println("<topic> is the kafka topic name");
System.out.println("<bootstrap.servers> is the ip:port list of brokers");
System.out.println("******************************************************************************************");
//Build the execution environment.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Configure the parallelism.
env.setParallelism(1);
//Parse the execution parameter.
ParameterTool paraTool = ParameterTool.fromArgs(args);
//Build the StreamGraph, read data from Kafka and print the result in another row.
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"),
new SimpleStringSchema(),
paraTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return "Flink says " + s + System.getProperty("line.separator");
}
}).print();
//Call execute to trigger the execution.
env.execute();
}
}
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.