更新时间:2024-08-03 GMT+08:00

Flink Join样例程序(Scala)

本章节适用于MRS 3.3.0及以后版本。

功能介绍

在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

代码样例

用户在开发前需要使用对接安全模式的Kafka,则需要引入FusionInsight的kafka-clients-*.jar,该jar包可在Kafka客户端目录下获取。

下面列出producer和consumer,以及Flink Stream SQL Join使用主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.SqlJoinWithSocket

  1. 每秒钟往Kafka中生产一条用户信息,用户信息有姓名、年龄、性别组成。
    //producer代码
     
    object WriteIntoKafka {
       def main(args: Array[String]): Unit = {
         System.out.println("use command as: ")
         System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21005")
         System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/test.jar --topic" + " topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT" + " --sasl.kerberos.service.name kafka")
    
    
         System.out.println("******************************************************************************************")
         System.out.println("<topic> is the kafka topic name")
         System.out.println("<bootstrap.servers> is the ip:port list of brokers")
         System.out.println("******************************************************************************************")
         val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
         val paraTool = ParameterTool.fromArgs(args)
    
         val messageStream = env.addSource(new WriteIntoKafka.SimpleStringGenerator)
         val producer = new FlinkKafkaProducer[String](paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)
    
         producer.setWriteTimestampToKafka(true)
    
         messageStream.addSink(producer)
         env.execute
       }
    
       /**
        * String source
        *
        */
       object SimpleStringGenerator {
         private[examples] val NAME = Array("Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James")
         private[examples] val SEX = Array("MALE", "FEMALE")
         private[examples] val COUNT = NAME.length
       }
    
       class SimpleStringGenerator extends SourceFunction[String] {
         private[examples] var running = true
         private[examples] val rand = new Random(47)
    
         @throws[Exception]
         override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
           while (running) {
             val i = rand.nextInt(SimpleStringGenerator.COUNT)
             val age = rand.nextInt(70)
             val sexy = SimpleStringGenerator.SEX(rand.nextInt(2))
             ctx.collect(SimpleStringGenerator.NAME(i) + "," + age + "," + sexy)
             Thread.sleep(1000)
           }
         }
    
         override def cancel(): Unit = {
           running = false
         }
       }
    
     }
  2. 生成Table1和Table2,并使用Join对Table1和Table2进行联合查询,打印输出结果。
    object SqlJoinWithSocket {
       def main(args: Array[String]): Unit = {
         var hostname: String = null
         var port = 0
         System.out.println("use command as: ")
         System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/test.jar --topic" + " topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port xxx")
         System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/test.jar --topic" + " topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT" + " --sasl.kerberos.service.name kafka--hostname xxx.xxx.xxx.xxx --port xxx")
    
         System.out.println("******************************************************************************************")
         System.out.println("<topic> is the kafka topic name")
         System.out.println("<bootstrap.servers> is the ip:port list of brokers")
         System.out.println("******************************************************************************************")
         try {
           val params = ParameterTool.fromArgs(args)
           hostname = if (params.has("hostname")) params.get("hostname")
           else "localhost"
           port = params.getInt("port")
         } catch {
           case e: Exception =>
             System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server")
             System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " + "type the input text into the command line")
             return
         }
    
         val fsSettings = EnvironmentSettings.newInstance.inStreamingMode.build
         val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env, fsSettings)
    
         env.getConfig.setAutoWatermarkInterval(200)
         env.setParallelism(1)
         val paraTool = ParameterTool.fromArgs(args)
    
         val kafkaStream = env.addSource(new FlinkKafkaConsumer[String](paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)).map(new MapFunction[String, Tuple3[String, String, String]]() {
           @throws[Exception]
           override def map(str: String): Tuple3[String, String, String] = {
             val word = str.split(",")
             new Tuple3[String, String, String](word(0), word(1), word(2))
           }
         })
    
         tableEnv.createTemporaryView("Table1", kafkaStream, $("name"), $("age"), $("sexy"), $("proctime").proctime)
    
         val socketStream = env.socketTextStream(hostname, port, "\n").map(new MapFunction[String, Tuple2[String, String]]() {
           @throws[Exception]
           override def map(str: String): Tuple2[String, String] = {
             val words = str.split("\\s")
             if (words.length < 2) return new Tuple2[String, String]
             new Tuple2[String, String](words(0), words(1))
           }
         })
    
         tableEnv.createTemporaryView("Table2", socketStream, $("name"), $("job"), $("proctime").proctime)
    
         val result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" + "FROM Table1 AS t1\n" + "JOIN Table2 AS t2\n" + "ON t1.name = t2.name\n" + "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL" + " '1' SECOND")
    
         tableEnv.toAppendStream(result, classOf[Row]).print
         env.execute
       }
    
     }