配置Flink读写Doris数据
Flink Doris Connector支持通过Flink操作(读取、插入、修改、删除)Doris中存储的数据。
- 只能对Unique Key模型的表进行修改和删除操作。
- 该章节仅适用于MRS 3.5.0及之后版本。
前提条件
- 已创建包含Doris服务的集群,集群内各服务运行正常。
- 待连接Doris数据库的节点与MRS集群网络互通。
- 创建具有Doris管理权限的用户。
- 集群已启用Kerberos认证(安全模式)
在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。
使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。
- 集群未启用Kerberos认证(普通模式)
使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
- 集群已启用Kerberos认证(安全模式)
- 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris。
- 已安装Flink客户端。
操作步骤
Doris侧操作。
- 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。
如果集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
- 数据库连接端口为Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
- Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
- 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。
- 执行以下命令创建数据库并切换:
create database if not exists testdb;
use testdb;
- 执行以下命令创建表z_test并插入数据:
create table z_test(id int, name string) distributed by hash(id) buckets 10;
insert into z_test values(123, 'aaa'), (234, 'bbb'), (345, 'ccc');
- 执行以下命令创建z_test_sink_3表:
create table z_test_sink_3(id int, name string) distributed by hash(id) buckets 10;
Flink侧操作。
- 以客户端安装用户登录安装了Flink客户端的节点,执行以下命令:
cd 客户端安装目录
source bigdata_env
kinit 组件业务用户(如果集群未启用Kerberos认证(普通模式),请跳过该操作)
- 执行以下命令登录Flink SQL客户端:
cd Flink/flink/bin/
sql-client.sh
- 在Flink客户端创建流或批Flink SQL作业。例如:
CREATE TABLE flink_doris_source (id INT, name STRING) WITH (
'connector' = 'doris',
'fenodes' = 'FE实例IP地址:端口号',
'table.identifier' = 'testdb.z_test',
'username' = 'user',
'password' = 'password',
'doris.enable.https' = 'true',
'doris.ignore.https.ca' = 'true'
);
CREATE TABLE flink_doris_sink (id INT, name STRING) WITH (
'connector' = 'doris',
'fenodes' = 'FE实例IP地址:端口号',
'table.identifier' = 'testdb.z_test_sink_3',
'username' = 'user',
'password' = 'password',
'sink.label-prefix' = 'doris_label_6',
'doris.enable.https' = 'true',
'doris.ignore.https.ca' = 'true'
);
执行以下命令插入数据:
INSERT INTO
flink_doris_sink
select
id,
name
from
flink_doris_source;
- 开启HTTPS后,需要在创建表的with子句中添加如下配置参数:
- 'doris.enable.https' = 'true'
- 'doris.ignore.https.ca' = 'true'
- Source和Sink表对应的字段需和Doris中表的字段名保持一致。
- 端口号为FE服务的HTTPS端口(开启Kerberos认证的集群)或HTTP端口(未开启Kerberos认证的集群)。可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置”,在搜索框中搜索“https_port”或“http_port”查看。
- 创建Flink作业时,username配置项为Doris用户,password配置项为Doris用户密码。
- 集群已启用Kerberos认证(安全模式)只能配置HTTPS模式。
- 开启HTTPS后,需要在创建表的with子句中添加如下配置参数: