Elasticsearch结果表
功能描述
DLI将Flink作业的输出数据输出到云搜索服务CSS的Elasticsearch中。Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于日志分析、站内搜索等场景。
云搜索服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。云搜索服务的更多信息,请参见《云搜索服务用户指南》。
前提条件
- 请务必确保您的账户下已在云搜索服务里创建了集群。如何创建集群请参考《云搜索服务用户指南》中创建集群章节。
如果需要通过集群账号和密码访问Elasticsearch,则创建的云搜索服务集群必须开启安全模式并且关闭https。
- 该场景作业需要运行在DLI的独享队列上,因此要与云搜索服务建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
- 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
- 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
注意事项
- 当前只支持CSS集群7.X及以上版本,推荐使用7.6.2版本。
- 如果不使用“connector.username”和“connector.password”参数时CSS集群请勿开启安全模式。
- CSS集群安全组入向规则必须开启ICMP。
语法格式
create table esSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://xxxx:9200', 'connector.index' = '', 'connector.document-type' = '', 'update-mode' = '', 'format.type' = 'json' );
参数说明
参数 |
是否必选 |
说明 |
---|---|---|
connector.type |
是 |
connector的类型,对于elasticsearch需配置为elasticsearch |
connector.version |
是 |
使用的elasticsearch的版本。 当前只能使用版本7,即该值只能为7 |
connector.hosts |
是 |
Elasticsearch所在集群的主机名,多个以’;’间隔,注意请以http开头,如http://x.x.x.x:9200 |
connector.index |
是 |
Elasticsearch的索引名 |
connector.document-type |
是 |
Elasticsearch的type名称 当版本为7时,由于elasticsearch使用默认的_doc类型,因此该属性无效 |
update-mode |
是 |
sink的写入类型,支持append和upsert |
connector.key-delimiter |
否 |
连接复合主键的拼接符,默认为_ |
connector.key-null-literal |
否 |
当key中含有null时,使用该字符代替 |
connector.failure-handler |
否 |
elasticsearch请求失败时的策略,默认为fail fail:当请求失败且作业失败时抛出异常 ignore:忽略 retry-rejected:对于由于es节点的队列满时,会重新请求而不抛出失败。 custom:使用定制策略 |
connector.failure-handler-class |
否 |
使用失败时的定制策略时所使用的自定义处理方式 |
connector.flush-on-checkpoint |
否 |
checkpoint时是否会等待所有阻塞请求完成。 默认为true,表示会等待阻塞请求完成,如果配置为false,则表示不会等待阻塞请求完成。 |
connector.bulk-flush.max-actions |
否 |
批量写入时的每次最大写入记录数 |
connector.bulk-flush.max-size |
否 |
批量写入时的最大数据量,当前只支持MB,请带上单位 mb |
connector.bulk-flush.interval |
否 |
批量写入时的刷新的时间间隔,单位为milliseconds,无需带上单位 |
format.type |
是 |
当前只支持json |
connector.username |
否 |
Elasticsearch所在集群的账号。该账号参数需和密码“connector.password”参数同时配置。 使用账号密码参数时,创建的云搜索服务集群必须开启安全模式并且关闭https。 |
connector.password |
否 |
Elasticsearch所在集群的密码。该密码参数需和“connector.username”参数同时配置。 |
示例
create table sink1( attr1 string, attr2 int ) with ( 'connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://xxxx:9200', 'connector.index' = 'es', 'connector.document-type' = 'one', 'update-mode' = 'append', 'format.type' = 'json' );