Updated on 2025-04-15 GMT+08:00

Flink SQL Elasticsearch Development Suggestions

Job Parameters

Table 1 With properties of Flink jobs

Parameter

Mandatory (Yes/No)

Data Type

Description

connector

Yes

String

Connector to be interconnected. For example, elasticsearch-7 means a cluster of Elasticsearch 7.x or later is interconnected.

hosts

Yes

String

IP addresses of one or multiple Elasticsearch hosts to be connected.

Example: 'http://10.10.10.10:24100;http://10.10.10.10:24100'

index

Yes

String

Index of each record in Elasticsearch The index can be a static one (for example, 'myIndex') or a dynamic one (for example, 'index-{log_ts|yyyy-MM-dd}').

document-id.key-delimiter

No

String

Delimiter of a composite key. The default value is _. If this parameter is set to $, the document ID is KEY1$KEY2$KEY3.

username

No

String

Username for connecting to the Elasticsearch instance.

password

No

String

Password for connecting to the Elasticsearch instance. This parameter must be a non-null string if username is set.

failure-handler

No

String

Policy for handling Elasticsearch request failures. Value options include:

  • fail (default value): throws an exception if the request fails and the job fails as a result.
  • ignore: ignores failures and drops the request.
  • retry-rejected: re-adds requests that have failed due to queue capacity saturation.
  • custom class name: handles failures with an ActionRequestFailureHandler subclass.

sink.flush-on-checkpoint

No

Boolean

  • true (default value): ensures that data in the buffer is read during Checkpoint processing.
  • false: indicates sink does not guarantee the consistency of requests. Sink does not wait for all pending execution requests to be completed by Elasticsearch during Checkpoint progress.

sink.bulk-flush.max-actions

No

Integer

Maximum number of buffer operations for each batch request. The default value is 1000. You can set this parameter to 0 to disable this function.

sink.bulk-flush.max-size

No

MemorySize

Maximum size of the buffer operations for each batch request in the memory. The default value is 2 MB. The unit must be MB. You can set this parameter to 0 to disable this function.

sink.bulk-flush.interval

No

Duration

Interval for buffer operations. The default value is 1s. You can set this parameter to 0 to disable this function.

sink.bulk-flush.backoff.strategy

No

String

Retry strategy if any flush operations fail due to a temporary request error. Value options include:

  • DISABLED (default value): performs no retry, that is, the first request fails after an error occurs.
  • CONSTANT: indicates constant backoff, that is, the interval is same for each backoff retry.
  • EXPONENTIAL: indicates exponential backoff, that is, the interval increases exponentially between backoff retries.

sink.bulk-flush.backoff.max-retries

No

Integer

Maximum number of backoff retries.

sink.bulk-flush.backoff.delay

No

Duration

Delay between backoff retries. The options include:

  • CONSTANT: indicates delay between retries.
  • EXPONENTIAL: indicates the initial base delay.

connection.path-prefix

No

String

Prefix string to be added to each REST communication, for example, '/v1'.

format

No

String

Format supported by the Elasticsearch connector. The default value is json.