使用DLI Flink作业实时同步MRS Kafka数据至CloudTable Doris集群
此章节为您介绍数据实时同步的最佳实践,通过数据湖探索服务DLI Flink作业将MRS kafka数据实时同步给Doris,实现Kafka实时入库到Doris的过程。

使用限制
准备工作
- 已注册华为账号并开通华为云,具体请参见注册华为账号并开通华为云,且在使用CloudTable前检查账号状态,账号不能处于欠费或冻结状态。
- 已创建虚拟私有云和子网,参见创建虚拟私有云和子网。
步骤一:创建CloudTable Doris集群
- 登录表格存储服务控制台,创建Doris集群。
- 准备弹性云服务。
- 安装客户端并连接集群。
- 创建数据库。
create database flink;
- 使用数据库。
use flink;
- 创建表。
create table flink.order_doris( order_id varchar, order_channel varchar, order_time varchar, pay_amount decimal, real_pay decimal, pay_time varchar, user_id varchar, user_name varchar, area_id varchar ) ENGINE =OLAP COMMENT 'OLAP' DISTRIBUTED BY HASH(`order_id`) BUCKETS AUTO PROPERTIES ( 'replication_num'='3' );
- 查看FE节点状态。
show frontends;
步骤二:MRS集群中创建Flink作业制造数据
- 创建MRS集群。
- 登录Manager,选择“集群 > Flink > 概览”,进入概览页面。
- 单击“Flink WebUI”右侧的链接,访问Flink WebUI。
- 在MRS Flink WebUI中创建Flink任务产生数据。
- 单击作业管理中的“新建作业”,弹出新建作业页面。
- 填写参数,单击“确定”,建立Flink SQL作业。如果修改SQL,单击操作列的“开发”,进入SQL页面修改命令。
获取IP地址和端口:进入集群的Manager页面,单击“集群 > Kafka > 配置 > 全部配置”进入全部配置页面,在页面右上角搜索框搜索“default.bootstrap.severs”参数可获取IP和端口。
SQL语句示例:
CREATE TABLE IF NOT EXISTS `lineorder_doris` ( `order_id` string, `order_channel` string, `order_time` string, `intercept_flag` int, `pay_amount` double, `real_pay` double, `pay_time` string, `user_id` string, `user_name` string, `area_id` string ) WITH ( 'connector' = 'kafka', 'topic' = 'test_flink', 'properties.bootstrap.servers' = 'ip:port', 'value.format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka' ); CREATE TABLE lineorder_datagen ( `order_id` string, `order_channel` string, `order_time` string, `intercept_flag` int, `pay_amount` double, `real_pay` double, `pay_time` string, `user_id` string, `user_name` string, `area_id` string ) WITH ( 'connector' = 'datagen', 'fields.intercept_flag.kind' = 'sequence', 'fields.intercept_flag.start' = '1', 'fields.intercept_flag.end' = '2000000', 'rows-per-second' = '1000' ); INSERT INTO lineorder_doris SELECT * from lineorder_datagen;
- 回到作业管理界面,单击操作列的“启动”。作业状态为“运行中”表示作业运行成功。
步骤三:创建DLI Flink作业进行数据同步
- 创建弹性资源池和队列,请参见“创建弹性资源池并添加队列”章节。
- 创建跨源连接,请参见创建增强型跨源连接。
- 分别测试DLI与上游MRS Kafka和下游CloudTable Doris的连通性。
- 弹性资源和队列创建后,单击“资源管理 > 队列管理”,进入队列管理界面测试地址连通性,请参见测试地址连通性。
- 获取上游IP地址和端口:进入集群的Manager页面,单击“集群 > Kafka > 配置 > 全部配置”进入全部配置页面,在页面右上角搜索框搜索“default.bootstrap.severs”参数可获取IP和端口(该port是Broker服务监听的PLAINTEXT协议端口号)。
- 获取下游IP地址和端口:进入目标集群详情页可查看节点IP和端口。
- 创建Flink作业,请参见使用DLI提交作业Flink作业。
- 选择4中创建的Flink作业,单击操作列的“编辑”,添加SQL进行数据同步。
create table orders ( order_id string, order_channel string, order_time string, pay_amount double, intercept_flag int, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'test_flink', 'properties.bootstrap.servers' = 'ip:port', 'properties.group.id' = 'testGroup', 'format' = 'json', 'json.fail-on-missing-field' = 'false', --这个参数表示在json数据中如果某个字段不存在时不会报错,而是会返回null值。 'json.ignore-parse-errors' = 'true', --这个参数表示在解析json数据时如果出现错误(比如格式不正确)时会忽略这条数据,而不会报错。 'scan.topic-partition-discovery.interval' = '300000', --这个参数表示flink会每隔300秒(5分钟)自动检查一次Kafka中的topic和partition信息,以便及时发现新增或删除的topic和partition 'scan.startup.mode' = 'latest-offset' ---这个参数表示flink会从Kafka中最早的offset开始读取数据,即从最开始的数据开始处理。 ); create table dorisSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) with ( 'connector' = 'doris', 'fenodes' = 'ip:port', 'password' = '***', 'username' = '***', 'table.identifier' = 'flink.order_doris', --Doris 数据库中的哪个表 --'sink.properties.strip_outer_array' = 'true', --这个配置项表示将输出的 JSON 数据中的外层数组去掉,只输出 JSON 对象 --'sink.properties.format' = 'json', --'doris.enable.https' = 'true', --'doris.ignore.https.ca' = 'true', --'sink.batch.interval' = '1s', ---这个配置项表示每隔150秒批量输出一次数据flink1.12有这个参数 'sink.label-prefix' = 'doris_label_118_061', 'doris.batch.size' = '10', --flink1.12 为sink.batch.size 'sink.max-retries'='10' --这个配置项表示在数据输出失败时最多重试10次 ); insert into dorisSink select order_id, order_channel, order_time, pay_amount, real_pay, pay_time, user_id, user_name, area_id from orders;
- 单击“格式化”,再单击“保存”。
请务必先单击“格式化”将SQL代码进行格式化处理,否则可能会因为代码复制和粘贴操作过程中引入新的空字符,而导致作业执行失败。
- 回到DLI控制台首页,单击左侧“作业管理 > Flink作业”。
- 启动4中创建的作业,单击操作列的“启动 > 立即启动”。作业状态为“运行中”表示作业运行成功。