Flink Join Sample Program (Scala)
This section applies to MRS 3.3.0 or later.
Function
In a Flink application, call the API of the flink-connector-kafka module to produce and consume data.
Sample Code
If you need to use Kafka in security mode before development, you need to import kafka-clients-*.jar of FusionInsight. You can obtain the JAR package from the Kafka client directory.
The following example shows the Producer, Consumer, and the main logic code used by Flink Stream SQL Join.
For the complete codes, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.SqlJoinWithSocket.
- Produce a piece of user information in Kafka every second. The user information includes the name, age, and gender.
// Kafka Producer code object WriteIntoKafka { def main(args: Array[String]): Unit = { System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21005") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/test.jar --topic" + " topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT" + " --sasl.kerberos.service.name kafka") System.out.println("******************************************************************************************") System.out.println("<topic> is the kafka topic name") System.out.println("<bootstrap.servers> is the ip:port list of brokers") System.out.println("******************************************************************************************") val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val paraTool = ParameterTool.fromArgs(args) val messageStream = env.addSource(new WriteIntoKafka.SimpleStringGenerator) val producer = new FlinkKafkaProducer[String](paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties) producer.setWriteTimestampToKafka(true) messageStream.addSink(producer) env.execute } /** * String source class * */ object SimpleStringGenerator { private[examples] val NAME = Array("Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James") private[examples] val SEX = Array("MALE", "FEMALE") private[examples] val COUNT = NAME.length } class SimpleStringGenerator extends SourceFunction[String] { private[examples] var running = true private[examples] val rand = new Random(47) @throws[Exception] override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (running) { val i = rand.nextInt(SimpleStringGenerator.COUNT) val age = rand.nextInt(70) val sexy = SimpleStringGenerator.SEX(rand.nextInt(2)) ctx.collect(SimpleStringGenerator.NAME(i) + "," + age + "," + sexy) Thread.sleep(1000) } } override def cancel(): Unit = { running = false } } }
- Generate Table1 and Table2, use Join to jointly query Table1 and Table2, and print the output result.
object SqlJoinWithSocket { def main(args: Array[String]): Unit = { var hostname: String = null var port = 0 System.out.println("use command as: ") System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/test.jar --topic" + " topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port xxx") System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/test.jar --topic" + " topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT" + " --sasl.kerberos.service.name kafka--hostname xxx.xxx.xxx.xxx --port xxx") System.out.println("******************************************************************************************") System.out.println("<topic> is the kafka topic name") System.out.println("<bootstrap.servers> is the ip:port list of brokers") System.out.println("******************************************************************************************") try { val params = ParameterTool.fromArgs(args) hostname = if (params.has("hostname")) params.get("hostname") else "localhost" port = params.getInt("port") } catch { case e: Exception => System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server") System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " + "type the input text into the command line") return } val fsSettings = EnvironmentSettings.newInstance.inStreamingMode.build val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env, fsSettings) env.getConfig.setAutoWatermarkInterval(200) env.setParallelism(1) val paraTool = ParameterTool.fromArgs(args) val kafkaStream = env.addSource(new FlinkKafkaConsumer[String](paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)).map(new MapFunction[String, Tuple3[String, String, String]]() { @throws[Exception] override def map(str: String): Tuple3[String, String, String] = { val word = str.split(",") new Tuple3[String, String, String](word(0), word(1), word(2)) } }) tableEnv.createTemporaryView("Table1", kafkaStream, $("name"), $("age"), $("sexy"), $("proctime").proctime) val socketStream = env.socketTextStream(hostname, port, "\n").map(new MapFunction[String, Tuple2[String, String]]() { @throws[Exception] override def map(str: String): Tuple2[String, String] = { val words = str.split("\\s") if (words.length < 2) return new Tuple2[String, String] new Tuple2[String, String](words(0), words(1)) } }) tableEnv.createTemporaryView("Table2", socketStream, $("name"), $("job"), $("proctime").proctime) val result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" + "FROM Table1 AS t1\n" + "JOIN Table2 AS t2\n" + "ON t1.name = t2.name\n" + "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL" + " '1' SECOND") tableEnv.toAppendStream(result, classOf[Row]).print env.execute } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot