更新时间:2024-06-27 GMT+08:00
分享

Flink对接云搜索服务(CSS)样例程序(Java)

功能介绍

当前基于随机数生成器实现了一个持续产生长度为4字符串的数据源用于写入数据。

样例代码

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.flink.examples。

public class FlinkEsSinkExample {
  public static void main(String[] args) throws Exception {

    System.out.println("use command as:");
    System.out.println(
        "flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.FlinkEsSinkExample /<JAR_PATH>/FlinkEsSinkExample-1.0.jar");
    System.out.println(
        "******************************************************************************************");
    System.out.println("<JAR_PATH> is the path of jar file");
    System.out.println(
        "******************************************************************************************");
    //init flink execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //create a dataSource according to RandomDataGenerator
    //of course, you can create any datasource from kafka, mysql, text file and so on
    DataGeneratorSource<String> dataGeneratorSource =
        new DataGeneratorSource<>(new DataGenerator<String>() {
          RandomDataGenerator generator;

          @Override
          public void open(String name, FunctionInitializationContext context,
              RuntimeContext runtimeContext) throws Exception {
            generator = new RandomDataGenerator();
          }

          @Override
          public boolean hasNext() {
            return true;
          }

          @Override
          public String next() {
            return generator.nextHexString(4);
          }
        });

    //create HtppHost list which are needed by elasticsearch sink
    List<HttpHost> httpHosts = new ArrayList<>();
    httpHosts.add(new HttpHost("172.16.0.117", 9200, "http"));
    //httpHosts.add(new HttpHost("172.16.0.xxx", 9200, "http"));
    ElasticsearchSink.Builder<String> esSinkBuilder =
        new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<String>() {
          public IndexRequest createIndexRequest(String element) {
            Map<String, String> json = new HashMap<>();
            json.put("data", element);
            //init index request,put add real data as index request source
            return Requests.indexRequest().index("my-index").id(element).source(json);
          }

          @Override
          public void process(String s, RuntimeContext runtimeContext,
              RequestIndexer requestIndexer) {
            requestIndexer.add(createIndexRequest(s));
          }
        });

    // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
    esSinkBuilder.setBulkFlushMaxActions(1);

    //add data source
    DataStream<String> stringDataStream =
        env.addSource(dataGeneratorSource, "DataGeneratorSource").returns(Types.STRING);
    //add elstic search sink
    stringDataStream.addSink(esSinkBuilder.build());
    env.execute();
  }
}

相关文档