Updated on 2022-09-14 GMT+08:00

Java Sample Code

Function Description

In a Flink application, call API of the flink-connector-kafka module to produce and consume data.

Sample Code

If you want to use FusionInsight in security mode, ensure that the kafka-clients-*.jar is obtained from the FusionInsight client directory.

Following is the main logic code of Kafka Consumer and Kafka Producer.

For the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.ReadFromKafka.

//producer code
public class WriteIntoKafka {

     public static void main(String[] args) throws Exception {
     //Print the reference command of flink run.

       System.out.println("use command as: ");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

       System.out.println
("******************************************************************************************");

       System.out.println("<topic> is the kafka topic name");

       System.out.println("<bootstrap.servers> is the ip:port list of brokers");

       System.out.println
("******************************************************************************************");
       
       //Build the execution environment.
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       //Configure the parallelism.
       env.setParallelism(1);
       //Parse the execution parameter.
       ParameterTool paraTool = ParameterTool.fromArgs(args);
       //Build the StreamGraph and write data generated by customized source into Kafka.
       DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());

       messageStream.addSink(new FlinkKafkaProducer<>(paraTool.get("topic"),

           new SimpleStringSchema(),

           paraTool.getProperties()));
         //Call execute to trigger the execution.
       env.execute();

     }


    //Customize source to continuously generate messages every one second.
     public static class SimpleStringGenerator implements SourceFunction<String> {

       private static final long serialVersionUID = 2174904787118597072L;

       boolean running = true;

       long i = 0;



       @Override

       public void run(SourceContext<String> ctx) throws Exception {

         while (running) {

           ctx.collect("element-" + (i++));

           Thread.sleep(1000);

         }

       }



       @Override

       public void cancel() {

         running = false;

       }

     }

   } 

//consumer code
public class ReadFromKafka {

     public static void main(String[] args) throws Exception {
     //Print the reference command of flink run.
       System.out.println("use command as: ");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

       System.out.println
("******************************************************************************************");

       System.out.println("<topic> is the kafka topic name");

       System.out.println("<bootstrap.servers> is the ip:port list of brokers");

       System.out.println
("******************************************************************************************");
       //Build the execution environment.
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       //Configure the parallelism.
       env.setParallelism(1);
       //Parse the execution parameter.
       ParameterTool paraTool = ParameterTool.fromArgs(args);
       //Build the StreamGraph, read data from Kafka and print the result in another row.
       DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"),

           new SimpleStringSchema(),

           paraTool.getProperties()));

       messageStream.rebalance().map(new MapFunction<String, String>() {

         @Override

         public String map(String s) throws Exception {

           return "Flink says " + s + System.getProperty("line.separator");

         }

       }).print();
       //Call execute to trigger the execution.
       env.execute();

     }

   }