更新时间:2025-02-12 GMT+08:00

FlinkSQL Elasticsearch表开发建议

FlinkSQL Elasticsearch作业,参数配置如下:

表1 Flink作业With属性

参数

是否必选

数据类型

描述

connector

必选

String

指定要使用的连接器,如elasticsearch-7,即连接到Elasticsearch 7.x或更高版本的集群。

hosts

必选

String

要连接的一台或多台Elasticsearch主机地址。

例如:'http://10.10.10.10:24100;http://10.10.10.10:24100'

index

必选

String

Elasticsearch中每条记录的索引。可以是一个静态索引(如 'myIndex' )或一个动态索引(如 'index-{log_ts|yyyy-MM-dd}' )。

document-id.key-delimiter

可选

String

复合键的分隔符,默认为“_”。若指定为“$”,则文档ID为“KEY1$KEY2$KEY3”。

username

可选

String

用于连接Elasticsearch实例的用户名。

password

可选

String

用于连接Elasticsearch实例的用户名密码。若配置了username,则必须配置为非空字符串。

failure-handler

可选

String

对Elasticsearch请求失败情况的失败处理策略,有效策略如下:

  • fail(默认值):如果请求失败并因此导致作业失败,则发生异常。
  • ignore:忽略失败并放弃请求。
  • retry-rejected:重新添加由于队列容量饱和而失败的请求。
  • 自定义类名称:使用ActionRequestFailureHandler的子类进行失败处理。

sink.flush-on-checkpoint

可选

Boolean

  • true:确保在进行CheckPoint时读出缓冲区中的数据,默认值为“true”。
  • false:Sink将不对请求的一致性提供保证。在进行CheckPoint时,对于进行中的请求,Sink将不再等待Elasticsearch的执行完成确认。

sink.bulk-flush.max-actions

可选

Integer

每个批量请求的最大缓冲操作数,默认值为“1000”,可设置为“0”禁用该功能。

sink.bulk-flush.max-size

可选

MemorySize

每个批量请求的缓冲操作在内存中的最大值,默认值为“2MB”,单位必须为MB,可设置为“0”禁用该功能。

sink.bulk-flush.interval

可选

Duration

缓冲操作的间隔时间,默认值为“1s”,可设置为“0”禁用该功能。

sink.bulk-flush.backoff.strategy

可选

String

指定在由于临时请求错误导致任何flush操作失败时如何执行重试。有效策略为:

  • DISABLED(默认值):不执行重试,即第一次请求错误后失败。
  • CONSTANT :常量回退,即每次回退等待时间相同。
  • EXPONENTIAL :指数回退,即每次回退等待时间指数递增。

sink.bulk-flush.backoff.max-retries

可选

Integer

最大回退重试次数。

sink.bulk-flush.backoff.delay

可选

Duration

每次退避重试之间的延迟,退避策略如下:

  • CONSTANT:每次重试之间的延迟。
  • EXPONENTIAL:初始的延迟。

connection.path-prefix

可选

String

添加到每个REST通信中的前缀字符串,例如: '/v1' 。

format

可选

String

Elasticsearch连接器支持的指定格式,默认值为“json”。