更新时间:2024-07-27 GMT+08:00

CSS Elasticsearch输出流

功能描述

DLI将Flink作业的输出数据输出到云搜索服务CSS的Elasticsearch中。Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于日志分析、站内搜索等场景。

云搜索服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。

云搜索服务的更多信息,请参见《云搜索服务用户指南》。

创建CSS集群时如果开启了安全模式,后续将无法关闭。

前提条件

  • 请务必确保您的账户下已在云搜索服务里创建了集群。如何创建集群请参考《云搜索服务用户指南》创建集群章节。
  • 该场景作业需要运行在DLI的独享队列上,因此要与云搜索服务建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

    如何建立增强型跨源连接,请参见《数据湖探索用户指南》增强型跨源连接章节。

    如何设置安全组规则,请参见《虚拟私有云用户指南》“安全组”章节。

语法格式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "es",
    region = "",
    cluster_address = "",
    es_index = "",
    es_type= "",
    es_fields= "",
    batch_insert_data_num= ""
  );

关键字

表1 关键字说明

参数

是否必选

说明

type

输出通道类型,es表示输出到云搜索服务中。

region

数据所在的云搜索服务所在区域。

cluster_address

云搜索服务集群的内网访问地址,例如:x.x.x.x:x,多个地址时以逗号分隔。

es_index

待插入数据的索引,支持参数化。对应CSS服务中的index。

具体请参考《云搜索服务产品介绍》。

es_type

待插入数据的文档类型,支持参数化。对应CSS服务中的type。

具体请参考《云搜索服务产品介绍》。

若使用的es版本为6.x,则该值不能以"_"开头。

若使用的es版本为7.x,如果提前预置CSS服务中的type,则es_type需为"_doc",否则可为符合CSS规范的值。

es_fields

待插入数据字段的key,具体形式如:"id,f1,f2,f3,f4",并且保证与sink中数据列一一对应;如果不使用key,而是采用随机的属性字段,则无需使用id关键字,具体形式如:"f1,f2,f3,f4,f5"。对应CSS服务中的filed。

具体请参考《云搜索服务产品介绍》。

batch_insert_data_num

表示一次性批量写入的数据量,值必须为正整数,单位为:条。上限为65536,默认值为10。

action

当值为add时,表示遇到相同id时,数据被强制覆盖,当值为upsert时,表示遇到相同id时,更新数据(选择upsert时,es_fields字段中必须指定id),默认值为add。

enable_output_null

使用该参数来配置是否输出空字段。当该参数为true表示输出空字段(值为null),若为false表示不输出空字段。默认为false。

max_record_num_cache

记录最大缓存数。

es_certificate_name

跨源认证信息名称。

创建跨源认证请参考跨源认证

若es集群开启安全模式且开启https,则使用证书进行访问,创建的跨源认证类型需要为“CSS”。

若es集群开启安全模式,但关闭https,则使用证书和账号密码进行访问,创建的跨源认证类型需要为"Password"。

注意事项

当配置项支持参数化时,表示将记录中的一列或者多列作为该配置项的一部分。例如当配置项设置为car_${car_brand}时,如果一条记录的car_brand列值为BMW,则该配置项在该条记录下为car_BMW。

示例

将流qualified_cars的数据输出到云搜索服务的集群。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE SINK STREAM qualified_cars (
  car_id STRING,
  car_owner STRING,
  car_age INT,
  average_speed INT,
  total_miles INT
)
  WITH (
    type = "es",
    region = "xxx",
    cluster_address = "192.168.0.212:9200",
    es_index = "car",
    es_type = "information",
    es_fields = "id,owner,age,speed,miles",
    batch_insert_data_num = "10"
);