通过Doris Catalog读取RDS-MySQL数据并写入Doris
应用场景
通过Doris创建Catalog成功读取RDS-MySQL数据并写入Doris,并按照Unique指定字段自动去重。
方案架构
离线数据可以从数据湖加载,也可以直接加载本地文件。从数据湖加载可以使用工具CDM,在没有CDM工具时,可以直接使用外表加载、BrokerLoad的方式。
本地文件加载则使用Doris自带的StreamLoad工具。

- 场景特点:
- 数据加载方式:
- 在存在DataArts Studio工具的场景下,数据加载统一使用CDM进行(DataArts Studio需要2.10及以上版本)。
- 在不存在DataArts Studio工具的场景下,可以使用Spark或者外表方式加载。由数据加工方进行数据推送加载时,推荐使用BorkerLoad加载方式,方便数据加工方统一调度;如果有数据应用方负责数据加载,则使用外表方式进行数据加载较为方便应用层另外配置加载数据。
- 在没有数据湖的场景下,轻量化数仓场景中,数据直接由源端文件加载到Doris中,可以使用Streamload方式加载本地文件。
- 注意事项:
步骤1:创建RDS for MySQL实例、创建数据表
- 创建RDS for MySQL实例。
详细操作请参考购买RDS for MySQL实例。
- 已创建相应数据表,如lineorder_noid,且存在一定量数据。
建表语句参考:
CREATE TABLE `lineorder_noid`( `lo_orderkey` bigint, `lo_linenumber` int, `lo_custkey` bigint, `lo_partkey` bigint, `lo_suppkey` bigint, `lo_orderdate` date, `lo_orderpriority` varchar(100), `lo_shippriority` int, `lo_quantity` int, `lo_extendedprice` bigint, `lo_ordtotalprice` bigint, `lo_discount` int, `lo_revenue` bigint, `lo_supplycost` int, `lo_tax` int, `lo_commitdate` date, `lo_shipmode` varchar(100) ) ;
数据准备如下:
步骤2:创建MRS Doris集群并配置
- 创建MRS Doris集群。
详细操作请参考自定义购买MRS集群,例如相关参数如下:
- 集群名称:自定义,例如“mrs_doris”
- 集群版本:MRS 3.3.0-LTS
- 组件选择:Doris集群。
- Kerberos认证:开启
其他参数根据实际需要进行配置。
- 集群购买完成后安装集群客户端。
详细操作请参考安装客户端(3.x版本)。
例如客户端安装路径为“/opt/dorisclient”。
- 在Doris集群安装MySQL客户端。
详细操作请参考使用MySQL客户端连接Doris。
- 创建拥有Doris管理权限的人机用户(例如用户名为dorisuser),并修改初始密码。
- 登录Doris集群Manager页面。
- 选择“系统 > 权限 > 角色 > 添加角色”,填写角色名称,如“dorisrole”,在“配置资源权限”选择“待操作的集群 > Doris”,勾选“Doris管理员权限”,单击“确定”。
- 选择“用户 > 添加用户”,填写用户名称,如“dorisuser”,“用户类型”选择“人机”,“密码策略”默认,输入用户密码并确认密码,关联“dorisrole”角色,单击“确定”。
- 使用新建的dorisuser用户重新登录FusionInsight Manager,修改该用户初始密码。
步骤3:创建Doris Catalog
- 登录Doris集群中已安装好MySQL客户端的节点,执行如下命令进入Doris数据库。
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1 //若集群已启用Kerberos认证(安全模式)需执行该命令。
mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
- 数据库连接端口为Doris FE的查询连接端口,可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 实例”,查看任一FE实例的业务IP地址获取。
- Doris FE实例IP地址可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 配置 > 全部配置”,搜索并查看“query_port”参数值获取。
- 创建Catalog。
例如执行以下命令,相关参数说明见表1,参数取值请以时间为准。
CREATE CATALOG jdbc_mysql PROPERTIES ( 'type' = 'jdbc', 'user' = '用户名', 'password' = '用户密码', 'jdbc_url' = 'jdbc:mysql://MySQL实例IP:3306/数据库名', 'driver_url' = 'file://预置的MySQL驱动包绝对路径', 'driver_class' = 'com.mysql.cj.jdbe.Driver' );
表1 参数说明 参数
取值样例
参数描述
user
user
MySQL数据库用户名。
password
password
MySQL数据库用户密码。
jdbc_url
jdbc:mysql://MySQL实例IP:3306/数据库名
jdbc:mysql://MySQL实例IP:3306/数据库名
driver_url
file://预置的MySQL驱动包绝对路径
driver_url可以通过以下三种方式指定:
- 文件名。如“mysql-connector-java-*.jar”。需将Jar包预先存放在FE和BE部署目录的“jdbc_drivers/”目录下。系统会自动在这个该目录下寻找。该目录的位置,也可以由“fe.conf”和“be.conf”中的“jdbc_drivers_dir”配置修改。
- 本地绝对路径。如“file:///path/to/mysql-connector-java-*.jar”。需将Jar包预先存放在所有FE/BE节点指定的路径下并修改所属用户/组(chown omm:wheel mysql-connector-java-*.jar)。
- HTTP地址。如“https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar”。系统会从这个HTTP地址下载Driver文件。仅支持无认证的HTTP服务。
driver_class
com.mysql.cj.jdbe.Driver
-
- 执行以下命令查询MySQL表。
- 执行以下命令查询Catalogs:
show catalogs;
- 执行以下命令查询catalog下面的库:
show databases from jdbc_mysql;
- 执行以下命令切换到Catalog下,再进入到数据库中:
switch jdbc_mysql;
use default;
- 查询Catalog中某个库的所有表:
show tables from `jdbc_mysql`.`default`;
- 查询指定表:
select * from `jdbc_mysql `.`default`.`test_table`;
- 执行以下命令查看表的Schema:
DESC test_table;
- 新建或操作MySQL表后,需要在Doris中执行刷新:
refresh catalog jdbc_mysql;
例如执行后结果如下,表示查询成功:
- 执行以下命令查询Catalogs:
- 执行以下命令创建Doris表(使用引擎为 UNIQUE )。
CREATE TABLE wxk.`lineorder_mysql_u` ( `LO_ORDERKEY` bigint NOT NULL COMMENT "订单id", `LO_LINENUMBER` int NOT NULL COMMENT "订单码", `LO_CUSTKEY` bigint NOT NULL COMMENT "客户id", `LO_PARTKEY` bigint NOT NULL COMMENT "商品id", `LO_SUPPKEY` bigint NOT NULL COMMENT "供应商id", `LO_ORDERDATE` date DEFAULT NULL COMMENT "订单日期", `LO_ORDERPRIORITY` varchar(100) DEFAULT NULL COMMENT "订单优先级", `LO_SHIPPRIORITY` int DEFAULT NULL COMMENT "物流优先级", `LO_QUANTITY` int DEFAULT NULL COMMENT "数量", `LO_EXTENDEDPRICE` bigint DEFAULT NULL COMMENT "扩展价格", `LO_ORDTOTALPRICE` bigint DEFAULT NULL COMMENT "订单总价", `LO_DISCOUNT` int DEFAULT NULL COMMENT "折扣", `LO_REVENUE` bigint DEFAULT NULL COMMENT "收入", `LO_SUPPLYCOST` int DEFAULT NULL COMMENT "成本", `LO_TAX` int DEFAULT NULL COMMENT "税款", `LO_COMMITDATE` date DEFAULT NULL COMMENT "承诺日期", `LO_SHIPMODE` varchar(100) NOT NULL COMMENT "运输方式" ) UNIQUE KEY ( `LO_ORDERKEY`, `LO_LINENUMBER`, `LO_CUSTKEY`, `LO_PARTKEY`, `LO_SUPPKEY`, `LO_ORDERDATE` ) PARTITION BY RANGE(`LO_ORDERDATE`) ( PARTITION `p1992` VALUES LESS THAN ("1993-01-01"), PARTITION `p1993` VALUES LESS THAN ("1994-01-01"), PARTITION `p1994` VALUES LESS THAN ("1995-01-01"), PARTITION `p1995` VALUES LESS THAN ("1996-01-01"), PARTITION `p1996` VALUES LESS THAN ("1997-01-01"), PARTITION `p1997` VALUES LESS THAN ("1998-01-01"), PARTITION `p1998` VALUES LESS THAN ("1999-01-01"), PARTITION `p1999` VALUES LESS THAN ("2000-01-01"), PARTITION `p2000` VALUES LESS THAN ("2001-01-01"), PARTITION `p2001` VALUES LESS THAN ("2002-01-01") ) DISTRIBUTED BY HASH(`LO_ORDERDATE`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 3" );
查询表名后结果如下,表示创建Doris表成功:
show tables;
- 执行以下命令读取MySQL数据写入Doris。
insert into internal.wxk.lineorder_mysql_u select * from jdbc_mysql.cdl_dy.lineorder_noid;
例如执行后结果如下,MySQL数据写入Doris成功,且按照UNIQUE KEY指定字段自动去除重复数据。