更新时间:2023-04-12 GMT+08:00

Java样例代码

功能简介

在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

用户在开发前需要使用对接安全模式的Kafka,则需要引入MRS的kafka-client-xx.x.x.jar,该jar包可在MRS client目录下获取。

代码样例

下面列出producer和consumer,以及Flink Stream SQL Join使用主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.SqlJoinWithSocket

  • 每秒钟往Kafka中生产一条用户信息,用户信息有姓名、年龄、性别组成。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    //producer代码
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    
    import java.util.Random;
    
    public class WriteIntoKafka4SQLJoin {
    
        public static void main(String[] args) throws Exception {
            // 打印出执行flink run的参考命令
            System.out.println("use command as: ");
            System.out.println("./bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");
            System.out.println("./bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");
            System.out.println("./bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
            System.out.println("./bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei");
            System.out.println("******************************************************************************************");
            System.out.println("<topic> is the kafka topic name");
            System.out.println("<bootstrap.servers> is the ip:port list of brokers");
            System.out.println("******************************************************************************************");
    
            // 构造执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并发度
            env.setParallelism(1);
            // 解析运行参数
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            // 构造流图,将自定义Source生成的数据写入Kafka
            DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
            FlinkKafkaProducer producer = new FlinkKafkaProducer<>(paraTool.get("topic"), new SimpleStringSchema(), paraTool.getProperties());
            messageStream.addSink(producer);
            // 调用execute触发执行
            env.execute();
        }
    
        // 自定义Source,每隔1s持续产生消息
        public static class SimpleStringGenerator implements SourceFunction<String> {
            static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"};
            static final String[] SEX = {"MALE", "FEMALE"};
            static final int COUNT = NAME.length;
            boolean running = true;
            Random rand = new Random(47);
    
            @Override
            //rand随机产生名字,性别,年龄的组合信息
            public void run(SourceContext<String> ctx) throws Exception {
                while (running) {
                    int i = rand.nextInt(COUNT);
                    int age = rand.nextInt(70);
                    String sexy = SEX[rand.nextInt(2)];
                    ctx.collect(NAME[i] + "," + age + "," + sexy);
                    Thread.sleep(1000);
                }
            }
    
            @Override
            public void cancel() {
                running = false;
            }
        }
    }
    
  • 生成Table1和Table2,并使用Join对Table1和Table2进行联合查询,打印输出结果。
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    import org.apache.calcite.interpreter.Row;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    
    public class SqlJoinWithSocket {
    
        public static void main(String[] args) throws Exception{
    
            final String hostname;
    
            final int port;
    
            System.out.println("use command as: ");
    
            System.out.println("flink run --class com.huawei.flink.example.sqljoin.SqlJoinWithSocket" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port xxx");
    
            System.out.println("flink run --class com.huawei.flink.example.sqljoin.SqlJoinWithSocket" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"
                    + "--hostname xxx.xxx.xxx.xxx --port xxx");
    
            System.out.println("flink run --class com.huawei.flink.example.sqljoin.SqlJoinWithSocket" +
                    " /opt/test.jar --topic topic-test -bootstrap.servers xxxx.xxx.xxx.xxx:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks "
                    + "--ssl.truststore.password huawei --hostname xxx.xxx.xxx.xxx --port xxx");
    
            System.out.println("******************************************************************************************");
            System.out.println("<topic> is the kafka topic name");
            System.out.println("<bootstrap.servers> is the ip:port list of brokers");
            System.out.println("******************************************************************************************");
    
            try {
                final ParameterTool params = ParameterTool.fromArgs(args);
    
                hostname = params.has("hostname") ? params.get("hostname") : "localhost";
    
                port = params.getInt("port");
    
            } catch (Exception e) {
                System.err.println("No port specified. Please run 'FlinkStreamSqlJoinExample " +
                        "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                        "and port is the address of the text server");
    
                System.err.println("To start a simple text server, run 'netcat -l -p <port>' and " +
                        "type the input text into the command line");
    
                return;
            }
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
            //基于EventTime进行处理
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            env.setParallelism(1);
    
            ParameterTool paraTool = ParameterTool.fromArgs(args);
    
            //Stream1,从Kafka中读取数据
            DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"),
                    new SimpleStringSchema(), paraTool.getProperties()))
                    .map(new MapFunction<String, Tuple3<String, String, String>>() {
                        @Override
                        public Tuple3<String, String, String> map(String s) throws Exception
                        {
                            String[] word = s.split(",");
    
                            return new Tuple3<>(word[0], word[1], word[2]);
                        }
                    });
    
            //将Stream1注册为Table1
            tableEnv.registerDataStream("Table1", kafkaStream, "name, age, sexy, proctime.proctime");
    
            //Stream2,从Socket中读取数据
            DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n")
                    .map(new MapFunction<String, Tuple2<String, String>>() {
                        @Override
                        public Tuple2<String, String> map(String s) throws Exception
                        {
                            String[] words = s.split("\\s");
                            if (words.length < 2) {
                                return new Tuple2<>();
                            }
    
                            return new Tuple2<>(words[0], words[1]);
                        }
                    });
    
            //将Stream2注册为Table2
            tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime");
    
            //执行SQL Join进行联合查询
            Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" +
                    "FROM Table1 AS t1\n" +
                    "JOIN Table2 AS t2\n" +
                    "ON t1.name = t2.name\n" +
                    "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND");
    
            //将查询结果转换为Stream,并打印输出
            tableEnv.toAppendStream(result, Row.class).print();
    
            env.execute();
        }
    }