Updated on 2024-12-10 GMT+08:00

Flink Stream SQL Join Java Sample Code

Function Description

In a Flink application, call the API of the flink-connector-kafka module to produce and consume data.

If you need to interconnect with Kafka in security mode before application development, kafka-client-xx.x.x.jar of MRS is required. You can obtain the JAR file in the MRS client directory.

Sample Code

The following example shows the Producer, Consumer, and the main logic code used by Flink Stream SQL Join.

For the complete codes, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.SqlJoinWithSocket.

  • Produce a piece of user information in Kafka every second. The user information includes the name, age, and gender.
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    // Kafka Producer code
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    
    import java.util.Random;
    
    public class WriteIntoKafka4SQLJoin {
    
        public static void main(String[] args) throws Exception {
            // Print the command reference for flink run. 
            System.out.println("use command as: ");
            System.out.println("./bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");
            System.out.println("./bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");
            System.out.println("******************************************************************************************");
            System.out.println("<topic> is the kafka topic name");
            System.out.println("<bootstrap.servers> is the ip:port list of brokers");
            System.out.println("******************************************************************************************");
    
            // Construct the execution environment.
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
           // Set parallelism.
            env.setParallelism(1);
            // Parse the running parameters.
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            // Construct a StreamGraph and write the data generated from self-defined sources to Kafka.
            DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
            FlinkKafkaProducer producer = new FlinkKafkaProducer<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties());
            messageStream.addSink(producer);
            // Invoke execute to trigger the execution. 
            env.execute();
        }
    
        // Customize the sources and generate a message every other second.
        public static class SimpleStringGenerator implements SourceFunction<String> {
            static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"};
            static final String[] Gender = {"MALE", "FEMALE"};
            static final int COUNT = NAME.length;
            boolean running = true;
            Random rand = new Random(47);
    
            @Override
           // Use rand to randomly generate a combination of the name, gender, and age.
            public void run(SourceContext<String> ctx) throws Exception {
                while (running) {
                    int i = rand.nextInt(COUNT);
                    int age = rand.nextInt(70);
                    String Gender = Gender[rand.nextInt(2)];
                    ctx.collect(NAME[i] + "," + age + "," + Gender);
                    Thread.sleep(1000);
                }
            }
    
            @Override
            public void cancel() {
                running = false;
            }
        }
    }
    
  • Generate Table1 and Table2, use Join to jointly query Table1 and Table2, and print the output result.
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    import org.apache.calcite.interpreter.Row;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    
    public class SqlJoinWithSocket {
    
        public static void main(String[] args) throws Exception{
    
            final String hostname;
    
            final int port;
    
            System.out.println("use command as: ");
    
            System.out.println("flink run --class com.huawei.flink.example.sqljoin.SqlJoinWithSocket" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port xxx");
    
            System.out.println("flink run --class com.huawei.flink.example.sqljoin.SqlJoinWithSocket" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"
                    + "--hostname xxx.xxx.xxx.xxx --port xxx");
    
            System.out.println("******************************************************************************************");
            System.out.println("<topic> is the kafka topic name");
            System.out.println("<bootstrap.servers> is the ip:port list of brokers");
            System.out.println("******************************************************************************************");
    
            try {
                final ParameterTool params = ParameterTool.fromArgs(args);
    
                hostname = params.has("hostname") ? params.get("hostname") : "localhost";
    
                port = params.getInt("port");
    
            } catch (Exception e) {
                System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " +
                        "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                        "and port is the address of the text server");
    
                System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " +
                        "type the input text into the command line");
    
                return;
            }
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
           // Process data based on EventTime.
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            env.setParallelism(1);
    
            ParameterTool paraTool = ParameterTool.fromArgs(args);
    
            // Use Stream1 to read data from Kafka.
            DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"),
                    new SimpleStringSchema(), paraTool.getProperties()))
                    .map(new MapFunction<String, Tuple3<String, String, String>>() {
                        @Override
                        public Tuple3<String, String, String> map(String s) throws Exception
                        {
                            String[] word = s.split(",");
    
                            return new Tuple3<>(word[0], word[1], word[2]);
                        }
                    });
    
            // Register Stream1 as Table1.
            tableEnv.registerDataStream("Table1", kafkaStream, "name, age, Gender, proctime.proctime");
    
           // Use Stream2 to read data from the socket. 
            DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n")
                    .map(new MapFunction<String, Tuple2<String, String>>() {
                        @Override
                        public Tuple2<String, String> map(String s) throws Exception
                        {
                            String[] words = s.split("\\s");
                            if (words.length < 2) {
                                return new Tuple2<>();
                            }
    
                            return new Tuple2<>(words[0], words[1]);
                        }
                    });
    
            // Register Stream2 as Table2.
            tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime");
    
           // Run SQL Join to perform a joint query.
            Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.Gender, t2.job, t2.proctime as shiptime\n" +
                    "FROM Table1 AS t1\n" +
                    "JOIN Table2 AS t2\n" +
                    "ON t1.name = t2.name\n" +
                    "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND");
    
            // Convert the query result into a stream and print the output.
            tableEnv.toAppendStream(result, Row.class).print();
    
            env.execute();
        }
    }