Scala Sample Code
Function Description
In a Flink application, call the API of the flink-connector-kafka module to produce and consume data.
If you need to interconnect with Kafka in security mode before application development, kafka-client-xx.x.x.jar of MRS is required. You can obtain the JAR file in the MRS client directory.
Sample Code
The following example shows the main logic code of Kafka Consumer and Kafka Producer.
For the complete codes, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.flink.example.kafka.ReadFromKafka.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
// Kafka Producer code
object WriteIntoKafkaScala {
def main(args: Array[String]) {
// Print the command reference for flink run.
System.out.println("use command as: ")
System.out.println("./bin/flink run --class com.huawei.flink.example.kafka.WriteIntoKafkaScala" +
" /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005")
System.out.println
("******************************************************************************************")
System.out.println("<topic> is the kafka topic name")
System.out.println("<bootstrap.servers> is the ip:port list of brokers")
System.out.println
("******************************************************************************************")
// Construct the execution environment.
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Set parallelism.
env.setParallelism(1)
// Parse the running parameters.
val paraTool = ParameterTool.fromArgs(args)
// Construct a StreamGraph and write the data generated from self-defined sources to Kafka.
val messageStream: DataStream[String] = env.addSource(new SimpleStringGeneratorScala)
messageStream.addSink(new FlinkKafkaProducer(paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties))
// Invoke execute to trigger the execution.
env.execute
}
}
// Customize the sources and generate a message every other second.
class SimpleStringGeneratorScala extends SourceFunction[String] {
var running = true
var i = 0
override def run(ctx: SourceContext[String]) {
while (running) {
ctx.collect("element-" + i)
i += 1
Thread.sleep(1000)
}
}
override def cancel() {
running = false
}
}
// Kafka Consumer code
object ReadFromKafkaScala {
def main(args: Array[String]) {
// Print the command reference for flink run.
System.out.println("use command as: ")
System.out.println("./bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafkaScala" +
" /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005")
System.out.println
("******************************************************************************************")
System.out.println("<topic> is the kafka topic name")
System.out.println("<bootstrap.servers> is the ip:port list of brokers")
System.out.println
("******************************************************************************************")
// Construct the execution environment.
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Set parallelism.
env.setParallelism(1)
// Parse the running parameters.
val paraTool = ParameterTool.fromArgs(args)
//Construct a StreamGraph, read data from Kafka, and print the data in a new line.
val messageStream = env.addSource(new FlinkKafkaConsumer(
paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties))
messageStream
.map(s => "Flink says " + s + System.getProperty("line.separator")).print()
// Invoke execute to trigger the execution.
env.execute()
}
}
|
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.