更新时间:2024-12-28 GMT+08:00

DLI对接LakeFormation

操作场景

LakeFormation是企业级一站式湖仓构建服务,提供元数据统一管理能力,支持无缝对接多种计算引擎及大数据云服务,使客户便捷高效地构建数据湖和运营相关业务,加速释放业务数据价值。

在Spark作业和SQL作业场景,支持对接LakeFormation实现元数据的统一管理,本节操作介绍配置DLI与LakeFormation的数据连接的操作步骤。

LakeFormation Spark语法请参考Spark语法参考

LakeFormation Flink语法请参考Flink语法参考

使用须知

该功能为白名单功能,如需使用,请在管理控制台右上角,选择“工单 > 新建工单”,提交申请。

DLI对接LakeFormation功能的使用依赖于“湖仓构建”服务的上线状态,如需了解“湖仓构建”服务的上线范围请参考全球产品和服务

操作流程

图1 操作流程

约束限制

  • 表1中提供了支持对接LakeFormation获取元数据的队列和引擎类型。
    查看队列的引擎类型和版本请参考查看队列的基本信息
    表1 LakeFormation获取元数据的队列和引擎类型

    队列类型

    引擎类型和支持的版本

    default队列

    • Spark 3.3.x:支持对接LakeFormation获取元数据的队列和引擎。
    • HetuEngine 2.1.0:支持对接LakeFormation获取元数据的队列和引擎。

    SQL队列

    • Spark 3.3.x:支持对接LakeFormation获取元数据的队列和引擎。
    • HetuEngine 2.1.0:支持对接LakeFormation获取元数据的队列和引擎。

    通用队列

    Flink作业场景:Flink 1.15及以上版本且使用弹性资源池队列时支持对接LakeFormation获取元数据。

  • DLI仅支持对接LakeFormation默认实例,请在LakeFormation设置实例为默认实例。
  • DLI支持读取Lakeformation的中Avro、Json、Parquet、Csv、Orc、Text、Hudi格式的数据。
  • LakeFormation数据目录中的库、表权限统一由LakeFormation管理。
  • DLI支持对接LakeFormation后,DLI原始库表下移至dli的数据目录下。

步骤1:创建LakeFormation实例用于元数据存储

LakeFormation实例为元数据的管理提供基础资源,DLI仅支持对接LakeFormation的默认实例。
  1. 创建实例
    1. 登录LakeFormation管理控制台。
    2. 单击页面右上角“立即购买”或“购买实例”,进入实例购买页面。

      首次创建实例时界面显示“立即购买”,如果界面已有LakeFormation实例则显示为“购买实例”。

    3. 按需配置LakeFormation实例参数,完成实例创建。

      本例创建按需计费的共享型实例。

      更多参数配置及说明,请参考创建LakeFormation实例

  2. 设置实例为默认实例
    1. 查看实例“基本信息”中“是否为默认实例”的参数值。
      • “true”表示当前实例为默认实例。
      • “false”表示当前实例不为默认实例。
    2. 如果需要设置当前实例为默认实例,请单击页面右上角“设为默认实例”。
    3. 勾选操作影响后单击“确定”,将当前实例设置为默认实例。

      当前DLI仅对接LakeFormation默认实例,变更默认实例后,可能对使用LakeFormation的周边服务产生影响,请谨慎操作。

步骤2:在LakeFormation管理控制台创建Catalog

数据目录(Catalog)是元数据管理对象,它可以包含多个数据库。您可以在LakeFormation中创建并管理多个Catalog,用于不同外部集群的元数据隔离。

  1. 登录LakeFormation管理控制台。
  2. 选择“元数据 > Catalog”。
  3. 单击“创建Catalog”。

    按需配置Catalog实例参数。

    更多参数配置及说明,请参考创建Catalog

  4. 创建完成后,即可在“Catalog”页面查看Catalog相关信息。

步骤3:在DLI管理控制台创建数据目录

