更新时间:2024-11-26 GMT+08:00
分享

配置Flink读写Doris数据

Flink Doris Connector支持通过Flink操作(读取、插入、修改、删除)Doris中存储的数据。

  • 只能对Unique Key模型的表进行修改和删除操作。
  • 该章节仅适用于MRS 3.5.0及之后版本。

前提条件

  • 已创建包含Doris服务的集群,集群内各服务运行正常。
  • 待连接Doris数据库的节点与MRS集群网络互通。
  • 创建具有Doris管理权限的用户。
    • 集群已启用Kerberos认证(安全模式)

      在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。

      使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。

    • 集群未启用Kerberos认证(普通模式)

      使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。

  • 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris
  • 已安装Flink客户端。

操作步骤

Doris侧操作。

  1. 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。

    如果集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

    mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址

    • 数据库连接端口为Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
    • Doris FE实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE实例的IP地址。
    • 用户也可以使用MySQL连接软件或者Doris WebUI界面连接数据库。

  2. 执行以下命令创建数据库并切换:

    create database if not exists testdb;

    use testdb;

  3. 执行以下命令创建表z_test并插入数据:

    create table z_test(id int, name string) distributed by hash(id) buckets 10;

    insert into z_test values(123, 'aaa'), (234, 'bbb'), (345, 'ccc');

  4. 执行以下命令创建z_test_sink_3表:

    create table z_test_sink_3(id int, name string) distributed by hash(id) buckets 10;

Flink侧操作。

  1. 以客户端安装用户登录安装了Flink客户端的节点,执行以下命令:

    cd 客户端安装目录

    source bigdata_env

    kinit 组件业务用户(如果集群未启用Kerberos认证(普通模式),请跳过该操作)

  2. 执行以下命令登录Flink SQL客户端:

    cd Flink/flink/bin/

    sql-client.sh

  3. 在Flink客户端创建流或批Flink SQL作业。例如:

    CREATE TABLE flink_doris_source (id INT, name STRING) WITH (

    'connector' = 'doris',

    'fenodes' = 'FE实例IP地址:端口号',

    'table.identifier' = 'testdb.z_test',

    'username' = 'user',

    'password' = 'password',

    'doris.enable.https' = 'true',

    'doris.ignore.https.ca' = 'true'

    );

    CREATE TABLE flink_doris_sink (id INT, name STRING) WITH (

    'connector' = 'doris',

    'fenodes' = 'FE实例IP地址:端口号',

    'table.identifier' = 'testdb.z_test_sink_3',

    'username' = 'user',

    'password' = 'password',

    'sink.label-prefix' = 'doris_label_6',

    'doris.enable.https' = 'true',

    'doris.ignore.https.ca' = 'true'

    );

    执行以下命令插入数据:

    INSERT INTO

    flink_doris_sink

    select

    id,

    name

    from

    flink_doris_source;

    • 开启HTTPS后,需要在创建表的with子句中添加如下配置参数:
      • 'doris.enable.https' = 'true'
      • 'doris.ignore.https.ca' = 'true'
    • Source和Sink表对应的字段需和Doris中表的字段名保持一致。
    • 端口号为FE服务的HTTPS端口(开启Kerberos认证的集群)或HTTP端口(未开启Kerberos认证的集群)。可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置”,在搜索框中搜索“https_port”或“http_port”查看。
    • 创建Flink作业时,username配置项为Doris用户,password配置项为Doris用户密码。
    • 集群已启用Kerberos认证(安全模式)只能配置HTTPS模式。

相关文档