更新时间:2024-09-27 GMT+08:00

Doris结果表

功能描述

Flink SQL作业写Doris结果表。

前提条件

  • 该场景作业需要运行在DLI的独享队列上,因此要与Doris建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • 如果使用MRS Doris,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。

    详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。

  • 集群未启用Kerberos认证(普通模式)

    使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 集群未启用Kerberos认证(普通模式)
  • Doris的表名是区分大小写。
  • 使用cloudTable的doris时,'fenodes'字段值的端口请用8030,如'xx:8030'。同时安全组请放开端口8030,8040,9030。
  • 开启HTTPS后,需要在创建表的with子句中添加如下配置参数:
    • 'doris.enable.https' = 'true'
    • 'doris.ignore.https.ca' = 'true'
  • 请在Flink“作业编辑”页面选择“运行参数配置”,选择“开启Checkpoint”,否则会导致Doris结果表无法写入数据,且写入Doris的延时取决于设置的Checkpoint的间隔时间。

语法格式

create table dorisSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
 )
with (
  'connector' = 'doris',
  'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT',
  'table.identifier' = 'database.table',
  'username' = 'dorisUsername',
  'password' = 'dorisPassword'
);

参数说明

通用配置项​

参数

默认值

是否必选

参数类型说明

fenodes

--

Doris FE ip地址和port, 多实例之间使用逗号分隔。其中port可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置”,在搜索框中搜索“http”查看。如果开启https,则搜索“https”。

table.identifier

--

Doris 表名,如:db.tbl。

username

--

访问 Doris的用户名。

password

--

访问 Doris的密码。

sink.label-prefix

""

Stream load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。

sink.enable-2pc

TRUE

是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考这里

sink.check-interval

10000

加载时检查间隔异常。

sink.max-retries

3

将记录写入数据库失败时的最大重试次数。

sink.buffer-size

256 * 1024

缓存流加载数据的缓冲区大小。

sink.buffer-count

3

缓存流加载数据的缓冲区计数。

sink.enable-delete

TRUE

是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。

sink.properties.*

--

Stream Load 的导入参数。

例如: 'sink.properties.column_separator' = ', ' 定义列分隔符, 'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,'\x01'会被转换为二进制的0x01

JSON格式导入

'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true'

示例

该示例是从Datagen数据源中生成数据,并将结果写入到Doris结果表中。

  1. 参考增强型跨源连接,在DLI上根据Doris所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。“修改主机信息”章节描述,在增强型跨源中增加MRS的主机信息。
  2. 设置Doris的安全组,添加入向规则使其对Flink的队列网段放通。分别根据Doris的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。

    参考测试地址连通性

  3. 参考MRS Doris使用指南,创建doris表,创建语句如下:
    CREATE TABLE IF NOT EXISTS dorisdemo
    (
      `user_id` varchar(10) NOT NULL,
      `city` varchar(10),
      `age` int,
      `gender` int
    )
    DISTRIBUTED BY HASH(`user_id`) BUCKETS 10
    
  4. 创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Datagen作为数据源,将数据写入到Doris作为结果表中。
    create table student_datagen_source(
      `user_id` String NOT NULL,
      `city` String,
      `age` int,
      `gender` int
    ) with (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      'fields.user_id.kind' = 'random',
      'fields.user_id.length' = '7', 
      'fields.city.kind' = 'random',
      'fields.city.length' = '7'
    );
    
    
    CREATE TABLE dorisDemo (
      `user_id` String NOT NULL,
      `city` String,
      `age` int,
      `gender` int
    ) with (
      'connector' = 'doris',
      'fenodes' = 'FE_IP:PORT',
      'table.identifier' = 'demo.dorisdemo',
      'username' = 'dorisUser',
      'password' = 'dorisPassword',
      'sink.label-prefix' = 'demo',
      'sink.enable-2pc' = 'true',
      'sink.buffer-count' = '10'
    );
    
    insert into dorisDemo select * from student_datagen_source
  1. 查看doris结果表是否已成功写入数据。

    user_id

    city

    age

    gender

    50aff04

    93406c5

    12

    1

    681a230

    1f27d06

    16

    1

    006eff4

    3521ded

    18

    0