文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Flink开发指南(普通模式)/
开发Flink应用/
Flink对接云搜索服务(CSS)样例程序/
Flink对接云搜索服务(CSS)样例程序(Java)
更新时间:2024-08-05 GMT+08:00
Flink对接云搜索服务(CSS)样例程序(Java)
功能介绍
当前基于随机数生成器实现了一个持续产生长度为4字符串的数据源用于写入数据。
样例代码
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.flink.examples。
public class FlinkEsSinkExample { public static void main(String[] args) throws Exception { System.out.println("use command as:"); System.out.println( "flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.FlinkEsSinkExample /<JAR_PATH>/FlinkEsSinkExample-1.0.jar"); System.out.println( "******************************************************************************************"); System.out.println("<JAR_PATH> is the path of jar file"); System.out.println( "******************************************************************************************"); //init flink execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //create a dataSource according to RandomDataGenerator //of course, you can create any datasource from kafka, mysql, text file and so on DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new DataGenerator<String>() { RandomDataGenerator generator; @Override public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception { generator = new RandomDataGenerator(); } @Override public boolean hasNext() { return true; } @Override public String next() { return generator.nextHexString(4); } }); //create HtppHost list which are needed by elasticsearch sink List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("172.16.0.117", 9200, "http")); //httpHosts.add(new HttpHost("172.16.0.xxx", 9200, "http")); ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<String>() { public IndexRequest createIndexRequest(String element) { Map<String, String> json = new HashMap<>(); json.put("data", element); //init index request,put add real data as index request source return Requests.indexRequest().index("my-index").id(element).source(json); } @Override public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { requestIndexer.add(createIndexRequest(s)); } }); // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1); //add data source DataStream<String> stringDataStream = env.addSource(dataGeneratorSource, "DataGeneratorSource").returns(Types.STRING); //add elstic search sink stringDataStream.addSink(esSinkBuilder.build()); env.execute(); } }