Updated on 2022-08-16 GMT+08:00

Scala Sample Code

Function Description

In a Flink application, call API of the flink-connector-kafka module to produce and consume data.

Sample Code

Following is the main logic code of Kafka Consumer and Kafka Producer.

For the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.ReadFromKafka.

//Code of producer 
object WriteIntoKafka { 
  def main(args: Array[String]) { 
    //Print the reference command of flink run.
    System.out.println("use command as: ") 
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + 
      " /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:9092") 
    System.out.println("******************************************************************************************") 
    System.out.println("<topic> is the kafka topic name") 
    System.out.println("<bootstrap.servers> is the ip:port list of brokers") 
    System.out.println("******************************************************************************************") 
 
    //Build the execution environment.
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    //Configure the parallelism.
    env.setParallelism(1) 
    //Parse the execution parameter. 
    val paraTool = ParameterTool.fromArgs(args) 
    //Build the StreamGraph and wirte data generated by customized source into Kafka.
    val messageStream: DataStream[String] = env.addSource(new SimpleStringGenerator) 
    messageStream.addSink(new FlinkKafkaProducer( 
      paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)) 
    //Call execute to trigger the execution.
    env.execute 
  } 
} 
 
//Customize source to continuously generate messages every one second.
class SimpleStringGenerator extends SourceFunction[String] { 
  var running = true 
  var i = 0 
 
  override def run(ctx: SourceContext[String]) { 
    while (running) { 
      ctx.collect("element-" + i) 
      i += 1 
      Thread.sleep(1000) 
    } 
  } 
 
  override def cancel() { 
    running = false 
  } 
} 
 
//consumer code
object ReadFromKafka { 
  def main(args: Array[String]) { 
    //Print the reference command of flink run. 
    System.out.println("use command as: ") 
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" + 
      " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092") 
    System.out.println("******************************************************************************************") 
    System.out.println("<topic> is the kafka topic name") 
    System.out.println("<bootstrap.servers> is the ip:port list of brokers") 
    System.out.println("******************************************************************************************") 
 
    //Build the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    //Configure the parallelism. 
    env.setParallelism(1) 
    //Parse the execution parameter. 
    val paraTool = ParameterTool.fromArgs(args) 
    //Build the StreamGraph, read data from Kafka and print the result in another row. 
    val messageStream = env.addSource(new FlinkKafkaConsumer( 
      paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)) 
    messageStream 
      .map(s => "Flink says " + s + System.getProperty("line.separator")).print() 
    //Call execute to trigger the execution
    env.execute() 
  } 
}