Updated on 2023-08-31 GMT+08:00

Compiling and Running the Application

Scenario

After application code is developed, upload it to the Linux client to run applications. The procedures for running applications developed using Scala or Java are the same on the Flink client.

Flink applications of a YARN cluster can run only on Linux.

Procedure

  1. In IntelliJ IDEA, configure Artifacts of the project before generating a JAR file.

    1. On the IDEA homepage, choose File > Project Structures... to go to the Project Structure page.
    2. On the Project Structure page, select Artifacts, click +, and choose JAR > Empty.
      Figure 1 Adding Artifacts
    3. Set the name, type, and output path of the JAR file based on site requirements.
      Figure 2 Setting basic information
    4. Right-click 'FlinkStreamJavaExample' compile output and choose Put into Output Root from the shortcut menu. Then, click Apply.
      Figure 3 Put into Output Root
    5. Click OK.

  2. Generate a JAR file.

    1. On the IDEA home page, choose Build > Build Artifacts....
      Figure 4 Build Artifacts
    2. In the displayed menu, choose FlinkStreamJavaExample > Build to generate the JAR file.
      Figure 5 Build
    3. If information similar to the following is displayed in the event log, the JAR file has been generated. You can obtain the JAR file from the path configured in 1.c.
      21:25:43 Compilation completed successfully in 36 sec

  3. Copy the JAR file generated in 2 (for example, FlinkStreamJavaExample.jar) to the Flink operating environment on Linux (that is, the Flink client), for example, /opt/client. Create the conf directory in the directory and copy the required configuration file to the conf directory. For details, see Preparing the Operating Environment. Run the Flink application.

    Start the Flink cluster before running the Flink applications on Linux. Run the yarn session command on the Flink client to start the Flink cluster. An example command is as follows:
    bin/yarn-session.sh -jm 1024 -tm 1024
    • Before running the yarn-session.sh command, copy the dependency package of the Flink application to the client directory {client_install_home}/Flink/flink/lib. For details about the dependency packages of the application, see Reference Information.
    • The dependencies of different sample projects may conflict. When running a new sample project, you need to remove the dependencies copied from the old sample project to the {client_install_home}/Flink/flink/lib directory on the client.
    • Run the source bingdata_env command in the client installation directory before running the yarn-session.sh command.
    • The yarn-session.sh command must be run in the /Flink client installation directory/Flink/flink directory, for example, /opt/client/Flink/flink.
    • Do not restart the HDFS service or all DataNode instances during Flink job running. Otherwise, the job may fail and some temporary application data cannot be cleared.
    • Ensure that the user permissions on the JAR file and configuration file are the same as those on the Flink client. For example, the user is omm and the permission is 755.
    • Running the DataStream sample application (in Scala or Java)

      Open another window on the terminal. Go to the Flink client directory and invoke the bin/flink run script to run code.

      • Java
        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/client/FlinkStreamJavaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
      • Scala
        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/client/FlinkStreamScalaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2

      The log1.txt and log2.txt files must be stored on each node where NodeManager instances are deployed, and the permission is 755.

      Table 1 Parameters

      Parameter

      Description

      <filePath>

      File path in the local file system. The /opt/log1.txt and /opt/log2.txt files must be stored on every node. The default value can be retained or changed.

      <windowTime>

      Duration of a time window, in minutes. The default value can be retained or changed.

    • Running the sample application for producing and consuming data in Kafka (in Java or Scala)

      Command for starting the application to produce data

      bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password]
      Command for starting the application to consume data
      bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password]
      Table 2 Parameter Description

      Parameter

      Description

      Mandatory

      topic

      Kafka topic name

      Yes

      bootstrap.server

      List of IP addresses or ports of broker clusters

      Yes

      security.protocol

      The parameter can be set to PLAINTEXT (optional), SASL_PLAINTEXT, SSL, or SASL_SSL, corresponding to port 21005, 21007, 21008, or 21009 of the FusionInsight Kafka cluster, respectively.

      • If SASL is configured, sasl.kerberos.service.name must be set to kafka and the configuration items related to security.kerberos.login in conf/flink-conf.yaml must be configured.
      • If SSL is configured, ssl.truststore.location (path of truststore) and ssl.truststore.password (password of truststore) must be configured.

      No

      NOTE:
      • If this parameter is not configured, Kafka is in non-security mode.
      • If SSL needs to be configured, find more information about how to generate the truststore.jks file in section "SSL Encryption Function Used by a Client" of Kafka Development Guide.

      The following commands use ReadFromKafka as an example and the cluster domain name is HADOOP.COM:

      • Command 1:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:9092
      • Command 2:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21005 --security.protocol PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name 
        hadoop.hadoop.com
      • Command 3:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei
      • Command 4:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21005 --security.protocol PLAINTEXT --sasl.kerberos.service.name kafka --ssl.truststore.location config/truststore.jks --ssl.truststore.password huawei --kerberos.domain.name hadoop.hadoop.com
    • Running the sample application of the asynchronous checkpoint mechanism (in Scala or Java)

      To diversify sample code, the processing time is used as a timestamp for data stream in Java, and the event time is used as a timestamp for data stream in Scala. The command reference is as follows:

      Saving checkpoint snapshot information to HDFS
      • Java
        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain /opt/client/FlinkCheckpointJavaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
      • Scala
        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkEventTimeAPIChkMain /opt/client/FlinkCheckpointScalaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
      • Checkpoint source file path: flink/checkpoint/fd5f5b3d08628d83038a30302b611/chk-X/4f854bf4-ea54-4595-a9d9-9b9080779ffe

        flink/checkpoint: indicates the specified root directory.

        fd5f5b3d08628d83038a30302b611: indicates the level-2 directory named after jobID.

        chk-X: "X" indicates the checkpoint number, which is the level-3 directory.

        4f854bf4-ea54-4595-a9d9-9b9080779ffe: a checkpoint source file

      • If Flink is in cluster mode, checkpoints store the file in HDFS. A local path can be used only when Flink is in local mode, facilitating commissioning.
    • Running the Pipeline sample application
      • Java
        1. Start the publisher job.
          bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipelineNettySink /opt/client/FlinkPipelineJavaExample.jar
        2. Start the subscriber Job1.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipelineNettySource1 /opt/client/FlinkPipelineJavaExample.jar
        3. Start the subscriber Job2.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipelineNettySource2 /opt/client/FlinkPipelineJavaExample.jar
      • Scala
        1. Start the publisher job.
          bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipeline_NettySink /opt/client/FlinkPipelineScalaExample.jar
        2. Start the subscriber Job1.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 /opt/client/FlinkPipelineScalaExample.jar
        3. Start the subscriber Job2.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 /opt/client/FlinkPipelineScalaExample.jar
    • Running the Stream SQL Join sample application
      1. Start the application to generate data for Kafka. For details about Kafka configuration, see Running the sample application for producing and consuming data in Kafka (in Java or Scala).
        bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092
      2. Run the netcat command on any node in the cluster to wait for an application connection.
        netcat -l -p 9000

        If "command not found" is displayed, install NetCat and run the command again.

      3. Start the application to receive socket data and perform a joint query.
        bin/flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/client/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port 9000