CSS Elasticsearch Sink Stream
Function
DLI exports Flink job output data to Elasticsearch of Cloud Search Service (CSS). Elasticsearch is a popular enterprise-class Lucene-powered search server and provides the distributed multi-user capabilities. It delivers multiple functions, including full-text retrieval, structured search, analytics, aggregation, and highlighting. With Elasticsearch, you can achieve stable, reliable, real-time search. Elasticsearch applies to diversified scenarios, such as log analysis and site search.
CSS is a fully managed, distributed search service. It is fully compatible with open-source Elasticsearch and provides DLI with structured and unstructured data search, statistics, and report capabilities.
For more information about CSS, see the Cloud Search Service User Guide.
If the security mode is enabled when you create a CSS cluster, it cannot be undone.
Prerequisites
- Ensure that you have created a cluster on CSS using your account. For details about how to create a cluster on CSS, see Creating a Cluster in the Cloud Search Service User Guide.
- In this scenario, jobs must run on the dedicated queue of DLI. Therefore, DLI must interconnect with the enhanced datasource connection that has been connected with CSS. You can also set the security group rules as required.
For details about how to create an enhanced datasource connection, see Enhanced Datasource Connections in the Data Lake Insight User Guide.
For details about how to configure security group rules, see Security Group in the Virtual Private Cloud User Guide.
Syntax
1 2 3 4 5 6 7 8 9 10 | CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
WITH (
type = "es",
region = "",
cluster_address = "",
es_index = "",
es_type= "",
es_fields= "",
batch_insert_data_num= ""
);
|
Keyword
| Parameter | Mandatory | Description |
|---|---|---|
| type | Yes | Output channel type. Value es indicates that data is stored to CSS. |
| region | Yes | Region where CSS is located. for example, "cn-north-1". |
| cluster_address | Yes | Private access address of the CSS cluster, for example: x.x.x.x:x. Use commas (,) to separate multiple addresses. |
| es_index | Yes | Index of the data to be inserted. This parameter corresponds to the index in CSS. For details, see the Cloud Search Service Overview. |
| es_type | Yes | Type of the document to which data is to be inserted. This parameter corresponds to the type in CSS. For details, see the Cloud Search Service Overview. |
| es_fields | Yes | Key of the data field to be inserted. The parameter is in the format of "Id, f1, f2, f3, f4". Ensure that the parameter value has a one-to-one mapping with data columns in the sink stream. If the key is not used, remove the id keyword. Specifically, the parameter is in the format of "F1, f2, f3, f4, f5". This parameter corresponds to the field in CSS. For details, see the Cloud Search Service Overview. |
| batch_insert_data_num | Yes | Amount of data to be written in batches at a time. The value must be a positive integer. The upper limit is 100. The default value is 10. |
| action | No | If the value is add, data is forcibly overwritten when the same ID is encountered. If the value is upsert, data is updated when the same ID is encountered. (If upsert is selected, id in the es_fields field must be specified.) The default value is add. |
| enable_output_null | No | This parameter is used to configure whether to generate an empty field. If this parameter is set to true, an empty field (the value is null) is generated. If set to false, no empty field is generated. The default value is false. |
| max_record_num_cache | No | Maximum number of records that can be cached. |
| es_certificate_name | No | Name of the datasource authentication information. The type must be CSS. |
Precautions
If a configuration item can be specified through parameter configurations, one or more columns in the record can be used as part of the configuration item. For example, if the configuration item is set to car_$ {car_brand} and the value of car_brand in a record is BMW, the value of this configuration item is car_BMW in the record.
Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | CREATE SINK STREAM qualified_cars (
car_id STRING,
car_owner STRING,
car_age INT,
average_speed INT,
total_miles INT
)
WITH (
type = "es",
region = "cn-north-1" ,
cluster_address = "192.168.0.212:9200",
es_index = "car",
es_type = "information",
es_fields = "id,owner,age,speed,miles",
batch_insert_data_num = "10"
);
|
Last Article: CloudTable OpenTSDB Sink Stream
Next Article: DCS Sink Stream
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.