更新时间:2022-07-12 GMT+08:00

Elasticsearch结果表

功能描述

DLI将Flink作业的输出数据输出到云搜索服务CSS的Elasticsearch中。Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于日志分析、站内搜索等场景。

云搜索服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。云搜索服务的更多信息,请参见《云搜索服务用户指南》

前提条件

  • 请务必确保您的账户下已在云搜索服务里创建了集群。

    如果需要通过集群账号和密码访问Elasticsearch,则创建的云搜索服务集群必须开启安全模式并且关闭https

  • 该场景作业需要运行在DLI的独享队列上,因此要与云搜索服务建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。

注意事项

  • 当前只支持CSS集群7.X及以上版本,推荐使用7.6.2版本。
  • 如果不使用“connector.username”和“connector.password”参数时CSS集群请勿开启安全模式。
  • CSS集群安全组入向规则必须开启ICMP。

语法格式

create table esSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector.type' = 'elasticsearch',
  'connector.version' = '7',
  'connector.hosts' = 'http://xxxx:9200',
  'connector.index' = '',
  'connector.document-type' = '',
  'update-mode' = '',
  'format.type' = 'json'
);

参数说明

表1 参数说明

参数

是否必选

说明

connector.type

connector的类型,对于elasticsearch需配置为elasticsearch

connector.version

使用的elasticsearch的版本。

当前只能使用版本7,即该值只能为7

connector.hosts

Elasticsearch所在集群的主机名,多个以’;’间隔,注意请以http开头,如http://x.x.x.x:9200

connector.index

Elasticsearch的索引名

connector.document-type

Elasticsearch的type名称

当版本为7时,由于elasticsearch使用默认的_doc类型,因此该属性无效

update-mode

sink的写入类型,支持append和upsert

connector.key-delimiter

连接复合主键的拼接符,默认为_

connector.key-null-literal

当key中含有null时,使用该字符代替

connector.failure-handler

elasticsearch请求失败时的策略,默认为fail

fail:当请求失败且作业失败时抛出异常

ignore:忽略

retry-rejected:对于由于es节点的队列满时,会重新请求而不抛出失败。

custom:使用定制策略

connector.failure-handler-class

使用失败时的定制策略时所使用的自定义处理方式

connector.flush-on-checkpoint

checkpoint时是否会等待所有阻塞请求完成。

默认为true,表示会等待阻塞请求完成,如果配置为false,则表示不会等待阻塞请求完成。

connector.bulk-flush.max-actions

批量写入时的每次最大写入记录数

connector.bulk-flush.max-size

批量写入时的最大数据量,当前只支持MB,请带上单位 mb

connector.bulk-flush.interval

批量写入时的刷新的时间间隔,单位为milliseconds,无需带上单位

format.type

当前只支持json

connector.username

Elasticsearch所在集群的账号。该账号参数需和密码“connector.password”参数同时配置。

使用账号密码参数时,创建的云搜索服务集群必须开启安全模式并且关闭https

connector.password

Elasticsearch所在集群的密码。该密码参数需和“connector.username”参数同时配置。

示例

create table sink1(
  attr1 string,
  attr2 int
) with (
  'connector.type' = 'elasticsearch',
  'connector.version' = '7', 
  'connector.hosts' = 'http://xxxx:9200',
  'connector.index' = 'es',
  'connector.document-type' = 'one',
  'update-mode' = 'append',
  'format.type' = 'json'
);