通过FlinkSQL将订单表Kafka数据实时同步到Doris
应用场景
创建MRS FlinkServer作业,将订单表Kafka数据通过FlinkSQL实时同步到Doris。
方案架构
对实时性要求较高的场景,可直接将实时流式数据通过FlinkSQL传入Doris中,在Doris中进行实时查询。
如果数据在加载到Doris之前需要处理,可直接在FlinkSQL中进行数据加工,将加工后的数据实时写入Doris中。

- 场景特点:
- 业务特点为实时流式数据,需要针对实时流式数据进行快速的统计查询。
- 前端业务库系统实时同步到后端分析系统,在后端分析系统进行实时OLAP分析。
- 数据加载方式:
- 前端数据统一接入Kafka中,Kafka作为实时数据加工的唯一数据源。
- 针对仅追加的数据,按正常的数据插入逻辑,进行数据插入。
- 针对实时数据同步方式,数据格式指定为Debezium方式,利用Flink的数据同步能力将数据同步到Flink。
- 注意事项:
- 实时流式数据写入Doris时,需要使用微批模式,一批数据写入一次,提高写入效率。
- 使用DataArts Studio DLF进行流任务调度开发时,需要使用2.10.1及以上的版本。
操作流程
步骤1:创建MRS Flink集群并配置
- 创建MRS集群。
详细操作请参考自定义购买MRS集群,例如相关参数如下:
- 集群名称:自定义,例如“mrs_test”
- 版本类型:LTS版
- 集群版本:MRS 3.3.0-LTS
- 集群组件包含:Kafka、Flink等
- Kerberos认证:开启
其他参数根据实际需要进行配置。
- 为集群添加拥有OBS权限的委托。
- 创建具有访问OBS权限的ECS委托,如果已存在可用的OBS权限委托,则跳过该步骤。
详细操作请参考配置MRS集群通过IAM委托对接OBS中“创建具有访问OBS权限的ECS委托”部分,例如委托名称为“mrs_ecs_obs”。
- 在MRS管理控制台“现有集群”中单击已创建完成的MRS集群名称。
在集群“概览”页签单击“委托”后的“选择委托”,选择已创建的委托名称(例如“mrs_ecs_obs”),单击“确定”。
- 创建具有访问OBS权限的ECS委托,如果已存在可用的OBS权限委托,则跳过该步骤。
- 创建具有FlinkServer管理权限的用户。
- 登录集群Manager页面,详细操作请参考访问MRS集群Manager。
- 选择“系统 > 权限 > 角色 > 添加角色”。
填写角色名称,如“flinkrole”,在“配置资源权限”的表格中选择“待操作集群的名称 > Flink”,勾选“FlinkServer管理操作权限”,单击“确定”。
- 选择“用户 > 添加用户”,填写用户名称,如“flinkuser”,“用户类型”选择“人机”,“密码策略”默认,输入用户密码并确认密码,关联“flinkrole”、“System_administrator”角色,单击“确定”。
- 使用新建的flinkuser用户重新登录FusionInsight Manager,修改该用户初始密码。
- 集群购买完成后安装集群全量客户端,详细操作请参考安装客户端(3.x版本)。
例如客户端安装路径为“/opt/client”。
步骤2:创建MRS Doris集群并配置
- 创建MRS Doris集群。
详细操作请参考自定义购买MRS集群,例如相关参数如下:
- 集群名称:自定义,例如“mrs_doris”
- 集群版本:MRS 3.3.0-LTS
- 组件选择:Doris集群。
- Kerberos认证:开启
其他参数根据实际需要进行配置。
- 为集群添加拥有OBS权限的委托。
- 创建具有访问OBS权限的ECS委托,如果已存在可用的OBS权限委托,则跳过该步骤。
详细操作请参考配置MRS集群通过IAM委托对接OBS中“创建具有访问OBS权限的ECS委托”部分,例如委托名称为“mrs_ecs_obs”。
- 在MRS管理控制台“现有集群”中单击已创建完成的MRS集群名称。
在集群“概览”页签单击“委托”后的“选择委托”,选择已创建的委托名称(例如“mrs_ecs_obs”),单击“确定”。
- 创建具有访问OBS权限的ECS委托,如果已存在可用的OBS权限委托,则跳过该步骤。
- 创建拥有Doris管理权限的人机用户(例如用户名为dorisuser),并修改初始密码。
- 登录Doris集群Manager页面。
- 选择“系统 > 权限 > 角色 > 添加角色”,填写角色名称,如“dorisrole”,在“配置资源权限”选择“待操作的集群 > Doris”,勾选“Doris管理员权限”,单击“确定”。
- 选择“用户 > 添加用户”,填写用户名称,如“dorisuser”,“用户类型”选择“人机”,“密码策略”默认,输入用户密码并确认密码,关联“dorisrole”角色,单击“确定”。
- 使用新建的dorisuser用户重新登录FusionInsight Manager,修改该用户初始密码。
- 集群购买完成后安装集群客户端。
详细操作请参考安装客户端(3.x版本)。
例如客户端安装路径为“/opt/dorisclient”。
- 在Doris集群安装MySQL客户端。
详细操作请参考使用MySQL客户端连接Doris。
步骤3:准备数据
- 创建Doris的Unique表。
- 登录Doris集群中已安装好MySQL客户端的节点,执行如下命令进入Doris数据库。
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1 //若集群已启用Kerberos认证(安全模式)需执行该命令。
mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
- 数据库连接端口为Doris FE的查询连接端口,可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 实例”,查看任一FE实例的业务IP地址获取。
- Doris FE实例IP地址可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 配置 > 全部配置”,搜索并查看“query_port”参数值获取。
- 执行以下命令创建Doris的Unique表。
create database test;
use test;
CREATE TABLE `lineorder_u` ( `LO_ORDERKEY` bigint NOT NULL COMMENT "订单id", `LO_LINENUMBER` int NOT NULL COMMENT "订单码", `LO_CUSTKEY` bigint NOT NULL COMMENT "客户id", `LO_PARTKEY` bigint NOT NULL COMMENT "商品id", `LO_SUPPKEY` bigint NOT NULL COMMENT "供应商id", `LO_ORDERDATE` date DEFAULT NULL COMMENT "订单日期", `LO_ORDERPRIORITY` varchar(100) DEFAULT NULL COMMENT "订单优先级", `LO_SHIPPRIORITY` int DEFAULT NULL COMMENT "物流优先级", `LO_QUANTITY` int DEFAULT NULL COMMENT "数量", `LO_EXTENDEDPRICE` bigint DEFAULT NULL COMMENT "扩展价格", `LO_ORDTOTALPRICE` bigint DEFAULT NULL COMMENT "订单总价", `LO_DISCOUNT` int DEFAULT NULL COMMENT "折扣", `LO_REVENUE` bigint DEFAULT NULL COMMENT "收入", `LO_SUPPLYCOST` int DEFAULT NULL COMMENT "成本", `LO_TAX` int DEFAULT NULL COMMENT "税款", `LO_COMMITDATE` date DEFAULT NULL COMMENT "承诺日期", `LO_SHIPMODE` varchar(100) NOT NULL COMMENT "运输方式" ) UNIQUE KEY ( `LO_ORDERKEY`, `LO_LINENUMBER`, `LO_CUSTKEY`, `LO_PARTKEY`, `LO_SUPPKEY`, `LO_ORDERDATE` ) PARTITION BY RANGE(`LO_ORDERDATE`) ( PARTITION `p1992` VALUES LESS THAN ("1993-01-01"), PARTITION `p1993` VALUES LESS THAN ("1994-01-01"), PARTITION `p1994` VALUES LESS THAN ("1995-01-01"), PARTITION `p1995` VALUES LESS THAN ("1996-01-01"), PARTITION `p1996` VALUES LESS THAN ("1997-01-01"), PARTITION `p1997` VALUES LESS THAN ("1998-01-01"), PARTITION `p1998` VALUES LESS THAN ("1999-01-01"), PARTITION `p1999` VALUES LESS THAN ("2000-01-01"), PARTITION `p2000` VALUES LESS THAN ("2001-01-01"), PARTITION `p2001` VALUES LESS THAN ("2002-01-01") ) DISTRIBUTED BY HASH(`LO_ORDERDATE`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 3" );
- 登录Doris集群中已安装好MySQL客户端的节点,执行如下命令进入Doris数据库。
- 创建Kafka多分区的Topic。
- 以客户端安装用户,登录MRS Flink集群中安装了客户端的节点。
- 执行以下命令登录客户端。
source /opt/client/bigdata_env
kinit 业务用户
- 执行以下命令创建Kafka多分区的Topic
cd /opt/client/Kafka/kafka/bin
sh kafka-topics.sh --create --topic 主题名称 --partitions 主题占用的分区数 --replication-factor 主题的备份个数 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka
- ZooKeeper角色实例所在节点IP地址:登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper > 实例”,查看并记录ZooKeeper角色实例业务IP地址获取。
- clientPort:可在ZooKeeper的全部配置参数中搜索“clientPort”查看。
例如创建3分区的Topic,则执行以下命令:
sh kafka-topics.sh --create --topic topic_lineorder --partitions 3 --replication-factor 3 --zookeeper 10.10.10.x:2181/kafka
步骤4:创建Flink SQL作业
- 使用步骤 3创建的具有FlinkServer管理员权限的用户(例如flinkuser),登录MRS Flink集群Manager页面。
- 选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 选择“作业管理 > 新建作业”,在新建作业页面配置相关信息后,单击“确定”。
- 类型:Flink SQL
- 名称:自定义
- 作业类型:流作业
- 描述:自定义
- 配置Flink SQL作业信息。
图2 配置Flink SQL作业
- 在作业开发界面配置作业信息,例如,配置如下:
CREATE TABLE IF NOT EXISTS flink_kafka_source ( `LO_ORDERKEY` BIGINT, `LO_LINENUMBER` INT, `LO_CUSTKEY` BIGINT, `LO_PARTKEY` BIGINT, `LO_SUPPKEY` BIGINT, `LO_ORDERDATE` DATE, `LO_ORDERPRIORITY` STRING, `LO_SHIPPRIORITY` INT, `LO_QUANTITY` INT, `LO_EXTENDEDPRICE` BIGINT, `LO_ORDTOTALPRICE` BIGINT, `LO_DISCOUNT` INT, `LO_REVENUE` BIGINT, `LO_SUPPLYCOST` INT, `LO_TAX` INT, `LO_COMMITDATE` DATE, `LO_SHIPMODE` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'topic名称', 'properties.group.id' = 'group1', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP1:Kafka端口号,Kafka的Broker实例业务IP2:Kafka端口号', 'format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); CREATE TABLE IF NOT EXISTS flink_doris_sink ( `LO_ORDERKEY` BIGINT, `LO_LINENUMBER` INT, `LO_CUSTKEY` BIGINT, `LO_PARTKEY` BIGINT, `LO_SUPPKEY` BIGINT, `LO_ORDERDATE` DATE, `LO_ORDERPRIORITY` STRING, `LO_SHIPPRIORITY` INT, `LO_QUANTITY` INT, `LO_EXTENDEDPRICE` BIGINT, `LO_ORDTOTALPRICE` BIGINT, `LO_DISCOUNT` INT, `LO_REVENUE` BIGINT, `LO_SUPPLYCOST` INT, `LO_TAX` INT, `LO_COMMITDATE` DATE, `LO_SHIPMODE` STRING ) WITH ( 'connector' = 'doris', 'fenodes' = 'Doris的FE实例IP地址1:Doris端口号,Doris的FE实例IP地址2:Doris端口号,Doris的FE实例IP地址3:Doris端口号’, ' table.identifier ' = ' database_name.table_name ', ' username ' = ' user ', ' password ' = ' password ', ' sink.label - prefix ' = ' doris_label_xxx ', ' doris.enable.https ' = ' true ', ' doris.ignore.https.ca ' = ' true ' ); insert into flink_doris_sink select * from flink_kafka_source;
Kafka相关配置参数说明:
- 'topic' = 'topic名称':已创建kafka多分区的Topic。
- 'properties.group.id' = 'group1':Kafka的group
- Kafka的Broker实例业务IP:登录Manager,选择“集群 > 服务 > Kafka> 实例”,查看Broker实例业务IP。
- Kafka端口号:登录Manager,选择“集群 > 服务 > Kafka > 配置”,搜索Kafka服务的“sasl.port”参数获取。
- hadoop.系统域名:登录Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
Doris相关配置参数说明:
- Doris的FE实例IP地址:登录Manager,选择“集群 > 服务 > Doris > 实例”,查询FE实例的业务IP地址获取。
- Doris端口号:登录Manager,选择“集群 > 服务 > Doris > 配置”,查询Doris服务的“https_port”参数获取。
- database_name.table_name:数据库名,表名。
- 'username' = 'user':为在两集群上分别创建的具有FlinkServer、Doris管理员权限的同名用户。
- 'password' = 'password':创建的具有FlinkServer、Doris管理员权限的用户密码。
- doris_label_xxx:sink.label-prefix的值,必须是唯一的,每次启任务必须保持其值的唯一性。
- 在界面左侧配置作业“基础参数”。
- 并行度:3
- JobManager内存(MB):4096
- 提交队列:自定义,默认default
- 勾选“taskManager”
- slot数量:1
- 内存(MB):4096
- 勾选“开启CheckPoint”
- 时间间隔(ms):20000(默认最小时间间隔20s)
- 模式:EXACTLY_ONCE,其他默认参数即可
- 故障恢复策略:fixed-delay
- 重试次数:3
- 失败重试间隔(s):60
- 其他配置保持默认即可
- 配置作业完成后,可以单击上方“语义校验”对输入内容校验,单击“SQL格式化”对SQL语句进行格式化。确认无问题后,单击“保存”,单击“提交”提交作业。
- 在“作业管理”页面单击Flink SQL作业“操作”列的“启动”,运行作业。
- 在作业开发界面配置作业信息,例如,配置如下:
步骤5:Kafka数据通过FlinkSQL实时同步到Doris
- 以客户端安装用户,登录MRS Flink集群中安装了客户端的节点。
- 执行以下命令登录客户端。
source /opt/client/bigdata_env
kinit 业务用户
- 执行以下命令向Kafka写入数据。
cd /opt/client/Kafka/kafka/bin
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic topic名称 --producer.config /opt/newclient/Kafka/kafka/config/producer.properties
- Kafka角色实例所在节点的IP地址:登录Manager后,选择“集群 > 服务 > Kafka > 实例”,记录Broker角色实例其中任意一个的IP地址。
- Kafka端口号:登录Manager后,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,集群已启用Kerberos认证(安全模式)搜索并获取“sasl.port”参数的值,集群未启用Kerberos认证(普通模式)搜索并获取“port”的值。
例如,写入数据内容如下:
{"LO_ORDERKEY":"1" ,"LO_LINENUMBER":"1" ,"LO_CUSTKEY":"1" ,"LO_PARTKEY":"1" ,"LO_SUPPKEY":"1" ,"LO_ORDERDATE": "1996-01-02","LO_ORDERPRIORITY":"5-LOW" ,"LO_SHIPPRIORITY":"0" ,"LO_QUANTITY":"17" ,"LO_EXTENDEDPRICE":"6511" ,"LO_ORDTOTALPRICE":"9991" ,"LO_DISCOUNT":"4" ,"LO_REVENUE":"99999" ,"LO_SUPPLYCOST":"95999" ,"LO_TAX":"1001" ,"LO_COMMITDATE":"1996-01-09" ,"LO_SHIPMODE": "AIR"} {"LO_ORDERKEY":"2" ,"LO_LINENUMBER":"1" ,"LO_CUSTKEY":"1" ,"LO_PARTKEY":"1" ,"LO_SUPPKEY":"1" ,"LO_ORDERDATE": "1996-01-02","LO_ORDERPRIORITY":"5-LOW" ,"LO_SHIPPRIORITY":"0" ,"LO_QUANTITY":"17" ,"LO_EXTENDEDPRICE":"6511" ,"LO_ORDTOTALPRICE":"9991" ,"LO_DISCOUNT":"4" ,"LO_REVENUE":"99999" ,"LO_SUPPLYCOST":"95999" ,"LO_TAX":"1001" ,"LO_COMMITDATE":"1996-01-09" ,"LO_SHIPMODE": "AIR"} {"LO_ORDERKEY":"3" ,"LO_LINENUMBER":"1" ,"LO_CUSTKEY":"1" ,"LO_PARTKEY":"1" ,"LO_SUPPKEY":"1" ,"LO_ORDERDATE": "1996-01-02","LO_ORDERPRIORITY":"5-LOW" ,"LO_SHIPPRIORITY":"0" ,"LO_QUANTITY":"17" ,"LO_EXTENDEDPRICE":"6511" ,"LO_ORDTOTALPRICE":"9991" ,"LO_DISCOUNT":"4" ,"LO_REVENUE":"99999" ,"LO_SUPPLYCOST":"95999" ,"LO_TAX":"1001" ,"LO_COMMITDATE":"1996-01-09" ,"LO_SHIPMODE": "AIR"}
- 登录Doris集群中已安装好MySQL客户端的节点,执行如下命令进入Doris数据库。
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1 //若集群已启用Kerberos认证(安全模式)需执行该命令。
mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
- 数据库连接端口为Doris FE的查询连接端口,可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 实例”,查看任一FE实例的业务IP地址获取。
- Doris FE实例IP地址可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 配置 > 全部配置”,搜索并查看“query_port”参数值获取。
- 执行以下命令查询新增的数据。
use 数据库名称;
select * from lineorder_u;
例如查看到如下信息,表示查询新增的数据成功。