Updated on 2024-12-28 GMT+08:00

Connecting DLI to LakeFormation

Scenario

LakeFormation is an enterprise-level, all-in-one lakehouse construction service that provides unified metadata management capabilities. It supports seamless integration with various compute engines and big data cloud services, enabling you to efficiently build data lakes and operate related businesses, thereby accelerating the extraction of business data value.

In Spark and SQL job scenarios, LakeFormation can be connected to achieve unified metadata management. This section will guide you through the steps to configure the data connection between DLI and LakeFormation.

For the Spark syntax of LakeFormation, see Spark Syntax Reference.

For the Flink syntax of LakeFormation, see Flink Syntax Reference.

Notes

To use this function, which is currently in the whitelist, submit a request by choosing Service Tickets > Create Service Ticket in the upper right corner of the management console.

Connecting DLI to LakeFormation depends on the availability of the LakeFormation service. To understand the availability scope of LakeFormation, refer to Global Products and Services.

Procedure

Figure 1 Procedure

Notes and Constraints

  • Table 1 lists the DLI queue and engine types that allow you to connect DLI to LakeFormation to obtain metadata.
    For the engine type and version of a queue, see Viewing Basic Information About a Queue.
    Table 1 Queue and engine types that allow for the connection of DLI to LakeFormation for metadata retrieval

    Queue Type

    Engine Type and Version

    default queue

    • Spark 3.3.x: can connect to LakeFormation to obtain metadata.
    • HetuEngine 2.1.0: can connect to LakeFormation to obtain metadata.

    For SQL

    • Spark 3.3.x: can connect to LakeFormation to obtain metadata.
    • HetuEngine 2.1.0: can connect to LakeFormation to obtain metadata.

    For general purpose

    Flink job scenario: Flink 1.15 or later supports integration with LakeFormation to obtain metadata when using queues within an elastic resource pool.

  • DLI can only connect to the default LakeFormation instance. Set the instance in LakeFormation as the default to ensure successful connection.
  • DLI can read data in Avro, Json, Parquet, CSV, ORC, Text, and Hudi formats from LakeFormation.
  • LakeFormation manages the permissions for databases and tables in the data catalog of LakeFormation.
  • After connecting DLI to LakeFormation, the original databases and tables in DLI will be moved to the data directory of DLI.

Step 1: Create a LakeFormation Instance for Metadata Storage

LakeFormation instances provide basic resources for metadata management. DLI can only connect to the default LakeFormation instance.
  1. Creating an instance
    1. Log in to the LakeFormation management console.
    2. Click Buy Now or Buy Instance in the upper right corner of the page.

      If there are no instances available on the page, Buy Now is displayed. If there are any LakeFormation instances on the page, Buy Instance is displayed.

    3. Set LakeFormation instance parameters as needed to complete instance creation.

      In this example, we create a pay-per-use shared instance.

  2. Setting a LakeFormation instance as the default
    1. View the value of Default Instance in the Basic Information area.
      • true: The instance is the default.
      • false: The instance is not the default.
    2. To set an instance as the default, click Set as Default in the upper right corner of the page.
    3. In the dialog box that appears, select I understand the consequences of changing the default instance and have still decided to perform this operation. and click OK.

      DLI can currently only connect to the default LakeFormation instance. Changing an instance to the default may impact the services that use LakeFormation. Exercise caution when performing this operation.

Step 2: Create a Catalog on the LakeFormation Management Console

A data catalog is a metadata management object that can contain multiple databases. You can create and manage multiple catalogs in LakeFormation to isolate metadata of different external clusters.

  1. Log in to the LakeFormation management console.
  2. In the navigation pane on the left, choose Metadata > Catalog.
  3. On the displayed page, click Create.

    Set catalog instance parameters as needed.

    For parameter settings and descriptions, see Creating a Catalog.

  4. Once created, you can view information about the created catalog on the Catalog page.

Step 3: Create a Data Catalog on the DLI Management Console

