Java Sample Code
Description
Currently, a data source that continuously generates four strings is implemented based on the random number generator for writing data.
Sample Code
The following code snippet is used as an example. For complete code, see com.huawei.bigdata.flink.examples.
public class FlinkEsSinkExample { public static void main(String[] args) throws Exception { System.out.println("use command as:"); System.out.println( "flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.FlinkEsSinkExample /<JAR_PATH>/FlinkEsSinkExample-1.0.jar"); System.out.println( "******************************************************************************************"); System.out.println("<JAR_PATH> is the path of jar file"); System.out.println( "******************************************************************************************"); //init flink execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //create a dataSource according to RandomDataGenerator //of course, you can create any datasource from kafka, mysql, text file and so on DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new DataGenerator<String>() { RandomDataGenerator generator; @Override public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception { generator = new RandomDataGenerator(); } @Override public boolean hasNext() { return true; } @Override public String next() { return generator.nextHexString(4); } }); //create HtppHost list which are needed by elasticsearch sink List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("172.16.0.117", 9200, "http")); //httpHosts.add(new HttpHost("172.16.0.xxx", 9200, "http")); ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<String>() { public IndexRequest createIndexRequest(String element) { Map<String, String> json = new HashMap<>(); json.put("data", element); //init index request,put add real data as index request source return Requests.indexRequest().index("my-index").id(element).source(json); } @Override public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { requestIndexer.add(createIndexRequest(s)); } }); // configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered esSinkBuilder.setBulkFlushMaxActions(1); //add data source DataStream<String> stringDataStream = env.addSource(dataGeneratorSource, "DataGeneratorSource").returns(Types.STRING); //add elstic search sink stringDataStream.addSink(esSinkBuilder.build()); env.execute(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.