网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
企业路由器 ER
企业交换机 ESW
全球加速 GA
企业连接 EC
云原生应用网络 ANC
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
威胁检测服务 MTD
态势感知 SA
认证测试中心 CTC
边缘安全 EdgeSec
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
API网关 APIG
分布式缓存服务 DCS
多活高可用服务 MAS
事件网格 EG
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
配置审计 Config
应用身份管理服务 OneAccess
资源访问管理 RAM
组织 Organizations
资源编排服务 RFS
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
解决方案
高性能计算 HPC
SAP
混合云灾备
开天工业工作台 MIW
Haydn解决方案工厂
数字化诊断治理专家服务
云生态
云商店
合作伙伴中心
华为云开发者学堂
华为云慧通差旅
开发与运维
软件开发生产线 CodeArts
需求管理 CodeArts Req
流水线 CodeArts Pipeline
代码检查 CodeArts Check
编译构建 CodeArts Build
部署 CodeArts Deploy
测试计划 CodeArts TestPlan
制品仓库 CodeArts Artifact
移动应用测试 MobileAPPTest
CodeArts IDE Online
开源镜像站 Mirrors
性能测试 CodeArts PerfTest
应用管理与运维平台 ServiceStage
云应用引擎 CAE
开源治理服务 CodeArts Governance
华为云Astro轻应用
CodeArts IDE
Astro工作流 AstroFlow
代码托管 CodeArts Repo
漏洞管理服务 CodeArts Inspector
联接 CodeArtsLink
软件建模 CodeArts Modeling
Astro企业应用 AstroPro
CodeArts盘古助手
华为云Astro大屏应用
计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
云手机服务器 CPH
专属主机 DeH
弹性伸缩 AS
镜像服务 IMS
函数工作流 FunctionGraph
云耀云服务器(旧版)
VR云渲游平台 CVR
Huawei Cloud EulerOS
云化数据中心 CloudDC
网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
企业路由器 ER
企业交换机 ESW
全球加速 GA
企业连接 EC
云原生应用网络 ANC
CDN与智能边缘
内容分发网络 CDN
智能边缘云 IEC
智能边缘平台 IEF
CloudPond云服务
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
威胁检测服务 MTD
态势感知 SA
认证测试中心 CTC
边缘安全 EdgeSec
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
可信智能计算服务 TICS
推荐系统 RES
云搜索服务 CSS
数据可视化 DLV
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
湖仓构建 LakeFormation
智能数据洞察 DataArts Insight
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
API网关 APIG
分布式缓存服务 DCS
多活高可用服务 MAS
事件网格 EG
开天aPaaS
应用平台 AppStage
开天企业工作台 MSSE
开天集成工作台 MSSI
API中心 API Hub
云消息服务 KooMessage
交换数据空间 EDS
云地图服务 KooMap
云手机服务 KooPhone
组织成员账号 OrgID
云空间服务 KooDrive
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
配置审计 Config
应用身份管理服务 OneAccess
资源访问管理 RAM
组织 Organizations
资源编排服务 RFS
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
区块链
区块链服务 BCS
数字资产链 DAC
华为云区块链引擎服务 HBS
解决方案
高性能计算 HPC
SAP
混合云灾备
开天工业工作台 MIW
Haydn解决方案工厂
数字化诊断治理专家服务
价格
成本优化最佳实践
专属云商业逻辑
云生态
云商店
合作伙伴中心
华为云开发者学堂
华为云慧通差旅
其他
管理控制台
消息中心
产品价格详情
系统权限
客户关联华为云合作伙伴须知
公共问题
宽限期保留期
奖励推广计划
活动
云服务信任体系能力说明
开发与运维
软件开发生产线 CodeArts
需求管理 CodeArts Req
流水线 CodeArts Pipeline
代码检查 CodeArts Check
编译构建 CodeArts Build
部署 CodeArts Deploy
测试计划 CodeArts TestPlan
制品仓库 CodeArts Artifact
移动应用测试 MobileAPPTest
CodeArts IDE Online
开源镜像站 Mirrors
性能测试 CodeArts PerfTest
应用管理与运维平台 ServiceStage
云应用引擎 CAE
开源治理服务 CodeArts Governance
华为云Astro轻应用
CodeArts IDE
Astro工作流 AstroFlow
代码托管 CodeArts Repo
漏洞管理服务 CodeArts Inspector
联接 CodeArtsLink
软件建模 CodeArts Modeling
Astro企业应用 AstroPro
CodeArts盘古助手
华为云Astro大屏应用
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
存储容灾服务 SDRS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
云存储网关 CSG
专属分布式存储服务 DSS
数据工坊 DWR
地图数据 MapDS
键值存储服务 KVS
容器
云容器引擎 CCE
云容器实例 CCI
容器镜像服务 SWR
云原生服务中心 OSC
应用服务网格 ASM
华为云UCS
数据库
云数据库 RDS
数据复制服务 DRS
文档数据库服务 DDS
分布式数据库中间件 DDM
云数据库 GaussDB
云数据库 GeminiDB
数据管理服务 DAS
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
AI开发平台ModelArts
华为HiLens
图引擎服务 GES
图像识别 Image
文字识别 OCR
自然语言处理 NLP
内容审核 Moderation
图像搜索 ImageSearch
医疗智能体 EIHealth
企业级AI应用开发专业套件 ModelArts Pro
人脸识别服务 FRS
对话机器人服务 CBS
语音交互服务 SIS
人证核身服务 IVS
视频智能分析服务 VIAS
城市智能体
自动驾驶云服务 Octopus
盘古大模型 PanguLargeModels
IoT物联网
设备接入 IoTDA
全球SIM联接 GSL
IoT数据分析 IoTA
路网数字化服务 DRIS
IoT边缘 IoTEdge
设备发放 IoTDP
企业应用
域名注册服务 Domains
云解析服务 DNS
企业门户 EWP
ICP备案
商标注册
华为云WeLink
华为云会议 Meeting
隐私保护通话 PrivateNumber
语音通话 VoiceCall
消息&短信 MSGSMS
云管理网络
SD-WAN 云服务
边缘数据中心管理 EDCM
云桌面 Workspace
应用与数据集成平台 ROMA Connect
ROMA资产中心 ROMA Exchange
API全生命周期管理 ROMA API
政企自服务管理 ESM
视频
实时音视频 SparkRTC
视频直播 Live
视频点播 VOD
媒体处理 MPC
视频接入服务 VIS
数字内容生产线 MetaStudio
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
专属云
专属计算集群 DCC
开发者工具
SDK开发指南
API签名指南
DevStar
华为云命令行工具服务 KooCLI
Huawei Cloud Toolkit
CodeArts API
云化转型
云架构中心
云采用框架
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
客户运营能力
国际站常见问题
支持计划
专业服务
合作伙伴支持计划
我的凭证
华为云公共事业服务云平台
工业软件
工业数字模型驱动引擎
硬件开发工具链平台云服务
工业数据转换引擎云服务

