Flink Stream SQL Join Java样例代码
功能简介
在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。
用户在开发前需要使用对接安全模式的Kafka,则需要引入MRS的kafka-client-xx.x.x.jar,该jar包可在MRS client目录下获取。
代码样例
下面列出producer和consumer,以及Flink Stream SQL Join使用主要逻辑代码作为演示。
完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.SqlJoinWithSocket。
- 每秒钟往Kafka中生产一条用户信息,用户信息有姓名、年龄、性别组成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
//producer代码 import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.Random; public class WriteIntoKafka4SQLJoin { public static void main(String[] args) throws Exception { // 打印出执行flink run的参考命令 System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005"); System.out.println("./bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"); System.out.println("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println("******************************************************************************************"); // 构造执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并发度 env.setParallelism(1); // 解析运行参数 ParameterTool paraTool = ParameterTool.fromArgs(args); // 构造流图,将自定义Source生成的数据写入Kafka DataStream<String> messageStream = env.addSource(new SimpleStringGenerator()); FlinkKafkaProducer producer = new FlinkKafkaProducer<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties()); messageStream.addSink(producer); // 调用execute触发执行 env.execute(); } // 自定义Source,每隔1s持续产生消息 public static class SimpleStringGenerator implements SourceFunction<String> { static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"}; static final String[] Gender = {"MALE", "FEMALE"}; static final int COUNT = NAME.length; boolean running = true; Random rand = new Random(47); @Override //rand随机产生名字,性别,年龄的组合信息 public void run(SourceContext<String> ctx) throws Exception { while (running) { int i = rand.nextInt(COUNT); int age = rand.nextInt(70); String Gender = Gender[rand.nextInt(2)]; ctx.collect(NAME[i] + "," + age + "," + Gender); Thread.sleep(1000); } } @Override public void cancel() { running = false; } } }
- 生成Table1和Table2,并使用Join对Table1和Table2进行联合查询,打印输出结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
import org.apache.calcite.interpreter.Row; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; public class SqlJoinWithSocket { public static void main(String[] args) throws Exception{ final String hostname; final int port; System.out.println("use command as: "); System.out.println("flink run --class com.huawei.flink.example.sqljoin.SqlJoinWithSocket" + " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port xxx"); System.out.println("flink run --class com.huawei.flink.example.sqljoin.SqlJoinWithSocket" + " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka" + "--hostname xxx.xxx.xxx.xxx --port xxx"); System.out.println("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println("******************************************************************************************"); try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " + "type the input text into the command line"); return; } StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); //基于EventTime进行处理 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); ParameterTool paraTool = ParameterTool.fromArgs(args); //Stream1,从Kafka中读取数据 DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties())) .map(new MapFunction<String, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(String s) throws Exception { String[] word = s.split(","); return new Tuple3<>(word[0], word[1], word[2]); } }); //将Stream1注册为Table1 tableEnv.registerDataStream("Table1", kafkaStream, "name, age, Gender, proctime.proctime"); //Stream2,从Socket中读取数据 DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n") .map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String s) throws Exception { String[] words = s.split("\\s"); if (words.length < 2) { return new Tuple2<>(); } return new Tuple2<>(words[0], words[1]); } }); //将Stream2注册为Table2 tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime"); //执行SQL Join进行联合查询 Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.Gender, t2.job, t2.proctime as shiptime\n" + "FROM Table1 AS t1\n" + "JOIN Table2 AS t2\n" + "ON t1.name = t2.name\n" + "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND"); //将查询结果转换为Stream,并打印输出 tableEnv.toAppendStream(result, Row.class).print(); env.execute(); } }