Using DLI to Submit a Flink Jar Job
Scenario
Flink Jar jobs are suitable for data analysis scenarios that require custom stream processing logic, complex state management, or integration with specific libraries. You need to write and build a Jar job package. Before submitting a Flink Jar job, upload the Jar job package to OBS and submit it together with the data and job parameters to run the job.
This example introduces the basic process of submitting a Flink Jar job package through the DLI console. Due to different service requirements, the specific writing of the Jar package may vary. It is recommended that you refer to the sample code provided by DLI and edit and customize it according to your actual business scenario. Get DLI Sample Code.
Procedure
Table 1 describes the procedure for submitting a Flink Jar job using DLI.
Complete the preparations in Preparations before performing the following operations.
Procedure |
Description |
---|---|
Prepare a Flink Jar job package and upload it to OBS. |
|
Step 2: Buy an Elastic Resource Pool and Add Queues to the Pool |
Create compute resources required for submitting the Flink Jar job. |
In cross-source analysis scenarios, use DEW to manage access credentials of data sources. |
|
Step 4: Create a Custom Agency to Allow DLI to Access DEW and Read Credentials |
Create an agency to allow DLI to access DEW. |
Step 5: Create a Flink Jar Job and Configure Job Information |
Create a Flink Jar job to analyze data. |
Preparations
- Register a Huawei ID and enable Huawei Cloud services. Make sure your account is not in arrears or frozen.
- Configure an agency for DLI.
To use DLI, you need to access services such as Object Storage Service (OBS), Virtual Private Cloud (VPC), and Simple Message Notification (SMN). If it is your first time using DLI, you will need to configure an agency to allow access to these dependent services.
- Log in to the DLI management console using your account. In the navigation pane on the left, choose Global Configuration > Service Authorization.
- On the agency settings page, select the agency permissions under Basic Usage, Datasource, and O&M and click Update.
- Check and understand the notes for updating the agency, and click OK. The DLI agency permissions are updated.
Figure 1 Configuring an agency for DLI
- Once configured, you can check the agency dli_management_agency in the agency list on the IAM console.
Step 1: Develop a JAR File and Upload It to OBS
Develop a JAR file offline as the DLI console does not support this capability. For development examples, refer to "Flink Jar Job Examples".
Develop a Flink Jar job program by referring to Flink Job Sample Code, compile it, and pack it into flink-examples.jar. Perform the following steps to upload the program:
Before submitting a Flink job, upload data files to OBS.
- Log in to the DLI console.
- In the service list, click Object Storage Service under Storage.
- Create a bucket. In this example, name it dli-test-obs01.
- On the displayed Buckets page, click Create Bucket in the upper right corner.
- On the displayed Create Bucket page, set Region and Bucket Name. Retain the default values for other parameters or modify them as needed.
Select a region that matches the location of the DLI console.
- Click Create Now.
- In the bucket list, click the name of the dli-test-obs01 bucket you just created to access its Objects tab.
- Click Upload Object. In the displayed dialog box, drag or add files or folders, for example, flink-examples.jar, to the upload area. Then, click Upload.
In this example, the path after upload is obs://dli-test-obs01/flink-examples.jar.
For more operations on the OBS console, see the Object Storage Service User Guide.
Step 2: Buy an Elastic Resource Pool and Add Queues to the Pool
To execute SQL jobs in datasource scenarios, you must use your own SQL queue as the existing default queue cannot be used. In this example, create an elastic resource pool named dli_resource_pool and a queue named dli_queue_01.
- Log in to the DLI management console.
- In the navigation pane on the left, choose Resources > Resource Pool.
- On the displayed page, click Buy Resource Pool in the upper right corner.
- On the displayed page, set the parameters.
- In this example, we will buy the resource pool in the CN East-Shanghai2 region. Table 2 describes the parameters.
Table 2 Parameters Parameter
Description
Example Value
Region
Select a region where you want to buy the elastic resource pool.
CN East-Shanghai2
Project
Project uniquely preset by the system for each region
Default
Name
Name of the elastic resource pool
dli_resource_pool
Specifications
Specifications of the elastic resource pool
Standard
CU Range
The maximum and minimum CUs allowed for the elastic resource pool
64-64
CIDR Block
CIDR block the elastic resource pool belongs to. If you use an enhanced datasource connection, this CIDR block cannot overlap that of the data source. Once set, this CIDR block cannot be changed.
172.16.0.0/19
Enterprise Project
Select an enterprise project for the elastic resource pool.
default
- Click Buy.
- Click Submit.
- In the elastic resource pool list, locate the pool you just created and click Add Queue in the Operation column.
- Set the basic parameters listed below.
Table 3 Basic parameters for adding a queue Parameter
Description
Example Value
Name
Name of the queue to add
dli_queue_01
Type
Type of the queue
- To execute SQL jobs, select For SQL.
- To execute Flink or Spark jobs, select For general purpose.
_
Engine
SQL queue engine. The options include Spark and Trino.
_
Enterprise Project
Select an enterprise project.
default
- Click Next and configure scaling policies for the queue.
Click Create to add a scaling policy with varying priority, period, minimum CUs, and maximum CUs.
Figure 2 shows the scaling policy configured in this example.Table 4 Scaling policy parameters Parameter
Description
Example Value
Priority
Priority of the scaling policy in the current elastic resource pool. A larger value indicates a higher priority. In this example, only one scaling policy is configured, so its priority is set to 1 by default.
1
Period
The first scaling policy is the default policy, and its Period parameter configuration cannot be deleted or modified.
The period for the scaling policy is from 00 to 24.
00–24
Min CU
Minimum number of CUs allowed by the scaling policy
16
Max CU
Maximum number of CUs allowed by the scaling policy
64
- Click OK.
Step 3: Use DEW to Manage Access Credentials
In cross-source analysis scenarios, you need to set attributes such as the username and password in the connector. However, information such as usernames and passwords is highly sensitive and needs to be encrypted to ensure user data privacy.
Data Encryption Workshop (DEW) is a secure, reliable, and easy-to-use solution for encrypting and decrypting private data while ensuring its security.
This example introduces how to create a shared secret in DEW.
- Log in to the DEW management console.
- In the navigation pane on the left, choose Cloud Secret Management Service > Secrets.
- Click Create Secret. On the displayed page, configure basic secret information.
- In this example, the key in the first line is the user's access key ID (AK).
- In this example, the key in the second line is the user's secret access key (SK).
Figure 3 Configuring access credentials in DEW
- Set access credential parameters on the DLI Flink Jar job editing page.
flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.access.key=USER_AK_CSMS_KEY_obstest1 flink.hadoop.fs.obs.bucket.USER_BUCKET_NAME.dew.secret.key=USER_SK_CSMS_KEY_obstest1 flink.hadoop.fs.obs.security.provider=com.dli.provider.UserObsBasicCredentialProvider flink.hadoop.fs.dew.csms.secretName=obsAksKflink.hadoop.fs.dew.endpoint=kmsendpoint flink.hadoop.fs.dew.csms.version=v6flink.hadoop.fs.dew.csms.cache.time.second=3600flink.dli.job.agency.name=agencyname
Step 4: Create a Custom Agency to Allow DLI to Access DEW and Read Credentials
- Log in to the management console.
- In the upper right corner of the page, hover over the username and select Identity and Access Management.
- In the navigation pane of the IAM console, choose Agencies.
- On the displayed page, click Create Agency.
- On the Create Agency page, set the following parameters:
- Agency Name: Enter an agency name, for example, dli_dew_agency_access.
- Agency Type: Select Cloud service.
- Cloud Service: This parameter is available only when you select Cloud service for Agency Type. Select Data Lake Insight (DLI) from the drop-down list.
- Validity Period: Select Unlimited.
- Description: You can enter Agency with OBS OperateAccess permissions. This parameter is optional.
- Click Next.
- Click the agency name. On the displayed page, click the Permissions tab. Click Authorize. On the displayed page, click Create Policy.
- Configure policy information.
- Enter a policy name, for example, dli-dew-agency.
- Select JSON.
- In the Policy Content area, paste a custom policy.
{ "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "csms:secretVersion:get", "csms:secretVersion:list", "kms:dek:decrypt" ] } ] }
- Enter a policy description as required.
- Click Next.
- On the Select Policy/Role page, select Custom policy from the first drop-down list and select the custom policy created in 8.
- Click Next. On the Select Scope page, set the authorization scope. In this example, select All resources.
For details about authorization operations, see Creating a User Group and Assigning Permissions.
- Click OK.
It takes 15 to 30 minutes for the authorization to be in effect.
Step 5: Create a Flink Jar Job and Configure Job Information
- Create a Flink Jar job.
- In the navigation pane on the left of the DLI management console, choose Job Management > Flink Jobs.
- On the displayed page, click Create Job in the upper right corner.
In this example, set Type to Flink Jar and Name to Flink_Jar_for_test.Figure 4 Creating a Flink Jar job
- Click OK.
- Configure basic job information.
Configure basic job information based on Table 5.
Table 5 Parameters Parameter
Mandatory
Description
Queue
Yes
Select a queue where you want to run your job.
Application
Yes
Select the custom package in Step 1: Develop a JAR File and Upload It to OBS.
Main Class
Yes
Class name of the JAR file to load
This parameter specifies the entry for the Fink job, that is, the class that contains the main method. This is the class that is executed first when a Flink job is started.
If the application program is of type .jar, the main class name must be provided.
The main class name is case-sensitive and must be correct.
- Default: Specified based on the Manifest file in the JAR file.
- Manually assign: You must enter the class name and confirm the class arguments (separated by spaces).
NOTE:When a class belongs to a package, the package path must be carried, for example, packagePath.KafkaMessageStreaming.
Flink Version
Yes
Flink version used for job running
If you choose to use Flink 1.15, make sure to configure the agency information for the cloud service that DLI is allowed to access in the job.
Agency
No
If you choose to use Flink 1.15, configure the agency name yourself to ensure smooth job running.
- Configure advanced settings for the Flink Jar job.
Configure the Flink Jar job based on Table 6.
Table 6 Advanced settings for the Flink Jar job Parameter
Mandatory
Description
CUs
Yes
One CU consists of one vCPU and 4 GB of memory. The number of CUs ranges from 2 to 400.
Job Manager CUs
Yes
Number of CUs allowed for the job manager. The value ranges from 1 to 4. The default value is 1.
Parallelism
Yes
Maximum number of parallel operators in a job
NOTE:- The value cannot exceed four times the number of compute units (CUs – Job Manager CUs).
- You are advised to set this parameter to a value greater than that configured in the code. Otherwise, job submission may fail.
Task Manager Config
No
Whether Task Manager resource parameters are set
If this option is selected, you need to set the following parameters:
- CU(s) per TM: Number of resources occupied by each Task Manager.
- Slot(s) per TM: Number of slots contained in each Task Manager.
Save Job Log
No
Whether job running logs are saved to OBS
If this option is selected, you need to set the following parameters:
OBS Bucket: Select an OBS bucket to store job logs. If the OBS bucket you selected is unauthorized, click Authorize.
Alarm on Job Exception
No
Whether to notify users of any job exceptions, such as running exceptions or arrears, via SMS or email.
If this option is selected, you need to set the following parameters:
SMN Topic
Select a custom SMN topic. For how to create a custom SMN topic, see "Creating a Topic" in the Simple Message Notification User Guide.
Auto Restart on Exception
No
Whether automatic restart is enabled. If enabled, jobs will be automatically restarted and restored when exceptions occur.
If this option is selected, you need to set the following parameters:
- Max. Retry Attempts: maximum number of retries upon an exception. The unit is times/hour.
- Unlimited: The number of retries is unlimited.
- Limited: The number of retries is user-defined.
- Restore Job from Checkpoint: Restore the job from the latest checkpoint.
If you select this parameter, you also need to set Checkpoint Path.
Checkpoint Path: Select a path for storing checkpoints. This path must match that configured in the application package. Each job must have a unique checkpoint path, or, you will not be able to obtain the checkpoint.
- Click Save in the upper right of the page.
- Click Start in the upper right corner.
- On the displayed Start Flink Job page, confirm the job specificationand fee and click Start Now to start the job.
Once the job is started, the system automatically switches to the Flink Jobs page. Locate the job you created and check its status in the Status column.
Once a job is successfully submitted, its status changes from Submitting to Running. After the execution is complete, the status changes to Completed.
If the job status is Submission failed or Running exception, the job fails to submit or run. In this case, you can hover over the status icon in the Status column of the job list to view the error details. You can click to copy these details. Rectify the fault based on the error information and resubmit the job.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.