在DLI使用Hudi提交Flink SQL作业
本节操作介绍在DLI使用Hudi提交Flink SQL作业的操作步骤。
具体语法说明请参考Flink OpenSource SQL1.15语法概览。
进入DLI控制台,随后单击左侧菜单的“作业管理 > Flink作业”,进入Flink作业的界面。
- 创建Flink作业:点击界面右上角的”创建作业”按钮,在弹出窗口中配置作业名称,类型选择”Flink OpenSource SQL”
- 写入Flink SQL (不使用Catalog的场景):
这里的sink表通过创建临时表指向Hudi表路径来写入数据,同时在表参数中配置hive_sync相关参数,实时同步元数据至由DLI提供的元数据服务。(具体参数详见Flink参数一节)
请将作业中sink表的path参数修改为希望保存hudi表的obs路径。-- 临时表作为source,通过datagen mock数据 create table orderSource ( order_id STRING, order_name STRING, order_time TIMESTAMP(3) ) with ('connector' = 'datagen', 'rows-per-second' = '1'); -- 创建Hudi临时表作为sink,通过配置hms同步,将表同步至DLI元数据服务 CREATE TABLE hudi_table ( order_id STRING PRIMARY KEY NOT ENFORCED, order_name STRING, order_time TIMESTAMP(3), order_date String ) PARTITIONED BY (order_date) WITH ( 'connector' = 'hudi', 'path' = 'obs://bucket/path/hudi_table', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'order_id', 'write.precombine.field' = 'order_time', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'hms', 'hive_sync.table' = 'hudi_table', 'hive_sync.db' = 'default' ); -- 执行insert,读取source表,写入sink insert into hudi_table select order_id, order_name, order_time, DATE_FORMAT (order_time, 'yyyyMMdd') from orderSource;
- 配置作业运行参数:
- 选择队列,并配置Flink版本至少为1.15。
- 配置权限足够的委托。
- 配置OBS桶。
- 开启Checkpoint,使用Hudi时必须开启Checkpoint。
- 提交作业并检查Flink UI和日志:
直接点击界面右上角的”提交”,在跳转界面再次确认参数无误后,点击底部”立即启动”。完成提交后自动跳转至Flink作业界面,此处可以筛选刚才提交的Flink作业并检查执行状态。
点击作业的名称,可以跳转至作业界面,此处可以点击”提交日志”或”运行日志”,检查聚合的日志。也可以直接点击日志列表,选择JobManager或者TaskManager,并下载对应日志。
点击作业界面右上角的”更多”->”Flink UI”,即可跳转至该任务的Flink UI界面。