On the DLI management console, you need to create a link to the catalog to access the catalog stored in the LakeFormation instance.

  • DLI can only connect to the default LakeFormation instance. Set the instance in LakeFormation as the default to ensure successful connection.
  • You can only create one mapping for each data catalog in LakeFormation.

    For example, a user creates a mapping named catalogMapping1 in DLI, which corresponds to the data catalog catalogA in LakeFormation. Once created, you cannot create a mapping to catalogA in the same project space.

  1. Log in to the DLI management console.
  2. In the navigation pane on the left, choose SQL Editor.
  3. The Catalog tab of the SQL editor is displayed.
  4. Click to create a data catalog.
  5. In the Create Catalog dialog box, set data catalog parameters.
    Table 2 Data catalog parameters

    Parameter

    Mandatory

    Description

    External Catalog Name

    Yes

    Catalog name of the default LakeFormation instance.

    Type

    Yes

    Currently, the only available option is LakeFormation.

    This option is fixed and does not need to be selected.

    Catalog Name

    Yes

    Catalog mapping name used in DLI. When running SQL statements, you need to specify the catalog mapping to identify the external metadata to be accessed. You are advised to set this parameter to the same value as External Catalog Name.

    Currently, DLI can only connect to the data catalog of the default LakeFormation instance.

    Description

    No

    Description of the data catalog.

  6. Click OK.

Step 4: Authorize to Use LakeFormation Resources

  • SQL job scenarios

    Before submitting a SQL job, you need to authorize DLI to access LakeFormation resources such as metadata, databases, tables, columns, and functions, to ensure that the job can access required data and resources during execution. Supported Actions for LakeFormation SQL Resources describes LakeFormation resources and corresponding permissions.

    To use LakeFormation resources, you need to separately complete IAM fine-grained authorization and LakeFormation SQL resource authorization.

    • IAM fine-grained authorization for LakeFormation: Authorize DLI to use LakeFormation APIs.

      IAM offers multiple methods to manage the permissions of users, groups, and roles to access resources. You can create policies on the IAM console to define which users or roles can call LakeFormation APIs. Then, attach these policies to the specified users or roles.

      • Method 1: Role-based authorization

        A coarse-grained authorization strategy that defines permissions by job responsibility. Only a limited number of service-level roles are available for authorization.

        For example, grant users the read-only permission to query LakeFormation metadata resources by referring to LakeFormation Permissions Management.

        Alternatively, grant all operation permissions on LakeFormation-related metadata resources.

        Example:

        {
            "Version": "1.1",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": [
                        "lakeformation:table:*",
                        "lakeformation:database:*",
                        "lakeformation:catalog:*",
                        "lakeformation:function:*",         
                        "lakeformation:transaction:*",
                        "lakeformation:policy:describe",
                        "lakeformation:credential:describe"
                    ]
                }
            ]
        }
      • Method 2: Policy-based fine-grained authorization

        A policy defines the permissions required to perform actions on a specific cloud resource under certain conditions.

        For LakeFormation permissions policies, see LakeFormation Permissions and Supported Actions.

        For how to grant permissions, see Creating a User and Granting LakeFormation Permissions.

    • LakeFormation SQL resource authorization: authorizes users to use specific LakeFormation resources (such as metadata, databases, tables, columns, and functions).

      Users are allowed to access specific resources. This controls access to LakeFormation data and metadata.

      Two methods:

      • Method 1: Authorize access to resources on the LakeFormation management console.

        For details, see Granting Permissions.

        For LakeFormation SQL resource permissions, see Data Permissions.

      • Method 2: Run the GRANT SQL statement on the DLI management console.

        The GRANT statement is used for authorization in SQL.

        You can run the GRANT statement to grant users or roles the permission to access databases, tables, columns, and functions.

        Supported Actions for LakeFormation SQL Resources describes authorization policies for LakeFormation resources.

        Currently, you cannot authorize access to catalogs by running the GRANT statement on the DLI console. To authorize access, use method 1.

  • Spark Jar, Flink OpenSource SQL, and Flink Jar job scenarios:
    • Method 1: Use agency authorization. Before using Spark 3.3.1 or later and Flink 1.15 to run jobs, you need to create an agency on the IAM console and add the new agency information when configuring the job.

      For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.

    • Method 2: Use DEW for authorization.
      • You have granted the IAM user the IAM and LakeFormation permissions. For details, see IAM authorization in SQL job scenarios.
      • A shared secret has been created on the DEW console and the secret value has been stored. For details, see Creating a Shared Secret.
      • An agency has been created and authorized for DLI to access DEW. The agency must have been granted the following permissions:
        • Permission of the ShowSecretVersion interface for querying secret versions and secret values in DEW: csms:secretVersion:get.
        • Permission of the ListSecretVersions interface for listing secret versions in DEW: csms:secretVersion:list.
        • Permission to decrypt DEW secrets: kms:dek:decrypt

        For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.

Step 5: Use LakeFormation Metadata During DLI Job Development

After connecting DLI to the default LakeFormation instance and authorizing access to LakeFormation resources, you can use LakeFormation metadata during DLI job development.

  • DLI SQL:

    For the SQL syntax of LakeFormation, see Data Lake Insight Spark SQL Syntax Reference.

    When running a SQL job, you can select the catalog where the SQL statement is located on the console, as shown in Figure 2, or specify catalogName in the SQL statement. catalogName is the mapping name of the data catalog on the DLI console.

    Figure 2 Selecting a data catalog on the SQL editor page
    • When connecting DLI to a LakeFormation instance, you need to specify the OBS path for storing the database when creating it.
    • When connecting DLI to a LakeFormation instance, you cannot set table lifecycle and versioning when creating a table.
    • When connecting DLI to a LakeFormation instance, the LOAD DATA statement does not support datasource tables, and partitions must be specified if the statement is used to import data into a partitioned table.
    • If the databases and tables created on the LakeFormation console contain Chinese characters, operations on them cannot be performed on DLI.
    • When connecting DLI to a LakeFormation instance, you cannot specify filter criteria to delete partitions.
    • When connecting DLI to a LakeFormation instance, you cannot create Truncate Datasource or Hive foreign tables.
    • DLI does not currently support the use of LakeFormation row filter criteria.
    • When DLI reads binary data for console display, it converts the binary data to Base64.
    • DLI does not currently support authorizing access to LakeFormation paths.
  • DLI Spark Jar:

    This part explains how to configure LakeFormation metadata when submitting a Spark Jar job on the DLI management console.

    • Example Spark Jar
      SparkSession spark = SparkSession.builder()
          .enableHiveSupport()
          .appName("java_spark_demo")
          .getOrCreate();
      
      spark.sql("show databases").show();
    • Configuring a Spark Jar job on the DLI management console
      • (Recommended) Method 1: Configure a Spark Jar job's access to LakeFormation metadata using the parameters (such as agency and metadata source) provided on the console
        When creating or editing a Spark Jar job, refer to Table 3 to configure the job's access to LakeFormation metadata.
        Table 3 Configuring a Spark Jar job's access to LakeFormation metadata

        Parameter

        Description

        Example Value

        Spark Version

        Spark 3.3.x or later can connect to LakeFormation.

        3.3.1

        Agency

        Before using Spark 3.3.1 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:

        spark.dli.job.agency.name=agency

        For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.

        -

        Access Metadata

        Whether to allow the Spark job to access metadata.

        Yes

        Metadata Source

        Type of metadata the Spark job accesses. In this scenario, select lakeformation.

        Once set to lakeformation, the system automatically adds the following configurations to your job to load LakeFormation dependencies.

        spark.sql.catalogImplementation=hive
        spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true
        spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient
        og
        // Load LakeFormation dependencies.
        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*

        You can also set the metadata source in the Spark Arguments(--conf) parameter. This means if you set the metadata source in both Metadata Source and Spark Arguments(--conf), the system preferentially uses the information configured in Spark Arguments(--conf).

        You are advised to set the metadata source through Metadata Source.

        LakeFormation

        Catalog Name

        Name of the data catalog the Spark job accesses.

        The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance. To specify other LakeFormation instances, configure the LakeFormation instances and data catalogs to be connected in Spark Arguments(--conf). For details, see Method 2: Configure a Spark Jar job's access to LakeFormation metadata using the Spark Arguments(--conf) parameter.

        Once set to LakeFormation, the system automatically adds the following configuration to your job to connect to the data catalog of the default LakeFormation instance:

        spark.hadoop.lakecat.catalogname.default=lfcatalog

        You can also set the data catalog name in the Spark Arguments(--conf) parameter. This means if you set the data catalog name in both Catalog Name and Spark Arguments(--conf), the system preferentially uses the information configured in Spark Arguments(--conf).

        You are advised to set the data catalog name through Catalog Name.

        -

        Spark Arguments(--conf)

        You can also set the metadata source and data catalog name in the Spark Arguments(--conf) parameter. This means if you set the metadata source and data catalog name in both Metadata Source and Catalog Name, as well as Spark Arguments(--conf), the system preferentially uses the information configured in Spark Arguments(--conf).

        • To access a Hudi data table, add the following configurations to the Spark Arguments(--conf) parameter:
          spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
          spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider
        • To access a Delta data table, add the following configurations to the Spark Arguments(--conf) parameter:
          spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension

        -

      • Method 2: Configure a Spark Jar job's access to LakeFormation metadata using the Spark Arguments(--conf) parameter
        When creating or editing a Spark Jar job, configure the following information in the Spark Arguments(--conf) parameter on the job configuration page to access LakeFormation metadata:
        spark.sql.catalogImplementation=hive
        spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true
        spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient
        spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension // Hudi is supported, which is optional.
        spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider // Hudi is supported, which is optional.
        // Access using an agency with the OBS and LakeFormation permissions. You are advised to configure the minimum permission set.
        spark.dli.job.agency.name=agencyForLakeformation
        // ID of the LakeFormation instance to be accessed, which can be viewed on the LakeFormation console. This parameter is optional. If unset, the default LakeFormation instance is accessed.
        spark.hadoop.lakeformation.instance.id=xxx
        // Name of the catalog to be accessed on the LakeFormation side, which can be viewed on the LakeFormation console. This parameter is optional. If unset, the default value is hive.
        spark.hadoop.lakecat.catalogname.default=lfcatalog
        // Load LakeFormation dependencies.
        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
  • DLI Flink OpenSource SQL
    • Example 1: Connecting DLI to LakeFormation using an agency

      Create a Flink OpenSource SQL job and set the following parameters:

      Parameter

      Description

      Example Value

      Flink Version

      Flink 1.15 or later can connect to LakeFormation.

      1.15

      Agency

      Before using Flink 1.15 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:

      flink.dli.job.agency.name=agency

      For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.

      -

      Enable Checkpointing

      Select it.

      Select it.

      Runtime Configuration

      • Type of metadata the Flink job accesses.

        In this scenario, select lakeformation.

        flink.dli.job.catalog.type=lakeformation

      • Name of the data catalog the Flink job accesses.

        flink.dli.job.catalog.name=[Catalog name in LakeFormation]

        The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance.

      -

      For the catalog parameters in the example, see Table 4.

      Table 4 Catalog parameters in the example Flink OpenSource SQL

      Parameter

      Description

      Mandatory

      Example Value

      type

      Catalog type

      Yes

      Fixed at hive

      hive-conf-dir

      hive-conf path, which is fixed at /opt/flink/conf.

      Yes

      Fixed at /opt/flink/conf

      default-database

      Default database name

      No

      Default database

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      CREATE CATALOG hive
      WITH
        (
          'type' = 'hive',
          'hive-conf-dir' = '/opt/flink/conf',  -- Fixed at /opt/flink/conf
          'default-database'='default'
        );
      
      USE CATALOG hive;
      
      CREATE TABLE IF NOT EXISTS
        dataGenSource612 (user_id string, amount int)
      WITH
        (
          'connector' = 'datagen',
          'rows-per-second' = '1',
          'fields.user_id.kind' = 'random',
          'fields.user_id.length' = '3'
        );
      
      CREATE table IF NOT EXISTS
        printSink612 (user_id string, amount int)
      WITH
        ('connector' = 'print');
      
      INSERT INTO
        printSink612
      SELECT
        *
      FROM
        dataGenSource612;
      
    • Example 2: Connecting DLI to LakeFormation using DEW

      Create a Flink OpenSource SQL job and set the following parameters:

      Parameter

      Description

      Example Value

      Flink Version

      Flink 1.15 or later can connect to LakeFormation.

      1.15

      Agency

      Before using Flink 1.15 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:

      flink.dli.job.agency.name=agency

      For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.

      -

      Enable Checkpointing

      Select it.

      Select it.

      Runtime Configuration

      • Type of metadata the Flink job accesses.

        In this scenario, select lakeformation.

        flink.dli.job.catalog.type=lakeformation

      • Name of the data catalog the Flink job accesses.

        flink.dli.job.catalog.name=[Catalog name in LakeFormation]

        The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance.

      -

      For the catalog parameters in the example, see Table 5.

      Set properties.catalog.lakeformation.auth.identity.util.class to com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator and configure DEW.

      Table 5 Catalog parameters in the example Flink OpenSource SQL (using DEW)

      Parameter

      Description

      Mandatory

      Example Value

      type

      Catalog type

      Yes

      Fixed at hive

      hive-conf-dir

      hive-conf path, which is fixed at /opt/flink/conf.

      Yes

      Fixed at /opt/flink/conf

      default-database

      Default database name

      No

      If unset, the default database is used.

      properties.catalog.lakecat.auth.identity.util.class

      Authentication information acquisition class

      Yes

      Mandatory when DEW is used, which is fixed at com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator.

      properties.catalog.dew.projectId

      ID of the project DEW belongs to. The default value is the ID of the project where the Flink job is.

      Yes

      Mandatory when DEW is used

      properties.catalog.dew.endpoint

      Endpoint of the DEW service to be used.

      Yes

      Mandatory when DEW is used.

      Example: kms.xxx.com

      properties.catalog.dew.csms.secretName

      Name of the shared secret in DEW's secret management.

      Yes

      Mandatory when DEW is used

      properties.catalog.dew.csms.version

      Version number of the shared secret in DEW's secret management.

      Yes

      Mandatory when DEW is used

      properties.catalog.dew.access.key

      Enter the key corresponding to the access.key value in DEW's secret.

      Yes

      Mandatory when DEW is used

      properties.catalog.dew.secret.key

      Enter the key corresponding to the secret.key value in DEW's secret.

      Yes

      Mandatory when DEW is used

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      CREATE CATALOG myhive
      WITH
        (
          'type' = 'hive',
          'hive-conf-dir' = '/opt/flink/conf',
          'default-database'='default',
          -- The following is the DEW configuration. Change the parameter values based on site requirements.
          'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',
          'properties.catalog.dew.endpoint'='kms.xxx.com',
          'properties.catalog.dew.csms.secretName'='obsAksK',
          'properties.catalog.dew.access.key' = 'myak',
          'properties.catalog.dew.secret.key' = 'mysk',
          'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxx',
          'properties.catalog.dew.csms.version'='v9'
      );
      
      USE CATALOG myhive;
      
      create table IF NOT EXISTS dataGenSource_dew612(
        user_id string,
        amount int
      ) with (
        'connector' = 'datagen',
        'rows-per-second' = '1',
        'fields.user_id.kind' = 'random',
        'fields.user_id.length' = '3'
      );
      
      create table IF NOT EXISTS printSink_dew612(
        user_id string,
        amount int
      ) with (
        'connector' = 'print'
      );
      
      insert into printSink_dew612 select * from dataGenSource_dew612;
      
    • Example 3: Connecting DLI to LakeFormation using an agency to write data to a Hudi table

      Create a Flink OpenSource SQL job and set the following parameters:

      Parameter

      Description

      Example Value

      Flink Version

      Flink 1.15 or later can connect to LakeFormation.

      1.15

      Agency

      Before using Flink 1.15 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:

      flink.dli.job.agency.name=agency

      For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.

      -

      Enable Checkpointing

      Select it.

      Select it.

      Runtime Configuration

      • Type of metadata the Flink job accesses.

        In this scenario, select lakeformation.

        flink.dli.job.catalog.type=lakeformation

      • Name of the data catalog the Flink job accesses.

        flink.dli.job.catalog.name=[Catalog name in LakeFormation]

        The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance.

      -

      For the catalog parameters in the example, see Table 6.

      Table 6 Parameters for configuring a catalog of the Hudi type

      Parameter

      Description

      Mandatory

      Example Value

      type

      Catalog type

      Yes

      hudi for a Hudi table

      hive-conf-dir

      hive-conf path. The value is fixed at /opt/flink/conf.

      Yes

      Fixed at /opt/flink/conf

      default-database

      Default database name

      No

      Default database

      mode

      The value can be 'hms' or 'non-hms'.

      • hms indicates that the created Hudi catalog uses Hive Metastore to store metadata.
      • non-hms indicates that Hive Metastore is not used to store metadata.

      Yes

      Fixed at hms

      Table 7 Connector parameters for a Hudi sink table

      Parameter

      Description

      Mandatory

      Example Value

      connector

      Flink connector type

      If set to hudi, the sink table is a Hudi table.

      Yes

      hudi

      path

      Basic path of the table. If the path does not exist, the system will create it.

      Yes

      Refer to the values configured in the sample code.

      hoodie.datasource.write.recordkey.field

      Unique key field name of the Hoodie table

      No

      Set order_id to a unique key.

      EXTERNAL

      Whether the table is foreign

      Yes

      Mandatory for the Hudi table and must be set to true.

      true

      CREATE CATALOG hive_catalog
        WITH (
          'type'='hive',
          'hive-conf-dir' = '/opt/flink/conf',
          'default-database'='test'
        );
      USE CATALOG hive_catalog;
      create table  if not exists genSource618 (
        order_id STRING, 
        order_name STRING, 
        price INT, 
        weight INT
      ) with (
        'connector' = 'datagen',
        'rows-per-second' = '1', 
        'fields.order_id.kind' = 'random',
        'fields.order_id.length' = '8',
        'fields.order_name.kind' = 'random',
        'fields.order_name.length' = '5'
      );
      
      CREATE CATALOG hoodie_catalog
        WITH (
          'type'='hudi',
          'hive.conf.dir' = '/opt/flink/conf',
          'mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence
        );
      CREATE TABLE  if not exists  hoodie_catalog.`test`.`hudiSink618` (
        `order_id` STRING PRIMARY KEY NOT ENFORCED,
        `order_name` STRING, 
        `price` INT, 
        `weight` INT,
        `create_time` BIGINT,
        `create_date` String
      ) PARTITIONED BY (create_date) WITH (
        'connector' = 'hudi',
        'path' = 'obs://xxx/catalog/dbtest3/hudiSink618',
        'hoodie.datasource.write.recordkey.field' = 'order_id',
        'write.precombine.field' = 'create_time',
        'EXTERNAL' = 'true' -- must be set
      );
      
      insert into hoodie_catalog.`test`.`hudiSink618`
      select 
        order_id, 
        order_name, 
        price, 
        weight,
        UNIX_TIMESTAMP() as create_time,
        FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date
      from genSource618;
  • DLI Flink Jar
    • Example 1: Connecting DLI to LakeFormation using an agency
      1. Develop a Flink Jar program, compile and upload the JAR file to OBS. In this example, the file is uploaded to the obs://obs-test/dlitest/ directory.

        The sample code is as follows:

        In this example, random data is generated using the DataGen table and then output to the Print result table.

        For other connector types, see List of Connectors Supported by Flink 1.15.

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        package com.huawei.test;
        
        import org.apache.flink.api.java.utils.ParameterTool;
        import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
        import org.apache.flink.runtime.state.filesystem.FsStateBackend;
        import org.apache.flink.streaming.api.CheckpointingMode;
        import org.apache.flink.streaming.api.environment.CheckpointConfig;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.EnvironmentSettings;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        
        import java.text.SimpleDateFormat;
        
        @SuppressWarnings({"deprecation", "rawtypes", "unchecked"})
        public class GenToPrintTaskAgency {
            private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class);
            private static final String datePattern = "yyyy-MM-dd_HH-mm-ss";
        
            public static void main(String[] args) {
                LOGGER.info("Start task.");
                ParameterTool paraTool = ParameterTool.fromArgs(args);
                String checkpointInterval = "180000000";
        
                // set up execution environment
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings settings = EnvironmentSettings.newInstance()
                        .inStreamingMode().build();
                StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
                env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval));
                env.getCheckpointConfig().enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
                SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern);
                String time = dateTimeFormat.format(System.currentTimeMillis());
                RocksDBStateBackend rocksDbBackend =
                        new RocksDBStateBackend(
                                new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true);
                env.setStateBackend(rocksDbBackend);
        
                String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" +
                        "    'type' = 'hive',\n" +
                        "    'hive-conf-dir' = '/opt/hadoop/conf'\n" +
                        "  );";
                tEnv.executeSql(createCatalog);
        
                String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJar618_1` (\n" +
                        "  user_id string,\n" +
                        "  amount int\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1',\n" +
                        "  'fields.user_id.kind' = 'random',\n" +
                        "  'fields.user_id.length' = '3'\n" +
                        ")";
        /*testdb is a custom database.*/
        
                tEnv.executeSql(dataSource);
        
                String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJar618_1` (\n" +
                        "  user_id string,\n" +
                        "  amount int\n" +
                        ") WITH ('connector' = 'print')";
                tEnv.executeSql(printSink);
        /*testdb is a custom database.*/
        
                String query = "insert into lf_catalog.`test`.`printSinkJar618_1` " +
                        "select * from lf_catalog.`test`.`dataGenSourceJar618_1`";
                tEnv.executeSql(query);
            }
        }
        
      2. Create a Flink Jar job and set the following parameters:

        Parameter

        Description

        Example Value

        Flink Version

        Flink 1.15 or later can connect to LakeFormation.

        1.15

        Agency

        Before using Flink 1.15 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:

        flink.dli.job.agency.name=agency

        For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.

        -

        Runtime Configuration

        • Type of metadata the Flink job accesses.

          In this scenario, select lakeformation.

          flink.dli.job.catalog.type=lakeformation

        • Name of the data catalog the Flink job accesses.

          flink.dli.job.catalog.name=[Catalog name in LakeFormation]

          The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance.

        -

    • Example 2: Connecting DLI to LakeFormation using DEW
      1. Develop a Flink Jar program, compile and upload the JAR file to OBS. In this example, the file is uploaded to the obs://obs-test/dlitest/ directory.

        The sample code is as follows:

        In this example, random data is generated using the DataGen table and then output to the Print result table.

        For other connector types, see List of Connectors Supported by Flink 1.15.

        package com.huawei.test;
        
        import org.apache.flink.api.java.utils.ParameterTool;
        import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
        import org.apache.flink.runtime.state.filesystem.FsStateBackend;
        import org.apache.flink.streaming.api.CheckpointingMode;
        import org.apache.flink.streaming.api.environment.CheckpointConfig;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.EnvironmentSettings;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        
        import java.text.SimpleDateFormat;
        
        @SuppressWarnings({"deprecation", "rawtypes", "unchecked"})
        public class GenToPrintTaskDew {
            private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class);
            private static final String datePattern = "yyyy-MM-dd_HH-mm-ss";
        
            public static void main(String[] args) {
                LOGGER.info("Start task.");
                ParameterTool paraTool = ParameterTool.fromArgs(args);
                String checkpointInterval = "180000000";
        
                // set up execution environment
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings settings = EnvironmentSettings.newInstance()
                        .inStreamingMode().build();
                StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
                env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval));
                env.getCheckpointConfig().enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
                SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern);
                String time = dateTimeFormat.format(System.currentTimeMillis());
                RocksDBStateBackend rocksDbBackend =
                        new RocksDBStateBackend(
                                new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true);
                env.setStateBackend(rocksDbBackend);
        
                String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" +
                        "    'type' = 'hive',\n" +
                        "    'hive-conf-dir' = '/opt/hadoop/conf',\n" +
                        "    'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',\n" +
                        "    'properties.catalog.dew.endpoint'='kms.xxx.xxx.com',\n" +
                        "    'properties.catalog.dew.csms.secretName'='obsAksK',\n" +
                        "    'properties.catalog.dew.access.key' = 'ak',\n" +
                        "    'properties.catalog.dew.secret.key' = 'sk',\n" +
                        "    'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxxx',\n" +
                        "    'properties.catalog.dew.csms.version'='v9'\n" +
                        "  );";
                tEnv.executeSql(createCatalog);
        
                String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJarDew618_1` (\n" +
                        "  user_id string,\n" +
                        "  amount int\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1',\n" +
                        "  'fields.user_id.kind' = 'random',\n" +
                        "  'fields.user_id.length' = '3'\n" +
                        ")";
                tEnv.executeSql(dataSource);
        /*testdb is a custom database.*/
        
                String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJarDew618_1` (\n" +
                        "  user_id string,\n" +
                        "  amount int\n" +
                        ") WITH ('connector' = 'print')";
                tEnv.executeSql(printSink);
        /*testdb is a custom database.*/
        
                String query = "insert into lf_catalog.`test`.`printSinkJarDew618_1` " +
                        "select * from lf_catalog.`test`.`dataGenSourceJarDew618_1`";
                tEnv.executeSql(query);
            }
        }
      2. Create a Flink Jar job and set the following parameters:

        Parameter

        Description

        Example Value

        Flink Version

        Flink 1.15 or later can connect to LakeFormation.

        1.15

        Agency

        Before using Flink 1.15 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:

        flink.dli.job.agency.name=agency

        For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.

        -

        Runtime Configuration

        • Type of metadata the Flink job accesses.

          In this scenario, select lakeformation.

          flink.dli.job.catalog.type=lakeformation

        • Name of the data catalog the Flink job accesses.

          flink.dli.job.catalog.name=[Catalog name in LakeFormation]

          The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance.

        -

    • Example 3: Flink Jar jobs supporting Hudi tables
      1. Develop a Flink Jar program, compile and upload the JAR file to OBS. In this example, the file is uploaded to the obs://obs-test/dlitest/ directory.

        The sample code is as follows:

        In this example, random data is generated using the DataGen table and then output to the Hudi result table.

        For other connector types, see List of Connectors Supported by Flink 1.15.

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        88
        89
        90
        91
        package com.huawei.test;
        
        import org.apache.flink.api.java.utils.ParameterTool;
        import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
        import org.apache.flink.runtime.state.filesystem.FsStateBackend;
        import org.apache.flink.streaming.api.CheckpointingMode;
        import org.apache.flink.streaming.api.environment.CheckpointConfig;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.table.api.EnvironmentSettings;
        import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        
        import java.io.IOException;
        import java.text.SimpleDateFormat;
        
        public class GenToHudiTask4 {
            private static final Logger LOGGER = LoggerFactory.getLogger(GenToHudiTask4.class);
            private static final String datePattern = "yyyy-MM-dd_HH-mm-ss";
        
            public static void main(String[] args) throws IOException {
                LOGGER.info("Start task.");
                ParameterTool paraTool = ParameterTool.fromArgs(args);
                String checkpointInterval = "30000";
        
                // set up execution environment
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings settings = EnvironmentSettings.newInstance()
                        .inStreamingMode().build();
                StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
                env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval));
                env.getCheckpointConfig().enableExternalizedCheckpoints(
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
                SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern);
                String time = dateTimeFormat.format(System.currentTimeMillis());
                RocksDBStateBackend rocksDbBackend =
                        new RocksDBStateBackend(
                                new FsStateBackend("obs://xxx/jobs/testcheckpoint/" + time), true);
                env.setStateBackend(rocksDbBackend);
        
                String catalog = "CREATE CATALOG hoodie_catalog\n" +
                        "  WITH (\n" +
                        "    'type'='hudi',\n" +
                        "    'hive.conf.dir' = '/opt/hadoop/conf',\n" +
                        "    'mode'='hms'\n" +
                        "  )";
                tEnv.executeSql(catalog);
                String dwsSource = "CREATE TABLE if not exists genSourceJarForHudi618_1 (\n" +
                        "  order_id STRING,\n" +
                        "  order_name STRING,\n" +
                        "  price INT,\n" +
                        "  weight INT\n" +
                        ") WITH (\n" +
                        "  'connector' = 'datagen',\n" +
                        "  'rows-per-second' = '1',\n" +
                        "  'fields.order_id.kind' = 'random',\n" +
                        "  'fields.order_id.length' = '8',\n" +
                        "  'fields.order_name.kind' = 'random',\n" +
                        "  'fields.order_name.length' = '8'\n" +
                        ")";
                tEnv.executeSql(dwsSource);
        /*testdb is a custom database.*/
                String printSinkdws =
                        "CREATE TABLE if not exists hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` (\n" +
                        "  order_id STRING PRIMARY KEY NOT ENFORCED,\n" +
                        "  order_name STRING,\n" +
                        "  price INT,\n" +
                        "  weight INT,\n" +
                        "  create_time BIGINT,\n" +
                        "  create_date String\n" +
                        ") WITH (" +
                        "'connector' = 'hudi',\n" +
                        "'path' = 'obs://xxx/catalog/dbtest3/hudiSinkJarHudi618_1',\n" +
                        "'hoodie.datasource.write.recordkey.field' = 'order_id',\n" +
                        "'EXTERNAL' = 'true'\n" +
                        ")";
                tEnv.executeSql(printSinkdws);
        /*testdb is a custom database.*/
                String query = "insert into hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` select\n" +
                " order_id,\n" +
                " order_name,\n" +
                " price,\n" +
                " weight,\n" +
                " UNIX_TIMESTAMP() as create_time,\n" +
                " FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date\n" +
                " from genSourceJarForHudi618_1";
                tEnv.executeSql(query);
            }
        }
        

        Table 8 Connector parameters for a Hudi sink table

        Parameter

        Description

        Mandatory

        Example Value

        connector

        Flink connector type

        If set to hudi, the sink table is a Hudi table.

        Yes

        hudi

        path

        Basic path of the table. If the path does not exist, the system will create it.

        Yes

        Refer to the values configured in the sample code.

        hoodie.datasource.write.recordkey.field

        Unique key field name of the Hoodie table

        No

        Set order_id to a unique key.

        EXTERNAL

        Whether the table is foreign

        Yes

        Mandatory for the Hudi table and must be set to true.

        true

      2. Create a Flink Jar job and set the following parameters:

        Parameter

        Description

        Example Value

        Flink Version

        Flink 1.15 or later can connect to LakeFormation.

        1.15

        Agency

        Before using Flink 1.15 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:

        flink.dli.job.agency.name=agency

        For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.

        -

        Runtime Configuration

        • Type of metadata the Flink job accesses.

          In this scenario, select lakeformation.

          flink.dli.job.catalog.type=lakeformation

        • Name of the data catalog the Flink job accesses.

          flink.dli.job.catalog.name=[Catalog name in LakeFormation]

          The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance.

        -