CSS Elasticsearch输出流
功能描述
DLI将Flink作业的输出数据输出到云搜索服务CSS的Elasticsearch中。Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于日志分析、站内搜索等场景。
云搜索服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。
云搜索服务的更多信息,请参见《云搜索服务用户指南》。
创建CSS集群时如果开启了安全模式,后续将无法关闭。
前提条件
- 请务必确保您的账户下已在云搜索服务里创建了集群。如何创建集群请参考《云搜索服务用户指南》中创建集群章节。
- 该场景作业需要运行在DLI的独享队列上,因此要与云搜索服务建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
语法格式
1 2 3 4 5 6 7 8 9 10 |
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "es", region = "", cluster_address = "", es_index = "", es_type= "", es_fields= "", batch_insert_data_num= "" ); |
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
输出通道类型,es表示输出到云搜索服务中。 |
region |
是 |
数据所在的云搜索服务所在区域。 |
cluster_address |
是 |
云搜索服务集群的内网访问地址,例如:x.x.x.x:x,多个地址时以逗号分隔。 |
es_index |
是 |
待插入数据的索引,支持参数化。对应CSS服务中的index。 具体请参考《云搜索服务产品介绍》。 |
es_type |
是 |
待插入数据的文档类型,支持参数化。对应CSS服务中的type。 具体请参考《云搜索服务产品介绍》。 若使用的es版本为6.x,则该值不能以"_"开头。 若使用的es版本为7.x,如果提前预置CSS服务中的type,则es_type需为"_doc",否则可为符合CSS规范的值。 |
es_fields |
是 |
待插入数据字段的key,具体形式如:"id,f1,f2,f3,f4",并且保证与sink中数据列一一对应;如果不使用key,而是采用随机的属性字段,则无需使用id关键字,具体形式如:"f1,f2,f3,f4,f5"。对应CSS服务中的filed。 具体请参考《云搜索服务产品介绍》。 |
batch_insert_data_num |
是 |
表示一次性批量写入的数据量,值必须为正整数,单位为:条。上限为65536,默认值为10。 |
action |
否 |
当值为add时,表示遇到相同id时,数据被强制覆盖,当值为upsert时,表示遇到相同id时,更新数据(选择upsert时,es_fields字段中必须指定id),默认值为add。 |
enable_output_null |
否 |
使用该参数来配置是否输出空字段。当该参数为true表示输出空字段(值为null),若为false表示不输出空字段。默认为false。 |
max_record_num_cache |
否 |
记录最大缓存数。 |
es_certificate_name |
否 |
跨源认证信息名称。 创建跨源认证请参考跨源认证。 若es集群开启安全模式且开启https,则使用证书进行访问,创建的跨源认证类型需要为“CSS”。 若es集群开启安全模式,但关闭https,则使用证书和账号密码进行访问,创建的跨源认证类型需要为"Password"。 |
注意事项
当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "es", region = "xxx", cluster_address = "192.168.0.212:9200", es_index = "car", es_type = "information", es_fields = "id,owner,age,speed,miles", batch_insert_data_num = "10" ); |