CSS Elasticsearch输出流
功能描述
DLI将Flink作业的输出数据输出到云搜索服务CSS的Elasticsearch中。Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于日志分析、站内搜索等场景。
云搜索服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。
云搜索服务的更多信息,请参见《云搜索服务用户指南》。
创建CSS集群时如果开启了安全模式,后续将无法关闭。
前提条件
语法格式
1 2 3 4 5 6 7 8 9 10 |
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
WITH (
type = "es",
region = "",
cluster_address = "",
es_index = "",
es_type= "",
es_fields= "",
batch_insert_data_num= ""
);
|
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
输出通道类型,es表示输出到云搜索服务中。 |
region |
是 |
数据所在的云搜索服务所在区域。 |
cluster_address |
是 |
云搜索服务集群的内网访问地址,例如:x.x.x.x:x,多个地址时以逗号分隔。 |
es_index |
是 |
待插入数据的索引,支持参数化。对应CSS服务中的index。 |
es_type |
是 |
待插入数据的文档类型,支持参数化。对应CSS服务中的type。 若使用的es版本为6.x,则该值不能以"_"开头。 若使用的es版本为7.x,如果提前预置CSS服务中的type,则es_type需为"_doc",否则可为符合CSS规范的值。 |
es_fields |
是 |
待插入数据字段的key,具体形式如:"id,f1,f2,f3,f4",并且保证与sink中数据列一一对应;如果不使用key,而是采用随机的属性字段,则无需使用id关键字,具体形式如:"f1,f2,f3,f4,f5"。对应CSS服务中的filed。 |
batch_insert_data_num |
是 |
表示一次性批量写入的数据量,值必须为正整数,单位为:条。上限为65536,默认值为10。 |
action |
否 |
当值为add时,表示遇到相同id时,数据被强制覆盖,当值为upsert时,表示遇到相同id时,更新数据(选择upsert时,es_fileds字段中必须指定id),默认值为add。 |
enable_output_null |
否 |
使用该参数来配置是否输出空字段。当该参数为true表示输出空字段(值为null),若为false表示不输出空字段。默认为false。 |
max_record_num_cache |
否 |
记录最大缓存数。 |
es_certificate_name |
否 |
跨源认证信息名称。 若es集群开启安全模式且开启https,则使用证书进行访问,创建的跨源认证类型需要为“CSS”。 若es集群开启安全模式,但关闭http,则使用账号密码进行访问,创建的跨源认证类型需要为"Password"。 |
注意事项
当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
CREATE SINK STREAM qualified_cars (
car_id STRING,
car_owner STRING,
car_age INT,
average_speed INT,
total_miles INT
)
WITH (
type = "es",
region = "xxx",
cluster_address = "192.168.0.212:9200",
es_index = "car",
es_type = "information",
es_fields = "id,owner,age,speed,miles",
batch_insert_data_num = "10"
);
|