在DLI管理控制台需要创建到Catalog的连接,才可以访问LakeFormation实例中存储的Catalog。

  • DLI仅支持对接LakeFormation默认实例,请在LakeFormation设置实例为默认实例。
  • LakeFormation中每一个数据目录只能创建一个映射,不能创建多个。

    例如用户在DLI创建了映射名catalogMapping1对应LakeFormation数据目录:catalogA。创建成功后,在同一个项目空间下,不能再创建到catalogA的映射。

  1. 登录DLI管理控制台。
  2. 选择“SQL编辑器 ”。
  3. 在SQL编辑器页面,选择“数据目录”。
  4. 单击创建数据目录。
  5. 配置数据目录相关信息。
    表2 数据目录配置信息

    参数名称

    是否必填

    说明

    外部数据目录名称

    LakeFormation默认实例下的Catalog名称。

    类型

    当前只支持LakeFormation。

    该选项已固定,无需填写。

    数据目录映射名称

    在DLI使用的Catalog映射名,用户在执行SQL语句的时候需要指定Catalog映射,以此来标识访问的外部的元数据。建议与外部数据目录名称保持一致。

    当前仅支持连接LakeFormation默认实例的数据目录。

    描述

    自定义数据目录的描述信息。

  6. 单击“确定”创建数据目录。

步骤4:授权使用LakeFormation资源

  • SQL作业场景

    在进行SQL作业提交之前,需完成LakeFormation元数据、数据库、表、列和函数等资源授权,确保作业在执行过程中能够顺利访问所需的数据和资源。LakeFormation SQL资源权限支持列表提供了LakeFormation权限支持列表。

    使用LakeFormation资源需要分别完成LakeFormation的IAM细粒度授权和LakeFormation SQL资源授权。

    • LakeFormation的IAM细粒度授权:授权使用LakeFormation API。

      IAM服务通常提供了管理用户、组和角色的访问权限的方式。您可以在IAM控制台中创建策略(Policy),定义哪些用户或角色可以调用LakeFormation的API。然后,将这些策略附加到相应的用户或角色上。

      • 方法1:基于角色授权:

        即IAM最初提供的一种根据用户的工作职能定义权限的粗粒度授权机制。该机制以服务为粒度,提供有限的服务相关角色用于授权。

        例如参考LakeFormation权限管理授予用户只读权限,允许查询LakeFormation相关元数据资源的权限。

        或如下示例授予LakeFormation相关元数据资源的所有操作权限。

        示例:

        {
            "Version": "1.1",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": [
                        "lakeformation:table:*",
                        "lakeformation:database:*",
                        "lakeformation:catalog:*",
                        "lakeformation:function:*",         
                        "lakeformation:transaction:*",
                        "lakeformation:policy:describe",
                        "lakeformation:credential:describe"
                    ]
                }
            ]
        }
      • 方法2:基于策略的精细化授权:

        IAM提供的细粒度授权的能力,可以精确到具体服务的操作、资源以及请求条件等。

        LakeFormation权限策略请参考LakeFormation权限和授权项

        IAM授权的具体操作请参考创建用户并授权使用LakeFormation

    • LakeFormation SQL资源授权:授权使用LakeFormation具体资源(元数据、数据库、表、列和函数等)。

      LakeFormation资源授权是指允许用户对特定资源的访问的权限,以此来控制对LakeFormation的数据和元数据的访问。

      LakeFormation资源授权有两种方式:

      • 方式一:在LakeFormation管理控制台对资源授权。

        具体操作请参考LakeFormation用户指南中的新增授权

        了解LakeFormation SQL资源权限请参考数据权限概述

      • 方式二:在DLI管理控制台使用GRANT SQL语句授权

        GRANT语句是SQL语言中用于授权的一种方式。

        您可以使用GRANT语句来授予用户或角色对数据库、表、列、函数等的访问权限。

        LakeFormation SQL资源权限支持列表提供了LakeFormation资源授权的策略。

        Catalog资源暂时不支持在DLI SQL授权,请参考▪方式一:在LakeFormation管理控制台...在LakeFormation 管理控制台完成授权。

  • Spark Jar、Flink OpenSource SQL、Flink Jar作业场景:
    • 方式1:使用委托授权:使用Spark 3.3.1及以上版本、Flink 1.15版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在配置作业时添加新建的委托信息。

      委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

    • 方式2:使用DEW授权:
      • 已为授予IAM用户所需的IAM和Lakeformation权限,具体请参考•SQL作业场景的IAM授权的操作步骤。
      • 已在DEW服务创建通用凭证,并存入凭据值。具体操作请参考创建通用凭据
      • 已创建DLI访问DEW的委托并完成委托授权。该委托需具备以下权限:
        • DEW中的查询凭据的版本与凭据值ShowSecretVersion接口权限,csms:secretVersion:get。
        • DEW中的查询凭据的版本列表ListSecretVersions接口权限,csms:secretVersion:list。
        • DEW解密凭据的权限,kms:dek:decrypt。

        委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

步骤5:在DLI作业开发时使用LakeFormation元数据

DLI对接LakeFormation默认实例且完成LakeFormation的资源授权后,即可以在作业开发时使用LakeFormation元数据。

  • DLI SQL

    LakeFormation SQL语法说明请参考DLI Spark SQL语法参考

    在执行SQL作业时,您可以在控制台选择执行SQL所在的catalog,如图2所示,或在SQL命令中指定catalogName。catalogName是DLI控制台的数据目录映射名。

    图2 在SQL编辑器页面选择数据目录
    • 对接LakeFormation实例场景,在创建数据库时需要指定数据库存储的OBS路径。
    • 对接LakeFormation实例场景,在创建表时不支持设置表生命周期和多版本。
    • 对接LakeFormation实例场景,LOAD DATA语句不支持datasource表,且LOAD DATA分区表必须指定分区。
    • 在LakeFormation控制台创建的数据库和表中包含中文字符时,不支持在DLI执行相关数据库和表的操作。
    • 对接LakeFormation实例场景,不支持指定筛选条件删除分区。
    • 对接LakeFormation实例场景,不支持创建Truncate Datasource/Hive外表。
    • DLI暂不支持使用LakeFormation行过滤条件功能。
    • DLI读取binary类型的数据进行console展示时,会对binary数据进行Base64转换。
    • 在DLI暂不支持LakeFormation的路径授权。
  • DLI Spark Jar:

    本节介绍在DLI管理控制台提交Spark Jar作业时使用LakeFormation元数据的配置操作。

    • Spark Jar 示例
      SparkSession spark = SparkSession.builder()
          .enableHiveSupport()
          .appName("java_spark_demo")
          .getOrCreate();
      
      spark.sql("show databases").show();
    • DLI管理控制台Spark Jar作业配置说明
      • (推荐)方式一:使用控制台提供的参数项(委托、元数据来源等)配置Spark Jar作业访问LakeFormation元数据
        新建或编辑Spark Jar作业时,请参考表3Spark Jar作业访问LakeFormation元数据。
        表3 配置Spark Jar作业访问LakeFormation元数据

        参数

        说明

        配置示例

        Spark版本

        Spark 3.3.x及以上版本支持对接LakeFormation。

        3.3.1

        委托

        使用Spark 3.3.1及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

        spark.dli.job.agency.name=agency

        委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

        -

        访问元数据

        配置开启Spark作业访问元数据功能。

        元数据来源

        配置Spark作业访问的元数据类型。本场景下请选择Lakeformation。

        选择该参数后系统将自动为您的作业添加以下配置项用于加载lakeformation相关依赖。

        spark.sql.catalogImplementation=hive
        spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true
        spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient
        og
        // lakeformation相关依赖加载
        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*

        “元数据来源”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。

        优先推荐您使用控制台提供的“元数据来源”参数项进行配置。

        Lakeformation

        数据目录名称

        配置Spark作业访问的数据目录名称。

        此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。如需指定LakeFormation其他实例请参考◦方式二:使用Spark(--conf)参数配置...在Spark(--conf)中配置连接的Lakeformation实例和数据目录。

        选择该参数后系统将自动为您的作业添加以下配置项用于连接Lakeformation默认实例下的数据目录。

        spark.hadoop.lakecat.catalogname.default=lfcatalog

        “数据目录名称”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。

        优先推荐您使用控制台提供的“数据目录名称”参数项进行配置。

        -

        Spark参数(--conf)

        “元数据来源”和“数据目录名称”均支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。

        • 如果您需要配置访问Hudi数据表,可在Spark(--conf)参数中填加以下配置项。
          spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
          spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider
        • 如果您需要配置访问Delta数据表,可在Spark(--conf)参数中填加以下配置项。
          spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension

        -

      • 方式二:使用Spark(--conf)参数配置Spark Jar作业访问LakeFormation元数据
        新建或编辑Spark Jar作业时,请在作业配置页面的Spark(--conf)参数中按需配置以下信息以访问LakeFormation元数据。
        spark.sql.catalogImplementation=hive
        spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true
        spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient
        spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension //支持hudi,可选
        spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider //支持hudi,可选
        // 使用有OBS和lakeformation权限的委托访问,建议用户设置最小权限集
        spark.dli.job.agency.name=agencyForLakeformation
        //需要访问的lakeformation实例ID,在lakeformation console查看。可选,如不填写访问Lakeformation的默认实例
        spark.hadoop.lakeformation.instance.id=xxx
        //需要访问的lakeformation侧的CATALOG名称,在lakeformation console查看。可选,如不填写则默认值为hive
        spark.hadoop.lakecat.catalogname.default=lfcatalog
        // lakeformation相关依赖加载
        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
  • DLI Flink OpenSource SQL
    • 示例1:委托的方式对接Lakeformation

      创建Flink OpenSource SQL作业并配置如下参数:

      参数

      说明

      配置示例

      Flink版本

      Flink 1.15及以上版本支持对接LakeFormation。

      1.15

      委托

      使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

      flink.dli.job.agency.name=agency

      委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

      -

      开启checkpoint

      勾选开启checkpoint。

      开启

      自定义参数

      • 配置Flink作业访问的元数据类型。

        本场景下请选择Lakeformation。

        flink.dli.job.catalog.type=lakeformation

      • 配置Flink作业访问的数据目录名称。

        flink.dli.job.catalog.name=[lakeformation中的catalog名称]

        此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。

      -

      示例中关于Catalog的参数说明请参考表4

      表4 Flink OpenSource SQL示例中关于Catalog的参数说明

      参数

      说明

      是否必填

      参数值

      type

      catalog类型

      固定值hive

      hive-conf-dir

      hive-conf路径,固定值/opt/flink/conf

      固定值/opt/flink/conf

      default-database

      默认数据库名称

      默认default库

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      CREATE CATALOG hive
      WITH
        (
          'type' = 'hive',
          'hive-conf-dir' = '/opt/flink/conf',  -- 固定配置/opt/flink/conf
          'default-database'='default'
        );
      
      USE CATALOG hive;
      
      CREATE TABLE IF NOT EXISTS
        dataGenSource612 (user_id string, amount int)
      WITH
        (
          'connector' = 'datagen',
          'rows-per-second' = '1',
          'fields.user_id.kind' = 'random',
          'fields.user_id.length' = '3'
        );
      
      CREATE table IF NOT EXISTS
        printSink612 (user_id string, amount int)
      WITH
        ('connector' = 'print');
      
      INSERT INTO
        printSink612
      SELECT
        *
      FROM
        dataGenSource612;
      
    • 示例2:DEW的方式对接Lakeformation

      创建Flink OpenSource SQL作业并配置如下参数:

      参数

      说明

      配置示例

      Flink版本

      Flink 1.15及以上版本支持对接LakeFormation。

      1.15

      委托

      使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

      flink.dli.job.agency.name=agency

      委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

      -

      开启checkpoint

      勾选开启checkpoint。

      开启

      自定义参数

      • 配置Flink作业访问的元数据类型。

        本场景下请选择Lakeformation。

        flink.dli.job.catalog.type=lakeformation

      • 配置Flink作业访问的数据目录名称。

        flink.dli.job.catalog.name=[lakeformation中的catalog名称]

        此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。

      -

      示例中关于Catalog的参数说明请参考表5

      需要指定properties.catalog.lakeformation.auth.identity.util.class参数值为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator,并且配置dew相关配置。

      表5 Flink OpenSource SQL示例中关于Catalog的参数说明(DEW方式)

      参数

      说明

      是否必填

      参数值

      type

      catalog类型

      固定值hive

      hive-conf-dir

      hive-conf路径,固定值/opt/flink/conf

      固定值/opt/flink/conf

      default-database

      默认数据库名称

      不填默认default库

      properties.catalog.lakecat.auth.identity.util.class

      认证信息获取类

      dew方式必填,固定配置为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator

      properties.catalog.dew.projectId

      DEW所在的项目ID, 默认是Flink作业所在的项目ID。

      使用dew方式必填

      properties.catalog.dew.endpoint

      指定要使用的DEW服务所在的endpoint信息。

      使用dew方式必填。

      配置示例:kms.xxx.com

      properties.catalog.dew.csms.secretName

      在DEW服务的凭据管理中新建的通用凭据的名称。

      使用dew方式必填

      properties.catalog.dew.csms.version

      在DEW服务的凭据管理中新建的通用凭据的版本号。

      使用dew方式必填

      properties.catalog.dew.access.key

      在DEW服务的凭据中配置access.key值对应的key

      使用dew方式必填

      properties.catalog.dew.secret.key

      在DEW服务的凭据中配置secret.key值对应的key

      使用dew方式必填

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      CREATE CATALOG myhive
      WITH
        (
          'type' = 'hive',
          'hive-conf-dir' = '/opt/flink/conf',
          'default-database'='default',
          --下边是dew相关配置,请根据实际情况修改参数值
          'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',
          'properties.catalog.dew.endpoint'='kms.xxx.com',
          'properties.catalog.dew.csms.secretName'='obsAksK',
          'properties.catalog.dew.access.key' = 'myak',
          'properties.catalog.dew.secret.key' = 'mysk',
          'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxx',
          'properties.catalog.dew.csms.version'='v9'
      );
      
      USE CATALOG myhive;
      
      create table IF NOT EXISTS dataGenSource_dew612(
        user_id string,
        amount int
      ) with (
        'connector' = 'datagen',
        'rows-per-second' = '1',
        'fields.user_id.kind' = 'random',
        'fields.user_id.length' = '3'
      );
      
      create table IF NOT EXISTS printSink_dew612(
        user_id string,
        amount int
      ) with (
        'connector' = 'print'
      );
      
      insert into printSink_dew612 select * from dataGenSource_dew612;
      
    • 示例3:委托的方式对接Lakeformation写hudi表

      创建Flink OpenSource SQL作业并配置如下参数:

      参数

      说明

      配置示例

      Flink版本

      Flink 1.15及以上版本支持对接LakeFormation。

      1.15

      委托

      使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

      flink.dli.job.agency.name=agency

      委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

      -

      开启checkpoint

      勾选开启checkpoint。

      开启

      自定义参数

      • 配置Flink作业访问的元数据类型。

        本场景下请选择Lakeformation。

        flink.dli.job.catalog.type=lakeformation

      • 配置Flink作业访问的数据目录名称。

        flink.dli.job.catalog.name=[lakeformation中的catalog名称]

        此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。

      -

      示例中关于Catalog的参数说明请参考表6

      表6 hudi类型Catalog参数说明

      参数

      说明

      是否必填

      参数值

      type

      catalog类型

      hudi表配置为hudi。

      hive-conf-dir

      hive-conf路径,固定值/opt/flink/conf

      固定值/opt/flink/conf。

      default-database

      默认数据库名称

      默认default库。

      mode

      取值'hms' 或 'non-hms'。

      • 'hms' 表示创建的 Hudi Catalog 会使用 Hive Metastore 存储元数据信息。
      • 'non-hms'表示不使用Hive Metastore存储元数据信息。

      固定值hms。

      表7 hudi类型sink表的connector参数

      参数

      说明

      是否必填

      参数值

      connector

      flink connector类型。

      配置为hudi表示sink表是hudi表。

      hudi

      path

      表的基本路径。如果该路径不存在,则会创建它。

      请参考示例代码中的配置值。

      hoodie.datasource.write.recordkey.field

      hoodie表的唯一键字段名

      这里配置order_id为唯一键。

      EXTERNAL

      是否外表

      hudi表必填,且设置为true

      true

      CREATE CATALOG hive_catalog
        WITH (
          'type'='hive',
          'hive-conf-dir' = '/opt/flink/conf',
          'default-database'='test'
        );
      USE CATALOG hive_catalog;
      create table  if not exists genSource618 (
        order_id STRING, 
        order_name STRING, 
        price INT, 
        weight INT
      ) with (
        'connector' = 'datagen',
        'rows-per-second' = '1', 
        'fields.order_id.kind' = 'random',
        'fields.order_id.length' = '8',
        'fields.order_name.kind' = 'random',
        'fields.order_name.length' = '5'
      );
      
      CREATE CATALOG hoodie_catalog
        WITH (
          'type'='hudi',
          'hive.conf.dir' = '/opt/flink/conf',
          'mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence
        );
      CREATE TABLE  if not exists  hoodie_catalog.`test`.`hudiSink618` (
        `order_id` STRING PRIMARY KEY NOT ENFORCED,
        `order_name` STRING, 
        `price` INT, 
        `weight` INT,
        `create_time` BIGINT,
        `create_date` String
      ) PARTITIONED BY (create_date) WITH (
        'connector' = 'hudi',
        'path' = 'obs://xxx/catalog/dbtest3/hudiSink618',
        'hoodie.datasource.write.recordkey.field' = 'order_id',
        'write.precombine.field' = 'create_time',
        'EXTERNAL' = 'true' -- must be set
      );
      
      insert into hoodie_catalog.`test`.`hudiSink618`
      select 
        order_id, 
        order_name, 
        price, 
        weight,
        UNIX_TIMESTAMP() as create_time,
        FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date
      from genSource618;
  • DLI Flink Jar
    • 示例1:委托方式对接Lakeformation
      1. 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录

        示例代码如下:

        本例通过DataGen表产生随机数据并输出到Print结果表中。

        其他connector类型可参考Flink 1.15支持的connector列表

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        package com.huawei.test;
        
        import org.apache.flink.api.java.utils.ParameterTool;
        import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
        import org.apache.flink.runtime.state.filesystem.FsStateBackend;
        import org.apache.flink.streaming.api.CheckpointingMode;
        import org.apache.flink.streaming.api.environment.CheckpointConfig;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.EnvironmentSettings;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        
        import java.text.SimpleDateFormat;
        
        @SuppressWarnings({"deprecation", "rawtypes", "unchecked"})
        public class GenToPrintTaskAgency {
            private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class);
            private static final String datePattern = "yyyy-MM-dd_HH-mm-ss";
        
            public static void main(String[] args) {
                LOGGER.info("Start task.");
                ParameterTool paraTool = ParameterTool.fromArgs(args);
                String checkpointInterval = "180000000";
        
                // set up execution environment
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings settings = EnvironmentSettings.newInstance()
                        .inStreamingMode().build();
                StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
                env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval));
                env.getCheckpointConfig().enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
                SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern);
                String time = dateTimeFormat.format(System.currentTimeMillis());
                RocksDBStateBackend rocksDbBackend =
                        new RocksDBStateBackend(
                                new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true);
                env.setStateBackend(rocksDbBackend);
        
                String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" +
                        "    'type' = 'hive',\n" +
                        "    'hive-conf-dir' = '/opt/hadoop/conf'\n" +
                        "  );";
                tEnv.executeSql(createCatalog);
        
                String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJar618_1` (\n" +
                        "  user_id string,\n" +
                        "  amount int\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1',\n" +
                        "  'fields.user_id.kind' = 'random',\n" +
                        "  'fields.user_id.length' = '3'\n" +
                        ")";
        /*testdb是用户自定义的数数据库*/
        
                tEnv.executeSql(dataSource);
        
                String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJar618_1` (\n" +
                        "  user_id string,\n" +
                        "  amount int\n" +
                        ") WITH ('connector' = 'print')";
                tEnv.executeSql(printSink);
        /*testdb是用户自定义的数数据库*/
        
                String query = "insert into lf_catalog.`test`.`printSinkJar618_1` " +
                        "select * from lf_catalog.`test`.`dataGenSourceJar618_1`";
                tEnv.executeSql(query);
            }
        }
        
      2. 创建Flink jar作业并配置如下参数。

        参数

        说明

        配置示例

        Flink版本

        Flink 1.15及以上版本支持对接LakeFormation。

        1.15

        委托

        使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

        flink.dli.job.agency.name=agency

        委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

        -

        优化参数

        • 配置Flink作业访问的元数据类型。

          本场景下请选择Lakeformation。

          flink.dli.job.catalog.type=lakeformation

        • 配置Flink作业访问的数据目录名称。

          flink.dli.job.catalog.name=[lakeformation中的catalog名称]

          此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。

        -

    • 示例2:DEW方式对接Lakeformation
      1. 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录

        示例代码如下:

        本例通过DataGen表产生随机数据并输出到Print结果表中。

        其他connector类型可参考Flink 1.15支持的connector列表

        package com.huawei.test;
        
        import org.apache.flink.api.java.utils.ParameterTool;
        import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
        import org.apache.flink.runtime.state.filesystem.FsStateBackend;
        import org.apache.flink.streaming.api.CheckpointingMode;
        import org.apache.flink.streaming.api.environment.CheckpointConfig;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.EnvironmentSettings;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        
        import java.text.SimpleDateFormat;
        
        @SuppressWarnings({"deprecation", "rawtypes", "unchecked"})
        public class GenToPrintTaskDew {
            private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class);
            private static final String datePattern = "yyyy-MM-dd_HH-mm-ss";
        
            public static void main(String[] args) {
                LOGGER.info("Start task.");
                ParameterTool paraTool = ParameterTool.fromArgs(args);
                String checkpointInterval = "180000000";
        
                // set up execution environment
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings settings = EnvironmentSettings.newInstance()
                        .inStreamingMode().build();
                StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
                env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval));
                env.getCheckpointConfig().enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
                SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern);
                String time = dateTimeFormat.format(System.currentTimeMillis());
                RocksDBStateBackend rocksDbBackend =
                        new RocksDBStateBackend(
                                new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true);
                env.setStateBackend(rocksDbBackend);
        
                String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" +
                        "    'type' = 'hive',\n" +
                        "    'hive-conf-dir' = '/opt/hadoop/conf',\n" +
                        "    'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',\n" +
                        "    'properties.catalog.dew.endpoint'='kms.xxx.xxx.com',\n" +
                        "    'properties.catalog.dew.csms.secretName'='obsAksK',\n" +
                        "    'properties.catalog.dew.access.key' = 'ak',\n" +
                        "    'properties.catalog.dew.secret.key' = 'sk',\n" +
                        "    'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxxx',\n" +
                        "    'properties.catalog.dew.csms.version'='v9'\n" +
                        "  );";
                tEnv.executeSql(createCatalog);
        
                String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJarDew618_1` (\n" +
                        "  user_id string,\n" +
                        "  amount int\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1',\n" +
                        "  'fields.user_id.kind' = 'random',\n" +
                        "  'fields.user_id.length' = '3'\n" +
                        ")";
                tEnv.executeSql(dataSource);
        /*testdb是用户自定义的数数据库*/
        
                String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJarDew618_1` (\n" +
                        "  user_id string,\n" +
                        "  amount int\n" +
                        ") WITH ('connector' = 'print')";
                tEnv.executeSql(printSink);
        /*testdb是用户自定义的数数据库*/
        
                String query = "insert into lf_catalog.`test`.`printSinkJarDew618_1` " +
                        "select * from lf_catalog.`test`.`dataGenSourceJarDew618_1`";
                tEnv.executeSql(query);
            }
        }
      2. 创建Flink jar作业并配置如下参数。

        参数

        说明

        配置示例

        Flink版本

        Flink 1.15及以上版本支持对接LakeFormation。

        1.15

        委托

        使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

        flink.dli.job.agency.name=agency

        委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

        -

        优化参数

        • 配置Flink作业访问的元数据类型。

          本场景下请选择Lakeformation。

          flink.dli.job.catalog.type=lakeformation

        • 配置Flink作业访问的数据目录名称。

          flink.dli.job.catalog.name=[lakeformation中的catalog名称]

          此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。

        -

    • 示例3:Flink jar支持Hudi表
      1. 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录

        示例代码如下:

        本例通过DataGen表产生随机数据并输出到Hudi结果表中。

        其他connector类型可参考Flink 1.15支持的connector列表

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        88
        89
        90
        91
        package com.huawei.test;
        
        import org.apache.flink.api.java.utils.ParameterTool;
        import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
        import org.apache.flink.runtime.state.filesystem.FsStateBackend;
        import org.apache.flink.streaming.api.CheckpointingMode;
        import org.apache.flink.streaming.api.environment.CheckpointConfig;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.EnvironmentSettings;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        
        import java.io.IOException;
        import java.text.SimpleDateFormat;
        
        public class GenToHudiTask4 {
            private static final Logger LOGGER = LoggerFactory.getLogger(GenToHudiTask4.class);
            private static final String datePattern = "yyyy-MM-dd_HH-mm-ss";
        
            public static void main(String[] args) throws IOException {
                LOGGER.info("Start task.");
                ParameterTool paraTool = ParameterTool.fromArgs(args);
                String checkpointInterval = "30000";
        
                // set up execution environment
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings settings = EnvironmentSettings.newInstance()
                        .inStreamingMode().build();
                StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
                env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval));
                env.getCheckpointConfig().enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
                SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern);
                String time = dateTimeFormat.format(System.currentTimeMillis());
                RocksDBStateBackend rocksDbBackend =
                        new RocksDBStateBackend(
                                new FsStateBackend("obs://xxx/jobs/testcheckpoint/" + time), true);
                env.setStateBackend(rocksDbBackend);
        
                String catalog = "CREATE CATALOG hoodie_catalog\n" +
                        "  WITH (\n" +
                        "    'type'='hudi',\n" +
                        "    'hive.conf.dir' = '/opt/hadoop/conf',\n" +
                        "    'mode'='hms'\n" +
                        "  )";
                tEnv.executeSql(catalog);
                String dwsSource = "CREATE TABLE if not exists genSourceJarForHudi618_1 (\n" +
                        "  order_id STRING,\n" +
                        "  order_name STRING,\n" +
                        "  price INT,\n" +
                        "  weight INT\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1',\n" +
                        "  'fields.order_id.kind' = 'random',\n" +
                        "  'fields.order_id.length' = '8',\n" +
                        "  'fields.order_name.kind' = 'random',\n" +
                        "  'fields.order_name.length' = '8'\n" +
                        ")";
                tEnv.executeSql(dwsSource);
        /*testdb是用户自定义的数数据库*/
                String printSinkdws =
                        "CREATE TABLE if not exists hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` (\n" +
                        "  order_id STRING PRIMARY KEY NOT ENFORCED,\n" +
                        "  order_name STRING,\n" +
                        "  price INT,\n" +
                        "  weight INT,\n" +
                        "  create_time BIGINT,\n" +
                        "  create_date String\n" +
                        ") WITH (" +
                        "'connector' = 'hudi',\n" +
                        "'path' = 'obs://xxx/catalog/dbtest3/hudiSinkJarHudi618_1',\n" +
                        "'hoodie.datasource.write.recordkey.field' = 'order_id',\n" +
                        "'EXTERNAL' = 'true'\n" +
                        ")";
                tEnv.executeSql(printSinkdws);
        /*testdb是用户自定义的数数据库*/
                String query = "insert into hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` select\n" +
                " order_id,\n" +
                " order_name,\n" +
                " price,\n" +
                " weight,\n" +
                " UNIX_TIMESTAMP() as create_time,\n" +
                " FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date\n" +
                " from genSourceJarForHudi618_1";
                tEnv.executeSql(query);
            }
        }
        

        表8 hudi类型sink表的connector参数

        参数

        说明

        是否必填

        参数值

        connector

        flink connector类型。

        配置为hudi表示sink表是hudi表。

        hudi

        path

        表的基本路径。如果该路径不存在,则会创建它。

        请参考示例代码中的配置值。

        hoodie.datasource.write.recordkey.field

        hoodie表的唯一键字段名

        这里配置order_id为唯一键。

        EXTERNAL

        是否外表

        hudi表必填,且设置为true

        true

      2. 创建Flink jar作业并配置如下参数。

        参数

        说明

        配置示例

        Flink版本

        Flink 1.15及以上版本支持对接LakeFormation。

        1.15

        委托

        使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置:

        flink.dli.job.agency.name=agency

        委托权限示例请参考创建DLI自定义委托权限常见场景的委托权限策略

        -

        优化参数

        • 配置Flink作业访问的元数据类型。

          本场景下请选择Lakeformation。

          flink.dli.job.catalog.type=lakeformation

        • 配置Flink作业访问的数据目录名称。

          flink.dli.job.catalog.name=[lakeformation中的catalog名称]

          此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。

        -