Deze pagina is nog niet beschikbaar in uw eigen taal. We werken er hard aan om meer taalversies toe te voegen. Bedankt voor uw steun.

On this page

Java Sample Code

Updated on 2022-11-18 GMT+08:00

Function

In the Flink application, this code invokes the flink-connector-kafka module's API to generate and consume data.

Sample Code

If the user needs to use FusionInsight Kafka interconnected with the security mode before the development, obtain the kafka-client-*.jar JAR file from the FusionInsight client directory.

The following lists Producer, Consumer, and the main logic code used by Flink Stream SQL Join.

For the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafkaand com.huawei.bigdata.flink.examples.SqlJoinWithSocket.

  1. A piece of user information is generated in Kafka every second. The user information includes the name, age, and gender.
    //Producer code
    public class WriteIntoKafka {
          public static void main(String[] args) throws Exception {
          //Print the command reference for flink run. 
            System.out.println("use command as: ");
            System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
               " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092");
            System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
               " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");
            System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
               " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
            System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
               " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
            System.out.println("******************************************************************************************");
            System.out.println("<topic> is the kafka topic name");
            System.out.println("<bootstrap.servers> is the ip:port list of brokers");
            System.out.println("******************************************************************************************");
           
            //Construct the execution environment.
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //Set the concurrency. 
            env.setParallelism(1);
            //Parse the running parameters.
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            //Construct a flow diagram and write the data generated from self-defined sources to Kafka.
            DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
            FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(paraTool.get("topic"),
               new SimpleStringSchema(),
               paraTool.getProperties());
            producer.setWriteTimestampToKafka(true);
            messageStream.addSink(producer);
            //Invoke execute to trigger the execution. 
            env.execute();
         }
    //Customize the sources and generate a message every second.
    public static class SimpleStringGenerator implements SourceFunction<String> {
            static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"};
            static final String[] SEX = {"MALE", "FEMALE"};
            static final int COUNT = NAME.length;   
            boolean running = true;
            Random rand = new Random(47);
           @Override
            //Use rand to randomly generate a combination of the name, gender, and age.
             public void run(SourceContext<String> ctx) throws Exception {
                while (running) {
                    int i = rand.nextInt(COUNT);
                    int age = rand.nextInt(70);
                    String sexy = SEX[rand.nextInt(2)];
                    ctx.collect(NAME[i] + "," + age + "," + sexy);
                    thread.sleep(1000);
                }
        }
           @Override
           public void cancel() {
             running = false;
           }
         }
       }
  2. Generate Table1 and Table2, use Join to jointly query Table1 and Table2, and print the output result.
    public class SqlJoinWithSocket {
        public static void main(String[] args) throws Exception{
            final String hostname;
            final int port;
            System.out.println("use command as: ");
            System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port xxx");
            System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"
                    + "--hostname xxx.xxx.xxx.xxx --port xxx");
            System.out.println("flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks "
                    + "--ssl.truststore.password huawei --hostname xxx.xxx.xxx.xxx --port xxx");
            System.out.println("******************************************************************************************");
            System.out.println("<topic> is the kafka topic name");
            System.out.println("<bootstrap.servers> is the ip:port list of brokers");
            System.out.println("******************************************************************************************");
            try {
                final ParameterTool params = ParameterTool.fromArgs(args);
                hostname = params.has("hostname") ? params.get("hostname") : "localhost";
                port = params.getInt("port");
            } catch (Exception e) {
                System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " +
                        "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                        "and port is the address of the text server");
                System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " +
                        "type the input text into the command line");
                return;
            }
            
            EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
            //Perform processing based on EventTime. 
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            //Use Stream1 to read data from Kafka. 
            DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"),
                    new SimpleStringSchema(),
                    paraTool.getProperties())).map(new MapFunction<String, Tuple3<String, String, String>>() {
                @Override
                public Tuple3<String, String, String> map(String s) throws Exception {
                    String[] word = s.split(",");
                    return new Tuple3<>(word[0], word[1], word[2]);
                }
            });
            //Register Stream1 as Table1. 
            tableEnv.registerDataStream("Table1", kafkaStream, "name, age, sexy, proctime.proctime");
            //Use Stream2 to read data from the socket. 
            DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n").
                    map(new MapFunction<String, Tuple2<String, String>>() {
                        @Override
                        public Tuple2<String, String> map(String s) throws Exception {
                            String[] words = s.split("\\s");
                            if (words.length < 2) {
                                return new Tuple2<>();
                            }
                            return new Tuple2<>(words[0], words[1]);
                        }
                    });
            //Register Stream2 as Table2. 
            tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime");
            //Run SQL Join to perform a combined query.
            Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" +
                    "FROM Table1 AS t1\n" +
                    "JOIN Table2 AS t2\n" +
                    "ON t1.name = t2.name\n" +
                    "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND");
            //Convert the query result into the stream and print the output.
            tableEnv.toAppendStream(result, Row.class).print();
            env.execute();
        }
    }

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback