更新时间:2024-08-03 GMT+08:00

Flink向Kafka生产并消费数据Java样例代码

功能简介

在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

用户在开发前需要使用对接安全模式的Kafka,则需要引入MRS的kafka-client-xx.x.x.jar,该jar包可在MRS client目录下获取。

样例代码

下面列出producer和consumer主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.flink.example.kafka.ReadFromKafka

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
//producer代码
public class WriteIntoKafka {
     public static void main(String[] args) throws Exception {
     // 打印出执行flink run的参考命令
       System.out.println("use command as: ");
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");
       System.out.println("******************************************************************************************");
       System.out.println("<topic> is the kafka topic name");
       System.out.println("<bootstrap.servers> is the ip:port list of brokers");
       System.out.println("******************************************************************************************");
       // 构造执行环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       // 设置并发度
       env.setParallelism(1);
       // 解析运行参数
       ParameterTool paraTool = ParameterTool.fromArgs(args);
       // 构造流图,将自定义Source生成的数据写入Kafka
       DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
       messageStream.addSink(new FlinkKafkaProducer010<>(paraTool.get("topic"),
           new SimpleStringSchema(),
           paraTool.getProperties()));
         // 调用execute触发执行
       env.execute();
     }
    // 自定义Source,每隔1s持续产生消息
     public static class SimpleStringGenerator implements SourceFunction<String> {
       private static final long serialVersionUID = 2174904787118597072L;
       boolean running = true;
       long i = 0;
       @Override
       public void run(SourceContext<String> ctx) throws Exception {
         while (running) {
           ctx.collect("element-" + (i++));
           Thread.sleep(1000);
         }
       }
       @Override
       public void cancel() {
         running = false;
       }
     }
   } 
//consumer代码
public class ReadFromKafka {
     public static void main(String[] args) throws Exception {
     // 打印出执行flink run的参考命令
       System.out.println("use command as: ");
       System.out.println("./bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka" +
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");
       System.out.println("./bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka" +
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");
       System.out.println
("******************************************************************************************");
       System.out.println("<topic> is the kafka topic name");
       System.out.println("<bootstrap.servers> is the ip:port list of brokers");
       System.out.println
("******************************************************************************************");
       // 构造执行环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       // 设置并发度
       env.setParallelism(1);
       // 解析运行参数
       ParameterTool paraTool = ParameterTool.fromArgs(args);
       // 构造流图,从Kafka读取数据并换行打印
       DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"),
           new SimpleStringSchema(),
           paraTool.getProperties()));
       messageStream.rebalance().map(new MapFunction<String, String>() {
         @Override
         public String map(String s) throws Exception {
           return "Flink says " + s + System.getProperty("line.separator");
         }
       }).print();
       // 调用execute触发执行
       env.execute();
     }
   }