更新时间:2024-06-05 GMT+08:00
分享

Flink Kafka样例程序(Java)

功能介绍

在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

代码样例

用户在开发前需要使用对接安全模式的Kafka,则需要引入FusionInsight的kafka-clients-*.jar,该jar包可在Kafka的客户端目录下获取。

下面列出producer和consumer主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.ReadFromKafka

//producer代码
public class WriteIntoKafka {

     public static void main(String[] args) throws Exception {
     // 打印出执行flink run的参考命令

       System.out.println("use command as: ");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");

       System.out.println("******************************************************************************************");

       System.out.println("<topic> is the kafka topic name");

       System.out.println("<bootstrap.servers> is the ip:port list of brokers");

       System.out.println("******************************************************************************************");
       
       // 构造执行环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       // 设置并发度
       env.setParallelism(1);
       // 解析运行参数
       ParameterTool paraTool = ParameterTool.fromArgs(args);
       // 构造流图,将自定义Source生成的数据写入Kafka
       DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());

       messageStream.addSink(new FlinkKafkaProducer<>(paraTool.get("topic"),

           new SimpleStringSchema(),

           paraTool.getProperties()));
         // 调用execute触发执行
       env.execute();

     }


    // 自定义Source,每隔1s持续产生消息
     public static class SimpleStringGenerator implements SourceFunction<String> {

       private static final long serialVersionUID = 2174904787118597072L;

       boolean running = true;

       long i = 0;


       @Override

       public void run(SourceContext<String> ctx) throws Exception {

         while (running) {

           ctx.collect("element-" + (i++));

           Thread.sleep(1000);

         }

       }



       @Override

       public void cancel() {

         running = false;

       }

     }

   } 

//consumer代码
public class ReadFromKafka {

     public static void main(String[] args) throws Exception {
     // 打印出执行flink run的参考命令
       System.out.println("use command as: ");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092");

       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +

           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka");


       System.out.println
("******************************************************************************************");

       System.out.println("<topic> is the kafka topic name");

       System.out.println("<bootstrap.servers> is the ip:port list of brokers");

       System.out.println
("******************************************************************************************");
       // 构造执行环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       // 设置并发度
       env.setParallelism(1);
       // 解析运行参数
       ParameterTool paraTool = ParameterTool.fromArgs(args);
       // 构造流图,从Kafka读取数据并换行打印
       DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"),

           new SimpleStringSchema(),

           paraTool.getProperties()));

       messageStream.rebalance().map(new MapFunction<String, String>() {

         @Override

         public String map(String s) throws Exception {

           return "Flink says " + s + System.getProperty("line.separator");

         }

       }).print();
       // 调用execute触发执行
       env.execute();

     }

   } 

相关文档