从MySQL CDC读取数据写入到Elasticsearch
场景描述
CDC是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库的增量变动记录,同步到一个或多个数据目的中。CDC在数据同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。
本示例通过创建MySQL CDC源表来监控MySQL的数据变化,并将变化的数据信息插入到Elasticsearch中。
准备工作
- 创建RDS MySQL实例。本示例创建的RDS MySQL数据库版本选择为:8.0。
具体步骤可参考购买RDS for MySQL实例。
- 创建CSS实例。
具体创建CSS集群的操作可以参考创建CSS集群。
- 使用DLI服务前需配置DLI作业桶,该桶用于存储DLI作业运行过程中产生的临时数据,例如:作业日志、作业结果。
详细配置说明请参考配置DLI作业桶。
- 使用Flink1.15及以上版本的计算引擎时,需要用户自行配置委托,否则可能影响作业运行。
详细操作请参考创建DLI自定义委托权限。
开发流程
|
序号 |
操作步骤 |
操作指导 |
|---|---|---|
|
1 |
创建弹性资源池并添加队列:创建DLI作业运行的队列。 |
|
|
2 |
创建RDS MySQL数据库和表:创建RDS MySQL的数据库和表。 |
|
|
3 |
创建ElasticSearch搜索索引用于接收结果数据。 |
|
|
4 |
创建增强型跨源连接,连通DLI和RDS的网络。 |
|
|
5 |
创建增强型跨源连接,连通DLI和CSS的网络。 |
|
|
6 |
在本地编辑Java样例代码并上传OBS桶。 |
|
|
7 |
在DLI上创建和运行Flink Jar作业。 |
|
|
8 |
发送数据和查询结果:RDS MySQL的表上插入数据,在CSS上查看运行结果。 |
步骤1:创建弹性资源池并添加队列
请注意在资源池中新建队列时,队列的网段不能和RDS实例的子网网段有重合,否则后续创建跨源连接会失败。
- 登录DLI管理控制台。
- 在左侧导航栏单击“资源管理 > 弹性资源池”,可进入弹性资源池管理页面。
- 在弹性资源池管理界面,单击界面右上角的“购买弹性资源池”。
- 在“购买弹性资源池”界面,填写具体的弹性资源池参数。
- 参数填写完成后,单击“立即购买”,在界面上确认当前配置是否正确。
- 单击“提交”完成弹性资源池的创建。
- 在弹性资源池的列表页,选择要操作的弹性资源池,单击操作列的“添加队列”。
- 配置队列的基础配置,具体参数信息如下。
表3 弹性资源池添加队列基础配置 参数名称
参数说明
配置样例
名称
弹性资源池添加的队列名称。
dli_queue_01
类型
选择创建的队列类型。
- 执行SQL作业请选择SQL队列。
- 执行Flink或Spark作业请选择通用队列。
SQL作业场景请选择“SQL队列”。
其他场景请选择“通用队列”。
执行引擎
SQL队列可以选择队列引擎为Spark或者HetuEngine。
Spark
企业项目
选择对应的企业项目。
default
- 单击“下一步”,配置队列的扩缩容策略。
单击“新增”,可以添加不同优先级、时间段、“最小CU”和“最大CU”扩缩容策略。
本例配置的扩缩容策略如图1所示。表4 扩缩容策略参数说明 参数名称
参数说明
配置样例
优先级
当前弹性资源池中的优先级数字越大表示优先级越高。本例设置一条扩缩容策略,默认优先级为1。
1
时间段
首条扩缩容策略是默认策略,不能删除和修改时间段配置。
即设置00-24点的扩缩容策略。
00-24
最小CU
设置扩缩容策略支持的最小CU数。
16
最大CU
当前扩缩容策略支持的最大CU数。
64
- 单击“确定”完成添加队列配置。
步骤2:创建RDS MySQL数据库和表
- 登录RDS管理控制台,在“实例管理”界面,选择已创建的RDS for MySQL实例,选择操作列的“更多 > 登录”,进入数据管理服务实例登录界面。
- 输入实例登录的用户名和密码。单击“登录”,即可进入RDS for MySQL数据库并进行管理。
- 在数据库实例界面,单击“新建数据库”,数据库名定义为:test,字符集保持默认即可。
- 在已创建的数据库的操作列,单击“SQL查询”,输入以下创建表语句,创建RDS for MySQL表。
CREATE TABLE cdc-order ( `order_id` VARCHAR(64) NOT NULL, `order_channel` VARCHAR(32) NOT NULL, `order_time` VARCHAR(32), `pay_amount` DOUBLE, `real_pay` DOUBLE, `pay_time` VARCHAR(32), `user_id` VARCHAR(32), `user_name` VARCHAR(32), `area_id` VARCHAR(32) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4;
步骤3:创建Elasticsearch搜索索引
- 登录CSS管理控制台,选择“集群管理 > Elasticsearch”。
- 在集群管理界面,在已创建的CSS集群的“操作”列,单击“Kibana”访问集群。
- 在Kibana的左侧导航中选择“Dev Tools”,进入到Console界面。
- 在Console界面,执行如下命令创建索引“cdc-order”。
PUT /cdc-order
{
"settings": {
"number_of_shards": 1
},
"mappings": {
"properties": {
"order_id": {
"type": "text"
},
"order_channel": {
"type": "text"
},
"order_time": {
"type": "text"
},
"pay_amount": {
"type": "double"
},
"real_pay": {
"type": "double"
},
"pay_time": {
"type": "text"
},
"user_id": {
"type": "text"
},
"user_name": {
"type": "text"
},
"area_id": {
"type": "text"
}
}
}
}
步骤4:创建DLI连接RDS的增强型跨源连接
- 在RDS管理控制台,选择“实例管理”,单击对应的RDS实例名称,进入到RDS的基本信息页面。
- 在“基本信息”的“连接信息”中获取该实例的“内网地址”、“数据库端口”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
- 单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。
例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:
- 优先级选择:1,
- 策略选择:允许,
- 协议选择:TCP,
- 端口值:不填,
- 类型:IPv4,
- 源地址为:10.0.0.0/16,
单击“确定”完成安全组规则添加。
- CSS和RDS实例属于同一VPC和子网下?
- 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
- 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
- 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_rds。
- 弹性资源池:选择步骤1:创建弹性资源池并添加队列中已经创建的队列名称。
- 虚拟私有云:选择RDS的虚拟私有云。
- 子网:选择RDS的子网。
其他参数可以根据需要选择配置。
参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。
- 单击“队列管理”,选择操作的队列,本示例为步骤1:创建弹性资源池并添加队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。
- 在“测试连通性”界面,根据2中获取的RDS连接信息,地址栏输入“RDS内网地址:RDS数据库端口”,单击“测试”测试DLI到RDS网络是否可达。
步骤5:创建DLI连接CSS的增强型跨源连接
- 在CSS管理控制台,选择“集群管理”,单击已创建的CSS集群名称,进入到CSS的基本信息页面。
- 在“基本信息”中获取CSS的“内网访问地址”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
- 单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选择:1,策略选择:允许,协议选择:TCP,端口值不填,类型:IPv4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。
- RDS和CSS实例属于同一VPC和子网下?
- 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
- 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
- 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_css。
- 弹性资源池:选择步骤1:创建弹性资源池并添加队列中已经创建的弹性资源池。
- 虚拟私有云:选择CSS的虚拟私有云。
- 子网:选择CSS的子网。
其他参数可以根据需要选择配置。
参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。
- 单击“队列管理”,选择操作的队列,本示例为步骤1:创建弹性资源池并添加队列中添加的队列,在操作列,单击“更多 > 测试地址连通性”。
- 在“测试连通性”界面,根据2获取的CSS连接信息,地址栏输入“CSS内网地址:CSS内网端口”,单击“测试”测试DLI到CSS网络是否可达。
步骤6:在本地编辑Java样例代码
在本地编辑Java样例代码, 应用程序开发完成后,参考Flink Jar作业开发基础样例将编译打包的JAR包上传到OBS桶,为后续在DLI运行做准备。
本示例场景编辑代码的样例请参考示例代码。
步骤7:在DLI提交Flink Jar作业
在DLI提交Flink Jar作业的详细操作步骤详情参考创建Flink Jar作业。
|
参数 |
说明 |
示例 |
|---|---|---|
|
所属队列 |
队列用于指定Flink作业执行的资源队列。 队列决定了作业在弹性资源池中运行时能够使用的计算资源。每个队列都分配了指定的资源,即队列的CU,队列的CU配置直接影响作业的性能和执行效率。 在提交作业前,评估作业的资源需求,选择一个能够满足需求的队列。 Flink Jar作业即支持选择通用类型的队列。 |
选择Flink Jar作业运行的队列 |
|
应用程序 |
用户自定义的程序包 |
自定义的程序包 |
|
主类 |
指定加载的Jar包类名,如FlinkRdsToCSSExample。
|
指定 |
|
其他依赖文件 |
用户自定义的依赖文件。其他依赖文件需要自行在代码中引用。 在选择依赖文件之前需要将对应的文件上传至OBS桶中,并在“数据管理>程序包管理”中创建程序包,包类型没有限制。具体操作请参考创建程序包。 通过在应用程序中添加以下内容可访问对应的依赖文件。其中,“fileName”为需要访问的文件名,“ClassName”为需要访问该文件的类名。 ClassName.class.getClassLoader().getResource("userData/fileName") |
- |
|
Flink版本 |
选择作业运行时所使用的Flink的版本。不同版本的Flink可能支持不同的特性、性能优化和API。 在选择Flink版本之前,确保所选的Flink版本与您的作业程序和依赖包互相兼容。 了解更多Flink版本的信息请参考DLI Flink版本说明。 选择使用Flink1.15版本(Flink通用队列场景)时请在作业中配置允许DLI访问的云服务的委托信息。 自定义委托请参考自定义DLI委托权限。 |
1.15 |
步骤8:结果验证
作业运行完成后,查看对应CSS集群对应索引下是否有相关的数据信息。
- 向RDS插入两条数据,本示例库表为flink.cdc_order。
图2 插入数据
- 查看CSS中已同步以上两条数据。
图3 插入数据
示例代码
本例适用Flink 1.15版本的计算引擎。
- pom文件配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>dli-flink-demo</artifactId> <groupId>com.huaweicloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>MySQLCDCToElasticsearchDemo</artifactId> <properties> <flink.version>1.15.0</flink.version> <flink_cdc.version>2.4.0</flink_cdc.version> <scala.binary.version>2.11</scala.binary.version> <elasticsearch.version>7.10.2</elasticsearch.version> <!-- Ensure this matches your ES version --> </properties> <dependencies> <!-- Flink Core Dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.12</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.0-h0.cbu.mrs.330.r24</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.3.1-h0.cbu.mrs.330.r9</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.15.0</version> <scope>provided</scope> </dependency> <!-- MySQL CDC --> <!-- 1.17内置依赖包中不存在,需要用户自行打包(去掉provided) --> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>${flink_cdc.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Elasticsearch Connector --> <!-- 1.17内置依赖包中不存在,需要用户自行打包(去掉provided) --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId> <version>1.14.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <compilerArgument>-Xlint:unchecked</compilerArgument> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <archive> <manifest> <mainClass>com.huawei.dli.MySQLCDCToElasticsearchDemo</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> <resources> <resource> <directory>src/main/resources</directory> <filtering>true</filtering> <includes> <include>**/*.*</include> </includes> </resource> </resources> </build> </project> - 示例代码
package com.huawei.dli; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.DeserializationFeature; import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.JsonNode; import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.MapperFeature; import com.ververica.cdc.connectors.shaded.com.fasterxml.jackson.databind.ObjectMapper; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.hadoop.fs.FileSystem; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.ssl.SSLContextBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import org.apache.hadoop.conf.Configuration; import javax.net.ssl.SSLContext; import java.io.InputStream; import org.apache.hadoop.fs.Path; import java.net.URI; import java.security.KeyStore; import java.security.cert.Certificate; import java.security.cert.CertificateFactory; import java.util.*; /* 本例实现了从 MySQL CDC 读取数据并将其写入 Elasticsearch 的实时数据同步功能:从 test.cdc_order 数据库表中读取订单数据,并将其写入 Elasticsearch 的 cdc-test 索引。 */ public class MySQLCDCToElasticsearchDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.setParallelism(1); streamEnv.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE); // checkpoint配置 CheckpointConfig checkpointConfig = streamEnv.getCheckpointConfig(); checkpointConfig.setMinPauseBetweenCheckpoints(1 * 60 * 1000); checkpointConfig.setMaxConcurrentCheckpoints(1); checkpointConfig.setTolerableCheckpointFailureNumber(3); checkpointConfig.setCheckpointTimeout(10 * 60 * 1000); // Debezium配置 Properties debeziumProps = new Properties(); debeziumProps.setProperty("decimal.handling.mode", "string"); debeziumProps.setProperty("time.precision.mode", "connect"); debeziumProps.setProperty("database.ssl.mode", "disabled"); ParameterTool dbProps = ParameterTool.fromArgs(args); // 创建 MySQL CDC Builder MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname(dbProps.get("host", "xxx.xxx.xxx.xxx")) .port(Integer.parseInt(dbProps.get("port", "3306"))) .databaseList(dbProps.get("database_list", "test")) .tableList("test.cdc_order") .username(dbProps.get("username", "USERNAME")) .password(dbProps.get("password", "PASSWORD")) .serverTimeZone("Asia/Shanghai") .debeziumProperties(debeziumProps) .deserializer(new JsonDebeziumDeserializationSchema()) .serverId(dbProps.get("server-id")) .startupOptions(StartupOptions.initial()) .build(); DataStreamSource<String> cdcSource = streamEnv.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source"); // 解析after数据写入Elasticsearch ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true); // JSON字符串转JsonNode SingleOutputStreamOperator<JsonNode> dataStreamJsonNode = cdcSource .map(line -> mapper.readTree(line)); // 获取test.cdc_test表after数据,然后写入Elasticsearch SingleOutputStreamOperator<Map<String, Object>> mapDataStream = dataStreamJsonNode .filter(line -> "test".equals(line.get("source").get("db").asText()) && "cdc_order".equals(line.get("source").get("table").asText())) .map(line -> { Map<String, Object> data = new HashMap<>(); JsonNode afterNode = line.get("after"); if (afterNode != null) { // 映射订单表字段 data.put("order_id", afterNode.get("order_id").asText()); data.put("order_channel", afterNode.get("order_channel").asText()); data.put("order_time", afterNode.get("order_time").asText()); data.put("pay_amount", afterNode.get("pay_amount").asDouble()); data.put("real_pay", afterNode.get("real_pay").asDouble()); data.put("pay_time", afterNode.get("pay_time").asText()); data.put("user_id", afterNode.get("user_id").asText()); data.put("area_id", afterNode.get("area_id").asText()); } return data; }) .returns(Types.MAP(Types.STRING, Types.GENERIC(Object.class))); // 明确指定返回类型 // 读取Elasticsearch配置 ParameterTool elasticsearchPros = ParameterTool.fromArgs(args); // 配置 Elasticsearch Sink String[] elasticsearchNodes = elasticsearchPros.get("elasticsearch.nodes", "xxx.xxx.xxx.xxx:xxxx").split(","); String elasticsearchIndex = elasticsearchPros.get("elasticsearch.index", "indexName"); // 创建 HttpHost 列表 List<HttpHost> httpHosts = new ArrayList<>(); for (String node : elasticsearchNodes) { String[] hostPort = node.split(":"); // 若未开启https,则最后一个参数改为http httpHosts.add(new HttpHost(hostPort[0], Integer.parseInt(hostPort[1]), "https")); } // 创建 Elasticsearch Sink Builder ElasticsearchSink.Builder<Map<String, Object>> esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction<Map<String, Object>>() { public IndexRequest createIndexRequest(Map<String, Object> element) { return Requests.indexRequest() .index(elasticsearchIndex) .source(element); } @Override public void process(Map<String, Object> element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } } ); // 若CSS开启安全模式或者https链接,需要增加以下配置 esSinkBuilder.setRestClientFactory(restClientBuilder -> { // 若开启安全模式需要设置 Basic Auth final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials( AuthScope.ANY, new UsernamePasswordCredentials("username", "password") ); // 若开启https连接,还需配置 SSL 证书 try { FileSystem fs = FileSystem.get(new URI("obs://BucketName/"), new Configuration()); // 从 OBS 读取证书流(证书路径需自行配置) try (InputStream certStream = fs.open(new Path("obs://BucketName/tmp/CloudSearchService.cer"))) { // 将 PEM 证书转换为 KeyStore CertificateFactory cf = CertificateFactory.getInstance("X.509"); Certificate cert = cf.generateCertificate(certStream); KeyStore trustStore = KeyStore.getInstance("PKCS12"); trustStore.load(null, null); trustStore.setCertificateEntry("obs-cert", cert); // 加载到 SSLContext SSLContext sslContext = SSLContextBuilder.create() .loadTrustMaterial(trustStore, null) .build(); // 应用到 HTTPS 客户端 restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setDefaultCredentialsProvider(credentialsProvider) .setSSLContext(sslContext)); } } catch (Exception e) { throw new RuntimeException("Failed to load SSL certificate", e); } }); // 配置批量参数(可选) esSinkBuilder.setBulkFlushMaxActions(50); // 每50条刷新一次 esSinkBuilder.setBulkFlushInterval(1000L); // 每秒刷新一次 mapDataStream.addSink(esSinkBuilder.build()); streamEnv.execute("test cdc to elasticsearch"); } }