DLI对接LakeFormation

更新时间:2025-01-16 GMT+08:00
分享

操作场景

LakeFormation是企业级一站式湖仓构建服务,提供元数据统一管理能力,支持无缝对接多种计算引擎及大数据云服务,便捷高效地构建数据湖和运营相关业务,加速释放业务数据价值。

在DLI的Spark作业和SQL作业场景,支持对接LakeFormation实现元数据的统一管理,本节操作介绍配置DLI与LakeFormation的数据连接的操作步骤。

LakeFormation Spark语法请参考Spark语法参考

LakeFormation Flink语法请参考Flink语法参考

HetuEngine SQL语法请参考HetuEngine SQL语法参考

使用须知

该功能为白名单功能,如需使用,请在管理控制台右上角,选择“工单 > 新建工单”,提交申请。

DLI对接LakeFormation功能的使用依赖于“湖仓构建”服务的上线状态,如需了解“湖仓构建”服务的上线范围请参考全球产品和服务

操作流程

图1 操作流程

约束限制

  • 表1中提供了支持对接LakeFormation获取元数据的队列和引擎类型。
    查看队列的引擎类型和版本请参考查看队列的基本信息
    表1 LakeFormation获取元数据的队列和引擎类型

    队列类型

    引擎类型和支持的版本

    default队列

    • Spark 3.3.x:支持对接LakeFormation获取元数据的队列和引擎。
    • HetuEngine 2.1.0:支持对接LakeFormation获取元数据的队列和引擎。

    SQL队列

    • Spark 3.3.x:支持对接LakeFormation获取元数据的队列和引擎。
    • HetuEngine 2.1.0:支持对接LakeFormation获取元数据的队列和引擎。

    通用队列

    Flink作业场景:Flink 1.15及以上版本且使用弹性资源池队列时支持对接LakeFormation获取元数据。

  • DLI仅支持对接LakeFormation默认实例,请在LakeFormation设置实例为默认实例。
  • DLI支持读取Lakeformation的中Avro、Json、Parquet、Csv、Orc、Text、Hudi格式的数据。
  • LakeFormation数据目录中的库、表权限统一由LakeFormation管理。
  • DLI支持对接LakeFormation后,DLI原始库表下移至dli的数据目录下。

步骤1:创建LakeFormation实例用于元数据存储

LakeFormation实例为元数据的管理提供基础资源,DLI仅支持对接LakeFormation的默认实例。
  1. 创建实例
    1. 登录LakeFormation管理控制台。
    2. 单击页面右上角“立即购买”或“购买实例”,进入实例购买页面。

      首次创建实例时界面显示“立即购买”,如果界面已有LakeFormation实例则显示为“购买实例”。

    3. 按需配置LakeFormation实例参数,完成实例创建。

      本例创建按需计费的共享型实例。

      更多参数配置及说明,请参考创建LakeFormation实例

  2. 设置实例为默认实例
    1. 查看实例“基本信息”中“是否为默认实例”的参数值。
      • “true”表示当前实例为默认实例。
      • “false”表示当前实例不为默认实例。
    2. 如果需要设置当前实例为默认实例,请单击页面右上角“设为默认实例”。
    3. 勾选操作影响后单击“确定”,将当前实例设置为默认实例。
      说明:

      当前DLI仅对接LakeFormation默认实例,变更默认实例后,可能对使用LakeFormation的周边服务产生影响,请谨慎操作。

步骤2:在LakeFormation管理控制台创建Catalog

数据目录(Catalog)是元数据管理对象,它可以包含多个数据库。您可以在LakeFormation中创建并管理多个Catalog,用于不同外部集群的元数据隔离。

  1. 登录LakeFormation管理控制台。
  2. 选择“元数据 > Catalog”。
  3. 单击“创建Catalog”。

    按需配置Catalog实例参数。

    更多参数配置及说明,请参考创建Catalog

  4. 创建完成后,即可在“Catalog”页面查看Catalog相关信息。

步骤3:在DLI管理控制台创建数据目录

在DLI管理控制台需要创建到Catalog的连接,才可以访问LakeFormation实例中存储的Catalog。

说明:
  • DLI仅支持对接LakeFormation默认实例,请在LakeFormation设置实例为默认实例。
  • LakeFormation中每一个数据目录只能创建一个映射,不能创建多个。

    例如用户在DLI创建了映射名catalogMapping1对应LakeFormation数据目录:catalogA。创建成功后,在同一个项目空间下,不能再创建到catalogA的映射。

  1. 登录DLI管理控制台。
  2. 选择“SQL编辑器 ”。
  3. 在SQL编辑器页面,选择“数据目录”。
  4. 单击创建数据目录。
  5. 配置数据目录相关信息。
    表2 数据目录配置信息

    参数名称

    是否必填

    说明

    外部数据目录名称

    LakeFormation默认实例下的Catalog名称。

    类型

    当前只支持LakeFormation。

    该选项已固定,无需填写。

    数据目录映射名称

    在DLI使用的Catalog映射名,用户在执行SQL语句的时候需要指定Catalog映射,以此来标识访问的外部的元数据。建议与外部数据目录名称保持一致。

    当前仅支持连接LakeFormation默认实例的数据目录。

    描述

    自定义数据目录的描述信息。

  6. 单击“确定”创建数据目录。

步骤4:授权使用LakeFormation资源

  • SQL作业场景

    在进行SQL作业提交之前,需完成LakeFormation元数据、数据库、表、列和函数等资源授权,确保作业在执行过程中能够顺利访问所需的数据和资源。LakeFormation SQL资源权限支持列表提供了LakeFormation权限支持列表。

    使用LakeFormation资源需要分别完成LakeFormation的IAM细粒度授权和LakeFormation SQL资源授权。

    • LakeFormation的IAM细粒度授权:授权使用LakeFormation API。

      IAM服务通常提供了管理用户、组和角色的访问权限的方式。您可以在IAM控制台中创建策略(Policy),定义哪些用户或角色可以调用LakeFormation的API。然后,将这些策略附加到相应的用户或角色上。

      • 方法1:基于角色授权:

        即IAM最初提供的一种根据用户的工作职能定义权限的粗粒度授权机制。该机制以服务为粒度,提供有限的服务相关角色用于授权。

        例如参考LakeFormation权限管理授予用户只读权限,允许查询LakeFormation相关元数据资源的权限。

        或如下示例授予LakeFormation相关元数据资源的所有操作权限。

        示例:

        {
            "Version": "1.1",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": [
                        "lakeformation:table:*",
                        "lakeformation:database:*",
                        "lakeformation:catalog:*",
                        "lakeformation:function:*",         
                        "lakeformation:transaction:*",
                        "lakeformation:policy:describe",
                        "lakeformation:credential:describe"
                    ]
                }
            ]
        }
      • 方法2:基于策略的精细化授权:

        IAM提供的细粒度授权的能力,可以精确到具体服务的操作、资源以及请求条件等。

        LakeFormation权限策略请参考LakeFormation权限和授权项

        IAM授权的具体操作请参考创建用户并授权使用LakeFormation

    • LakeFormation SQL资源授权:授权使用LakeFormation具体资源(元数据、数据库、表、列和函数等)。

      LakeFormation资源授权是指允许用户对特定资源的访问的权限,以此来控制对LakeFormation的数据和元数据的访问。

      LakeFormation资源授权有两种方式:

      • 方式一:在LakeFormation管理控制台对资源授权。

        具体操作请参考LakeFormation用户指南中的新增授权

        了解LakeFormation SQL资源权限请参考数据权限概述

      • 方式二:在DLI管理控制台使用GRANT SQL语句授权

        GRANT语句是SQL语言中用于授权的一种方式。

        您可以使用GRANT语句来授予用户或角色对数据库、表、列、函数等的访问权限。

        LakeFormation SQL资源权限支持列表提供了LakeFormation资源授权的策略。

        说明:

        Catalog资源暂时不支持在DLI SQL授权,请参考▪方式一:在LakeFormation管理控制台...在LakeFormation 管理控制台完成授权。

  • Spark Jar、Flink OpenSource SQL、Flink Jar作业场景:
    • 方式1:使用委托授权:使用Spark 3.3.1及以上版本、Flink 1.15版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在配置作业时添加新建的委托信息。

      委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

    • 方式2:使用DEW授权:
      • 已为授予IAM用户所需的IAM和Lakeformation权限,具体请参考•SQL作业场景的IAM授权的操作步骤。
      • 已在DEW服务创建通用凭证,并存入凭据值。具体操作请参考创建通用凭据
      • 已创建DLI访问DEW的委托并完成委托授权。该委托需具备以下权限:
        • DEW中的查询凭据的版本与凭据值ShowSecretVersion接口权限,csms:secretVersion:get。
        • DEW中的查询凭据的版本列表ListSecretVersions接口权限,csms:secretVersion:list。
        • DEW解密凭据的权限,kms:dek:decrypt。

        委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

步骤5:在DLI作业开发时使用LakeFormation元数据

DLI对接LakeFormation默认实例且完成LakeFormation的资源授权后,即可以在作业开发时使用LakeFormation元数据。

  • DLI SQL

    LakeFormation SQL语法说明请参考DLI Spark SQL语法参考

    在执行SQL作业时,您可以在控制台选择执行SQL所在的catalog,如图2所示,或在SQL命令中指定catalogName。catalogName是DLI控制台的数据目录映射名。

    图2 在SQL编辑器页面选择数据目录
    说明:
    • 对接LakeFormation实例场景,在创建数据库时需要指定数据库存储的OBS路径。
    • 对接LakeFormation实例场景,在创建表时不支持设置表生命周期和多版本。
    • 对接LakeFormation实例场景,LOAD DATA语句不支持datasource表,且LOAD DATA分区表必须指定分区。
    • 在LakeFormation控制台创建的数据库和表中包含中文字符时,不支持在DLI执行相关数据库和表的操作。
    • 对接LakeFormation实例场景,不支持指定筛选条件删除分区。
    • 对接LakeFormation实例场景,不支持创建Truncate Datasource/Hive外表。
    • DLI暂不支持使用LakeFormation行过滤条件功能。
    • DLI读取binary类型的数据进行console展示时,会对binary数据进行Base64转换。
    • 在DLI暂不支持LakeFormation的路径授权。
  • DLI Spark Jar:

    本节介绍在DLI管理控制台提交Spark Jar作业时使用LakeFormation元数据的配置操作。

    • Spark Jar 示例
      SparkSession spark = SparkSession.builder()
          .enableHiveSupport()
          .appName("java_spark_demo")
          .getOrCreate();
      
      spark.sql("show databases").show();
    • DLI管理控制台Spark Jar作业配置说明
      • (推荐)方式一:使用控制台提供的参数项(委托、元数据来源等)配置Spark Jar作业访问LakeFormation元数据
        新建或编辑Spark Jar作业时,请参考表3Spark Jar作业访问LakeFormation元数据。
        表3 配置Spark Jar作业访问LakeFormation元数据

        参数

        说明

        配置示例

        Spark版本

        Spark 3.3.x及以上版本支持对接LakeFormation。

        3.3.1

        委托

        使用Spark 3.3.1及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

        spark.dli.job.agency.name=agency

        委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

        -

        访问元数据

        配置开启Spark作业访问元数据功能。

        元数据来源

        配置Spark作业访问的元数据类型。本场景下请选择Lakeformation。

        选择该参数后系统将自动为您的作业添加以下配置项用于加载lakeformation相关依赖。

        spark.sql.catalogImplementation=hive
        spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true
        spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient
        og
        // lakeformation相关依赖加载
        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*

        “元数据来源”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。

        优先推荐您使用控制台提供的“元数据来源”参数项进行配置。

        Lakeformation

        数据目录名称

        配置Spark作业访问的数据目录名称。

        此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。如需指定LakeFormation其他实例请参考◦方式二:使用Spark(--conf)参数配置...在Spark(--conf)中配置连接的Lakeformation实例和数据目录。

        选择该参数后系统将自动为您的作业添加以下配置项用于连接Lakeformation默认实例下的数据目录。

        spark.hadoop.lakecat.catalogname.default=lfcatalog

        “数据目录名称”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。

        优先推荐您使用控制台提供的“数据目录名称”参数项进行配置。

        -

        Spark参数(--conf)

        “元数据来源”和“数据目录名称”均支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。

        • 如果您需要配置访问Hudi数据表,可在Spark(--conf)参数中填加以下配置项。
          spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
          spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider
        • 如果您需要配置访问Delta数据表,可在Spark(--conf)参数中填加以下配置项。
          spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension

        -

      • 方式二:使用Spark(--conf)参数配置Spark Jar作业访问LakeFormation元数据
        新建或编辑Spark Jar作业时,请在作业配置页面的Spark(--conf)参数中按需配置以下信息以访问LakeFormation元数据。
        spark.sql.catalogImplementation=hive
        spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true
        spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient
        spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension //支持hudi,可选
        spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider //支持hudi,可选
        // 使用有OBS和lakeformation权限的委托访问,建议用户设置最小权限集
        spark.dli.job.agency.name=agencyForLakeformation
        //需要访问的lakeformation实例ID,在lakeformation console查看。可选,如不填写访问Lakeformation的默认实例
        spark.hadoop.lakeformation.instance.id=xxx
        //需要访问的lakeformation侧的CATALOG名称,在lakeformation console查看。可选,如不填写则默认值为hive
        spark.hadoop.lakecat.catalogname.default=lfcatalog
        // lakeformation相关依赖加载
        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
  • DLI Flink OpenSource SQL
    • 示例1:委托的方式对接Lakeformation

      创建Flink OpenSource SQL作业并配置如下参数:

      参数

      说明

      配置示例

      Flink版本

      Flink 1.15及以上版本支持对接LakeFormation。

      1.15

      委托

      使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

      flink.dli.job.agency.name=agency

      委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

      -

      开启checkpoint

      勾选开启checkpoint。

      开启

      自定义参数

      • 配置Flink作业访问的元数据类型。

        本场景下请选择Lakeformation。

        flink.dli.job.catalog.type=lakeformation

      • 配置Flink作业访问的数据目录名称。

        flink.dli.job.catalog.name=[lakeformation中的catalog名称]

        此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。

      -

      示例中关于Catalog的参数说明请参考表4

      表4 Flink OpenSource SQL示例中关于Catalog的参数说明

      参数

      说明

      是否必填

      参数值

      type

      catalog类型

      固定值hive

      hive-conf-dir

      hive-conf路径,固定值/opt/flink/conf

      固定值/opt/flink/conf

      default-database

      默认数据库名称

      默认default库

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      CREATE CATALOG hive
      WITH
        (
          'type' = 'hive',
          'hive-conf-dir' = '/opt/flink/conf',  -- 固定配置/opt/flink/conf
          'default-database'='default'
        );
      
      USE CATALOG hive;
      
      CREATE TABLE IF NOT EXISTS
        dataGenSource612 (user_id string, amount int)
      WITH
        (
          'connector' = 'datagen',
          'rows-per-second' = '1',
          'fields.user_id.kind' = 'random',
          'fields.user_id.length' = '3'
        );
      
      CREATE table IF NOT EXISTS
        printSink612 (user_id string, amount int)
      WITH
        ('connector' = 'print');
      
      INSERT INTO
        printSink612
      SELECT
        *
      FROM
        dataGenSource612;
      
    • 示例2:DEW的方式对接Lakeformation

      创建Flink OpenSource SQL作业并配置如下参数:

      参数

      说明

      配置示例

      Flink版本

      Flink 1.15及以上版本支持对接LakeFormation。

      1.15

      委托

      使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

      flink.dli.job.agency.name=agency

      委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

      -

      开启checkpoint

      勾选开启checkpoint。

      开启

      自定义参数

      • 配置Flink作业访问的元数据类型。

        本场景下请选择Lakeformation。

        flink.dli.job.catalog.type=lakeformation

      • 配置Flink作业访问的数据目录名称。

        flink.dli.job.catalog.name=[lakeformation中的catalog名称]

        此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。

      -

      示例中关于Catalog的参数说明请参考表5

      需要指定properties.catalog.lakeformation.auth.identity.util.class参数值为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator,并且配置dew相关配置。

      表5 Flink OpenSource SQL示例中关于Catalog的参数说明(DEW方式)

      参数

      说明

      是否必填

      参数值

      type

      catalog类型

      固定值hive

      hive-conf-dir

      hive-conf路径,固定值/opt/flink/conf

      固定值/opt/flink/conf

      default-database

      默认数据库名称

      不填默认default库

      properties.catalog.lakecat.auth.identity.util.class

      认证信息获取类

      dew方式必填,固定配置为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator

      properties.catalog.dew.projectId

      DEW所在的项目ID, 默认是Flink作业所在的项目ID。

      使用dew方式必填

      properties.catalog.dew.endpoint

      指定要使用的DEW服务所在的endpoint信息。

      使用dew方式必填。

      配置示例:kms.xxx.com

      properties.catalog.dew.csms.secretName

      在DEW服务的凭据管理中新建的通用凭据的名称。

      使用dew方式必填

      properties.catalog.dew.csms.version

      在DEW服务的凭据管理中新建的通用凭据的版本号。

      使用dew方式必填

      properties.catalog.dew.access.key

      在DEW服务的凭据中配置access.key值对应的key

      使用dew方式必填

      properties.catalog.dew.secret.key

      在DEW服务的凭据中配置secret.key值对应的key

      使用dew方式必填

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      CREATE CATALOG myhive
      WITH
        (
          'type' = 'hive',
          'hive-conf-dir' = '/opt/flink/conf',
          'default-database'='default',
          --下边是dew相关配置,请根据实际情况修改参数值
          'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',
          'properties.catalog.dew.endpoint'='kms.xxx.com',
          'properties.catalog.dew.csms.secretName'='obsAksK',
          'properties.catalog.dew.access.key' = 'myak',
          'properties.catalog.dew.secret.key' = 'mysk',
          'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxx',
          'properties.catalog.dew.csms.version'='v9'
      );
      
      USE CATALOG myhive;
      
      create table IF NOT EXISTS dataGenSource_dew612(
        user_id string,
        amount int
      ) with (
        'connector' = 'datagen',
        'rows-per-second' = '1',
        'fields.user_id.kind' = 'random',
        'fields.user_id.length' = '3'
      );
      
      create table IF NOT EXISTS printSink_dew612(
        user_id string,
        amount int
      ) with (
        'connector' = 'print'
      );
      
      insert into printSink_dew612 select * from dataGenSource_dew612;
      
    • 示例3:委托的方式对接Lakeformation写hudi表

      创建Flink OpenSource SQL作业并配置如下参数:

      参数

      说明

      配置示例

      Flink版本

      Flink 1.15及以上版本支持对接LakeFormation。

      1.15

      委托

      使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

      flink.dli.job.agency.name=agency

      委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

      -

      开启checkpoint

      勾选开启checkpoint。

      开启

      自定义参数

      • 配置Flink作业访问的元数据类型。

        本场景下请选择Lakeformation。

        flink.dli.job.catalog.type=lakeformation

      • 配置Flink作业访问的数据目录名称。

        flink.dli.job.catalog.name=[lakeformation中的catalog名称]

        此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。

      -

      示例中关于Catalog的参数说明请参考表6

      表6 hudi类型Catalog参数说明

      参数

      说明

      是否必填

      参数值

      type

      catalog类型

      hudi表配置为hudi。

      hive-conf-dir

      hive-conf路径,固定值/opt/flink/conf

      固定值/opt/flink/conf。

      default-database

      默认数据库名称

      默认default库。

      mode

      取值'hms' 或 'non-hms'。

      • 'hms' 表示创建的 Hudi Catalog 会使用 Hive Metastore 存储元数据信息。
      • 'non-hms'表示不使用Hive Metastore存储元数据信息。

      固定值hms。

      表7 hudi类型sink表的connector参数

      参数

      说明

      是否必填

      参数值

      connector

      flink connector类型。

      配置为hudi表示sink表是hudi表。

      hudi

      path

      表的基本路径。如果该路径不存在,则会创建它。

      请参考示例代码中的配置值。

      hoodie.datasource.write.recordkey.field

      hoodie表的唯一键字段名

      这里配置order_id为唯一键。

      EXTERNAL

      是否外表

      hudi表必填,且设置为true

      true

      CREATE CATALOG hive_catalog
        WITH (
          'type'='hive',
          'hive-conf-dir' = '/opt/flink/conf',
          'default-database'='test'
        );
      USE CATALOG hive_catalog;
      create table  if not exists genSource618 (
        order_id STRING, 
        order_name STRING, 
        price INT, 
        weight INT
      ) with (
        'connector' = 'datagen',
        'rows-per-second' = '1', 
        'fields.order_id.kind' = 'random',
        'fields.order_id.length' = '8',
        'fields.order_name.kind' = 'random',
        'fields.order_name.length' = '5'
      );
      
      CREATE CATALOG hoodie_catalog
        WITH (
          'type'='hudi',
          'hive.conf.dir' = '/opt/flink/conf',
          'mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence
        );
      CREATE TABLE  if not exists  hoodie_catalog.`test`.`hudiSink618` (
        `order_id` STRING PRIMARY KEY NOT ENFORCED,
        `order_name` STRING, 
        `price` INT, 
        `weight` INT,
        `create_time` BIGINT,
        `create_date` String
      ) PARTITIONED BY (create_date) WITH (
        'connector' = 'hudi',
        'path' = 'obs://xxx/catalog/dbtest3/hudiSink618',
        'hoodie.datasource.write.recordkey.field' = 'order_id',
        'write.precombine.field' = 'create_time',
        'EXTERNAL' = 'true' -- must be set
      );
      
      insert into hoodie_catalog.`test`.`hudiSink618`
      select 
        order_id, 
        order_name, 
        price, 
        weight,
        UNIX_TIMESTAMP() as create_time,
        FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date
      from genSource618;
  • DLI Flink Jar
    • 示例1:委托方式对接Lakeformation
      1. 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录

        示例代码如下:

        本例通过DataGen表产生随机数据并输出到Print结果表中。

        其他connector类型可参考Flink 1.15支持的connector列表

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        package com.huawei.test;
        
        import org.apache.flink.api.java.utils.ParameterTool;
        import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
        import org.apache.flink.runtime.state.filesystem.FsStateBackend;
        import org.apache.flink.streaming.api.CheckpointingMode;
        import org.apache.flink.streaming.api.environment.CheckpointConfig;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.EnvironmentSettings;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        
        import java.text.SimpleDateFormat;
        
        @SuppressWarnings({"deprecation", "rawtypes", "unchecked"})
        public class GenToPrintTaskAgency {
            private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class);
            private static final String datePattern = "yyyy-MM-dd_HH-mm-ss";
        
            public static void main(String[] args) {
                LOGGER.info("Start task.");
                ParameterTool paraTool = ParameterTool.fromArgs(args);
                String checkpointInterval = "180000000";
        
                // set up execution environment
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings settings = EnvironmentSettings.newInstance()
                        .inStreamingMode().build();
                StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
                env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval));
                env.getCheckpointConfig().enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
                SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern);
                String time = dateTimeFormat.format(System.currentTimeMillis());
                RocksDBStateBackend rocksDbBackend =
                        new RocksDBStateBackend(
                                new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true);
                env.setStateBackend(rocksDbBackend);
        
                String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" +
                        "    'type' = 'hive',\n" +
                        "    'hive-conf-dir' = '/opt/hadoop/conf'\n" +
                        "  );";
                tEnv.executeSql(createCatalog);
        
                String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJar618_1` (\n" +
                        "  user_id string,\n" +
                        "  amount int\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1',\n" +
                        "  'fields.user_id.kind' = 'random',\n" +
                        "  'fields.user_id.length' = '3'\n" +
                        ")";
        /*testdb是用户自定义的数数据库*/
        
                tEnv.executeSql(dataSource);
        
                String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJar618_1` (\n" +
                        "  user_id string,\n" +
                        "  amount int\n" +
                        ") WITH ('connector' = 'print')";
                tEnv.executeSql(printSink);
        /*testdb是用户自定义的数数据库*/
        
                String query = "insert into lf_catalog.`test`.`printSinkJar618_1` " +
                        "select * from lf_catalog.`test`.`dataGenSourceJar618_1`";
                tEnv.executeSql(query);
            }
        }
        
      2. 创建Flink jar作业并配置如下参数。

        参数

        说明

        配置示例

        Flink版本

        Flink 1.15及以上版本支持对接LakeFormation。

        1.15

        委托

        使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

        flink.dli.job.agency.name=agency

        委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

        -

        优化参数

        • 配置Flink作业访问的元数据类型。

          本场景下请选择Lakeformation。

          flink.dli.job.catalog.type=lakeformation

        • 配置Flink作业访问的数据目录名称。

          flink.dli.job.catalog.name=[lakeformation中的catalog名称]

          此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。

        -

    • 示例2:DEW方式对接Lakeformation
      1. 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录

        示例代码如下:

        本例通过DataGen表产生随机数据并输出到Print结果表中。

        其他connector类型可参考Flink 1.15支持的connector列表

        package com.huawei.test;
        
        import org.apache.flink.api.java.utils.ParameterTool;
        import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
        import org.apache.flink.runtime.state.filesystem.FsStateBackend;
        import org.apache.flink.streaming.api.CheckpointingMode;
        import org.apache.flink.streaming.api.environment.CheckpointConfig;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.EnvironmentSettings;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        
        import java.text.SimpleDateFormat;
        
        @SuppressWarnings({"deprecation", "rawtypes", "unchecked"})
        public class GenToPrintTaskDew {
            private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class);
            private static final String datePattern = "yyyy-MM-dd_HH-mm-ss";
        
            public static void main(String[] args) {
                LOGGER.info("Start task.");
                ParameterTool paraTool = ParameterTool.fromArgs(args);
                String checkpointInterval = "180000000";
        
                // set up execution environment
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings settings = EnvironmentSettings.newInstance()
                        .inStreamingMode().build();
                StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
                env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval));
                env.getCheckpointConfig().enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
                SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern);
                String time = dateTimeFormat.format(System.currentTimeMillis());
                RocksDBStateBackend rocksDbBackend =
                        new RocksDBStateBackend(
                                new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true);
                env.setStateBackend(rocksDbBackend);
        
                String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" +
                        "    'type' = 'hive',\n" +
                        "    'hive-conf-dir' = '/opt/hadoop/conf',\n" +
                        "    'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',\n" +
                        "    'properties.catalog.dew.endpoint'='kms.xxx.xxx.com',\n" +
                        "    'properties.catalog.dew.csms.secretName'='obsAksK',\n" +
                        "    'properties.catalog.dew.access.key' = 'ak',\n" +
                        "    'properties.catalog.dew.secret.key' = 'sk',\n" +
                        "    'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxxx',\n" +
                        "    'properties.catalog.dew.csms.version'='v9'\n" +
                        "  );";
                tEnv.executeSql(createCatalog);
        
                String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJarDew618_1` (\n" +
                        "  user_id string,\n" +
                        "  amount int\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1',\n" +
                        "  'fields.user_id.kind' = 'random',\n" +
                        "  'fields.user_id.length' = '3'\n" +
                        ")";
                tEnv.executeSql(dataSource);
        /*testdb是用户自定义的数数据库*/
        
                String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJarDew618_1` (\n" +
                        "  user_id string,\n" +
                        "  amount int\n" +
                        ") WITH ('connector' = 'print')";
                tEnv.executeSql(printSink);
        /*testdb是用户自定义的数数据库*/
        
                String query = "insert into lf_catalog.`test`.`printSinkJarDew618_1` " +
                        "select * from lf_catalog.`test`.`dataGenSourceJarDew618_1`";
                tEnv.executeSql(query);
            }
        }
      2. 创建Flink jar作业并配置如下参数。

        参数

        说明

        配置示例

        Flink版本

        Flink 1.15及以上版本支持对接LakeFormation。

        1.15

        委托

        使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

        flink.dli.job.agency.name=agency

        委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

        -

        优化参数

        • 配置Flink作业访问的元数据类型。

          本场景下请选择Lakeformation。

          flink.dli.job.catalog.type=lakeformation

        • 配置Flink作业访问的数据目录名称。

          flink.dli.job.catalog.name=[lakeformation中的catalog名称]

          此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。

        -

    • 示例3:Flink jar支持Hudi表
      1. 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录

        示例代码如下:

        本例通过DataGen表产生随机数据并输出到Hudi结果表中。

        其他connector类型可参考Flink 1.15支持的connector列表

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        88
        89
        90
        91
        package com.huawei.test;
        
        import org.apache.flink.api.java.utils.ParameterTool;
        import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
        import org.apache.flink.runtime.state.filesystem.FsStateBackend;
        import org.apache.flink.streaming.api.CheckpointingMode;
        import org.apache.flink.streaming.api.environment.CheckpointConfig;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.EnvironmentSettings;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        
        import java.io.IOException;
        import java.text.SimpleDateFormat;
        
        public class GenToHudiTask4 {
            private static final Logger LOGGER = LoggerFactory.getLogger(GenToHudiTask4.class);
            private static final String datePattern = "yyyy-MM-dd_HH-mm-ss";
        
            public static void main(String[] args) throws IOException {
                LOGGER.info("Start task.");
                ParameterTool paraTool = ParameterTool.fromArgs(args);
                String checkpointInterval = "30000";
        
                // set up execution environment
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings settings = EnvironmentSettings.newInstance()
                        .inStreamingMode().build();
                StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
                env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval));
                env.getCheckpointConfig().enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
                SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern);
                String time = dateTimeFormat.format(System.currentTimeMillis());
                RocksDBStateBackend rocksDbBackend =
                        new RocksDBStateBackend(
                                new FsStateBackend("obs://xxx/jobs/testcheckpoint/" + time), true);
                env.setStateBackend(rocksDbBackend);
        
                String catalog = "CREATE CATALOG hoodie_catalog\n" +
                        "  WITH (\n" +
                        "    'type'='hudi',\n" +
                        "    'hive.conf.dir' = '/opt/hadoop/conf',\n" +
                        "    'mode'='hms'\n" +
                        "  )";
                tEnv.executeSql(catalog);
                String dwsSource = "CREATE TABLE if not exists genSourceJarForHudi618_1 (\n" +
                        "  order_id STRING,\n" +
                        "  order_name STRING,\n" +
                        "  price INT,\n" +
                        "  weight INT\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1',\n" +
                        "  'fields.order_id.kind' = 'random',\n" +
                        "  'fields.order_id.length' = '8',\n" +
                        "  'fields.order_name.kind' = 'random',\n" +
                        "  'fields.order_name.length' = '8'\n" +
                        ")";
                tEnv.executeSql(dwsSource);
        /*testdb是用户自定义的数数据库*/
                String printSinkdws =
                        "CREATE TABLE if not exists hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` (\n" +
                        "  order_id STRING PRIMARY KEY NOT ENFORCED,\n" +
                        "  order_name STRING,\n" +
                        "  price INT,\n" +
                        "  weight INT,\n" +
                        "  create_time BIGINT,\n" +
                        "  create_date String\n" +
                        ") WITH (" +
                        "'connector' = 'hudi',\n" +
                        "'path' = 'obs://xxx/catalog/dbtest3/hudiSinkJarHudi618_1',\n" +
                        "'hoodie.datasource.write.recordkey.field' = 'order_id',\n" +
                        "'EXTERNAL' = 'true'\n" +
                        ")";
                tEnv.executeSql(printSinkdws);
        /*testdb是用户自定义的数数据库*/
                String query = "insert into hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` select\n" +
                " order_id,\n" +
                " order_name,\n" +
                " price,\n" +
                " weight,\n" +
                " UNIX_TIMESTAMP() as create_time,\n" +
                " FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date\n" +
                " from genSourceJarForHudi618_1";
                tEnv.executeSql(query);
            }
        }
        

        表8 hudi类型sink表的connector参数

        参数

        说明

        是否必填

        参数值

        connector

        flink connector类型。

        配置为hudi表示sink表是hudi表。

        hudi

        path

        表的基本路径。如果该路径不存在,则会创建它。

        请参考示例代码中的配置值。

        hoodie.datasource.write.recordkey.field

        hoodie表的唯一键字段名

        这里配置order_id为唯一键。

        EXTERNAL

        是否外表

        hudi表必填,且设置为true

        true

      2. 创建Flink jar作业并配置如下参数。

        参数

        说明

        配置示例

        Flink版本

        Flink 1.15及以上版本支持对接LakeFormation。

        1.15

        委托

        使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

        flink.dli.job.agency.name=agency

        委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

        -

        优化参数

        • 配置Flink作业访问的元数据类型。

          本场景下请选择Lakeformation。

          flink.dli.job.catalog.type=lakeformation

        • 配置Flink作业访问的数据目录名称。

          flink.dli.job.catalog.name=[lakeformation中的catalog名称]

          此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。

        -

提示

您即将访问非华为云网站,请注意账号财产安全

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容