Compiling and Commissioning the Flink Application
After the program code is developed, you can upload the code to the Linux client for running. The running procedures of applications developed in Scala or Java are the same.

Applications in Flink On YARN mode are allowed to run in a Linux-based environment, but not in a Windows-based environment.
- In IntelliJ IDEA, click Reload All Maven Projects in the Maven window on the right of IDEA to import Maven project dependencies.
Figure 1 Reload projects
- Compile and run the application.
Use either of the following two methods:
- Method 1:
- Choose Maven, locate the target project name, and double-click clean under Lifecycle to run the clean command of Maven.
- Choose Maven, locate the target project name, and double-click install under Lifecycle to run the install command of Maven.
Figure 2 Maven clean and install
- Method 2: Go to the directory where the pom.xml file is located in the Terminal window of IDEA, and run the mvn clean install command to build the file.
Figure 3 Entering mvn clean install in the IDEA Terminal text box
- Method 1:
- After the compilation is complete, the message "BUILD SUCCESS" is printed and the target directory is generated. Obtain the JAR package in the directory.
Figure 4 Compilation completed
- Copy the .jar package (for example FlinkStreamJavaExample.jar) created in 3 to the Flink running environment (Flink client), for example, /ophadoopclient, and then in that directory, create the conf folder and copy the required configuration files to the conf folder. For details, see Preparing an Operating Environment,to run the Flink application.
In a Linux-based environment, Flink cluster needs to be started in advance. Run the yarn session command on Flink client and start Flink clusters.
- Before running the command, copy the running dependency package of the Flink application to client directory #{client_install_home}/Flink/flink/lib. For details about the application running dependency package, see Sample Project Dependency Package Reference.
- When a Flink task is running, do not restart the HDFS service or all DataNode instances. Otherwise, the Flink task may fail, resulting loss of temporary data.
- The ssl/ directory is used to store SSL configuration files of SSL keystore and truststore.
- The following applies to MRS 3.2.1 or later. The memory size of TaskManagers specified by using the -tm command must be at least 4,096 MB.
cd /opt/hadoopclient/Flink/flink
bin/ -jm 1024 -tm 4096 -t conf/ssl/
- Run the DataStream sample program in Scala and Java.
Go to the Flink client directory and call the bin/flink run script to run the code.
- Java
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/hadoopclient/FlinkStreamJavaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
- Scala
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/hadoopclient/FlinkStreamScalaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
The log1.txt and log2.txt files must be stored on each node where the Yarn NodeManager instance is deployed and the permission is 755.
Table 1 Parameter description Parameter
File path in the local file system. The /opt/log1.txt and /opt/log2.txt files must be placed on each node. The default value can be retained or changed.
Duration of the window. The unit is minute. The default value can be retained or changed.
- Java
- Run the following code to interconnecting with Kafka (in Scala and Java.)
Execution of the production data command to start the program.
bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/hadoopclient/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [] [ssl.truststore.location] [ssl.truststore.password] []
Execution of the consumption data command to start the program.bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/hadoopclient/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [] [ssl.truststore.location] [ssl.truststore.password] []
Table 2 Parameter description Parameter
Mandatory (Yes/No)
The name of a Kafka topic.
The list of IP addresses or ports of broker clusters.
The parameter need be set to protocols PLAINTEXT (optional), SASL_PLAINTEXT, SSL, and SASL_SSL, and the corresponding FusionInsight Kafka ports are 9092, 21007, 9093, and 21009.
- If the SASL is configured, the value of must be set to kafka, the value of must be set to hadoop. System domain name and the security.kerberos.login parameter in conf/flink-conf.yaml are mandatory.
You can log in to Manager, choose System > Permission > Domain and Mutual Trust > Local Domain, can view the system domain name. All letters in the system domain name must be converted to lowercase letters.
- If the SSL is configured, ssl.truststore.location (path of truststore) and ssl.truststore.password (password of truststore) must be set.
NOTE:- If this parameter is not configured, the Kafka is non-secure.
- If SSL needs to be configured, see "Kafka Development Guide" > "SSL Encryption Function Used by a Client" to determine the file generation mode of the truststore.jks file.
- To run this example project, set to true. For details, see Configuring Kafka.
- Following .jar packages need to be added to Kafka applications:
- flink-dist_*.jar in the lib directory under the installation directory of Flink server.
- flink-connector-kafka_*.jar in the opt directory under the installation directory of Flink server.
- kafka-clients-*.jar in the lib directory under the installation directory of Kafka server or client.
- If the truststore.jks file path is an absolute path, the truststore.jks file must be stored in the specified directory of each Yarn nodemanager. If the path is a relative path, add the upload directory when running the script. For example, run the -t config/ *** command before running ▪Command 4:
Following is the example code corresponding to four protocols, take ReadFromKafka as an example, System domain name is HADOOP.COM:
- Command 1:
bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/hadoopclient/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers
- Command 2:
bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/hadoopclient/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers --security.protocol SASL_PLAINTEXT kafka
- Command 3:
bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/hadoopclient/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password xxx
- Command 4:
bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/hadoopclient/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers --security.protocol SASL_SSL kafka --ssl.truststore.location /config/truststore.jks --ssl.truststore.password xxx
- If the SASL is configured, the value of must be set to kafka, the value of must be set to hadoop. System domain name and the security.kerberos.login parameter in conf/flink-conf.yaml are mandatory.
- Asynchronous checkpoint mechanism (in Scala and Java).
The proctime is used as the timestamp for DataStream in Java, and the event time is used as the timestamp for DataStream in Scala. Following are examples of commands:
- Save the Checkpoint snapshot information to HDFS.
- Java
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain /opt/hadoopclient/FlinkCheckpointJavaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
- Scala
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkEventTimeAPIChkMain /opt/hadoopclient/FlinkCheckpointScalaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
- Java
- Save the Checkpoint snapshot information to a local path.
- Java
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain /opt/hadoopclient/FlinkCheckpointJavaExample.jar --chkPath file:///home/zzz/flink-checkpoint/
- Scala
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkEventTimeAPIChkMain /opt/hadoopclient/FlinkCheckpointScalaExample.jar --ckkPath file:///home/zzz/flink-checkpoint/
- Java
- Path to checkpoint source file: flink/checkpoint/checkpoint/fd5f5b3d08628d83038a30302b611/chk-X/4f854bf4-ea54-4595-a9d9-9b9080779ffe
flink/checkpoint //The specified root directory.
fd5f5b3d08628d83038a30302b611 //The second-level directory named after jobID
chk-X // The third-level directory. X indicates the checkpoint numbers.
4f854bf4-ea54-4595-a9d9-9b9080779ffe //Source files of checkpoint.
- If Flink is in cluster mode, use the HDFS path, because a local path can only be used when Flink is in local mode.
- Save the Checkpoint snapshot information to HDFS.
- Run the pipeline sample program.
- Java
- Start the publisher job.
bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipeline_NettySink /opt/hadoopclient/FlinkPipelineJavaExample.jar
- Start the subscriber Job1.
bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 /opt/hadoopclient/FlinkPipelineJavaExample.jar
- Start the subscriber Job2.
bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 /opt/hadoopclient/FlinkPipelineJavaExample.jar
- Start the publisher job.
- Scala
- Start the publisher job.
bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipeline_NettySink /opt/hadoopclient/FlinkPipelineScalaExample.jar
- Start the subscriber Job1.
bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 /opt/hadoopclient/FlinkPipelineScalaExample.jar
- Start the subscriber Job1.
bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 /opt/hadoopclient/FlinkPipelineScalaExample.jar
- Start the publisher job.
- Java
- Running the Stream SQL Join sample program
- Java
- Start the program to generate data for Kafka. For details about Kafka configuration, see • Run the following code to....
bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/hadoopclient/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers
- Run the netcat command on any node in the cluster to wait for the connection of the application.
netcat -l -p 9000
- Start the application to accept the socket data and perform the combined query.
bin/flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/hadoopclient/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers --hostname --port 9000
- Start the program to generate data for Kafka. For details about Kafka configuration, see • Run the following code to....
- Scala (for MRS 3.3.0 or later)
- Start the application to produce data in Kafka. For details about how to configure Kafka, see • Run the following code to.....
bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkStreamSqlJoinScalaExample.jar --topic topic-test --bootstrap.servers
- Run the netcat command on any node in the cluster to wait for an application connection.
netcat -l -p 9000
- Start the application to receive socket data and perform a joint query.
bin/flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/client/FlinkStreamSqlJoinScalaExample.jar --topic topic-test --bootstrap.servers --hostname --port 9000
- Start the application to produce data in Kafka. For details about how to configure Kafka, see • Run the following code to.....
- Java
- Running the Flink HBase sample program(MRS 3.2.0 or later clusters.)
- yarn-session
- Start the flink cluster.
./bin/ -t config -jm 1024 -tm 1024
- Run the Flink program and set parameters.
bin/flink run --class com.huawei.bigdata.flink.examples.WriteHBase /opt/client1/Flink/flink/FlinkHBaseJavaExample-xxx.jar --tableName xxx --confDir xxx bin/flink run --class com.huawei.bigdata.flink.examples.ReadHBase /opt/client1/Flink/flink/FlinkHBaseJavaExample-xxx.jar --tableName xxx --confDir xxx
- Start the flink cluster.
- yarn-cluster
bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.WriteHBase /opt/client1/Flink/flink/FlinkHBaseJavaExample-xxx.jar --tableName xxx --confDir xxx bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.ReadHBase /opt/client1/Flink/flink/FlinkHBaseJavaExample-xxx.jar --tableName xxx --confDir xxx
- yarn-session
- Running the Flink Hudi sample application (MRS 3.2.1 or later).
- yarn-session mode
- Start the Flink cluster.
./bin/ -t config -jm 1024 -tm 4096
- Run the Flink application and enter parameters.
./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoHudi /opt/test.jar --hudiTableName hudiSinkTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable ./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromHudi /opt/test.jar --hudiTableName hudiSourceTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable --read.start-commit xxx
- Start the Flink cluster.
- yarn-cluster mode
./bin/flink run -m yarn-cluster -yt config --class com.huawei.bigdata.flink.examples.WriteIntoHudi /opt/test.jar --hudiTableName hudiSinkTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable ./bin/flink run -m yarn-cluster -yt config --class com.huawei.bigdata.flink.examples.ReadFromHudi /opt/test.jar --hudiTableName hudiSourceTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable --read.start-commit xxx
- yarn-session mode
- Running the REST APIs for tenant sample program creation (The TestCreateTenants program is used as an example.)
- yarn-session mode
- Start the Flink cluster.
./bin/ -t config -jm 1024 -tm 4096
- Run the Flink program and enter parameters.
./bin/flink run --class com.huawei.bigdata.flink.examples.TestCreateTenants /opt/client/FlinkRESTAPIJavaExample-xxx.jar --hostName xx-xx-xx-xx --keytab xx/xx/user.keytab --krb5 xx/xx/krb5.conf --principal username
- Start the Flink cluster.
- yarn-cluster mode
./bin/flink run -m yarn-cluster -yt config -yjm 1024 -ytm 4096 --class com.huawei.bigdata.flink.examples.TestCreateTenants /opt//opt/client/FlinkRESTAPIJavaExample-xxx.jar --hostName xx-xx-xx-xx --keytab xx/xx/user.keytab --krb5 xx/xx/krb5.conf --principal username
- yarn-session mode
- Run a SQL task to submit Flink JAR jobs (applicable to MRS 3.2.1 or later).
- yarn-session mode
- Start the Flink cluster.
./bin/ -t config -jm 1024 -tm 4096
- Run the Flink application and enter parameters.
bin/flink run -d --class com.huawei.mrs.FlinkSQLExecutor /opt/flink-sql-xxx.jar --sql ./sql/datagen2kafka.sql
- Start the Flink cluster.
- yarn-cluster mode
bin/flink run -m yarn-cluster -yt config -yjm 1024 -ytm 4096 -d --class com.huawei.mrs.FlinkSQLExecutor /opt/flink-sql-xxx.jar --sql ./sql/datagen2kafka.sql
- yarn-session mode

For details about sample projects of dependency packages provided by Flink, see Sample Project Dependency Package Reference.
If HA is enabled for Flink (MRS 3.2.0 or later), the value of --hostName in the RESTAPI tenant creation example is the floating IP address of FlinkServer.
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.