更新时间:2024-12-25 GMT+08:00
分享

在DLI使用Hudi提交Flink SQL作业

本节操作介绍在DLI使用Hudi提交Flink SQL作业的操作步骤。

具体语法说明请参考Flink OpenSource SQL1.15语法概览

进入DLI控制台,随后单击左侧菜单的“作业管理 > Flink作业”,进入Flink作业的界面。

  1. 创建Flink作业:点击界面右上角的”创建作业”按钮,在弹出窗口中配置作业名称,类型选择”Flink OpenSource SQL
  2. 写入Flink SQL (不使用Catalog的场景):

    这里的sink表通过创建临时表指向Hudi表路径来写入数据,同时在表参数中配置hive_sync相关参数,实时同步元数据至由DLI提供的元数据服务。(具体参数详见Flink参数一节)

    请将作业中sink表的path参数修改为希望保存hudi表的obs路径。
    -- 临时表作为source,通过datagen mock数据
    create table
      orderSource (
        order_id STRING,
        order_name STRING,
        order_time TIMESTAMP(3)
      )
    with
      ('connector' = 'datagen', 'rows-per-second' = '1');
    
    -- 创建Hudi临时表作为sink,通过配置hms同步,将表同步至DLI元数据服务
    CREATE TABLE
      hudi_table (
        order_id STRING PRIMARY KEY NOT ENFORCED,
        order_name STRING,
        order_time TIMESTAMP(3),
        order_date String
      ) PARTITIONED BY (order_date)
    WITH
      (
        'connector' = 'hudi',
        'path' = 'obs://bucket/path/hudi_table',
        'table.type' = 'MERGE_ON_READ',
        'hoodie.datasource.write.recordkey.field' = 'order_id',
        'write.precombine.field' = 'order_time',
        'hive_sync.enable' = 'true',
        'hive_sync.mode' = 'hms',
        'hive_sync.table' = 'hudi_table',
        'hive_sync.db' = 'default'
      );
    
    -- 执行insert,读取source表,写入sink
    insert into
      hudi_table
    select
      order_id,
      order_name,
      order_time,
      DATE_FORMAT (order_time, 'yyyyMMdd')
    from
      orderSource;

  3. 配置作业运行参数:

    • 选择队列,并配置Flink版本至少为1.15。
    • 配置权限足够的委托。
    • 配置OBS桶。
    • 开启Checkpoint,使用Hudi时必须开启Checkpoint。

  4. 提交作业并检查Flink UI和日志:

    直接点击界面右上角的”提交”,在跳转界面再次确认参数无误后,点击底部”立即启动”。完成提交后自动跳转至Flink作业界面,此处可以筛选刚才提交的Flink作业并检查执行状态。

    点击作业的名称,可以跳转至作业界面,此处可以点击”提交日志”或”运行日志”,检查聚合的日志。也可以直接点击日志列表,选择JobManager或者TaskManager,并下载对应日志。

    点击作业界面右上角的”更多”->”Flink UI”,即可跳转至该任务的Flink UI界面。

相关文档