更新时间:2024-04-19 GMT+08:00

Elasticsearch

功能描述

DLI将Flink作业的输出数据输出到云搜索服务CSS的Elasticsearch 引擎的索引中。

Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于日志分析、站内搜索等场景。

云搜索服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。

云搜索服务的更多信息,请参见《云搜索服务用户指南》

更多具体使用可参考开源社区文档:Elasticsearch SQL 连接器

表1 支持类别

类别

详情

支持表类型

结果表

支持数据格式

JSON

前提条件

  • 请务必确保您的账户下已在云搜索服务里创建了集群。如何创建集群请参考《云搜索服务用户指南》中创建集群章节。

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • with参数中字段只能使用单引号,不能使用双引号。
  • 当前只支持CSS集群7.X及以上版本。
  • 如果开启安全模式,开启https,需要配置用户名username、密码password、证书位置certificate。请注意该场景hosts字段值以https开头。
  • CSS集群安全组入向规则必须开启ICMP。
  • with参数中字段只能使用单引号,不能使用双引号。
  • 数据类型的使用,请参考Format章节。

语法格式

create table esSink (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (','PRIMARY KEY (attr_name, ...) NOT ENFORCED)
)
with (
  'connector' = 'elasticsearch-7',
  'hosts' = '',
  'index' = ''
);

参数说明

表2 Elasticsearch结果表参数说明

参数

是否必选

默认值

类型

说明

connector

String

指定要使用的连接器,固定为:elasticsearch-7。表示连接到 Elasticsearch 7.x 及更高版本集群。

hosts

String

Elasticsearch所在集群的主机名,多个以';'间隔。

index

String

每条记录的 Elasticsearch 索引。可以是静态索引(例如'myIndex')或动态索引(例如'index-{log_ts|yyyy-MM-dd}')。更多详细信息,请参见下面的动态索引

username

String

Elasticsearch所在集群的账号。该账号参数需和密码“password”参数同时配置。

password

String

Elasticsearch所在集群的密码。该密码参数需和“username”参数同时配置。

document-id.key-delimiter

_

String

复合键的分隔符(默认为"_"),例如,指定为"$"将导致文档 ID 为"KEY1$KEY2$KEY3"。

failure-handler

fail

String

对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为:

  • fail:如果请求失败并因此导致作业失败,则抛出异常。
  • ignore:忽略失败并放弃请求。
  • retry-rejected:重新添加由于队列容量饱和而失败的请求。
  • 自定义类名称:使用 ActionRequestFailureHandler 的子类进行失败处理。

sink.flush-on-checkpoint

true

Boolean

在进行 checkpoint 时是否保证刷出缓冲区中的数据。

如果关闭这一选项,在进行checkpoint时 sink 将不再为所有进行 中的请求等待 Elasticsearch 的执行完成确认。因此,在这种情况下 sink 将不对至少一次的请求的一致性提供任何保证。

sink.bulk-flush.max-actions

1000

Interger

每个批量请求的最大缓冲操作数。可以设置'0'为禁用它。

sink.bulk-flush.max-size

2mb

MemorySize

每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。 可以设置为'0'来禁用它。

sink.bulk-flush.interval

1s

Duration

flush 缓冲操作的间隔。 可以设置为'0'来禁用它。

注意,'sink.bulk-flush.max-size'和'sink.bulk-flush.max-actions'都设置为'0'的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。

sink.bulk-flush.backoff.strategy

DISABLED

String

指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为:

  • DISABLED:不执行重试,即第一次请求错误后失败。
  • CONSTANT:等待重试之间的回退延迟。
  • EXPONENTIAL:先等待回退延迟,然后在重试之间指数递增。

sink.bulk-flush.backoff.max-retries

Integer

最大回退重试次数。

sink.bulk-flush.backoff.delay

Duration

每次退避尝试之间的延迟。

对于 CONSTANT 退避策略,该值是每次重试之间的延迟。对于 EXPONENTIAL 退避策略,该值是初始的延迟。

connection.path-prefix

String

添加到每个REST通信中的前缀字符串,例如, '/v1'。

connection.request-timeout

Duration

从连接管理器请求连接的超时时间。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。

connection.timeout

Duration

建立请求的超时时间 。

超时时间必须大于或者等于 0 ,如果设置为 0 则是无限超时。

socket.timeout

Duration

等待数据的 socket 的超时时间 (SO_TIMEOUT)。超时时间必须大于或者等于 0,如果设置为 0 则是无限超时。

format

json

String

Elasticsearch连接器支持指定格式。该格式必须生成有效的 json 文档。默认情况下使用内置'json'格式。

请参考Format页面以获取更多详细信息和格式参数。

certificate

String

Elasticsearch集群的证书在OBS中的位置。

仅在开启安全模式,且开启https下需要配置该参数。

请先在CSS管理控制台下载证书后将证书上传至OBS,该参数配置的是OBS地址。

例如:obs://bucket/path/CloudSearchService.cer

