Updated on 2023-08-31 GMT+08:00

Scala Sample Code

Function Description

In a Flink application, call API of the flink-connector-kafka module to produce and consume data.

Sample Code

If you want to use FusionInsight in security mode, ensure that the kafka-clients-*.jar is obtained from the FusionInsight client directory.

Following is the main logic code of Kafka Consumer and Kafka Producer.

For the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.ReadFromKafka in the FlinkKafkaScalaExample project.

//Code of producer 
object WriteIntoKafka {

     def main(args: Array[String]) {
     //Print the reference command of flink run.
       System.out.println("use command as: ")

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

         " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092")

       System.out.println
("******************************************************************************************")

       System.out.println("<topic> is the kafka topic name")

       System.out.println("<bootstrap.servers> is the ip:port list of brokers")

       System.out.println
("******************************************************************************************")
       //Build the execution environment.
       val env = StreamExecutionEnvironment.getExecutionEnvironment
       //Configure the parallelism.
       env.setParallelism(1)
       //Parse the execution parameter.
       val paraTool = ParameterTool.fromArgs(args)
       //Build the StreamGraph and wirte data generated by customized source into Kafka
       val messageStream: DataStream[String] = env.addSource(new SimpleStringGenerator)

       messageStream.addSink(new FlinkKafkaProducer(

         paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties))
       //Call execute to trigger the execution.
       env.execute

     }

   }


   //Customize source to continuously generate messages every one second.
   class SimpleStringGenerator extends SourceFunction[String] {

     var running = true

     var i = 0



     override def run(ctx: SourceContext[String]) {

       while (running) {

         ctx.collect("element-" + i)

         i += 1

         Thread.sleep(1000)

       }

     }



     override def cancel() {

       running = false

     }

   } 

//consumer code
object ReadFromKafka {

     def main(args: Array[String]) {
     //Print the reference command of flink run.
       System.out.println("use command as: ")

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +

         " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092")

       System.out.println
("******************************************************************************************")

       System.out.println("<topic> is the kafka topic name")

       System.out.println("<bootstrap.servers> is the ip:port list of brokers")

       System.out.println
("******************************************************************************************")


      //Build the execution environment
       val env = StreamExecutionEnvironment.getExecutionEnvironment
      //Configure the parallelism.
       env.setParallelism(1)
      //Parse the execution parameter.
       val paraTool = ParameterTool.fromArgs(args)
      //Build the StreamGraph, read data from Kafka and print the result in another row.
       val messageStream = env.addSource(new FlinkKafkaConsumer(

         paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties))

       messageStream

         .map(s => "Flink says " + s + System.getProperty("line.separator")).print()
      //Call execute to trigger the execution
       env.execute()

     }

   }