Flink Join样例程序(Scala)
本章节适用于MRS 3.3.0及以后版本。
功能介绍
在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。
代码样例
用户在开发前需要使用对接安全模式的Kafka,则需要引入FusionInsight的kafka-clients-*.jar,该jar包可在Kafka客户端目录下获取。
下面列出producer和consumer,以及Flink Stream SQL Join使用主要逻辑代码作为演示。
完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.SqlJoinWithSocket
- 每秒钟往Kafka中生产一条用户信息,用户信息由姓名、年龄、性别组成。
//producer代码 object WriteIntoKafka { def main(args: Array[String]): Unit = { System.out.println("use command as: ") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + " /opt/test.jar --topic topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21005") System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/test.jar --topic" + " topic-test -bootstrap.servers xxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT" + " --sasl.kerberos.service.name kafka") System.out.println("******************************************************************************************") System.out.println("<topic> is the kafka topic name") System.out.println("<bootstrap.servers> is the ip:port list of brokers") System.out.println("******************************************************************************************") val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val paraTool = ParameterTool.fromArgs(args) val messageStream = env.addSource(new WriteIntoKafka.SimpleStringGenerator) val producer = new FlinkKafkaProducer[String](paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties) producer.setWriteTimestampToKafka(true) messageStream.addSink(producer) env.execute } /** * String source类 * */ object SimpleStringGenerator { private[examples] val NAME = Array("Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James") private[examples] val SEX = Array("MALE", "FEMALE") private[examples] val COUNT = NAME.length } class SimpleStringGenerator extends SourceFunction[String] { private[examples] var running = true private[examples] val rand = new Random(47) @throws[Exception] override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (running) { val i = rand.nextInt(SimpleStringGenerator.COUNT) val age = rand.nextInt(70) val sexy = SimpleStringGenerator.SEX(rand.nextInt(2)) ctx.collect(SimpleStringGenerator.NAME(i) + "," + age + "," + sexy) Thread.sleep(1000) } } override def cancel(): Unit = { running = false } } }
- 生成Table1和Table2,并使用Join对Table1和Table2进行联合查询,打印输出结果。
object SqlJoinWithSocket { def main(args: Array[String]): Unit = { var hostname: String = null var port = 0 System.out.println("use command as: ") System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/test.jar --topic" + " topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port xxx") System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/test.jar --topic" + " topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT" + " --sasl.kerberos.service.name kafka--hostname xxx.xxx.xxx.xxx --port xxx") System.out.println("******************************************************************************************") System.out.println("<topic> is the kafka topic name") System.out.println("<bootstrap.servers> is the ip:port list of brokers") System.out.println("******************************************************************************************") try { val params = ParameterTool.fromArgs(args) hostname = if (params.has("hostname")) params.get("hostname") else "localhost" port = params.getInt("port") } catch { case e: Exception => System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server") System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " + "type the input text into the command line") return } val fsSettings = EnvironmentSettings.newInstance.inStreamingMode.build val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = StreamTableEnvironment.create(env, fsSettings) env.getConfig.setAutoWatermarkInterval(200) env.setParallelism(1) val paraTool = ParameterTool.fromArgs(args) val kafkaStream = env.addSource(new FlinkKafkaConsumer[String](paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)).map(new MapFunction[String, Tuple3[String, String, String]]() { @throws[Exception] override def map(str: String): Tuple3[String, String, String] = { val word = str.split(",") new Tuple3[String, String, String](word(0), word(1), word(2)) } }) tableEnv.createTemporaryView("Table1", kafkaStream, $("name"), $("age"), $("sexy"), $("proctime").proctime) val socketStream = env.socketTextStream(hostname, port, "\n").map(new MapFunction[String, Tuple2[String, String]]() { @throws[Exception] override def map(str: String): Tuple2[String, String] = { val words = str.split("\\s") if (words.length < 2) return new Tuple2[String, String] new Tuple2[String, String](words(0), words(1)) } }) tableEnv.createTemporaryView("Table2", socketStream, $("name"), $("job"), $("proctime").proctime) val result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" + "FROM Table1 AS t1\n" + "JOIN Table2 AS t2\n" + "ON t1.name = t2.name\n" + "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL" + " '1' SECOND") tableEnv.toAppendStream(result, classOf[Row]).print env.execute } }