Updated on 2023-08-31 GMT+08:00

Java Sample Code

Function Description

In a Flink application, call API of the flink-connector-kafka module to produce and consume data.

Sample Code

If you want to use FusionInsight in security mode, ensure that the kafka-clients-*.jar is obtained from the FusionInsight client directory.

Following is the main logic code of Kafka Consumer and Kafka Producer.

For the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.ReadFromKafka in the FlinkKafkaJavaExample project.

//producer code
public class WriteIntoKafka {

     public static void main(String[] args) throws Exception {
     //Print the reference command of flink run.

       System.out.println("use command as: ");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

       System.out.println
("******************************************************************************************");

       System.out.println("<topic> is the kafka topic name");

       System.out.println("<bootstrap.servers> is the ip:port list of brokers");

       System.out.println
("******************************************************************************************");
       
       //Build the execution environment.
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       //Configure the parallelism.
       env.setParallelism(1);
       //Parse the execution parameter.
       ParameterTool paraTool = ParameterTool.fromArgs(args);
       //Build the StreamGraph and write data generated by customized source into Kafka.
       DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());

       messageStream.addSink(new FlinkKafkaProducer<>(paraTool.get("topic"),

           new SimpleStringSchema(),

           paraTool.getProperties()));
         //Call execute to trigger the execution.
       env.execute();

     }


    //Customize source to continuously generate messages every one second.
     public static class SimpleStringGenerator implements SourceFunction<String> {

       private static final long serialVersionUID = 2174904787118597072L;

       boolean running = true;

       long i = 0;



       @Override

       public void run(SourceContext<String> ctx) throws Exception {

         while (running) {

           ctx.collect("element-" + (i++));

           Thread.sleep(1000);

         }

       }



       @Override

       public void cancel() {

         running = false;

       }

     }

   } 

//consumer code
public class ReadFromKafka {

     public static void main(String[] args) throws Exception {
     //Print the reference command of flink run.
       System.out.println("use command as: ");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");

       System.out.println
("******************************************************************************************");

       System.out.println("<topic> is the kafka topic name");

       System.out.println("<bootstrap.servers> is the ip:port list of brokers");

       System.out.println
("******************************************************************************************");
       //Build the execution environment.
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       //Configure the parallelism.
       env.setParallelism(1);
       //Parse the execution parameter.
       ParameterTool paraTool = ParameterTool.fromArgs(args);
       //Build the StreamGraph, read data from Kafka and print the result in another row.
       DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"),

           new SimpleStringSchema(),

           paraTool.getProperties()));

       messageStream.rebalance().map(new MapFunction<String, String>() {

         @Override

         public String map(String s) throws Exception {

           return "Flink says " + s + System.getProperty("line.separator");

         }

       }).print();
       //Call execute to trigger the execution.
       env.execute();

     }

   }