Updated on 2022-09-14 GMT+08:00

Java Sample Code

Function

In the Flink application, this code invokes the flink-connector-kafka module's API to generate and consume data.

Sample Code

If the user needs to use FusionInsight Kafka interconnected with the security mode before the development, obtain the kafka-client-*.jar JAR file from the FusionInsight client directory.

The following lists Producer, Consumer, and the main logic code used by Flink Stream SQL Join.

For the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafkaand com.huawei.bigdata.flink.examples.SqlJoinWithSocket.

  1. A piece of user information is generated in Kafka every second. The user information includes the name, age, and gender.
    //Producer code
    public class WriteIntoKafka {
          public static void main(String[] args) throws Exception {
          //Print the command reference for flink run. 
            System.out.println("use command as: ");
            System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
               " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092");
            System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
               " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");
            System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
               " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
            System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
               " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
            System.out.println("******************************************************************************************");
            System.out.println("<topic> is the kafka topic name");
            System.out.println("<bootstrap.servers> is the ip:port list of brokers");
            System.out.println("******************************************************************************************");
           
            //Construct the execution environment.
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //Set the concurrency. 
            env.setParallelism(1);
            //Parse the running parameters.
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            //Construct a flow diagram and write the data generated from self-defined sources to Kafka.
            DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
            FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(paraTool.get("topic"),
               new SimpleStringSchema(),
               paraTool.getProperties());
            producer.setWriteTimestampToKafka(true);
            messageStream.addSink(producer);
            //Invoke execute to trigger the execution. 
            env.execute();
         }
    //Customize the sources and generate a message every second.
    public static class SimpleStringGenerator implements SourceFunction<String> {
            static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"};
            static final String[] SEX = {"MALE", "FEMALE"};
            static final int COUNT = NAME.length;   
            boolean running = true;
            Random rand = new Random(47);
           @Override
            //Use rand to randomly generate a combination of the name, gender, and age.
             public void run(SourceContext<String> ctx) throws Exception {
                while (running) {
                    int i = rand.nextInt(COUNT);
                    int age = rand.nextInt(70);
                    String sexy = SEX[rand.nextInt(2)];
                    ctx.collect(NAME[i] + "," + age + "," + sexy);
                    thread.sleep(1000);
                }
        }
           @Override
           public void cancel() {
             running = false;
           }
         }
       }
  2. Generate Table1 and Table2, use Join to jointly query Table1 and Table2, and print the output result.
    public class SqlJoinWithSocket {
        public static void main(String[] args) throws Exception{
            final String hostname;
            final int port;
            System.out.println("use command as: ");
            System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port xxx");
            System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"
                    + "--hostname xxx.xxx.xxx.xxx --port xxx");
            System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks "
                    + "--ssl.truststore.password huawei --hostname xxx.xxx.xxx.xxx --port xxx");
            System.out.println("******************************************************************************************");
            System.out.println("<topic> is the kafka topic name");
            System.out.println("<bootstrap.servers> is the ip:port list of brokers");
            System.out.println("******************************************************************************************");
            try {
                final ParameterTool params = ParameterTool.fromArgs(args);
                hostname = params.has("hostname") ? params.get("hostname") : "localhost";
                port = params.getInt("port");
            } catch (Exception e) {
                System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " +
                        "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                        "and port is the address of the text server");
                System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " +
                        "type the input text into the command line");
                return;
            }
            
            EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
            //Perform processing based on EventTime. 
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            //Use Stream1 to read data from Kafka. 
            DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"),
                    new SimpleStringSchema(),
                    paraTool.getProperties())).map(new MapFunction<String, Tuple3<String, String, String>>() {
                @Override
                public Tuple3<String, String, String> map(String s) throws Exception {
                    String[] word = s.split(",");
                    return new Tuple3<>(word[0], word[1], word[2]);
                }
            });
            //Register Stream1 as Table1. 
            tableEnv.registerDataStream("Table1", kafkaStream, "name, age, sexy, proctime.proctime");
            //Use Stream2 to read data from the socket. 
            DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n").
                    map(new MapFunction<String, Tuple2<String, String>>() {
                        @Override
                        public Tuple2<String, String> map(String s) throws Exception {
                            String[] words = s.split("\\s");
                            if (words.length < 2) {
                                return new Tuple2<>();
                            }
                            return new Tuple2<>(words[0], words[1]);
                        }
                    });
            //Register Stream2 as Table2. 
            tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime");
            //Run SQL Join to perform a combined query.
            Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" +
                    "FROM Table1 AS t1\n" +
                    "JOIN Table2 AS t2\n" +
                    "ON t1.name = t2.name\n" +
                    "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND");
            //Convert the query result into the stream and print the output.
            tableEnv.toAppendStream(result, Row.class).print();
            env.execute();
        }
    }