Updated on 2022-08-16 GMT+08:00

Java Sample Code

Function Description

In a Flink application, call API of the flink-connector-kafka module to produce and consume data.

Sample Code

Following is the main logic code of Kafka Consumer and Kafka Producer.

For the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.ReadFromKafka.

//producer code 
public class WriteIntoKafka { 
  public static void main(String[] args) throws Exception { 
    //Print the reference command of flink run.
    System.out.println("use command as: "); 
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" + 
        " /opt/test.jar --topic topic-test --bootstrap.servers 10.91.8.218:9092"); 
    System.out.println("******************************************************************************************"); 
    System.out.println("<topic> is the kafka topic name"); 
    System.out.println("<bootstrap.servers> is the ip:port list of brokers"); 
    System.out.println("******************************************************************************************"); 
 
    //Build the execution environment.
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    //Configure the parallelism.
    env.setParallelism(1); 
    //Parse the execution parameter.
    ParameterTool paraTool = ParameterTool.fromArgs(args); 
    //Build the StreamGraph and write data generated by customized source into Kafka.
    DataStream<String> messageStream = env.addSource(new SimpleStringGenerator()); 
    messageStream.addSink(new FlinkKafkaProducer<>(paraTool.get("topic"), 
        new SimpleStringSchema(), 
        paraTool.getProperties())); 
    //Call execute to trigger the execution.
    env.execute(); 
  } 
 
  //Customize source to continuously generate messages every one second.
  public static class SimpleStringGenerator implements SourceFunction<String> { 
    private static final long serialVersionUID = 2174904787118597072L; 
    boolean running = true; 
    long i = 0; 
 
    @Override 
    public void run(SourceContext<String> ctx) throws Exception { 
      while (running) { 
        ctx.collect("element-" + (i++)); 
        Thread.sleep(1000); 
      } 
    } 
 
    @Override 
    public void cancel() { 
      running = false; 
    } 
  } 
} 
 
//consumer code 
public class ReadFromKafka { 
  public static void main(String[] args) throws Exception { 
    //Print the reference command of flink run. 
    System.out.println("use command as: "); 
    System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" + 
        " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:9092"); 
    System.out.println("******************************************************************************************"); 
    System.out.println("<topic> is the kafka topic name"); 
    System.out.println("<bootstrap.servers> is the ip:port list of brokers"); 
    System.out.println("******************************************************************************************"); 
 
    //Build the execution environment.
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    //Configure the parallelism.
    env.setParallelism(1); 
    //Parse the execution parameter.
    ParameterTool paraTool = ParameterTool.fromArgs(args); 
    //Build the StreamGraph, read data from Kafka and print the result in another row.
    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer<>(paraTool.get("topic"), 
        new SimpleStringSchema(), 
        paraTool.getProperties())); 
    messageStream.rebalance().map(new MapFunction<String, String>() { 
      @Override 
      public String map(String s) throws Exception { 
        return "Flink says " + s + System.getProperty("line.separator"); 
      } 
    }).print(); 
    //Call execute to trigger the execution.
    env.execute(); 
  } 
}