文档首页/ 数据湖探索 DLI/ 开发指南/ Flink作业开发指南/ Flink Jar作业开发/ 从MySQL CDC读取数据写入到Elasticsearch
更新时间:2025-11-12 GMT+08:00
分享

从MySQL CDC读取数据写入到Elasticsearch

场景描述

CDC是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库的增量变动记录,同步到一个或多个数据目的中。CDC在数据同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。

本示例通过创建MySQL CDC源表来监控MySQL的数据变化,并将变化的数据信息插入到Elasticsearch中。

准备工作

  • 创建RDS MySQL实例。本示例创建的RDS MySQL数据库版本选择为:8.0。

    具体步骤可参考购买RDS for MySQL实例

  • 创建CSS实例。

    具体创建CSS集群的操作可以参考创建CSS集群

  • 使用DLI服务前需配置DLI作业桶,该桶用于存储DLI作业运行过程中产生的临时数据,例如:作业日志、作业结果。

    详细配置说明请参考配置DLI作业桶

  • 使用Flink1.15及以上版本的计算引擎时,需要用户自行配置委托,否则可能影响作业运行。

    详细操作请参考创建DLI自定义委托权限

开发流程

表1 从MySQL CDC读取数据写入到Elasticsearch的操作流程

序号

操作步骤

操作指导

1

创建弹性资源池并添加队列:创建DLI作业运行的队列。

步骤1:创建弹性资源池并添加队列

2

创建RDS MySQL数据库和表:创建RDS MySQL的数据库和表。

步骤2:创建RDS MySQL数据库和表

3

创建ElasticSearch搜索索引用于接收结果数据。

步骤3:创建Elasticsearch搜索索引

4

创建增强型跨源连接,连通DLI和RDS的网络。

步骤4:创建DLI连接RDS的增强型跨源连接

5

创建增强型跨源连接,连通DLI和CSS的网络。

步骤5:创建DLI连接CSS的增强型跨源连接

6

在本地编辑Java样例代码并上传OBS桶。

步骤6:在本地编辑Java样例代码

7

在DLI上创建和运行Flink Jar作业。

步骤7:在DLI提交Flink Jar作业

8

发送数据和查询结果:RDS MySQL的表上插入数据,在CSS上查看运行结果。

步骤8:结果验证

步骤1:创建弹性资源池并添加队列

请注意在资源池中新建队列时,队列的网段不能和RDS实例的子网网段有重合,否则后续创建跨源连接会失败。

  1. 登录DLI管理控制台。
  2. 在左侧导航栏单击“资源管理 > 弹性资源池”,可进入弹性资源池管理页面。
  3. 在弹性资源池管理界面,单击界面右上角的“购买弹性资源池”。
  4. 在“购买弹性资源池”界面,填写具体的弹性资源池参数。
    本例在华东-上海二区域购买按需计费的弹性资源池。相关参数说明如表2所示。
    表2 参数说明

    参数名称

    参数说明

    配置样例

    计费模式

    选择弹性资源池计费模式。

    按需计费

    区域

    选择弹性资源池所在区域。

    华东-上海二

    项目

    每个区域默认对应一个项目,由系统预置。

    系统默认项目

    名称

    弹性资源池名称。

    dli_resource_pool

    规格

    选择弹性资源池规格。

    标准版

    CU范围

    弹性资源池最大最小CU范围。

    64-64

    网段

    规划弹性资源池所属的网段。如需使用DLI增强型跨源,弹性资源池网段与数据源网段不能重合。弹性资源池网段设置后不支持更改

    172.16.0.0/19

    企业项目

    选择对应的企业项目。

    default

  5. 参数填写完成后,单击“立即购买”,在界面上确认当前配置是否正确。
  6. 单击“提交”完成弹性资源池的创建。
  7. 在弹性资源池的列表页,选择要操作的弹性资源池,单击操作列的“添加队列”。
  8. 配置队列的基础配置,具体参数信息如下。
    表3 弹性资源池添加队列基础配置

    参数名称

    参数说明

    配置样例

    名称

    弹性资源池添加的队列名称。

    dli_queue_01

    类型

    选择创建的队列类型。

    • 执行SQL作业请选择SQL队列。
    • 执行Flink或Spark作业请选择通用队列。

    SQL作业场景请选择“SQL队列”。

    其他场景请选择“通用队列”。

    执行引擎

    SQL队列可以选择队列引擎为Spark或者HetuEngine。

    Spark

    企业项目

    选择对应的企业项目。

    default

  9. 单击“下一步”,配置队列的扩缩容策略。

    单击“新增”,可以添加不同优先级、时间段、“最小CU”和“最大CU”扩缩容策略。

    本例配置的扩缩容策略如图1所示。
    图1 添加队列时配置扩缩容策略
    表4 扩缩容策略参数说明

    参数名称

    参数说明

    配置样例

    优先级

    当前弹性资源池中的优先级数字越大表示优先级越高。本例设置一条扩缩容策略,默认优先级为1。

    1

    时间段

    首条扩缩容策略是默认策略,不能删除和修改时间段配置。

    即设置00-24点的扩缩容策略。

    00-24

    最小CU

    设置扩缩容策略支持的最小CU数。

    16

    最大CU

    当前扩缩容策略支持的最大CU数。

    64

  10. 单击“确定”完成添加队列配置。

步骤2:创建RDS MySQL数据库和表

  1. 登录RDS管理控制台,在“实例管理”界面,选择已创建的RDS for MySQL实例,选择操作列的“更多 > 登录”,进入数据管理服务实例登录界面。
  2. 输入实例登录的用户名和密码。单击“登录”,即可进入RDS for MySQL数据库并进行管理。
  3. 在数据库实例界面,单击“新建数据库”,数据库名定义为:test,字符集保持默认即可。
  4. 在已创建的数据库的操作列,单击“SQL查询”,输入以下创建表语句,创建RDS for MySQL表。
    CREATE TABLE cdc-order (
    	`order_id` VARCHAR(64) NOT NULL,
    	`order_channel` VARCHAR(32) NOT NULL,
    	`order_time` VARCHAR(32),
    	`pay_amount` DOUBLE,
    	`real_pay` DOUBLE,
    	`pay_time` VARCHAR(32),
    	`user_id` VARCHAR(32),
    	`user_name` VARCHAR(32),
    	`area_id` VARCHAR(32)
    
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4;

步骤3:创建Elasticsearch搜索索引

  1. 登录CSS管理控制台,选择“集群管理 > Elasticsearch”。
  2. 在集群管理界面,在已创建的CSS集群的“操作”列,单击“Kibana”访问集群。
  3. 在Kibana的左侧导航中选择“Dev Tools”,进入到Console界面。
  4. 在Console界面,执行如下命令创建索引“cdc-order”。
PUT /cdc-order
{
  "settings": {
    "number_of_shards": 1
  },
	"mappings": {
	  "properties": {
	    "order_id": {
	      "type": "text"
	    },
	    "order_channel": {
	      "type": "text"
	    },
	    "order_time": {
	      "type": "text"
	    },
	    "pay_amount": {
	      "type": "double"
	    },
	    "real_pay": {
	      "type": "double"
	    },
	    "pay_time": {
	      "type": "text"
	    },
	    "user_id": {
	      "type": "text"
	    },
	    "user_name": {
	      "type": "text"
	    },
	    "area_id": {
	      "type": "text"
	    }
	  }
	}
}

步骤4:创建DLI连接RDS的增强型跨源连接

  1. 在RDS管理控制台,选择“实例管理”,单击对应的RDS实例名称,进入到RDS的基本信息页面。
  2. 在“基本信息”的“连接信息”中获取该实例的“内网地址”、“数据库端口”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
  3. 单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。

    例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:

    • 优先级选择:1,
    • 策略选择:允许,
    • 协议选择:TCP,
    • 端口值:不填,
    • 类型:IPv4,
    • 源地址为:10.0.0.0/16,

    单击“确定”完成安全组规则添加。

  4. CSS和RDS实例属于同一VPC和子网下?
    1. 是,执行7。CSS和RDS实例在同一VPC和子网,不用再重复创建增强型跨源连接。
    2. 否,执行5。CSS和RDS实例分别在两个VPC和子网下,则要分别创建增强型跨源连接打通网络。
  5. 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
  6. 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
    • 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_rds。
    • 弹性资源池:选择步骤1:创建弹性资源池并添加队列中已经创建的队列名称。
    • 虚拟私有云:选择RDS的虚拟私有云。
    • 子网:选择RDS的子网。

    其他参数可以根据需要选择配置。

    参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。

  7. 单击“队列管理”,选择操作的队列,本示例为步骤1:创建弹性资源池并添加队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。
  8. 在“测试连通性”界面,根据2中获取的RDS连接信息,地址栏输入“RDS内网地址:RDS数据库端口”,单击“测试”测试DLI到RDS网络是否可达。

步骤5:创建DLI连接CSS的增强型跨源连接

  1. 在CSS管理控制台,选择“集群管理”,单击已创建的CSS集群名称,进入到CSS的基本信息页面。
  2. 在“基本信息”中获取CSS的“内网访问地址”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
  3. 单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选择:1,策略选择:允许,协议选择:TCP,端口值不填,类型:IPv4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。
  4. RDS和CSS实例属于同一VPC和子网下?
    1. 是,执行7。RDS和CSS实例在同一VPC和子网,不用再重复创建增强型跨源连接。
    2. 否,执行5。RDS和CSS实例分别在两个VPC和子网下,则要分别创建增强型跨源连接打通网络。
  5. 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
  6. 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
    • 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_css。
    • 弹性资源池:选择步骤1:创建弹性资源池并添加队列中已经创建的弹性资源池。
    • 虚拟私有云:选择CSS的虚拟私有云。
    • 子网:选择CSS的子网。

    其他参数可以根据需要选择配置。

    参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。

  7. 单击“队列管理”,选择操作的队列,本示例为步骤1:创建弹性资源池并添加队列中添加的队列,在操作列,单击“更多 > 测试地址连通性”。
  8. 在“测试连通性”界面,根据2获取的CSS连接信息,地址栏输入“CSS内网地址:CSS内网端口”,单击“测试”测试DLI到CSS网络是否可达。

步骤6:在本地编辑Java样例代码

在本地编辑Java样例代码, 应用程序开发完成后,参考Flink Jar作业开发基础样例将编译打包的JAR包上传到OBS桶,为后续在DLI运行做准备。

本示例场景编辑代码的样例请参考示例代码

步骤7:在DLI提交Flink Jar作业

在DLI提交Flink Jar作业的详细操作步骤详情参考创建Flink Jar作业

在应用程序中选择步骤步骤6:在本地编辑Java样例代码中创建的Flink Jar文件,并指定主类。
表5 创建Flink Jar作业参数说明

参数

说明

示例

所属队列

队列用于指定Flink作业执行的资源队列。

队列决定了作业在弹性资源池中运行时能够使用的计算资源。每个队列都分配了指定的资源,即队列的CU,队列的CU配置直接影响作业的性能和执行效率。

在提交作业前,评估作业的资源需求,选择一个能够满足需求的队列。

Flink Jar作业即支持选择通用类型的队列。

选择Flink Jar作业运行的队列

应用程序

用户自定义的程序包

自定义的程序包

主类

指定加载的Jar包类名,如FlinkRdsToCSSExample。

  • 默认:根据Jar包文件的Manifest文件指定。
  • 指定:必须输入“类名”并确定类参数列表(参数间用空格分隔)
    说明:

    当类属于某个包时,主类路径需要包含完整包路径,例如:packagePath.KafkaMessageStreaming

指定

其他依赖文件

用户自定义的依赖文件。其他依赖文件需要自行在代码中引用。

在选择依赖文件之前需要将对应的文件上传至OBS桶中,并在“数据管理>程序包管理”中创建程序包,包类型没有限制。具体操作请参考创建程序包

通过在应用程序中添加以下内容可访问对应的依赖文件。其中,“fileName”为需要访问的文件名,“ClassName”为需要访问该文件的类名。

ClassName.class.getClassLoader().getResource("userData/fileName")

-

Flink版本

选择作业运行时所使用的Flink的版本。不同版本的Flink可能支持不同的特性、性能优化和API。

在选择Flink版本之前,确保所选的Flink版本与您的作业程序和依赖包互相兼容。

了解更多Flink版本的信息请参考DLI Flink版本说明

选择使用Flink1.15版本(Flink通用队列场景)时请在作业中配置允许DLI访问的云服务的委托信息。

自定义委托请参考自定义DLI委托权限

1.15

步骤8:结果验证

作业运行完成后,查看对应CSS集群对应索引下是否有相关的数据信息。

  1. 向RDS插入两条数据,本示例库表为flink.cdc_order。
    图2 插入数据
  2. 查看CSS中已同步以上两条数据。
    图3 插入数据

示例代码

本例适用Flink 1.15版本的计算引擎。

  • pom文件配置
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>dli-flink-demo</artifactId>
            <groupId>com.huaweicloud</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>MySQLCDCToElasticsearchDemo</artifactId>
    
        <properties>
            <flink.version>1.15.0</flink.version>
            <flink_cdc.version>2.4.0</flink_cdc.version>
            <scala.binary.version>2.11</scala.binary.version>
            <elasticsearch.version>7.10.2</elasticsearch.version> <!-- Ensure this matches your ES version -->
        </properties>
    
        <dependencies>
            <!-- Flink Core Dependencies -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-hive_2.12</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-exec</artifactId>
                <version>3.1.0-h0.cbu.mrs.330.r24</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>3.3.1-h0.cbu.mrs.330.r9</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_2.12</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>1.15.0</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- MySQL CDC -->
            <!-- 1.17内置依赖包中不存在,需要用户自行打包(去掉provided) -->
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-sql-connector-mysql-cdc</artifactId>
                <version>${flink_cdc.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- Elasticsearch Connector -->
            <!-- 1.17内置依赖包中不存在,需要用户自行打包(去掉provided) -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
                <version>1.14.6</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <compilerArgument>-Xlint:unchecked</compilerArgument>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.huawei.dli.MySQLCDCToElasticsearchDemo</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
            <resources>
                <resource>
                    <directory>src/main/resources</directory>
                    <filtering>true</filtering>
                    <includes>
                        <include>**/*.*</include>
                    </includes>
                </resource>
            </resources>
        </build>
    </project>
  • 示例代码
    package com.huawei.dli;
    
    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.DeserializationFeature;
    import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.JsonNode;
    import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.MapperFeature;
    import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.ObjectMapper;
    
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
    import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.http.HttpHost;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.UsernamePasswordCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.impl.client.BasicCredentialsProvider;
    import org.apache.http.ssl.SSLContextBuilder;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.Requests;
    import org.apache.hadoop.conf.Configuration;
    
    import javax.net.ssl.SSLContext;
    import java.io.InputStream;
    import org.apache.hadoop.fs.Path;
    
    import java.net.URI;
    import java.security.KeyStore;
    import java.security.cert.Certificate;
    import java.security.cert.CertificateFactory;
    import java.util.*;
    
    /*
        本例实现了从 MySQL CDC 读取数据并将其写入 Elasticsearch 的实时数据同步功能:从 test.cdc_order 数据库表中读取订单数据,并将其写入 Elasticsearch 的 cdc-test 索引。
    */
    
    public class MySQLCDCToElasticsearchDemo {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            streamEnv.setParallelism(1);
            streamEnv.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
    
            // checkpoint配置
            CheckpointConfig checkpointConfig = streamEnv.getCheckpointConfig();
            checkpointConfig.setMinPauseBetweenCheckpoints(1 * 60 * 1000);
            checkpointConfig.setMaxConcurrentCheckpoints(1);
            checkpointConfig.setTolerableCheckpointFailureNumber(3);
            checkpointConfig.setCheckpointTimeout(10 * 60 * 1000);
    
            // Debezium配置
            Properties debeziumProps = new Properties();
            debeziumProps.setProperty("decimal.handling.mode", "string");
            debeziumProps.setProperty("time.precision.mode", "connect");
            debeziumProps.setProperty("database.ssl.mode", "disabled");
    
            ParameterTool dbProps = ParameterTool.fromArgs(args);
    
            // 创建 MySQL CDC Builder
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname(dbProps.get("host", "xxx.xxx.xxx.xxx"))
                    .port(Integer.parseInt(dbProps.get("port", "3306")))
                    .databaseList(dbProps.get("database_list", "test"))
                    .tableList("test.cdc_order")
                    .username(dbProps.get("username", "USERNAME"))
                    .password(dbProps.get("password", "PASSWORD"))
                    .serverTimeZone("Asia/Shanghai")
                    .debeziumProperties(debeziumProps)
                    .deserializer(new JsonDebeziumDeserializationSchema())
                    .serverId(dbProps.get("server-id"))
                    .startupOptions(StartupOptions.initial())
                    .build();
            DataStreamSource<String> cdcSource = streamEnv.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source");
    
            // 解析after数据写入Elasticsearch
            ObjectMapper mapper = new ObjectMapper();
            mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
            mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
    
            // JSON字符串转JsonNode
            SingleOutputStreamOperator<JsonNode> dataStreamJsonNode = cdcSource
                    .map(line -> mapper.readTree(line));
    
            // 获取test.cdc_test表after数据,然后写入Elasticsearch
            SingleOutputStreamOperator<Map<String, Object>> mapDataStream = dataStreamJsonNode
                    .filter(line -> "test".equals(line.get("source").get("db").asText()) &&
                            "cdc_order".equals(line.get("source").get("table").asText()))
                    .map(line -> {
                        Map<String, Object> data = new HashMap<>();
                        JsonNode afterNode = line.get("after");
                        if (afterNode != null) {
                            // 映射订单表字段
                            data.put("order_id", afterNode.get("order_id").asText());
                            data.put("order_channel", afterNode.get("order_channel").asText());
                            data.put("order_time", afterNode.get("order_time").asText());
                            data.put("pay_amount", afterNode.get("pay_amount").asDouble());
                            data.put("real_pay", afterNode.get("real_pay").asDouble());
                            data.put("pay_time", afterNode.get("pay_time").asText());
                            data.put("user_id", afterNode.get("user_id").asText());
                            data.put("area_id", afterNode.get("area_id").asText());
                        }
                        return data;
                    })
                    .returns(Types.MAP(Types.STRING, Types.GENERIC(Object.class)));  // 明确指定返回类型
    
            // 读取Elasticsearch配置
            ParameterTool elasticsearchPros = ParameterTool.fromArgs(args);
    
            // 配置 Elasticsearch Sink
            String[] elasticsearchNodes = elasticsearchPros.get("elasticsearch.nodes", "xxx.xxx.xxx.xxx:xxxx").split(",");
            String elasticsearchIndex = elasticsearchPros.get("elasticsearch.index", "indexName");
    
            // 创建 HttpHost 列表
            List<HttpHost> httpHosts = new ArrayList<>();
            for (String node : elasticsearchNodes) {
                String[] hostPort = node.split(":");
                // 若未开启https,则最后一个参数改为http
                httpHosts.add(new HttpHost(hostPort[0], Integer.parseInt(hostPort[1]), "https"));
            }
    
            // 创建 Elasticsearch Sink Builder
            ElasticsearchSink.Builder<Map<String, Object>> esSinkBuilder = new ElasticsearchSink.Builder<>(
                    httpHosts,
                    new ElasticsearchSinkFunction<Map<String, Object>>() {
                        public IndexRequest createIndexRequest(Map<String, Object> element) {
                            return Requests.indexRequest()
                                    .index(elasticsearchIndex)
                                    .source(element);
                        }
    
                        @Override
                        public void process(Map<String, Object> element, RuntimeContext ctx, RequestIndexer indexer) {
                            indexer.add(createIndexRequest(element));
                        }
                    }
            );
    
            // 若CSS开启安全模式或者https链接,需要增加以下配置
            esSinkBuilder.setRestClientFactory(restClientBuilder -> {
                // 若开启安全模式需要设置 Basic Auth
                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(
                        AuthScope.ANY,
                        new UsernamePasswordCredentials("username", "password")
                );
    
                // 若开启https连接,还需配置 SSL 证书
                try {
                    FileSystem fs = FileSystem.get(new URI("obs://BucketName/"), new Configuration());
                    // 从 OBS 读取证书流(证书路径需自行配置)
                    try (InputStream certStream = fs.open(new Path("obs://BucketName/tmp/CloudSearchService.cer"))) {
    
                        // 将 PEM 证书转换为 KeyStore
                        CertificateFactory cf = CertificateFactory.getInstance("X.509");
                        Certificate cert = cf.generateCertificate(certStream);
    
                        KeyStore trustStore = KeyStore.getInstance("PKCS12");
                        trustStore.load(null, null);
                        trustStore.setCertificateEntry("obs-cert", cert);
    
                        // 加载到 SSLContext
                        SSLContext sslContext = SSLContextBuilder.create()
                                .loadTrustMaterial(trustStore, null)
                                .build();
    
                        // 应用到 HTTPS 客户端
                        restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                                .setDefaultCredentialsProvider(credentialsProvider)
                                .setSSLContext(sslContext));
                    }
                } catch (Exception e) {
                    throw new RuntimeException("Failed to load SSL certificate", e);
                }
            });
    
            // 配置批量参数(可选)
            esSinkBuilder.setBulkFlushMaxActions(50); // 每50条刷新一次
            esSinkBuilder.setBulkFlushInterval(1000L); // 每秒刷新一次
    
            mapDataStream.addSink(esSinkBuilder.build());
    
            streamEnv.execute("test cdc to elasticsearch");
        }
    }

相关文档