Flink Stream SQL Join Java Sample Code
Function Description
In a Flink application, call the API of the flink-connector-kafka module to produce and consume data.
If you need to interconnect with Kafka in security mode before application development, kafka-client-xx.x.x.jar of MRS is required. You can obtain the JAR file in the MRS client directory.
Sample Code
The following example shows the Producer, Consumer, and the main logic code used by Flink Stream SQL Join.
For the complete codes, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.SqlJoinWithSocket.
- Produce a piece of user information in Kafka every second. The user information includes the name, age, and gender.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
// Kafka Producer code import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.Random; public class WriteIntoKafka4SQLJoin { public static void main(String[] args) throws Exception { // Print the command reference for flink run. System.out.println("use command as: "); System.out.println("./bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005"); System.out.println("./bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin" + " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"); System.out.println("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println("******************************************************************************************"); // Construct the execution environment. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set parallelism. env.setParallelism(1); // Parse the running parameters. ParameterTool paraTool = ParameterTool.fromArgs(args); // Construct a StreamGraph and write the data generated from self-defined sources to Kafka. DataStream<String> messageStream = env.addSource(new SimpleStringGenerator()); FlinkKafkaProducer producer = new FlinkKafkaProducer<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties()); messageStream.addSink(producer); // Invoke execute to trigger the execution. env.execute(); } // Customize the sources and generate a message every other second. public static class SimpleStringGenerator implements SourceFunction<String> { static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"}; static final String[] SEX = {"MALE", "FEMALE"}; static final int COUNT = NAME.length; boolean running = true; Random rand = new Random(47); @Override // Use rand to randomly generate a combination of the name, gender, and age. public void run(SourceContext<String> ctx) throws Exception { while (running) { int i = rand.nextInt(COUNT); int age = rand.nextInt(70); String sexy = SEX[rand.nextInt(2)]; ctx.collect(NAME[i] + "," + age + "," + sexy); Thread.sleep(1000); } } @Override public void cancel() { running = false; } } }
- Generate Table1 and Table2, use Join to jointly query Table1 and Table2, and print the output result.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
import org.apache.calcite.interpreter.Row; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; public class SqlJoinWithSocket { public static void main(String[] args) throws Exception{ final String hostname; final int port; System.out.println("use command as: "); System.out.println("flink run --class com.huawei.flink.example.sqljoin.SqlJoinWithSocket" + " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port xxx"); System.out.println("flink run --class com.huawei.flink.example.sqljoin.SqlJoinWithSocket" + " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka" + "--hostname xxx.xxx.xxx.xxx --port xxx"); System.out.println("******************************************************************************************"); System.out.println("<topic> is the kafka topic name"); System.out.println("<bootstrap.servers> is the ip:port list of brokers"); System.out.println("******************************************************************************************"); try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " + "type the input text into the command line"); return; } StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // Process data based on EventTime. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); ParameterTool paraTool = ParameterTool.fromArgs(args); // Use Stream1 to read data from Kafka. DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties())) .map(new MapFunction<String, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(String s) throws Exception { String[] word = s.split(","); return new Tuple3<>(word[0], word[1], word[2]); } }); // Register Stream1 as Table1. tableEnv.registerDataStream("Table1", kafkaStream, "name, age, sexy, proctime.proctime"); // Use Stream2 to read data from the socket. DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n") .map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String s) throws Exception { String[] words = s.split("\\s"); if (words.length < 2) { return new Tuple2<>(); } return new Tuple2<>(words[0], words[1]); } }); // Register Stream2 as Table2. tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime"); // Run SQL Join to perform a joint query. Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" + "FROM Table1 AS t1\n" + "JOIN Table2 AS t2\n" + "ON t1.name = t2.name\n" + "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND"); // Convert the query result into a stream and print the output. tableEnv.toAppendStream(result, Row.class).print(); env.execute(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot