Connecting DLI to LakeFormation
Scenario
LakeFormation is an enterprise-level, all-in-one lakehouse construction service that provides unified metadata management capabilities. It supports seamless integration with various compute engines and big data cloud services, enabling you to efficiently build data lakes and operate related businesses, thereby accelerating the extraction of business data value.
In Spark and SQL job scenarios, LakeFormation can be connected to achieve unified metadata management. This section will guide you through the steps to configure the data connection between DLI and LakeFormation.
For the Spark syntax of LakeFormation, see Spark Syntax Reference.
For the Flink syntax of LakeFormation, see Flink Syntax Reference.
Notes
To use this function, which is currently in the whitelist, submit a request by choosing Service Tickets > Create Service Ticket in the upper right corner of the management console.
Connecting DLI to LakeFormation depends on the availability of the LakeFormation service. To understand the availability scope of LakeFormation, refer to Global Products and Services.
Procedure
Notes and Constraints
- Table 1 lists the DLI queue and engine types that allow you to connect DLI to LakeFormation to obtain metadata.
For the engine type and version of a queue, see Viewing Basic Information About a Queue.
Table 1 Queue and engine types that allow for the connection of DLI to LakeFormation for metadata retrieval Queue Type
Engine Type and Version
default queue
- Spark 3.3.x: can connect to LakeFormation to obtain metadata.
- HetuEngine 2.1.0: can connect to LakeFormation to obtain metadata.
For SQL
- Spark 3.3.x: can connect to LakeFormation to obtain metadata.
- HetuEngine 2.1.0: can connect to LakeFormation to obtain metadata.
For general purpose
Flink job scenario: Flink 1.15 or later supports integration with LakeFormation to obtain metadata when using queues within an elastic resource pool.
- DLI can only connect to the default LakeFormation instance. Set the instance in LakeFormation as the default to ensure successful connection.
- DLI can read data in Avro, Json, Parquet, CSV, ORC, Text, and Hudi formats from LakeFormation.
- LakeFormation manages the permissions for databases and tables in the data catalog of LakeFormation.
- After connecting DLI to LakeFormation, the original databases and tables in DLI will be moved to the data directory of DLI.
Step 1: Create a LakeFormation Instance for Metadata Storage
- Creating an instance
- Log in to the LakeFormation management console.
- Click Buy Now or Buy Instance in the upper right corner of the page.
If there are no instances available on the page, Buy Now is displayed. If there are any LakeFormation instances on the page, Buy Instance is displayed.
- Set LakeFormation instance parameters as needed to complete instance creation.
- Setting a LakeFormation instance as the default
- View the value of Default Instance in the Basic Information area.
- true: The instance is the default.
- false: The instance is not the default.
- To set an instance as the default, click Set as Default in the upper right corner of the page.
- In the dialog box that appears, select I understand the consequences of changing the default instance and have still decided to perform this operation. and click OK.
DLI can currently only connect to the default LakeFormation instance. Changing an instance to the default may impact the services that use LakeFormation. Exercise caution when performing this operation.
- View the value of Default Instance in the Basic Information area.
Step 2: Create a Catalog on the LakeFormation Management Console
A data catalog is a metadata management object that can contain multiple databases. You can create and manage multiple catalogs in LakeFormation to isolate metadata of different external clusters.
- Log in to the LakeFormation management console.
- In the navigation pane on the left, choose Metadata > Catalog.
- On the displayed page, click Create.
Set catalog instance parameters as needed.
For parameter settings and descriptions, see Creating a Catalog.
- Once created, you can view information about the created catalog on the Catalog page.
Step 3: Create a Data Catalog on the DLI Management Console
On the DLI management console, you need to create a link to the catalog to access the catalog stored in the LakeFormation instance.
- DLI can only connect to the default LakeFormation instance. Set the instance in LakeFormation as the default to ensure successful connection.
- You can only create one mapping for each data catalog in LakeFormation.
For example, a user creates a mapping named catalogMapping1 in DLI, which corresponds to the data catalog catalogA in LakeFormation. Once created, you cannot create a mapping to catalogA in the same project space.
- Log in to the DLI management console.
- In the navigation pane on the left, choose SQL Editor.
- The Catalog tab of the SQL editor is displayed.
- Click to create a data catalog.
- In the Create Catalog dialog box, set data catalog parameters.
Table 2 Data catalog parameters Parameter
Mandatory
Description
External Catalog Name
Yes
Catalog name of the default LakeFormation instance.
Type
Yes
Currently, the only available option is LakeFormation.
This option is fixed and does not need to be selected.
Catalog Name
Yes
Catalog mapping name used in DLI. When running SQL statements, you need to specify the catalog mapping to identify the external metadata to be accessed. You are advised to set this parameter to the same value as External Catalog Name.
Currently, DLI can only connect to the data catalog of the default LakeFormation instance.
Description
No
Description of the data catalog.
- Click OK.
Step 4: Authorize to Use LakeFormation Resources
- SQL job scenarios
Before submitting a SQL job, you need to authorize DLI to access LakeFormation resources such as metadata, databases, tables, columns, and functions, to ensure that the job can access required data and resources during execution. Supported Actions for LakeFormation SQL Resources describes LakeFormation resources and corresponding permissions.
To use LakeFormation resources, you need to separately complete IAM fine-grained authorization and LakeFormation SQL resource authorization.
- IAM fine-grained authorization for LakeFormation: Authorize DLI to use LakeFormation APIs.
IAM offers multiple methods to manage the permissions of users, groups, and roles to access resources. You can create policies on the IAM console to define which users or roles can call LakeFormation APIs. Then, attach these policies to the specified users or roles.
- Method 1: Role-based authorization
A coarse-grained authorization strategy that defines permissions by job responsibility. Only a limited number of service-level roles are available for authorization.
For example, grant users the read-only permission to query LakeFormation metadata resources by referring to LakeFormation Permissions Management.
Alternatively, grant all operation permissions on LakeFormation-related metadata resources.
Example:
{ "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "lakeformation:table:*", "lakeformation:database:*", "lakeformation:catalog:*", "lakeformation:function:*", "lakeformation:transaction:*", "lakeformation:policy:describe", "lakeformation:credential:describe" ] } ] }
- Method 2: Policy-based fine-grained authorization
A policy defines the permissions required to perform actions on a specific cloud resource under certain conditions.
For LakeFormation permissions policies, see LakeFormation Permissions and Supported Actions.
For how to grant permissions, see Creating a User and Granting LakeFormation Permissions.
- Method 1: Role-based authorization
- LakeFormation SQL resource authorization: authorizes users to use specific LakeFormation resources (such as metadata, databases, tables, columns, and functions).
Users are allowed to access specific resources. This controls access to LakeFormation data and metadata.
Two methods:
- Method 1: Authorize access to resources on the LakeFormation management console.
For details, see Granting Permissions.
For LakeFormation SQL resource permissions, see Data Permissions.
- Method 2: Run the GRANT SQL statement on the DLI management console.
The GRANT statement is used for authorization in SQL.
You can run the GRANT statement to grant users or roles the permission to access databases, tables, columns, and functions.
Supported Actions for LakeFormation SQL Resources describes authorization policies for LakeFormation resources.
Currently, you cannot authorize access to catalogs by running the GRANT statement on the DLI console. To authorize access, use method 1.
- Method 1: Authorize access to resources on the LakeFormation management console.
- IAM fine-grained authorization for LakeFormation: Authorize DLI to use LakeFormation APIs.
- Spark Jar, Flink OpenSource SQL, and Flink Jar job scenarios:
- Method 1: Use agency authorization. Before using Spark 3.3.1 or later and Flink 1.15 to run jobs, you need to create an agency on the IAM console and add the new agency information when configuring the job.
For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.
- Method 2: Use DEW for authorization.
- You have granted the IAM user the IAM and LakeFormation permissions. For details, see IAM authorization in SQL job scenarios.
- A shared secret has been created on the DEW console and the secret value has been stored. For details, see Creating a Shared Secret.
- An agency has been created and authorized for DLI to access DEW. The agency must have been granted the following permissions:
- Permission of the ShowSecretVersion interface for querying secret versions and secret values in DEW: csms:secretVersion:get.
- Permission of the ListSecretVersions interface for listing secret versions in DEW: csms:secretVersion:list.
- Permission to decrypt DEW secrets: kms:dek:decrypt
For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.
- Method 1: Use agency authorization. Before using Spark 3.3.1 or later and Flink 1.15 to run jobs, you need to create an agency on the IAM console and add the new agency information when configuring the job.
Step 5: Use LakeFormation Metadata During DLI Job Development
After connecting DLI to the default LakeFormation instance and authorizing access to LakeFormation resources, you can use LakeFormation metadata during DLI job development.
- DLI SQL:
For the SQL syntax of LakeFormation, see Data Lake Insight Spark SQL Syntax Reference.
When running a SQL job, you can select the catalog where the SQL statement is located on the console, as shown in Figure 2, or specify catalogName in the SQL statement. catalogName is the mapping name of the data catalog on the DLI console.
- When connecting DLI to a LakeFormation instance, you need to specify the OBS path for storing the database when creating it.
- When connecting DLI to a LakeFormation instance, you cannot set table lifecycle and versioning when creating a table.
- When connecting DLI to a LakeFormation instance, the LOAD DATA statement does not support datasource tables, and partitions must be specified if the statement is used to import data into a partitioned table.
- If the databases and tables created on the LakeFormation console contain Chinese characters, operations on them cannot be performed on DLI.
- When connecting DLI to a LakeFormation instance, you cannot specify filter criteria to delete partitions.
- When connecting DLI to a LakeFormation instance, you cannot create Truncate Datasource or Hive foreign tables.
- DLI does not currently support the use of LakeFormation row filter criteria.
- When DLI reads binary data for console display, it converts the binary data to Base64.
- DLI does not currently support authorizing access to LakeFormation paths.
- DLI Spark Jar:
This part explains how to configure LakeFormation metadata when submitting a Spark Jar job on the DLI management console.
- Example Spark Jar
SparkSession spark = SparkSession.builder() .enableHiveSupport() .appName("java_spark_demo") .getOrCreate(); spark.sql("show databases").show();
- Configuring a Spark Jar job on the DLI management console
- (Recommended) Method 1: Configure a Spark Jar job's access to LakeFormation metadata using the parameters (such as agency and metadata source) provided on the console
When creating or editing a Spark Jar job, refer to Table 3 to configure the job's access to LakeFormation metadata.
Table 3 Configuring a Spark Jar job's access to LakeFormation metadata Parameter
Description
Example Value
Spark Version
Spark 3.3.x or later can connect to LakeFormation.
3.3.1
Agency
Before using Spark 3.3.1 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:
spark.dli.job.agency.name=agency
For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.
-
Access Metadata
Whether to allow the Spark job to access metadata.
Yes
Metadata Source
Type of metadata the Spark job accesses. In this scenario, select lakeformation.
Once set to lakeformation, the system automatically adds the following configurations to your job to load LakeFormation dependencies.
spark.sql.catalogImplementation=hive spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient og // Load LakeFormation dependencies. spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
You can also set the metadata source in the Spark Arguments(--conf) parameter. This means if you set the metadata source in both Metadata Source and Spark Arguments(--conf), the system preferentially uses the information configured in Spark Arguments(--conf).
You are advised to set the metadata source through Metadata Source.
LakeFormation
Catalog Name
Name of the data catalog the Spark job accesses.
The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance. To specify other LakeFormation instances, configure the LakeFormation instances and data catalogs to be connected in Spark Arguments(--conf). For details, see Method 2: Configure a Spark Jar job's access to LakeFormation metadata using the Spark Arguments(--conf) parameter.
Once set to LakeFormation, the system automatically adds the following configuration to your job to connect to the data catalog of the default LakeFormation instance:
spark.hadoop.lakecat.catalogname.default=lfcatalog
You can also set the data catalog name in the Spark Arguments(--conf) parameter. This means if you set the data catalog name in both Catalog Name and Spark Arguments(--conf), the system preferentially uses the information configured in Spark Arguments(--conf).
You are advised to set the data catalog name through Catalog Name.
-
Spark Arguments(--conf)
You can also set the metadata source and data catalog name in the Spark Arguments(--conf) parameter. This means if you set the metadata source and data catalog name in both Metadata Source and Catalog Name, as well as Spark Arguments(--conf), the system preferentially uses the information configured in Spark Arguments(--conf).
- To access a Hudi data table, add the following configurations to the Spark Arguments(--conf) parameter:
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider
- To access a Delta data table, add the following configurations to the Spark Arguments(--conf) parameter:
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
-
- To access a Hudi data table, add the following configurations to the Spark Arguments(--conf) parameter:
- Method 2: Configure a Spark Jar job's access to LakeFormation metadata using the Spark Arguments(--conf) parameter
When creating or editing a Spark Jar job, configure the following information in the Spark Arguments(--conf) parameter on the job configuration page to access LakeFormation metadata:
spark.sql.catalogImplementation=hive spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension // Hudi is supported, which is optional. spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider // Hudi is supported, which is optional. // Access using an agency with the OBS and LakeFormation permissions. You are advised to configure the minimum permission set. spark.dli.job.agency.name=agencyForLakeformation // ID of the LakeFormation instance to be accessed, which can be viewed on the LakeFormation console. This parameter is optional. If unset, the default LakeFormation instance is accessed. spark.hadoop.lakeformation.instance.id=xxx // Name of the catalog to be accessed on the LakeFormation side, which can be viewed on the LakeFormation console. This parameter is optional. If unset, the default value is hive. spark.hadoop.lakecat.catalogname.default=lfcatalog // Load LakeFormation dependencies. spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
- (Recommended) Method 1: Configure a Spark Jar job's access to LakeFormation metadata using the parameters (such as agency and metadata source) provided on the console
- Example Spark Jar
- DLI Flink OpenSource SQL
- Example 1: Connecting DLI to LakeFormation using an agency
Create a Flink OpenSource SQL job and set the following parameters:
Parameter
Description
Example Value
Flink Version
Flink 1.15 or later can connect to LakeFormation.
1.15
Agency
Before using Flink 1.15 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:
flink.dli.job.agency.name=agency
For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.
-
Enable Checkpointing
Select it.
Select it.
Runtime Configuration
- Type of metadata the Flink job accesses.
In this scenario, select lakeformation.
flink.dli.job.catalog.type=lakeformation
- Name of the data catalog the Flink job accesses.
flink.dli.job.catalog.name=[Catalog name in LakeFormation]
The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance.
-
For the catalog parameters in the example, see Table 4.
Table 4 Catalog parameters in the example Flink OpenSource SQL Parameter
Description
Mandatory
Example Value
type
Catalog type
Yes
Fixed at hive
hive-conf-dir
hive-conf path, which is fixed at /opt/flink/conf.
Yes
Fixed at /opt/flink/conf
default-database
Default database name
No
Default database
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
CREATE CATALOG hive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/opt/flink/conf', -- Fixed at /opt/flink/conf 'default-database'='default' ); USE CATALOG hive; CREATE TABLE IF NOT EXISTS dataGenSource612 (user_id string, amount int) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '3' ); CREATE table IF NOT EXISTS printSink612 (user_id string, amount int) WITH ('connector' = 'print'); INSERT INTO printSink612 SELECT * FROM dataGenSource612;
- Type of metadata the Flink job accesses.
- Example 2: Connecting DLI to LakeFormation using DEW
Create a Flink OpenSource SQL job and set the following parameters:
Parameter
Description
Example Value
Flink Version
Flink 1.15 or later can connect to LakeFormation.
1.15
Agency
Before using Flink 1.15 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:
flink.dli.job.agency.name=agency
For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.
-
Enable Checkpointing
Select it.
Select it.
Runtime Configuration
- Type of metadata the Flink job accesses.
In this scenario, select lakeformation.
flink.dli.job.catalog.type=lakeformation
- Name of the data catalog the Flink job accesses.
flink.dli.job.catalog.name=[Catalog name in LakeFormation]
The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance.
-
For the catalog parameters in the example, see Table 5.
Set properties.catalog.lakeformation.auth.identity.util.class to com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator and configure DEW.
Table 5 Catalog parameters in the example Flink OpenSource SQL (using DEW) Parameter
Description
Mandatory
Example Value
type
Catalog type
Yes
Fixed at hive
hive-conf-dir
hive-conf path, which is fixed at /opt/flink/conf.
Yes
Fixed at /opt/flink/conf
default-database
Default database name
No
If unset, the default database is used.
properties.catalog.lakecat.auth.identity.util.class
Authentication information acquisition class
Yes
Mandatory when DEW is used, which is fixed at com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator.
properties.catalog.dew.projectId
ID of the project DEW belongs to. The default value is the ID of the project where the Flink job is.
Yes
Mandatory when DEW is used
properties.catalog.dew.endpoint
Endpoint of the DEW service to be used.
Yes
Mandatory when DEW is used.
Example: kms.xxx.com
properties.catalog.dew.csms.secretName
Name of the shared secret in DEW's secret management.
Yes
Mandatory when DEW is used
properties.catalog.dew.csms.version
Version number of the shared secret in DEW's secret management.
Yes
Mandatory when DEW is used
properties.catalog.dew.access.key
Enter the key corresponding to the access.key value in DEW's secret.
Yes
Mandatory when DEW is used
properties.catalog.dew.secret.key
Enter the key corresponding to the secret.key value in DEW's secret.
Yes
Mandatory when DEW is used
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/opt/flink/conf', 'default-database'='default', -- The following is the DEW configuration. Change the parameter values based on site requirements. 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator', 'properties.catalog.dew.endpoint'='kms.xxx.com', 'properties.catalog.dew.csms.secretName'='obsAksK', 'properties.catalog.dew.access.key' = 'myak', 'properties.catalog.dew.secret.key' = 'mysk', 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxx', 'properties.catalog.dew.csms.version'='v9' ); USE CATALOG myhive; create table IF NOT EXISTS dataGenSource_dew612( user_id string, amount int ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '3' ); create table IF NOT EXISTS printSink_dew612( user_id string, amount int ) with ( 'connector' = 'print' ); insert into printSink_dew612 select * from dataGenSource_dew612;
- Type of metadata the Flink job accesses.
- Example 3: Connecting DLI to LakeFormation using an agency to write data to a Hudi table
Create a Flink OpenSource SQL job and set the following parameters:
Parameter
Description
Example Value
Flink Version
Flink 1.15 or later can connect to LakeFormation.
1.15
Agency
Before using Flink 1.15 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:
flink.dli.job.agency.name=agency
For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.
-
Enable Checkpointing
Select it.
Select it.
Runtime Configuration
- Type of metadata the Flink job accesses.
In this scenario, select lakeformation.
flink.dli.job.catalog.type=lakeformation
- Name of the data catalog the Flink job accesses.
flink.dli.job.catalog.name=[Catalog name in LakeFormation]
The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance.
-
For the catalog parameters in the example, see Table 6.
Table 6 Parameters for configuring a catalog of the Hudi type Parameter
Description
Mandatory
Example Value
type
Catalog type
Yes
hudi for a Hudi table
hive-conf-dir
hive-conf path. The value is fixed at /opt/flink/conf.
Yes
Fixed at /opt/flink/conf
default-database
Default database name
No
Default database
mode
The value can be 'hms' or 'non-hms'.
- hms indicates that the created Hudi catalog uses Hive Metastore to store metadata.
- non-hms indicates that Hive Metastore is not used to store metadata.
Yes
Fixed at hms
Table 7 Connector parameters for a Hudi sink table Parameter
Description
Mandatory
Example Value
connector
Flink connector type
If set to hudi, the sink table is a Hudi table.
Yes
hudi
path
Basic path of the table. If the path does not exist, the system will create it.
Yes
Refer to the values configured in the sample code.
hoodie.datasource.write.recordkey.field
Unique key field name of the Hoodie table
No
Set order_id to a unique key.
EXTERNAL
Whether the table is foreign
Yes
Mandatory for the Hudi table and must be set to true.
true
CREATE CATALOG hive_catalog WITH ( 'type'='hive', 'hive-conf-dir' = '/opt/flink/conf', 'default-database'='test' ); USE CATALOG hive_catalog; create table if not exists genSource618 ( order_id STRING, order_name STRING, price INT, weight INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.order_id.kind' = 'random', 'fields.order_id.length' = '8', 'fields.order_name.kind' = 'random', 'fields.order_name.length' = '5' ); CREATE CATALOG hoodie_catalog WITH ( 'type'='hudi', 'hive.conf.dir' = '/opt/flink/conf', 'mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence ); CREATE TABLE if not exists hoodie_catalog.`test`.`hudiSink618` ( `order_id` STRING PRIMARY KEY NOT ENFORCED, `order_name` STRING, `price` INT, `weight` INT, `create_time` BIGINT, `create_date` String ) PARTITIONED BY (create_date) WITH ( 'connector' = 'hudi', 'path' = 'obs://xxx/catalog/dbtest3/hudiSink618', 'hoodie.datasource.write.recordkey.field' = 'order_id', 'write.precombine.field' = 'create_time', 'EXTERNAL' = 'true' -- must be set ); insert into hoodie_catalog.`test`.`hudiSink618` select order_id, order_name, price, weight, UNIX_TIMESTAMP() as create_time, FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date from genSource618;
- Type of metadata the Flink job accesses.
- Example 1: Connecting DLI to LakeFormation using an agency
- DLI Flink Jar
- Example 1: Connecting DLI to LakeFormation using an agency
- Develop a Flink Jar program, compile and upload the JAR file to OBS. In this example, the file is uploaded to the obs://obs-test/dlitest/ directory.
The sample code is as follows:
In this example, random data is generated using the DataGen table and then output to the Print result table.
For other connector types, see List of Connectors Supported by Flink 1.15.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; @SuppressWarnings({"deprecation", "rawtypes", "unchecked"}) public class GenToPrintTaskAgency { private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "180000000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" + " 'type' = 'hive',\n" + " 'hive-conf-dir' = '/opt/hadoop/conf'\n" + " );"; tEnv.executeSql(createCatalog); String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJar618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.user_id.kind' = 'random',\n" + " 'fields.user_id.length' = '3'\n" + ")"; /*testdb is a custom database.*/ tEnv.executeSql(dataSource); String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJar618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH ('connector' = 'print')"; tEnv.executeSql(printSink); /*testdb is a custom database.*/ String query = "insert into lf_catalog.`test`.`printSinkJar618_1` " + "select * from lf_catalog.`test`.`dataGenSourceJar618_1`"; tEnv.executeSql(query); } }
- Create a Flink Jar job and set the following parameters:
Parameter
Description
Example Value
Flink Version
Flink 1.15 or later can connect to LakeFormation.
1.15
Agency
Before using Flink 1.15 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:
flink.dli.job.agency.name=agency
For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.
-
Runtime Configuration
- Type of metadata the Flink job accesses.
In this scenario, select lakeformation.
flink.dli.job.catalog.type=lakeformation
- Name of the data catalog the Flink job accesses.
flink.dli.job.catalog.name=[Catalog name in LakeFormation]
The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance.
-
- Type of metadata the Flink job accesses.
- Develop a Flink Jar program, compile and upload the JAR file to OBS. In this example, the file is uploaded to the obs://obs-test/dlitest/ directory.
- Example 2: Connecting DLI to LakeFormation using DEW
- Develop a Flink Jar program, compile and upload the JAR file to OBS. In this example, the file is uploaded to the obs://obs-test/dlitest/ directory.
The sample code is as follows:
In this example, random data is generated using the DataGen table and then output to the Print result table.
For other connector types, see List of Connectors Supported by Flink 1.15.
package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; @SuppressWarnings({"deprecation", "rawtypes", "unchecked"}) public class GenToPrintTaskDew { private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "180000000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" + " 'type' = 'hive',\n" + " 'hive-conf-dir' = '/opt/hadoop/conf',\n" + " 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',\n" + " 'properties.catalog.dew.endpoint'='kms.xxx.xxx.com',\n" + " 'properties.catalog.dew.csms.secretName'='obsAksK',\n" + " 'properties.catalog.dew.access.key' = 'ak',\n" + " 'properties.catalog.dew.secret.key' = 'sk',\n" + " 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxxx',\n" + " 'properties.catalog.dew.csms.version'='v9'\n" + " );"; tEnv.executeSql(createCatalog); String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJarDew618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.user_id.kind' = 'random',\n" + " 'fields.user_id.length' = '3'\n" + ")"; tEnv.executeSql(dataSource); /*testdb is a custom database.*/ String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJarDew618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH ('connector' = 'print')"; tEnv.executeSql(printSink); /*testdb is a custom database.*/ String query = "insert into lf_catalog.`test`.`printSinkJarDew618_1` " + "select * from lf_catalog.`test`.`dataGenSourceJarDew618_1`"; tEnv.executeSql(query); } }
- Create a Flink Jar job and set the following parameters:
Parameter
Description
Example Value
Flink Version
Flink 1.15 or later can connect to LakeFormation.
1.15
Agency
Before using Flink 1.15 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:
flink.dli.job.agency.name=agency
For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.
-
Runtime Configuration
- Type of metadata the Flink job accesses.
In this scenario, select lakeformation.
flink.dli.job.catalog.type=lakeformation
- Name of the data catalog the Flink job accesses.
flink.dli.job.catalog.name=[Catalog name in LakeFormation]
The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance.
-
- Type of metadata the Flink job accesses.
- Develop a Flink Jar program, compile and upload the JAR file to OBS. In this example, the file is uploaded to the obs://obs-test/dlitest/ directory.
- Example 3: Flink Jar jobs supporting Hudi tables
- Develop a Flink Jar program, compile and upload the JAR file to OBS. In this example, the file is uploaded to the obs://obs-test/dlitest/ directory.
The sample code is as follows:
In this example, random data is generated using the DataGen table and then output to the Hudi result table.
For other connector types, see List of Connectors Supported by Flink 1.15.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.SimpleDateFormat; public class GenToHudiTask4 { private static final Logger LOGGER = LoggerFactory.getLogger(GenToHudiTask4.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) throws IOException { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "30000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://xxx/jobs/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String catalog = "CREATE CATALOG hoodie_catalog\n" + " WITH (\n" + " 'type'='hudi',\n" + " 'hive.conf.dir' = '/opt/hadoop/conf',\n" + " 'mode'='hms'\n" + " )"; tEnv.executeSql(catalog); String dwsSource = "CREATE TABLE if not exists genSourceJarForHudi618_1 (\n" + " order_id STRING,\n" + " order_name STRING,\n" + " price INT,\n" + " weight INT\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.order_id.kind' = 'random',\n" + " 'fields.order_id.length' = '8',\n" + " 'fields.order_name.kind' = 'random',\n" + " 'fields.order_name.length' = '8'\n" + ")"; tEnv.executeSql(dwsSource); /*testdb is a custom database.*/ String printSinkdws = "CREATE TABLE if not exists hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` (\n" + " order_id STRING PRIMARY KEY NOT ENFORCED,\n" + " order_name STRING,\n" + " price INT,\n" + " weight INT,\n" + " create_time BIGINT,\n" + " create_date String\n" + ") WITH (" + "'connector' = 'hudi',\n" + "'path' = 'obs://xxx/catalog/dbtest3/hudiSinkJarHudi618_1',\n" + "'hoodie.datasource.write.recordkey.field' = 'order_id',\n" + "'EXTERNAL' = 'true'\n" + ")"; tEnv.executeSql(printSinkdws); /*testdb is a custom database.*/ String query = "insert into hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` select\n" + " order_id,\n" + " order_name,\n" + " price,\n" + " weight,\n" + " UNIX_TIMESTAMP() as create_time,\n" + " FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date\n" + " from genSourceJarForHudi618_1"; tEnv.executeSql(query); } }
Table 8 Connector parameters for a Hudi sink table Parameter
Description
Mandatory
Example Value
connector
Flink connector type
If set to hudi, the sink table is a Hudi table.
Yes
hudi
path
Basic path of the table. If the path does not exist, the system will create it.
Yes
Refer to the values configured in the sample code.
hoodie.datasource.write.recordkey.field
Unique key field name of the Hoodie table
No
Set order_id to a unique key.
EXTERNAL
Whether the table is foreign
Yes
Mandatory for the Hudi table and must be set to true.
true
- Create a Flink Jar job and set the following parameters:
Parameter
Description
Example Value
Flink Version
Flink 1.15 or later can connect to LakeFormation.
1.15
Agency
Before using Flink 1.15 or later to run jobs, you need to create an agency on the IAM console and add the new agency information. Once set, the system automatically adds the following agency configuration to your job:
flink.dli.job.agency.name=agency
For agency permission examples, see Creating a Custom DLI Agency and Agency Permission Policies in Common Scenarios.
-
Runtime Configuration
- Type of metadata the Flink job accesses.
In this scenario, select lakeformation.
flink.dli.job.catalog.type=lakeformation
- Name of the data catalog the Flink job accesses.
flink.dli.job.catalog.name=[Catalog name in LakeFormation]
The data catalog created on the DLI management console is selected, that is, the mapping between DLI and the data catalog of the default LakeFormation instance. This data catalog is connected to the data catalog of the default LakeFormation instance.
-
- Type of metadata the Flink job accesses.
- Develop a Flink Jar program, compile and upload the JAR file to OBS. In this example, the file is uploaded to the obs://obs-test/dlitest/ directory.
- Example 1: Connecting DLI to LakeFormation using an agency
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot