Compiling and Running the Application
Scenario
After programming is complete, upload application code to the Linux client for running. The procedures for running applications developed using Scala or Java are the same on the Flink client.
Flink applications for Yarn clusters can run only on Linux.
Procedure
- In IntelliJ IDEA, configure Artifacts of the project before generating a JAR file.
- On the IDEA homepage, choose .
- On the Project Structure page, select Artifacts, click +, and choose JAR > Empty.
Figure 1 Adding Artifacts
- Set the name, type, and output path of the JAR file based on site requirements.
Figure 2 Setting basic information
- Right-click 'FlinkStreamJavaExample' compile output and choose Put into Output Root from the shortcut menu. Then, click Apply.
Figure 3 Put into Output Root
- Click OK.
- Generate a JAR file.
- On the IDEA home page, choose Figure 4 Building Artifacts
.
- In the displayed menu, choose Figure 5 Build
to generate the JAR file.
- If information similar to the following is displayed in the event log, the JAR file is generated. You can obtain the JAR file from the path configured in 1.c.
21:25:43 Compilation completed successfully in 36 sec
- On the IDEA home page, choose
- Copy the JAR file generated in 2 (for example, FlinkStreamJavaExample.jar) to the Flink operating environment on Linux (that is, the Flink client), for example, /opt/client. Create the conf directory in the directory and copy the required configuration file to the conf directory. For details, see Preparing the Operating Environment. Run the Flink application.
Start the Flink cluster before running the Flink applications on Linux. Run the yarn session command on the Flink client to start the Flink cluster. An example command is as follows:
bin/yarn-session.sh -jm 1024 -tm 1024
- Before running the yarn-session.sh command, copy the dependency package of the Flink application to the client directory {client_install_home}/Flink/flink/lib. For details about the dependency packages of the application, see Reference Information.
- The dependencies of different sample projects may conflict. When running a new sample project, you need to remove the dependencies copied from the old sample project to the {client_install_home}/Flink/flink/lib directory on the client.
- Run the source bigdata_env command in the client installation directory before running the yarn-session.sh command.
- The yarn-session.sh command must be run in the /Flink client installation directory/Flink/flink directory, for example, /opt/client/Flink/flink.
- Do not restart the HDFS service or all DataNode instances during Flink job running. Otherwise, the job may fail and some temporary application data cannot be cleared.
- Ensure that the user permissions on the JAR file and configuration file are the same as those on the Flink client. For example, the user is omm and the permission is 755.
- Running the DataStream sample application (in Scala or Java)
Open another window on the terminal. Go to the Flink client directory and use the bin/flink run script to run code.
- Java
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/client/FlinkStreamJavaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
- Scala
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/client/FlinkStreamScalaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
The log1.txt and log2.txt files must be stored on each node where NodeManager instances are deployed, and the permission is 755.
Table 1 Parameters Parameter
Description
<filePath>
File path in the local file system. The /opt/log1.txt and /opt/log2.txt files must be stored on every node. The default value can be retained or changed.
<windowTime>
Duration of a time window, in minutes. The default value can be retained or changed.
- Java
- Running the sample application for producing and consuming data in Kafka (in Java or Scala)
Bootstrap program for producing data
bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password]
Bootstrap program for consuming databin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password]
Table 2 Parameters Parameter
Description
Mandatory
topic
Kafka topic name
Yes
bootstrap.server
List of IP addresses or ports of broker clusters
Yes
security.protocol
The parameter can be set to PLAINTEXT (optional), SASL_PLAINTEXT, SSL, or SASL_SSL, corresponding to port 21005, 21007, 21008, or 21009 of the FusionInsight Kafka cluster, respectively.
- If SASL is configured, sasl.kerberos.service.name must be set to kafka and the configuration items related to security.kerberos.login in conf/flink-conf.yaml must be configured.
- If SSL is configured, ssl.truststore.location (path of truststore) and ssl.truststore.password (password of truststore) must be configured.
No
NOTE:- If this parameter is not configured, Kafka is in non-security mode.
- If SSL needs to be configured, find more information about how to generate the truststore.jks file in section "SSL Encryption Function Used by a Client" of Kafka Development Guide.
The following commands use ReadFromKafka as an example and the cluster domain name is HADOOP.COM:
- Command 1:
bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:9092
- Command 2:
bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21005 --security.protocol PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.com
- Command 3:
bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password xxx
- Command 4:
bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21005 --security.protocol PLAINTEXT --sasl.kerberos.service.name kafka --ssl.truststore.location config/truststore.jks --ssl.truststore.password xxx --kerberos.domain.name hadoop.hadoop.com
- Running the sample application of the asynchronous checkpoint mechanism (in Scala or Java)
In Java sample code, the processing time is used as a timestamp for data stream. In Scala sample code, the event time is used as a timestamp for data stream.
Save checkpoint snapshot information to HDFS.- Java code
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain /opt/client/FlinkCheckpointJavaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
- Scala code
bin/flink run --class com.huawei.bigdata.flink.examples.FlinkEventTimeAPIChkMain /opt/client/FlinkCheckpointScalaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
- Checkpoint source file path: flink/checkpoint/fd5f5b3d08628d83038a30302b611/chk-X/4f854bf4-ea54-4595-a9d9-9b9080779ffe
flink/checkpoint: indicates the specified root directory.
fd5f5b3d08628d83038a30302b611: indicates the level-2 directory named after jobID.
chk-X: "X" indicates the checkpoint number, which is the level-3 directory.
4f854bf4-ea54-4595-a9d9-9b9080779ffe: indicates a checkpoint source file.
- If Flink is in cluster mode, checkpoints store the file in HDFS. A local path can be used only when Flink is in local mode, facilitating commissioning.
- Java code
- Running the Pipeline sample application
- Java code
- Start the publisher job.
bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipelineNettySink /opt/client/FlinkPipelineJavaExample.jar
- Start the subscriber Job1.
bin/flink run --class com.huawei.bigdata.flink.examples.TestPipelineNettySource1 /opt/client/FlinkPipelineJavaExample.jar
- Start the subscriber Job2.
bin/flink run --class com.huawei.bigdata.flink.examples.TestPipelineNettySource2 /opt/client/FlinkPipelineJavaExample.jar
- Start the publisher job.
- Scala
- Start the publisher job.
bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipeline_NettySink /opt/client/FlinkPipelineScalaExample.jar
- Start the subscriber Job1.
bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 /opt/client/FlinkPipelineScalaExample.jar
- Start the subscriber Job2.
bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 /opt/client/FlinkPipelineScalaExample.jar
- Start the publisher job.
- Java code
- Running the Stream SQL Join sample application
- Start the application to generate data for Kafka. For details about Kafka configuration, see Running the sample application for producing and consuming data in Kafka (in Java or Scala).
bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092
- Run the netcat command on any node in the cluster to wait for an application connection.
netcat -l -p 9000
If "command not found" is displayed, install NetCat and run the command again.
- Start the application to receive socket data and perform a joint query.
bin/flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/client/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port 9000
- Start the application to generate data for Kafka. For details about Kafka configuration, see Running the sample application for producing and consuming data in Kafka (in Java or Scala).
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot