Updated on 2024-08-10 GMT+08:00

Flink Join Sample Program (Scala)

This section applies to MRS 3.3.0 or later.

Function

In a Flink application, call the API of the flink-connector-kafka module to produce and consume data.

Sample Code

If you need to use Kafka in security mode before development, you need to import kafka-clients-*.jar of FusionInsight. You can obtain the JAR package from the Kafka client directory.

The following example shows the Producer, Consumer, and the main logic code used by Flink Stream SQL Join.

For the complete codes, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.SqlJoinWithSocket.

  1. Produce a piece of user information in Kafka every second. The user information includes the name, age, and gender.
    // Kafka Producer code
     
    object WriteIntoKafka {
       def main(args: Array[String]): Unit = {
         System.out.println("use command as: ")
         System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21005")
         System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/test.jar --topic" + " topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT" + " --sasl.kerberos.service.name kafka")
         System.out.println("******************************************************************************************")
         System.out.println("<topic> is the kafka topic name")
         System.out.println("<bootstrap.servers> is the ip:port list of brokers")
         System.out.println("******************************************************************************************")
         val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
         val paraTool = ParameterTool.fromArgs(args)
    
         val messageStream = env.addSource(new WriteIntoKafka.SimpleStringGenerator)
         val producer = new FlinkKafkaProducer[String](paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)
    
         producer.setWriteTimestampToKafka(true)
    
         messageStream.addSink(producer)
         env.execute
       }
    
       /**
    * String source class
        *
        */
       object SimpleStringGenerator {
         private[examples] val NAME = Array("Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James")
         private[examples] val SEX = Array("MALE", "FEMALE")
         private[examples] val COUNT = NAME.length
       }
    
       class SimpleStringGenerator extends SourceFunction[String] {
         private[examples] var running = true
         private[examples] val rand = new Random(47)
    
         @throws[Exception]
         override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
           while (running) {
             val i = rand.nextInt(SimpleStringGenerator.COUNT)
             val age = rand.nextInt(70)
             val sexy = SimpleStringGenerator.SEX(rand.nextInt(2))
             ctx.collect(SimpleStringGenerator.NAME(i) + "," + age + "," + sexy)
             Thread.sleep(1000)
           }
         }
    
         override def cancel(): Unit = {
           running = false
         }
       }
    
     }
  2. Generate Table1 and Table2, use Join to jointly query Table1 and Table2, and print the output result.
    object SqlJoinWithSocket {
       def main(args: Array[String]): Unit = {
         var hostname: String = null
         var port = 0
         System.out.println("use command as: ")
         System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/test.jar --topic" + " topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port xxx")
         System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/test.jar --topic" + " topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT" + " --sasl.kerberos.service.name kafka--hostname xxx.xxx.xxx.xxx --port xxx")
         System.out.println("******************************************************************************************")
         System.out.println("<topic> is the kafka topic name")
         System.out.println("<bootstrap.servers> is the ip:port list of brokers")
         System.out.println("******************************************************************************************")
         try {
           val params = ParameterTool.fromArgs(args)
           hostname = if (params.has("hostname")) params.get("hostname")
           else "localhost"
           port = params.getInt("port")
         } catch {
           case e: Exception =>
             System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server")
             System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " + "type the input text into the command line")
             return
         }
    
         val fsSettings = EnvironmentSettings.newInstance.inStreamingMode.build
         val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env, fsSettings)
    
         env.getConfig.setAutoWatermarkInterval(200)
         env.setParallelism(1)
         val paraTool = ParameterTool.fromArgs(args)
    
         val kafkaStream = env.addSource(new FlinkKafkaConsumer[String](paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)).map(new MapFunction[String, Tuple3[String, String, String]]() {
           @throws[Exception]
           override def map(str: String): Tuple3[String, String, String] = {
             val word = str.split(",")
             new Tuple3[String, String, String](word(0), word(1), word(2))
           }
         })
    
         tableEnv.createTemporaryView("Table1", kafkaStream, $("name"), $("age"), $("sexy"), $("proctime").proctime)
    
         val socketStream = env.socketTextStream(hostname, port, "\n").map(new MapFunction[String, Tuple2[String, String]]() {
           @throws[Exception]
           override def map(str: String): Tuple2[String, String] = {
             val words = str.split("\\s")
             if (words.length < 2) return new Tuple2[String, String]
             new Tuple2[String, String](words(0), words(1))
           }
         })
    
         tableEnv.createTemporaryView("Table2", socketStream, $("name"), $("job"), $("proctime").proctime)
    
         val result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" + "FROM Table1 AS t1\n" + "JOIN Table2 AS t2\n" + "ON t1.name = t2.name\n" + "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL" + " '1' SECOND")
    
         tableEnv.toAppendStream(result, classOf[Row]).print
         env.execute
       }
    
     }