更新时间:2024-08-05 GMT+08:00

Flink Kafka样例程序(Java)

功能介绍

在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。

代码样例

下面列出producer和consumer主要逻辑代码作为演示。

完整代码参见com.huawei.bigdata.flink.examples.WriteIntoKafka和com.huawei.bigdata.flink.examples.ReadFromKafka

//producer代码
public class WriteIntoKafka {
  public static void main(String[] args) throws Exception {
    // 打印出执行flink run的参考命令
    System.out.println("use command as: ");
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
        " /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:9092");
    System.out.println("******************************************************************************************");
    System.out.println("<topic> is the kafka topic name");
    System.out.println("<bootstrap.servers> is the ip:port list of brokers");
    System.out.println("******************************************************************************************");

    // 构造执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置并发度
    env.setParallelism(1);
    // 解析运行参数
    ParameterTool paraTool = ParameterTool.fromArgs(args);
    // 构造流图,将自定义Source生成的数据写入Kafka
    DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
    messageStream.addSink(new FlinkKafkaProducer<>(paraTool.get("topic"),
        new SimpleStringSchema(),
        paraTool.getProperties()));
    // 调用execute触发执行
    env.execute();
  }

  // 自定义Source,每隔1s持续产生消息
  public static class SimpleStringGenerator implements SourceFunction<String> {
    private static final long serialVersionUID = 2174904787118597072L;
    boolean running = true;
    long i = 0;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
      while (running) {
        ctx.collect("element-" + (i++));
        Thread.sleep(1000);
      }
    }

    @Override
    public void cancel() {
      running = false;
    }
  }
}

//consumer代码
public class ReadFromKafka {
  public static void main(String[] args) throws Exception {
    // 打印出执行flink run的参考命令
    System.out.println("use command as: ");
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +
        " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092");
    System.out.println("******************************************************************************************");
    System.out.println("<topic> is the kafka topic name");
    System.out.println("<bootstrap.servers> is the ip:port list of brokers");
    System.out.println("******************************************************************************************");

    // 构造执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置并发度
    env.setParallelism(1);
    // 解析运行参数
    ParameterTool paraTool = ParameterTool.fromArgs(args);
    // 构造流图,从Kafka读取数据并换行打印
    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"),
        new SimpleStringSchema(),
        paraTool.getProperties()));
    messageStream.rebalance().map(new MapFunction<String, String>() {
      @Override
      public String map(String s) throws Exception {
        return "Flink says " + s + System.getProperty("line.separator");
      }
    }).print();
    // 调用execute触发执行
    env.execute();
  }
}