Updated on 2022-12-09 GMT+08:00

Java Sample Code

Description

Currently, a data source that continuously generates four strings is implemented based on the random number generator for writing data.

Sample Code

The following code snippet is used as an example. For complete code, see com.huawei.bigdata.flink.examples.

public class FlinkEsSinkExample {
  public static void main(String[] args) throws Exception {

    System.out.println("use command as:");
    System.out.println(
        "flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.FlinkEsSinkExample /<JAR_PATH>/FlinkEsSinkExample-1.0.jar");
    System.out.println(
        "******************************************************************************************");
    System.out.println("<JAR_PATH> is the path of jar file");
    System.out.println(
        "******************************************************************************************");
    //init flink execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //create a dataSource according to RandomDataGenerator
    //of course, you can create any datasource from kafka, mysql, text file and so on
    DataGeneratorSource<String> dataGeneratorSource =
        new DataGeneratorSource<>(new DataGenerator<String>() {
          RandomDataGenerator generator;

          @Override
          public void open(String name, FunctionInitializationContext context,
              RuntimeContext runtimeContext) throws Exception {
            generator = new RandomDataGenerator();
          }

          @Override
          public boolean hasNext() {
            return true;
          }

          @Override
          public String next() {
            return generator.nextHexString(4);
          }
        });

    //create HtppHost list which are needed by elasticsearch sink
    List<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(new HttpHost("172.16.0.117", 9200, "http"));
    //httpHosts.add(new HttpHost("172.16.0.xxx", 9200, "http"));
    ElasticsearchSink.Builder<String> esSinkBuilder =
        new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<String>() {
          public IndexRequest createIndexRequest(String element) {
            Map<String, String> json = new HashMap<>();
            json.put("data", element);
            //init index request,put add real data as index request source
            return Requests.indexRequest().index("my-index").id(element).source(json);
          }

          @Override
          public void process(String s, RuntimeContext runtimeContext,
              RequestIndexer requestIndexer) {
            requestIndexer.add(createIndexRequest(s));
          }
        });

    // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
    esSinkBuilder.setBulkFlushMaxActions(1);

    //add data source
    DataStream<String> stringDataStream =
        env.addSource(dataGeneratorSource, "DataGeneratorSource").returns(Types.STRING);
    //add elstic search sink
    stringDataStream.addSink(esSinkBuilder.build());
    env.execute();
  }
}