主键处理

Elasticsearch sink 可以根据是否定义了一个主键来确定是在 upsert 模式还是 append 模式下工作。

  • 如果定义了主键,Elasticsearch sink 将以upsert模式工作,该模式可以消费包含UPDATE/DELETE消息的查询。
  • 如果未定义主键,Elasticsearch sink 将以append模式工作,该模式只能消费包含INSERT消息的查询。

在Elasticsearch连接器中,主键用于计算Elasticsearch 的文档ID,文档ID为最多512字节且不包含空格的字符串。

Elasticsearch连接器通过使用 document-id.key-delimiter 指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档ID字符串。 某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTES,ROW,ARRAY,MAP 等。

如果未指定主键,Elasticsearch 将自动生成文档ID。

动态索引

Elasticsearch sink同时支持静态索引和动态索引。

  • 如果你想使用静态索引,则index选项值应为纯字符串,例如 'myusers',所有记录都将被写入到“myusers”索引中。
  • 如果你想使用动态索引,你可以使用 {field_name} 来引用记录中的字段值来动态生成目标索引。
    • 可以使用 '{field_name|date_format_string}' 将 TIMESTAMP/DATE/TIME 类型的字段值转换为 date_format_string 指定的格式。 date_format_string 与 Java 的 DateTimeFormatter 兼容。 例如,如果选项值设置为 'myusers-{log_ts|yyyy-MM-dd}',则 log_ts 字段值为 2020-03-27 12:25:55 的记录将被写入到 “myusers-2020-03-27” 索引中。
    • 可以使用 '{now()|date_format_string}' 将当前的系统时间转换为 date_format_string 指定的格式。now() 对应的时间类型是 TIMESTAMP_WITH_LTZ 。 在将系统时间格式化为字符串时会使用 session 中通过 table.local-time-zone 中配置的时区。 使用 NOW(), now(), CURRENT_TIMESTAMP, current_timestamp 均可以。

      使用当前系统时间生成的动态索引时, 对于changelog的流,无法保证同一主键对应的记录能产生相同的索引名, 因此使用基于系统时间的动态索引,只能支持 append only 的流。

示例

该示例是从Kafka数据源中读取数据,并写入到Elasticsearch结果表中(本次所使用Elasticsearch版本为7.10.2),其具体步骤如下:

  1. 参考,在DLI上根据Elasticsearch和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。
  2. 设置Elasticsearch和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考分别根据Elasticsearch和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 登录Elasticsearch集群的Kibana,并选择Dev Tools,输入下列语句并执行,以创建值为orders的index:
    PUT /orders
    {
      "settings": {
        "number_of_shards": 1
      },
    	"mappings": {
    	  "properties": {
    	    "order_id": {
    	      "type": "text"
    	    },
    	    "order_channel": {
    	      "type": "text"
    	    },
    	    "order_time": {
    	      "type": "text"
    	    },
    	    "pay_amount": {
    	      "type": "double"
    	    },
    	    "real_pay": {
    	      "type": "double"
    	    },
    	    "pay_time": {
    	      "type": "text"
    	    },
    	    "user_id": {
    	      "type": "text"
    	    },
    	    "user_name": {
    	      "type": "text"
    	    },
    	    "area_id": {
    	      "type": "text"
    	    }
    	  }
    	}
    }
  4. 创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。
    如下脚本中的加粗参数请根据实际环境修改。
    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'KafkaTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE elasticsearchSink (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'elasticsearch-7',
      'hosts' = 'ElasticsearchAddress:ElasticsearchPort',
      'index' = 'orders'
    );
    insert into elasticsearchSink select * from kafkaSource;
  5. 连接Kafka集群,向kafka中插入如下测试数据:
    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  6. 在Elasticsearch集群的Kibana中输入下述语句并查看相应结果:
    GET orders/_search

    {
      "took" : 201,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 2,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "orders",
            "_type" : "_doc",
            "_id" : "fopyx4sBUuT2wThgYGcp",
            "_score" : 1.0,
            "_source" : {
              "order_id" : "202103241606060001",
              "order_channel" : "appShop",
              "order_time" : "2021-03-24 16:06:06",
              "pay_amount" : 200.0,
              "real_pay" : 180.0,
              "pay_time" : "2021-03-24 16:10:06",
              "user_id" : "0001",
              "user_name" : "Alice",
              "area_id" : "330106"
            }
          },
          {
            "_index" : "orders",
            "_type" : "_doc",
            "_id" : "f4pyx4sBUuT2wThgYGcr",
            "_score" : 1.0,
            "_source" : {
              "order_id" : "202103241000000001",
              "order_channel" : "webShop",
              "order_time" : "2021-03-24 10:00:00",
              "pay_amount" : 100.0,
              "real_pay" : 100.0,
              "pay_time" : "2021-03-24 10:02:03",
              "user_id" : "0001",
              "user_name" : "Alice",
              "area_id" : "330106"
            }
          }
        ]
      }
    }