更新时间:2026-01-06 GMT+08:00
分享

配置MRS Flink读写Doris数据

Flink Doris Connector支持通过Flink对Doris中存储的数据进行读取、插入、修改和删除操作。

建议Doris 3.0.6选取MRS 3.6.0版本进行匹配,创建包含Flink服务的MRS集群后Flink-Doris-Connector将自动集成,无需手动更新。Doris和MRS以及Flink版本配套关系如下所示:

表1 版本关系配套表

MRS版本

Flink版本

Flink-Doris-Connector版本

CloudTable Doris版本

MRS 3.6.0

1.20.0

flink-doris-connector-1.20-1.5.2-h0.cbu.mrs.360.xxx.jar

3.0.6

MRS 3.5.1

1.20.0

flink-doris-connector-1.20-1.5.2-h0.cbu.mrs.351.xxx.jar

3.0.6

MRS 3.5.0

1.17.1

flink-doris-connector-1.17-1.5.2-h0.cbu.mrs.350.xxx.jar

3.0.6

使用限制

  • 只能对Unique Key模型的表进行修改和删除操作。
  • 仅支持Doris 3.0.6及以上版本。
  • 确保网络连通,MRS集群必须与CloudTable Doris集群的安全组、区域、VPC、子网保持一致。

前提条件

  • 已创建CloudTable Doris集群,集群运行正常。
  • 已安装MySQL客户端。
  • 已创建包含Flink服务的MRS集群,集群内各服务运行正常。
  • 已安装Flink客户端。

Flink读写Doris数据操作步骤

Doris侧操作

  1. 使用SSH登录工具,访问Doris集群。
    • 连接Doris普通集群命令,详细请参见使用MySQL客户端连接Doris普通集群
      ./mysql -uadmin -ppassword -h集群内网地址 -P9030
    • 连接Doris安全集群命令,详细请参见使用MySQL客户端连接Doris安全集群
      ./mysql -uadmin -h集群内网地址 -P端口 --ssl-ca={path}/certificate.crt --ssl-mode=VERIFY_CA -ppassword
      表2 参数说明

      参数

      说明

      admin

      连接Doris集群的账户。

      password

      连接Doris集群的账户密码。

      集群内网地址

      获取集群内网地址:进入集群“详情”页,在“集群信息”模块处获取“集群访问地址”。

      path

      如果集群是开启HTTPS,需要配置证书的路径。

  2. 创建并使用数据库。
    • 创建数据库testdb。
      create database if not exists testdb;
    • 使用新创建的数据库testdb。
      use testdb;
  3. 创建数据表demo。
    create table demo(id int, name string) distributed by hash(id) buckets 10;
  4. 向表demo中插入数据。
    insert into demo values(123, 'aaa'), (234, 'bbb'), (345, 'ccc');
  5. 创建表test_demo。
    create table test_demo(id int, name string) distributed by hash(id) buckets 10;

Flink侧操作

  1. 以root用户登录MRS客户端所在节点(Master1节点),进入安装了Flink客户端的节点。
    cd 客户端安装目录
  2. 加载环境变量。
    source bigdata_env

    认证用户,如果MRS集群未启用Kerberos认证(普通模式)请跳过该操作。

    kinit 组件业务用户
  3. 连接Flink SQL客户端。
    • 进入bin目录。
      cd Flink/flink/bin/
    • 登录Flink SQL客户端。
      sql-client.sh
  4. 在Flink客户端创建Flink SQL作业。
    创建表“Flink_doris_source”。
    CREATE TABLE flink_doris_source (id INT, name STRING) WITH (
    'connector' = 'doris',
    'fenodes' = 'FE实例IP地址:端口号',
    'table.identifier' = 'testdb.demo',
    'username' = 'user',
    'password' = 'password',
    'doris.enable.https' = 'true',
    'doris.ignore.https.ca' = 'true'
    );

    创建表“Flink_doris_sink”。

    CREATE TABLE flink_doris_sink (id INT, name STRING) WITH (
    'connector' = 'doris',
    'fenodes' = 'FE实例IP地址:端口号',
    'table.identifier' = 'testdb.test_demo',
    'username' = 'user',
    'password' = 'password',
    'sink.label-prefix' = 'doris_label_6',
    'doris.enable.https' = 'true',
    'doris.ignore.https.ca' = 'true'
    );
    • 如果集群开启HTTPS,需要在创建表的with子句中添加以下参数;如果集群未开启HTTPS,则不需要在创建表的with子句中添加以下参数。
      • 'doris.enable.https' = 'true'
      • 'doris.ignore.https.ca' = 'true'
    • Flink_doris_source、Flink_doris_sink表参数对应Doris表中字段。
    • 端口号为MySQL端口9030。
    • 创建Flink作业时,username配置项为Doris用户,password配置项为Doris用户密码。
  5. 将表Flink_doris_source中的数据插入Flink_doris_sink表中。
    INSERT INTO flink_doris_sink select id,name from flink_doris_source;

验证

  1. 根据1访问Doris集群,验证test_demo表中是否插入新数据。
    • 使用testdb数据库。
      use testdb;
    • 查看test_demo表中插入数据。
      select * from test_demo;

相关文档