文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Flink开发指南(普通模式)/
开发Flink应用/
Flink对接云搜索服务(CSS)样例程序/
Flink对接云搜索服务(CSS)样例程序(Java)
更新时间:2024-08-05 GMT+08:00
Flink对接云搜索服务(CSS)样例程序(Java)
功能介绍
当前基于随机数生成器实现了一个持续产生长度为4字符串的数据源用于写入数据。
样例代码
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.flink.examples。
public class FlinkEsSinkExample {
public static void main(String[] args) throws Exception {
System.out.println("use command as:");
System.out.println(
"flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.FlinkEsSinkExample /<JAR_PATH>/FlinkEsSinkExample-1.0.jar");
System.out.println(
"******************************************************************************************");
System.out.println("<JAR_PATH> is the path of jar file");
System.out.println(
"******************************************************************************************");
//init flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//create a dataSource according to RandomDataGenerator
//of course, you can create any datasource from kafka, mysql, text file and so on
DataGeneratorSource<String> dataGeneratorSource =
new DataGeneratorSource<>(new DataGenerator<String>() {
RandomDataGenerator generator;
@Override
public void open(String name, FunctionInitializationContext context,
RuntimeContext runtimeContext) throws Exception {
generator = new RandomDataGenerator();
}
@Override
public boolean hasNext() {
return true;
}
@Override
public String next() {
return generator.nextHexString(4);
}
});
//create HtppHost list which are needed by elasticsearch sink
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("172.16.0.117", 9200, "http"));
//httpHosts.add(new HttpHost("172.16.0.xxx", 9200, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder =
new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
//init index request,put add real data as index request source
return Requests.indexRequest().index("my-index").id(element).source(json);
}
@Override
public void process(String s, RuntimeContext runtimeContext,
RequestIndexer requestIndexer) {
requestIndexer.add(createIndexRequest(s));
}
});
// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1);
//add data source
DataStream<String> stringDataStream =
env.addSource(dataGeneratorSource, "DataGeneratorSource").returns(Types.STRING);
//add elstic search sink
stringDataStream.addSink(esSinkBuilder.build());
env.execute();
}
}