Running a Flink Job
Flink jobs are distributed data processing tasks developed based on the Flink framework. They are mainly used for stream data processing and stateful computations. Flink is a unified framework for computations over unbounded (streams) and bounded (batches) data. Flink is designed with stream processing as its core, and it handles batch processing (finite data streams) as a special case of stream processing. Flink features real-time data processing with high throughput, low latency, and exactly-once semantics. It is widely used in real-time monitoring, log analysis, and financial transactions.
MRS allows you to submit and run your own programs, and get the results. This section will show you how to submit a Flink job in an MRS cluster.
You can create a job online and submit it for running on the MRS console, or submit a job in CLI mode on the MRS cluster client.
Prerequisites
- You have uploaded the program packages and data files required by jobs to OBS or HDFS.
- If the job program needs to read and analyze data in the OBS file system, you need to configure storage-compute decoupling for the MRS cluster. For details, see Configuring Storage-Compute Decoupling for an MRS Cluster.
Notes and Constraints
- When the policy of the user group to which an IAM user belongs changes from MRS ReadOnlyAccess to MRS CommonOperations, MRS FullAccess, or MRS Administrator, or vice versa, wait for five minutes after user synchronization for the System Security Services Daemon (SSSD) cache of the cluster node to refresh. Submit a job on the MRS console after the new policy takes effect. Otherwise, the job submission may fail.
- If the IAM username contains spaces (for example, admin 01), you cannot create jobs on the MRS console.
Video Tutorial
This tutorial demonstrates how to submit a Flink job on the console of an MRS cluster with Kerberos authentication disabled for processing data stored in OBS.
The UI may vary depending on the version. This tutorial is for reference only.
Submitting a Job
You can create and run jobs online using the management console or submit jobs by running commands on the cluster client.
- Prepare the application and data.
This section uses the Flink word count application as an example. You can obtain the sample program from the MRS cluster client (Client installation directory/Flink/flink/examples/batch/WordCount.jar) and upload the program to a specified directory in HDFS or OBS. For details, see Uploading Application Data to an MRS Cluster.
To run the application, you need to specify the following parameters:
- input: data file to be analyzed, which must be uploaded to the HDFS or OBS file system in advance.
For example, upload data file data2.txt. The file content is as follows:
This is a test demo for MRS Flink. Flink is a unified computing framework that supports both batch processing and stream processing. It provides a stream data processing engine that supports data distribution and parallel computing.
- output: file for storing the word count result.
- input: data file to be analyzed, which must be uploaded to the HDFS or OBS file system in advance.
- Log in to the MRS console.
- On the Active Clusters page, select a running cluster and click its name to switch to the cluster details page.
- On the Dashboard page, click Synchronize on the right side of IAM User Sync to synchronize IAM users.
Perform this step only when Kerberos authentication is enabled for the cluster.
After IAM user synchronization, wait for five minutes before submitting a job. For details about IAM user synchronization, see Synchronizing IAM Users to MRS.
- Click Job Management. On the displayed job list page, click Create.
- Set Type to Flink and configure Flink job information by referring to Table 1.
Figure 1 Creating a Flink job
Table 1 Job parameters Parameter
Description
Example
Name
Job name. It can contain 1 to 64 characters. Only letters, digits, hyphens (-), and underscores (_) are allowed.
flink_job
Program Path
Path of the program package to be executed. You can enter the path or click HDFS or OBS to select a file.
- The path can contain a maximum of 1,023 characters. It cannot contain special characters (;|&>,<'$) and cannot be left blank or all spaces.
- The OBS program path starts with obs://. The HDFS program path starts with hdfs://hacluster, for example, hdfs://hacluster/user/XXX.jar.
- The Flink job execution program must end with .jar.
obs://mrs-demotest/program/WordCount.jar
Tunning Parameters
(Optional) Used to configure optimization parameters such as threads, memory, and vCPUs for the job to optimize resource usage and improve job execution performance.
Table 2 lists the common program parameters of Flink jobs. You can configure the parameters based on the execution program and cluster resources. If you do not configure the parameters, the default values of the cluster are used.
-
Runtime Parameters
(Optional) Key parameters for program execution. The parameters are specified by the function of the user's program. MRS is only responsible for loading the parameters.
Multiple parameters are separated by spaces. The value can contain a maximum of 150,000 characters and can be left blank. The value cannot contain special characters such as ;|&><'$
CAUTION:When entering a parameter containing sensitive information (for example, login password), you can add an at sign (@) before the parameter name to encrypt the parameter value. This prevents the sensitive information from being persisted in plaintext.
When you view job information on the MRS console, the sensitive information is displayed as *.
Example: username=testuser @password=User password
--input obs://mrs-demotest/input/data2.txt --output obs://mrs-demotest/output.txt
Service Parameter
(Optional) Service parameters for the job.
To modify the current job, change this parameter. For permanent changes to the entire cluster, refer to Modifying the Configuration Parameters of an MRS Cluster Component and modify the cluster component parameters accordingly.
For example, if decoupled storage and compute is not configured for the MRS cluster and jobs need to access OBS using AK/SK, you can add the following service parameters:
- fs.obs.access.key: key ID for accessing OBS.
- fs.obs.secret.key: key corresponding to the key ID for accessing OBS.
-
Command Reference
Commands submitted to the background for execution when a job is submitted.
N/A
Table 2 Parameters for running a Flink job Parameter
Description
Example
-ytm
Memory size of each TaskManager container. (You can select a unit as required, and the default value is MB.)
1024
-yjm
Memory size of JobManager container. (You can select a unit as required, and the default value is MB.)
1024
-ys
Number of TaskManager cores.
2
-ynm
Custom name of an application on YARN.
test
-c
Class of the program entry method (for example, the main or getPlan() method). This parameter is required only when the JAR program does not specify the class of its manifest.
com.bigdata.mrs.test
- Confirm job configuration information and click OK.
- After the job is submitted, you can view the job running status and execution result in the job list. After the job status changes to Completed, you can view the analysis result of related programs.
In this sample program, you can click View Log to view the detailed execution process of the Flink job.
After the job is executed, you can view the word count result in the specified result output directory.
For example, the result is stored in the obs://mrs-demotest/output.txt file, and the file content is as follows:
a 3 and 2 batch 1 both 1 computing 2 data 2 demo 1 distribution 1 engine 1 flink 2 for 1 framework 1 is 2 it 1 mrs 1 parallel 1 processing 3 provides 1 stream 2 supports 2 test 1 that 2 this 1 unified 1
- Prepare the application and data.
This section uses the Flink word count application as an example. You can obtain the sample program from the MRS cluster client (Client installation directory/Flink/flink/examples/batch/WordCount.jar) and upload the program to a specified directory in HDFS or OBS. For details, see Uploading Application Data to an MRS Cluster.
To run the application, you need to specify the following parameters:
- input: data file to be analyzed, which must be uploaded to the HDFS or OBS file system in advance.
For example, upload data file data2.txt. The file content is as follows:
This is a test demo for MRS Flink. Flink is a unified computing framework that supports both batch processing and stream processing. It provides a stream data processing engine that supports data distribution and parallel computing.
- output: file for storing the word count result.
- input: data file to be analyzed, which must be uploaded to the HDFS or OBS file system in advance.
- If Kerberos authentication has been enabled for the current cluster, create a service user with job submission permissions on FusionInsight Manager in advance. For details, see Creating an MRS Cluster User.
- In this example, create human-machine user testuser, and associate the user with user group supergroup and role System_administrator.
- On FusionInsight Manager, choose System > Permission > User. On the displayed page, locate the row that contains the added user, click More in the Operation column, and select Download Authentication Credential.
- Decompress the downloaded authentication credential package and copy the obtained file to the client node, for example, the /opt/flinkclient/Flink/flink/conf directory on the client node. If the client is installed on a node outside the cluster, copy the obtained files to the /etc/ directory on the node.
- Install an MRS cluster client.
For details, see Installing an MRS Cluster Client. The subsequent operations use the installation directory /opt/flinkclient as an example.
- Log in to the node where the client is located as the MRS cluster client installation user.
For details, see Logging In to an MRS Cluster Node.
- Run the following command to go to the client installation directory:
cd /opt/flinkclient
Run the following command to load the environment variables:
source bigdata_env
- If Kerberos authentication has been enabled for the current cluster, modify Flink security configuration parameters.
- MRS 3.x or later: In a cluster with Kerberos authentication enabled, you need to add the floating IP addresses of the client installation node and cluster's FusionInsight Manager to the jobmanager.web.allow-access-address configuration item in the flink-conf.yaml file. Use commas (,) to separate the IP addresses.
vi Flink/flink/conf/flink-conf.yaml
For example, the changes are as follows:
jobmanager.web.allow-access-address: 192.168.0.219,192.168.0.153,192.168.0.47,192.168.0.127
You can run the ifconfig command on the OMS node of the cluster to view the floating IP address of the cluster's FusionInsight Manager. The value of ethX:wsom indicates the floating IP address.
- Search for and configure the keytab path and username in the flink-conf.yaml file.
For example, the changes are as follows:
security.kerberos.login.keytab: /opt/flinkclient/Flink/flink/conf/user.keytab security.kerberos.login.principal: testuser
- Run a command to perform Flink security hardening.
Run the following command to go to the Flink client directory:
cd Flink/flink/bin
Execute the following script and set a custom password as prompted. This script automatically replaces the SSL-related configuration values in Client directory/Flink/flink/conf/flink-conf.yaml. For versions earlier than MRS 3.x, external SSL is disabled by default for clusters in security mode. If you need to enable external SSL, configure the parameters and execute the script again. Table 3 describes the parameters.
sh generate_keystore.sh
Table 3 Parameter description Parameter
Description
Example Value
security.ssl.rest.enabled
Switch to enable external SSL.
true
security.ssl.rest.keystore
Path for storing keystore.
${path}/flink.keystore
security.ssl.rest.keystore-password
Password of the keystore. 123456 indicates a user-defined password is required.
123456
security.ssl.rest.key-password
Password of the SSL key. 123456 indicates a user-defined password is required.
123456
security.ssl.rest.truststore
Path for storing the truststore.
${path}/flink.truststore
security.ssl.rest.truststore-password
Password of the truststore. 123456 indicates a user-defined password is required.
123456
- Use relative paths for flink.keystore and flink.truststore files.
- Create a new directory, for example, ssl, in Flink/flink/conf/.
mkdir /opt/flinkclient/Flink/flink/conf/ssl
- Move the flink.keystore and flink.truststore file to the /opt/Bigdata/client/Flink/flink/conf/ssl/ directory.
mv flink.keystore /opt/flinkclient/Flink/flink/conf/ssl/
mv flink.truststore /opt/flinkclient/Flink/flink/conf/ssl/
- For MRS 3.x or later: Change the values of the following parameters in the flink-conf.yaml file to relative paths:
security.ssl.keystore: ssl/flink.keystore security.ssl.truststore: ssl/flink.truststore
- For MRS 3.x or earlier: Change the values of the following parameters in the flink-conf.yaml file to relative paths:
security.ssl.internal.keystore: ssl/flink.keystore security.ssl.internal.truststore: ssl/flink.truststore
- Create a new directory, for example, ssl, in Flink/flink/conf/.
- If the cluster client is installed on a node outside the cluster, add the following configurations to the flink-conf.yaml file to configure the IP address of the node where the client is located:
web.access-control-allow-origin: 192.168.0.150 jobmanager.web.allow-access-address: 192.168.0.150
- MRS 3.x or later: In a cluster with Kerberos authentication enabled, you need to add the floating IP addresses of the client installation node and cluster's FusionInsight Manager to the jobmanager.web.allow-access-address configuration item in the flink-conf.yaml file. Use commas (,) to separate the IP addresses.
- Run the Flink job.
This section uses the word count sample program provided by the client as an example.
- Security-mode cluster (Kerberos authentication enabled)
- Run the command in the directory at the same level as ssl to start the session and submit the job in the session.
For example, if the ssl directory is /opt/flinkclient/Flink/flink/conf/, run the following command in the /opt/flinkclient/Flink/flink/conf/ directory:
yarn-session.sh -t ssl/ -nm "session-name"
Open another client connection window and run the following command to initialize environment variables:
source /opt/flinkclient/bigdata_env
Run the following command to submit the job:
flink run /opt/flinkclient/Flink/flink/examples/batch/WordCount.jar --input obs://mrs-demotest/input/data2.txt --output obs://mrs-demotest/output.txt
- Run the following command to submit a single job on YARN:
flink run -m yarn-cluster /opt/flinkclient/Flink/flink/examples/batch/WordCount.jar --input obs://mrs-demotest/input/data2.txt --output obs://mrs-demotest/output.txt
- Run the command in the directory at the same level as ssl to start the session and submit the job in the session.
- Normal-mode cluster (Kerberos authentication disabled)
- Run the following commands in the /opt/flinkclient/Flink/flink/conf/ directory to start a session and submit the job in the session:
yarn-session.sh -nm "session-name" -d
Open another client connection window and run the following command to submit the job:
flink run /opt/flinkclient/Flink/flink/examples/batch/WordCount.jar --input obs://mrs-demotest/input/data2.txt --output obs://mrs-demotest/output.txt
- Run the following command to submit a single job on YARN:
flink run -m yarn-cluster /opt/flinkclient/Flink/flink/examples/batch/WordCount.jar --input obs://mrs-demotest/input/data2.txt --output obs://mrs-demotest/output.txt
- Run the following commands in the /opt/flinkclient/Flink/flink/conf/ directory to start a session and submit the job in the session:
- Security-mode cluster (Kerberos authentication enabled)
- Log in to FusionInsight Manager as user testuser, choose Cluster > Services > Yarn, and click the hyperlink on the right of ResourceManager Web UI to access the YARN Web UI. Click the application ID of the job to view the job running information and related logs.
Figure 2 Viewing Flink job details
- After the job is executed, you can view the word count result in the specified result output directory.
For example, the result is stored in the obs://mrs-demotest/output.txt file, and the file content is as follows:
a 3 and 2 batch 1 both 1 computing 2 data 2 demo 1 distribution 1 engine 1 flink 2 for 1 framework 1 is 2 it 1 mrs 1 parallel 1 processing 3 provides 1 stream 2 supports 2 test 1 that 2 this 1 unified 1
Helpful Links
- You can view logs of each job created on the MRS console. For details, see Viewing MRS Job Details and Logs.
- Kerberos authentication has been enabled for a cluster and IAM user synchronization has not been performed. When you submit a job, an error is reported. For details about how to handle the error, see What Can I Do If the System Displays a Message Indicating that the Current User Does Not Exist on Manager When I Submit a Job?
- After a job is submitted, you can view the logs of a specified YARN task. For details, see How Do I View Logs of a Specified YARN Task?
- For more Flink job troubleshooting cases, see Job Management FAQs and Flink Troubleshooting.
- For more MRS application development sample programs, see MRS Developer Guide.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot