Java Sample Code
Function Description
In a Flink application, call API of the flink-connector-kafka module to produce and consume data.
Sample Code
If you want to use FusionInsight in security mode, ensure that the kafka-clients-*.jar is obtained from the FusionInsight client directory.
Following is the main logic code of Kafka Consumer and Kafka Producer.
For the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.ReadFromKafka in the FlinkKafkaJavaExample project.
//producer code public class WriteIntoKafka { public static void main(String[] args) throws Exception { //Print the reference command of flink run. System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092"); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"); System.out.println ("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println ("******************************************************************************************"); //Build the execution environment. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Configure the parallelism. env.setParallelism(1); //Parse the execution parameter. ParameterTool paraTool = ParameterTool.fromArgs(args); //Build the StreamGraph and write data generated by customized source into Kafka. DataStream<String> messageStream = env.addSource(new SimpleStringGenerator()); messageStream.addSink(new FlinkKafkaProducer<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties())); //Call execute to trigger the execution. env.execute(); } //Customize source to continuously generate messages every one second. public static class SimpleStringGenerator implements SourceFunction<String> { private static final long serialVersionUID = 2174904787118597072L; boolean running = true; long i = 0; @Override public void run(SourceContext<String> ctx) throws Exception { while (running) { ctx.collect("element-" + (i++)); Thread.sleep(1000); } } @Override public void cancel() { running = false; } } } //consumer code public class ReadFromKafka { public static void main(String[] args) throws Exception { //Print the reference command of flink run. System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092"); System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"); System.out.println ("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println ("******************************************************************************************"); //Build the execution environment. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //Configure the parallelism. env.setParallelism(1); //Parse the execution parameter. ParameterTool paraTool = ParameterTool.fromArgs(args); //Build the StreamGraph, read data from Kafka and print the result in another row. DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties())); messageStream.rebalance().map(new MapFunction<String, String>() { @Override public String map(String s) throws Exception { return "Flink says " + s + System.getProperty("line.separator"); } }).print(); //Call execute to trigger the execution. env.execute(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot