Help Center/ Data Lake Insight/ Developer Guide/ Flink Jobs/ Flink Jar Job Development/ Reading Data from MySQL CDC and Writing Data to Elasticsearch
Updated on 2026-03-10 GMT+08:00

Reading Data from MySQL CDC and Writing Data to Elasticsearch

Scenario

Change Data Capture (CDC) can synchronize incremental changes from the source database to one or more destinations. During data synchronization, CDC processes data, for example, grouping (GROUP BY) and joining multiple tables (JOIN).

This example creates a MySQL CDC source table to monitor MySQL data changes and insert the changed data into Elasticsearch.

Preparations

  • Create an RDS for MySQL instance. In this example, the RDS for MySQL database version is 8.0.
  • Create a CSS instance.
  • Before using DLI, you need to configure a DLI job bucket. The bucket is used to store temporary data generated during DLI job running, such as job logs and results.

    For details, see Configuring a DLI Job Bucket.

  • When using Flink 1.15 or later, users need to configure agencies themselves to avoid potential impacts on job execution.

    For details, see Creating a DLI Custom Agency.

Development Process

Table 1 Process of reading data from MySQL CDC and writing the data to Elasticsearch

No.

Step

Reference

1

Create an elastic resource pool and create a queue within the pool to run DLI jobs.

Step 1: Create an Elastic Resource Pool and Create Queues Within It

2

Create an RDS for MySQL database and table.

Step 2: Create an RDS for MySQL Database and Table

3

Create an Elasticsearch search index to receive result data.

Step 3: Create an Elasticsearch Index

4

Create an enhanced datasource connection to connect DLI and RDS.

Step 4: Create an Enhanced Datasource Connection Between DLI and RDS

5

Create an enhanced datasource connection to connect DLI and CSS.

Step 5: Create an Enhanced Datasource Connection Between DLI and CSS

6

Edit the Java sample code on a local host and upload it to an OBS bucket.

Step 6: Edit Java Sample Code on a Local Host

7

Create and run a Flink Jar job on DLI.

Step 7: Submit a Flink Jar Job on DLI

8

Send data and query results: Insert data into the RDS for MySQL table and view the execution result on CSS.

Step 8: Verify the Result

Step 1: Create an Elastic Resource Pool and Create Queues Within It

Note that when creating a queue in the resource pool, ensure that the CIDR block of the queue must not overlap with that of the RDS instance. Otherwise, the datasource connection will fail to be created.

  1. Log in to the DLI management console.
  2. In the navigation pane on the left, choose Resources > Resource Pool.
  3. On the displayed page, click Buy Resource Pool in the upper right corner.
  4. On the displayed page, set the parameters.
    In this example, we will buy the resource pool in the CN East-Shanghai2 region. Table 2 describes the parameters.
    Table 2 Parameter descriptions

    Parameter

    Description

    Example Value

    Region

    Select a region where you want to buy the elastic resource pool.

    CN East-Shanghai2

    Project

    Project uniquely preset by the system for each region

    Default

    Name

    Name of the elastic resource pool

    dli_resource_pool

    Specifications

    Specifications of the elastic resource pool

    Standard

    CU Range

    The maximum and minimum CUs allowed for the elastic resource pool

    64-64

    CIDR Block

    CIDR block the elastic resource pool belongs to. If you use an enhanced datasource connection, this CIDR block cannot overlap that of the data source. Once set, this CIDR block cannot be changed.

    172.16.0.0/19

    Enterprise Project

    Select an enterprise project for the elastic resource pool.

    default

  5. Click Buy.
  6. Click Submit.
  7. In the elastic resource pool list, locate the pool you just created and click Add Queue in the Operation column.
  8. Set the basic parameters listed below.
    Table 3 Basic parameters for adding a queue

    Parameter

    Description

    Example Value

    Name

    Name of the queue to add

    dli_queue_01

    Type

    Type of the queue

    • To execute SQL jobs, select For SQL.
    • To execute Flink or Spark jobs, select For general purpose.

    For SQL jobs, select For SQL.

    For other scenarios, select For general purpose.

    Engine

    SQL queue engine. The value can only be Spark.

    Spark

    Enterprise Project

    Select an enterprise project.

    default

  9. Click Next and configure scaling policies for the queue.

    Click Create to add a scaling policy with varying priority, period, minimum CUs, and maximum CUs.

    Figure 1 shows the scaling policy configured in this example.
    Figure 1 Configuring a scaling policy when adding a queue
    Table 4 Scaling policy parameters

    Parameter

    Description

    Example Value

    Priority

    Priority of the scaling policy in the current elastic resource pool. A larger value indicates a higher priority. In this example, only one scaling policy is configured, so its priority is set to 1 by default.

    1

    Period

    The first scaling policy is the default policy, and its Period parameter configuration cannot be deleted or modified.

    The period for the scaling policy is from 00 to 24.

    00–24

    Min CU

    Minimum number of CUs allowed by the scaling policy

    16

    Max CU

    Maximum number of CUs allowed by the scaling policy

    64

  10. Click OK.

Step 2: Create an RDS for MySQL Database and Table

  1. Log in to the RDS console. On the displayed page, locate the desired RDS for MySQL instance, click More in its Operation column, and select Log In.
  2. Enter the username and password for logging in to the instance. Click Log In.
  3. On the Databases page, click Create Database. In the dialog box that appears, enter test as the database name and retain the default values for the rest parameters. Then, click OK.
  4. In the Operation column of row where the created database is, click SQL Window and enter the following statement to create a table:
    CREATE TABLE cdc-order (
    	`order_id` VARCHAR(64) NOT NULL,
    	`order_channel` VARCHAR(32) NOT NULL,
    	`order_time` VARCHAR(32),
    	`pay_amount` DOUBLE,
    	`real_pay` DOUBLE,
    	`pay_time` VARCHAR(32),
    	`user_id` VARCHAR(32),
    	`user_name` VARCHAR(32),
    	`area_id` VARCHAR(32)
    
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4;

Step 3: Create an Elasticsearch Index

  1. Log in to the CSS management console. In the navigation pane on the left, choose Clusters > Elasticsearch.
  2. On the Clusters page, locate the created CSS cluster and click Access Kibana in the Operation column.
  3. In the navigation pane of Kibana, choose Dev Tools.
  4. On the displayed Console page, run the following command to create index cdc-order:
PUT /cdc-order
{
  "settings": {
    "number_of_shards": 1
  },
	"mappings": {
	  "properties": {
	    "order_id": {
	      "type": "text"
	    },
	    "order_channel": {
	      "type": "text"
	    },
	    "order_time": {
	      "type": "text"
	    },
	    "pay_amount": {
	      "type": "double"
	    },
	    "real_pay": {
	      "type": "double"
	    },
	    "pay_time": {
	      "type": "text"
	    },
	    "user_id": {
	      "type": "text"
	    },
	    "user_name": {
	      "type": "text"
	    },
	    "area_id": {
	      "type": "text"
	    }
	  }
	}
}

Step 4: Create an Enhanced Datasource Connection Between DLI and RDS

  1. Go to the RDS console. In the navigation pane on the left, choose Instances. On the displayed page, click the name of the desired RDS instance. Basic information of the instance is displayed.
  2. In the Connection Information pane, obtain the floating IP address, database port, VPC, and subnet.
  3. Click the security group name. On the displayed page, click the Inbound Rules tab and add a rule to allow access from DLI queues.

    For example, if the CIDR block of the queue is 10.0.0.0/16, add the rule as follows:

    • Priority: Select 1.
    • Action: Select Allow.
    • Protocol: Select TCP.
    • Port: Leave it blank.
    • Type: Select IPv4.
    • Source: Enter 10.0.0.0/16.

    Click OK. The security group rule is added.

  4. Check whether the CSS instance and RDS instance are in the same VPC and subnet.
    1. If yes, go to 7. You do not need to create an enhanced datasource connection again.
    2. If no, go to 5. Create an enhanced datasource connection to connect CSS to the subnet where the RDS instance is.
  5. Log in to the DLI management console. In the navigation pane on the left, choose Datasource Connections. On the displayed Enhanced tab, click Create.
  6. In the dialog box that appears, set the following parameters:

    Set other parameters as needed.

    Click OK. Click the name of the created datasource connection to view its status. You can perform subsequent steps only after the connection status changes to Active.

  7. In the navigation pane on the left, choose Resources > Queue Management. On the displayed page, locate the queue you created in Step 1: Create an Elastic Resource Pool and Create Queues Within It, click More in its Operation column, and select Test Address Connectivity.
  8. In the displayed dialog box, enter <private-IP-address>:<database-port> of the RDS instance you have obtained in 2 in the Address box and click Test to check whether the database is reachable.

Step 5: Create an Enhanced Datasource Connection Between DLI and CSS

  1. On the CSS management console, choose Clusters > Elasticsearch. On the displayed page, click the name of the created CSS cluster to view basic information.
  2. On the Cluster Information page, obtain the Private Network Address, VPC, and Subnet.
  3. Click the security group name. On the displayed page, click the Inbound Rules tab and add a rule to allow access from DLI queues. For example, if the CIDR block of the queue is 10.0.0.0/16, set Priority to 1, Action to Allow, Protocol to TCP, Type to IPv4, Source to 10.0.0.0/16, and click OK.
  4. Check whether the RDS instance and CSS instance are in the same VPC and subnet.
    1. If yes, go to 7. You do not need to create an enhanced datasource connection again.
    2. If no, go to 5. Create an enhanced datasource connection to connect RDS to the subnet where the CSS instance is.
  5. Log in to the DLI management console. In the navigation pane on the left, choose Datasource Connections. On the displayed Enhanced tab, click Create.
  6. In the dialog box that appears, set the following parameters:

    Set other parameters as needed.

    Click OK. Click the name of the created datasource connection to view its status. You can perform subsequent steps only after the connection status changes to Active.

  7. In the navigation pane on the left, choose Resources > Queue Management. On the displayed page, locate the queue you created in Step 1: Create an Elastic Resource Pool and Create Queues Within It, click More in its Operation column, and select Test Address Connectivity.
  8. In the dialog box that appears, enter <private-IP-address>:<database-port> of the CSS instance you have obtained in 2 in the Address box and click Test to check whether the database is reachable.

Step 6: Edit Java Sample Code on a Local Host

Edit the Java sample code on a local host. After the application is developed, upload the compiled and packaged JAR file to the OBS bucket by referring to the basic example of Flink Jar job development to prepare for subsequent execution on DLI.

For details about the code editing example in this example, see Example Code.

Step 7: Submit a Flink Jar Job on DLI

For details about how to submit a Flink Jar job on DLI, see Creating a Flink Jar Job.

In the application, select the Flink Jar file created in Step 6: Edit Java Sample Code on a Local Host and specify the main class.
Table 5 Parameters for creating a Flink Jar job

Parameter

Description

Example Value

Queue

Resource queue used to execute Flink jobs.

A queue determines the compute resources accessible to a job during its operation within an elastic resource pool. Every queue is allocated with specific resources, known as CUs, whose configuration significantly impacts the job's performance and execution efficiency.

Before submitting a job, assess its resource needs and select an appropriate queue.

Flink Jar jobs support selecting For general purpose queues.

Select the queue where the job will run.

Application

User-defined package.

Custom program package

Main Class (--class)

Name of the JAR file to be loaded, for example, FlinkRdsToCSSExample. Options:

  • Default: The value is specified based on the Manifest file in the JAR file.
  • Manually assign: You must enter the class name and determine the list of class arguments (separated by spaces).
    NOTE:

    When a class belongs to a package, the main class path must contain the complete package path, for example, packagePath.KafkaMessageStreaming.

Manually assign

Other Dependencies

User-defined dependency files. Other dependency files need to be referenced in the code.

Before selecting a dependency file, upload the file to the OBS bucket and choose Data Management > Package Management to create a package. The package type is not limited. For details, see Creating a Package.

You can add the following command to the application to access the corresponding dependency file. fileName indicates the name of the file to be accessed, and ClassName indicates the name of the class that needs to access the file.

ClassName.class.getClassLoader().getResource("userData/fileName")

-

Flink Version

Flink version used for job running. Different versions of Flink may support different features, performance optimizations, and APIs.

Before selecting a Flink version, ensure that the Flink version is compatible with your job programs and dependencies.

For more information about Flink versions, refer to Flink Version Description.

If you choose to use Flink 1.15 (Flink general queue scenario), make sure to configure the agency information for the cloud service that DLI is allowed to access in the job.

For details about how to create a custom agency, see Customizing DLI Agency Permissions.

1.15

Step 8: Verify the Result

After the job is complete, check whether the corresponding index in the CSS cluster contains related data.

  1. Insert two data records into RDS. In this example, the database table is flink.cdc_order.
    Figure 2 Inserting data records
  2. Check whether the preceding two data records have been synchronized to CSS.
    Figure 3 Inserting data records

Example Code

This example applies to the Flink 1.15 compute engine.

  • POM file configurations
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>dli-flink-demo</artifactId>
            <groupId>com.huaweicloud</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>MySQLCDCToElasticsearchDemo</artifactId>
    
        <properties>
            <flink.version>1.15.0</flink.version>
            <flink_cdc.version>2.4.0</flink_cdc.version>
            <scala.binary.version>2.11</scala.binary.version>
            <elasticsearch.version>7.10.2</elasticsearch.version> <!-- Ensure this matches your ES version -->
        </properties>
    
        <dependencies>
            <!-- Flink Core Dependencies -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-hive_2.12</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-exec</artifactId>
                <version>3.1.0-h0.cbu.mrs.330.r24</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>3.3.1-h0.cbu.mrs.330.r9</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_2.12</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>1.15.0</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- MySQL CDC -->
            <!-- The following configuration does not exist in the 1.17 built-in dependency package and needs to be packaged by the user (excluding provided). -->
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-sql-connector-mysql-cdc</artifactId>
                <version>${flink_cdc.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- Elasticsearch Connector -->
            <!-- The following configuration does not exist in the 1.17 built-in dependency package and needs to be packaged by the user (excluding provided). -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
                <version>1.14.6</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <compilerArgument>-Xlint:unchecked</compilerArgument>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.huawei.dli.MySQLCDCToElasticsearchDemo</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
            <resources>
                <resource>
                    <directory>src/main/resources</directory>
                    <filtering>true</filtering>
                    <includes>
                        <include>**/*.*</include>
                    </includes>
                </resource>
            </resources>
        </build>
    </project>
  • Example code
    package com.huawei.dli;
    
    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.DeserializationFeature;
    import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.JsonNode;
    import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.MapperFeature;
    import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.ObjectMapper;
    
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
    import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.http.HttpHost;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.apache.http.ssl.SSLContextBuilder;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.Requests;
    import org.apache.hadoop.conf.Configuration;
    
    import javax.net.ssl.SSLContext;
    import java.io.InputStream;
    import org.apache.hadoop.fs.Path;
    
    import java.net.URI;
    import java.security.KeyStore;
    import java.security.cert.Certificate;
    import java.security.cert.CertificateFactory;
    import java.util.*;
    
    /*
    In this example, the real-time data synchronization function of reading data from MySQL CDC and writing the data to Elasticsearch is implemented. The order data is read from the test.cdc_order database table and written to the cdc-test index of Elasticsearch.
    */
    
    public class MySQLCDCToElasticsearchDemo {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            streamEnv.setParallelism(1);
            streamEnv.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
    
            // Checkpoint configuration
            CheckpointConfig checkpointConfig = streamEnv.getCheckpointConfig();
            checkpointConfig.setMinPauseBetweenCheckpoints(1 * 60 * 1000);
            checkpointConfig.setMaxConcurrentCheckpoints(1);
            checkpointConfig.setTolerableCheckpointFailureNumber(3);
            checkpointConfig.setCheckpointTimeout(10 * 60 * 1000);
    
            // Debezium configuration
            Properties debeziumProps = new Properties();
            debeziumProps.setProperty("decimal.handling.mode", "string");
            debeziumProps.setProperty("time.precision.mode", "connect");
            debeziumProps.setProperty("database.ssl.mode", "disabled");
    
            ParameterTool dbProps = ParameterTool.fromArgs(args);
    
            // Create a MySQL CDC builder.
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname(dbProps.get("host", "xxx.xxx.xxx.xxx"))
                    .port(Integer.parseInt(dbProps.get("port", "3306")))
                    .databaseList(dbProps.get("database_list", "test"))
                    .tableList("test.cdc_order")
                    .username(dbProps.get("username", "USERNAME"))
                    .password(dbProps.get("password", "PASSWORD"))
                    .serverTimeZone("Asia/Shanghai")
                    .debeziumProperties(debeziumProps)
                    .deserializer(new JsonDebeziumDeserializationSchema())
                    .serverId(dbProps.get("server-id"))
                    .startupOptions(StartupOptions.initial())
                    .build();
            DataStreamSource<String> cdcSource = streamEnv.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source");
    
            //Parse the after data and write it to Elasticsearch.
            ObjectMapper mapper = new ObjectMapper();
            mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
            mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
    
            // Convert the JSON string to JsonNode.
            SingleOutputStreamOperator<JsonNode> dataStreamJsonNode = cdcSource
                    .map(line -> mapper.readTree(line));
    
            // Obtain the after data of the test.cdc_test table and write the data to Elasticsearch.
            SingleOutputStreamOperator<Map<String, Object>> mapDataStream = dataStreamJsonNode
                    .filter(line -> "test".equals(line.get("source").get("db").asText()) &&
                            "cdc_order".equals(line.get("source").get("table").asText()))
                    .map(line -> {
                        Map<String, Object> data = new HashMap<>();
                        JsonNode afterNode = line.get("after");
                        if (afterNode != null) {
                            // Map fields in the order table.
                            data.put("order_id", afterNode.get("order_id").asText());
                            data.put("order_channel", afterNode.get("order_channel").asText());
                            data.put("order_time", afterNode.get("order_time").asText());
                            data.put("pay_amount", afterNode.get("pay_amount").asDouble());
                            data.put("real_pay", afterNode.get("real_pay").asDouble());
                            data.put("pay_time", afterNode.get("pay_time").asText());
                            data.put("user_id", afterNode.get("user_id").asText());
                            data.put("area_id", afterNode.get("area_id").asText());
                        }
                        return data;
                    })
                    .returns(Types.MAP(Types.STRING, Types.GENERIC(Object.class))); // Specify the return type.
    
            // Read the Elasticsearch configuration.
            ParameterTool elasticsearchPros = ParameterTool.fromArgs(args);
    
            // Configure the Elasticsearch sink.
            String[] elasticsearchNodes = elasticsearchPros.get("elasticsearch.nodes", "xxx.xxx.xxx.xxx:xxxx").split(",");
            String elasticsearchIndex = elasticsearchPros.get("elasticsearch.index", "indexName");
    
            // Create an HttpHost list.
            List<HttpHost> httpHosts = new ArrayList<>();
            for (String node : elasticsearchNodes) {
                String[] hostPort = node.split(":");
                // If HTTPS is not enabled, change the last parameter to http.
                httpHosts.add(new HttpHost(hostPort[0], Integer.parseInt(hostPort[1]), "https"));
            }
    
            // Create an Elasticsearch sink builder.
            ElasticsearchSink.Builder<Map<String, Object>> esSinkBuilder = new ElasticsearchSink.Builder<>(
                    httpHosts,
                    new ElasticsearchSinkFunction<Map<String, Object>>() {
                        public IndexRequest createIndexRequest(Map<String, Object> element) {
                            return Requests.indexRequest()
                                    .index(elasticsearchIndex)
                                    .source(element);
                        }
    
                        @Override
                        public void process(Map<String, Object> element, RuntimeContext ctx, RequestIndexer indexer) {
                            indexer.add(createIndexRequest(element));
                        }
                    }
            );
    
            // If the CSS is enabled in security mode or HTTPS is used, add the following configuration:
            esSinkBuilder.setRestClientFactory(restClientBuilder -> {
                // If the security mode is enabled, set Basic Auth.
                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(
                        AuthScope.ANY,
                        new UsernamePasswordCredentials("username", "password")
                );
    
                // If HTTPS is enabled, configure the SSL certificate.
                try {
                    FileSystem fs = FileSystem.get(new URI("obs://BucketName/"), new Configuration());
                    // Read the certificate stream from OBS. (The certificate path needs to be configured.)
                    try (InputStream certStream = fs.open(new Path("obs://BucketName/tmp/CloudSearchService.cer"))) {
    
                        // Convert the PEM certificate to KeyStore.
                        CertificateFactory cf = CertificateFactory.getInstance("X.509");
                        Certificate cert = cf.generateCertificate(certStream);
    
                        KeyStore trustStore = KeyStore.getInstance("PKCS12");
                        trustStore.load(null, null);
                        trustStore.setCertificateEntry("obs-cert", cert);
    
                        // Load the certificate to SSLContext.
                        SSLContext sslContext = SSLContextBuilder.create()
                                .loadTrustMaterial(trustStore, null)
                                .build();
    
                        // Apply the certificate to the HTTPS client.
                        restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                                .setDefaultCredentialsProvider(credentialsProvider)
                                .setSSLContext(sslContext));
                    }
                } catch (Exception e) {
                    throw new RuntimeException("Failed to load SSL certificate", e);
                }
            });
    
            // (Optional) Configure batch parameters.
            esSinkBuilder.setBulkFlushMaxActions(50); // Refresh every 50 records.
            esSinkBuilder.setBulkFlushInterval(1000L); // Refresh every second.
    
            mapDataStream.addSink(esSinkBuilder.build());
    
            streamEnv.execute("test cdc to elasticsearch");
        }
    }