Scala Sample Code
Function Description
In a Flink application, call API of the flink-connector-kafka module to produce and consume data.
Sample Code
If you want to use FusionInsight in security mode, ensure that the kafka-clients-*.jar is obtained from the FusionInsight client directory.
Following is the main logic code of Kafka Consumer and Kafka Producer.
For the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.ReadFromKafka in the FlinkKafkaScalaExample project.
//Code of producer object WriteIntoKafka { def main(args: Array[String]) { //Print the reference command of flink run. System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092") System.out.println ("******************************************************************************************") System.out.println("<topic> is the kafka topic name") System.out.println("<bootstrap.servers> is the ip:port list of brokers") System.out.println ("******************************************************************************************") //Build the execution environment. val env = StreamExecutionEnvironment.getExecutionEnvironment //Configure the parallelism. env.setParallelism(1) //Parse the execution parameter. val paraTool = ParameterTool.fromArgs(args) //Build the StreamGraph and wirte data generated by customized source into Kafka val messageStream: DataStream[String] = env.addSource(new SimpleStringGenerator) messageStream.addSink(new FlinkKafkaProducer( paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)) //Call execute to trigger the execution. env.execute } } //Customize source to continuously generate messages every one second. class SimpleStringGenerator extends SourceFunction[String] { var running = true var i = 0 override def run(ctx: SourceContext[String]) { while (running) { ctx.collect("element-" + i) i += 1 Thread.sleep(1000) } } override def cancel() { running = false } } //consumer code object ReadFromKafka { def main(args: Array[String]) { //Print the reference command of flink run. System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092") System.out.println ("******************************************************************************************") System.out.println("<topic> is the kafka topic name") System.out.println("<bootstrap.servers> is the ip:port list of brokers") System.out.println ("******************************************************************************************") //Build the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment //Configure the parallelism. env.setParallelism(1) //Parse the execution parameter. val paraTool = ParameterTool.fromArgs(args) //Build the StreamGraph, read data from Kafka and print the result in another row. val messageStream = env.addSource(new FlinkKafkaConsumer( paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)) messageStream .map(s => "Flink says " + s + System.getProperty("line.separator")).print() //Call execute to trigger the execution env.execute() } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot