更新时间:2024-04-23 GMT+08:00

Doris维表

功能描述

创建Doris维表用于与输入流连接生成宽表。

前提条件

  • 该场景作业需要运行在DLI的独享队列上,因此要与HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。
  • 如果使用MRS Doris,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。

    详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。

  • 集群未启用Kerberos认证(普通模式)。

    使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。

注意事项

  • 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。
  • 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 集群未启用Kerberos认证(普通模式)。
  • Doris的表名是区分大小写。
  • 使用cloudTable的doris时,'fenodes'字段值的端口请用8030,如'xx:8030'。同时安全组请放开端口8030, 8040,9030。
  • 开启HTTPS后,需要在创建表的with子句中添加如下配置参数:
    • 'doris.enable.https' = 'true'
    • 'doris.ignore.https.ca' = 'true'

语法格式

create table hbaseSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
 )
with (
  'connector' = 'doris',
  'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT',
  'table.identifier' = 'database.table',
  'username' = 'dorisUsername',
  'password' = 'dorisPassword'
);

参数说明

通用配置项​

参数

默认值

是否必选

参数类型说明

fenodes

--

Y

Doris FE ip地址和port, 多实例之间使用逗号分隔。其中port可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 配置”,在搜索框中搜索“http”查看。如果开启https,则搜索“https”。

table.identifier

--

Y

Doris 表名,如:db.tbl

username

--

Y

访问 Doris 的用户名。

password

--

Y

访问 Doris 的密码。

lookup.cache.max-rows

-1L

N

查找缓存的最大行数,超过此值,最旧的行将被删除。

如需启用缓存配置则“cache.max-rows”和“cache.ttl”选项都必须指定。

lookup.cache.ttl

10 s

N

缓存生存时间。

lookup.max-retries

3

N

查找数据库失败时的最大重试次数。

示例

该示例是从Doris源表读取数据,并输入到 print connector。

  1. 参考增强型跨源连接,在DLI上根据Doris所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。“修改主机信息”章节描述,在增强型跨源中增加MRS的主机信息。
  2. 设置Doris和kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据Doris和Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
  3. 参考MRS Doris使用指南,创建doris表,并插入10条数据。创建语句如下:
    CREATE TABLE IF NOT EXISTS dorisdemo
    (
      `user_id` varchar(10) NOT NULL,
      `city` varchar(10),
      `age` int,
      `gender` int
    )
    DISTRIBUTED BY HASH(`user_id`) BUCKETS 10;
    
    INSERT INTO dorisdemo VALUES ('user1', 'city1', 20, 1);
    INSERT INTO dorisdemo VALUES ('user2', 'city2', 21, 0);
    INSERT INTO dorisdemo VALUES ('user3', 'city3', 22, 1);
    INSERT INTO dorisdemo VALUES ('user4', 'city4', 23, 0);
    INSERT INTO dorisdemo VALUES ('user5', 'city5', 24, 1);
    INSERT INTO dorisdemo VALUES ('user6', 'city6', 25, 0);
    INSERT INTO dorisdemo VALUES ('user7', 'city7', 26, 1);
    INSERT INTO dorisdemo VALUES ('user8', 'city8', 27, 0);
    INSERT INTO dorisdemo VALUES ('user9', 'city9', 28, 1);
    INSERT INTO dorisdemo VALUES ('user10', 'city10', 29, 0);
  4. 创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业模拟从kafka读取数据,并关联doris维表对数据进行打宽,并输出到print。
    CREATE TABLE ordersSource (
      user_id string,
      user_name string,
      proctime as Proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'kafka-topic',
      'properties.bootstrap.servers' = 'kafkaIp:port,kafkaIp:port,kafkaIp:port',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE dorisDemo (
      `user_id` String NOT NULL,
      `city` String,
      `age` int,
      `gender` int
    ) with (
      'connector' = 'doris',
      'fenodes' = 'FE实例IP地址:端口号',
      'table.identifier' = 'demo.dorisdemo',
      'username' = 'dorisUsername',
      'password' = 'dorisPassword',
      'lookup.cache.ttl'='10 m',
      'lookup.cache.max-rows' = '100'
    );
    
    CREATE TABLE print (
      user_id string,
      user_name string,
      `city` String,
      `age` int,
      `gender` int
    ) WITH (
      'connector' = 'print'
    );
    
    insert into print 
    select 
      orders.user_id,
      orders.user_name,
      dim.city,
      dim.age,
      dim.sex
    from ordersSource orders
    left join dorisDemo for system_time as of orders.proctime as dim on orders.user_id = dim.user_id;
  5. 往kafka数据源写入2条数据。
    {"user_id": "user1", "user_name": "name1"}
    {"user_id": "user2", "user_name": "name2"}
  6. 查看print结果表数据。
    +I[user1, name1, city1, 20, 1]
    +I[user2, name2, city2, 21, 0]