配置MRS Flink读写Doris数据
Flink Doris Connector支持通过Flink对Doris中存储的数据进行读取、插入、修改和删除操作。
建议Doris 3.0.6选取MRS 3.6.0版本进行匹配,创建包含Flink服务的MRS集群后Flink-Doris-Connector将自动集成,无需手动更新。Doris和MRS以及Flink版本配套关系如下表所示:
|
MRS版本 |
Flink版本 |
Flink-Doris-Connector版本 |
CloudTable Doris版本 |
|---|---|---|---|
|
MRS 3.6.0 |
1.20.0 |
flink-doris-connector-1.20-1.5.2-h0.cbu.mrs.360.xxx.jar |
3.0.6 |
|
MRS 3.5.1 |
1.20.0 |
flink-doris-connector-1.20-1.5.2-h0.cbu.mrs.351.xxx.jar |
3.0.6 |
|
MRS 3.5.0 |
1.17.1 |
flink-doris-connector-1.17-1.5.2-h0.cbu.mrs.350.xxx.jar |
3.0.6 |
使用限制
- 只能对Unique Key模型的表进行修改和删除操作。
- 仅支持Doris 3.0.6及以上版本。
- 确保网络连通,MRS集群必须与CloudTable Doris集群的安全组、区域、VPC、子网保持一致。
前提条件
- 已创建CloudTable Doris集群,集群运行正常。
- 已安装MySQL客户端。
- 已创建包含Flink服务的MRS集群,集群内各服务运行正常。
- 已安装Flink客户端。
Flink读写Doris数据操作步骤
Doris侧操作
- 使用SSH登录工具,访问Doris集群。
- 连接Doris普通集群命令,详细请参见使用MySQL客户端连接Doris普通集群。
./mysql -uadmin -ppassword -h集群内网地址 -P9030
- 连接Doris安全集群命令,详细请参见使用MySQL客户端连接Doris安全集群。
./mysql -uadmin -h集群内网地址 -P端口 --ssl-ca={path}/certificate.crt --ssl-mode=VERIFY_CA -ppassword表2 参数说明 参数
说明
admin
连接Doris集群的账户。
password
连接Doris集群的账户密码。
集群内网地址
获取集群内网地址:进入集群“详情”页,在“集群信息”模块处获取“集群访问地址”。
path
如果集群是开启HTTPS,需要配置证书的路径。
- 连接Doris普通集群命令,详细请参见使用MySQL客户端连接Doris普通集群。
- 创建并使用数据库。
- 创建数据库testdb。
create database if not exists testdb;
- 使用新创建的数据库testdb。
use testdb;
- 创建数据库testdb。
- 创建数据表demo。
create table demo(id int, name string) distributed by hash(id) buckets 10;
- 向表demo中插入数据。
insert into demo values(123, 'aaa'), (234, 'bbb'), (345, 'ccc');
- 创建表test_demo。
create table test_demo(id int, name string) distributed by hash(id) buckets 10;
Flink侧操作
- 以root用户登录MRS客户端所在节点(Master1节点),进入安装了Flink客户端的节点。
cd 客户端安装目录
- 加载环境变量。
source bigdata_env
认证用户,如果MRS集群未启用Kerberos认证(普通模式)请跳过该操作。
kinit 组件业务用户
- 连接Flink SQL客户端。
- 进入bin目录。
cd Flink/flink/bin/
- 登录Flink SQL客户端。
sql-client.sh
- 进入bin目录。
- 在Flink客户端创建Flink SQL作业。
创建表“Flink_doris_source”。
CREATE TABLE flink_doris_source (id INT, name STRING) WITH ( 'connector' = 'doris', 'fenodes' = 'FE实例IP地址:端口号', 'table.identifier' = 'testdb.demo', 'username' = 'user', 'password' = 'password', 'doris.enable.https' = 'true', 'doris.ignore.https.ca' = 'true' );
创建表“Flink_doris_sink”。
CREATE TABLE flink_doris_sink (id INT, name STRING) WITH ( 'connector' = 'doris', 'fenodes' = 'FE实例IP地址:端口号', 'table.identifier' = 'testdb.test_demo', 'username' = 'user', 'password' = 'password', 'sink.label-prefix' = 'doris_label_6', 'doris.enable.https' = 'true', 'doris.ignore.https.ca' = 'true' );
- 如果集群开启HTTPS,需要在创建表的with子句中添加以下参数;如果集群未开启HTTPS,则不需要在创建表的with子句中添加以下参数。
- 'doris.enable.https' = 'true'
- 'doris.ignore.https.ca' = 'true'
- Flink_doris_source、Flink_doris_sink表参数对应Doris表中字段。
- 端口号为MySQL端口9030。
- 创建Flink作业时,username配置项为Doris用户,password配置项为Doris用户密码。
- 如果集群开启HTTPS,需要在创建表的with子句中添加以下参数;如果集群未开启HTTPS,则不需要在创建表的with子句中添加以下参数。
- 将表Flink_doris_source中的数据插入Flink_doris_sink表中。
INSERT INTO flink_doris_sink select id,name from flink_doris_source;
验证
- 根据1访问Doris集群,验证test_demo表中是否插入新数据。
- 使用testdb数据库。
use testdb;
- 查看test_demo表中插入数据。
select * from test_demo;
- 使用testdb数据库。