Updated on 2022-11-18 GMT+08:00

Compiling and Running the Application

Scenarios

After the program code is developed, you can upload the code to the Linux client for running. The running procedures of applications developed in Scala or Java are the same.

Applications in Flink On YARN mode are allowed to run in a Linux-based environment, but not in a Windows-based environment.

Procedure

  1. In IntelliJ IDEA, configure the Artifacts information about the project before the .jar package is created.

    1. On the main page of IntelliJ IDEA, choose File >Project Structures... to access the Project Structure page.
    2. On the Project Structure page, select Artifacts, click the icon "+" and choose JAR > Empty
      Figure 1 Adding Artifacts
    3. Configure the name, type, and output directory of the .jar package based on the actual condition.
      Figure 2 Setting basic information
    4. Right-click 'FlinkStreamJavaExample' compile output, choose Put into Output Root, and click Apply.
      Figure 3 Choosing Put into Output Root
    5. Click OK to complete the configuration.

  2. Generate the .jar package.

    1. On the main page of IntelliJ IDEA, choose Build > Build Artifacts....
      Figure 4 Choosing Build Artifacts
    2. On the displayed menu, choose FlinkStreamJavaExample > Build to create a jar.
      Figure 5 Generating the .jar package
    3. If the following information is displayed in the event log, the .jar package is created successfully. You can obtain the .jar package from the directory configured in 1.c.
      21:25:43 Compilation completed successfully in 36 sec

  3. Copy the .jar package (for example FlinkStreamJavaExample.jar) created in 2 to the Flink running environment (Flink client), for example, /opt/client, and then in that directory, create the conf folder and copy the required configuration files to the conf folder. For details, see Preparing an Operating Environment,to run the Flink application.

    In a Linux-based environment, Flink cluster needs to be started in advance. Run the yarn session command on Flink client and start Flink clusters. For example:

    cd /opt/client/Flink/flink

    bin/yarn-session.sh -jm 1024 -tm 1024 -t conf/ssl/

    • Before running the yarn-session.sh command, copy the running dependency package of the Flink application to client directory #{client_install_home}/Flink/flink/lib. For details about the application running dependency package, see Reference Information.
    • When a Flink task is running, do not restart the HDFS service or all DataNode instances. Otherwise, the Flink task may fail, resulting loss of temporary data.
    • The ssl/ directory is used to store SSL configuration files of SSL keystore and truststore.
    • Run the DataStream sample program in Scala and Java.

      Go to the Flink client directory and call the bin/flink run script to run the code.

      • Java

        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/client/FlinkStreamJavaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2

      • Scala

        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/client/FlinkStreamScalaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2

      The log1.txt and log2.txt files must be stored on each node where the Yarn NodeManager instance is deployed and the permission is 755.

      Table 1 Parameter description

      Parameter

      Description

      <filePath>

      File path in the local file system. The /opt/log1.txt and /opt/log2.txt files must be placed on each node. The default value can be retained or changed.

      <windowTime>

      Duration of the window. The unit is minute. The default value can be retained or changed.

    • Run the following code to interconnecting with Kafka (in Scala and Java.)

      Execution of the production data command to start the program.

      bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password] [kerberos.domain.name]
      Execution of the consumption data command to start the program.
      bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password] [kerberos.domain.name]
      Table 2 Parameter description

      Parameter

      Description

      Mandatory (Yes/No)

      topic

      The name of a Kafka topic.

      Yes

      bootstrap.server

      The list of IP addresses or ports of broker clusters.

      Yes

      security.protocol

      The parameter need be set to protocols PLAINTEXT (optional), SASL_PLAINTEXT, SSL, and SASL_SSL, and the corresponding FusionInsight Kafka ports are 9092, 21007, 9093, and 21009.

      • If the SASL is configured, the value of sasl.kerberos.service.name must be set to kafka, the value of kerberos.domain.name must be set to kerberos domain name and the security.kerberos.login parameter in conf/flink-conf.yaml are mandatory.
      • If SSL is configured, ssl.truststore.location (which species the path of truststore) and ssl.truststore.password (which specifies the password of truststore) are mandatory.

      No

      NOTE:
      • If this parameter is not configured, the Kafka is non-secure.
      • If SSL needs to be configured, see "Kafka Development Guide" > "SSL Encryption Function Used by a Client" to determine the file generation mode of the truststore.jks file.
      • To run this example project, set allow.everyone.if.no.acl.found to true. For details, see .Configuring Kafka
      • Following .jar packages need to be added to Kafka applications:
        • flink-dist_*.jar in the lib directory under the installation directory of Flink server.
        • flink-connector-kafka_*.jar in the opt directory under the installation directory of Flink server.
        • kafka-clients-*.jar in the lib directory under the installation directory of Kafka server or client.
        • If the truststore.jks file path is an absolute path, the truststore.jks file must be stored in the specified directory of each Yarn nodemanager. If the path is a relative path, add the upload directory when running the yarn-session.sh script. For example, run the yarn-session.sh -t config/ *** command before running ▪Command 4:

      Following is the example code corresponding to four protocols, take ReadFromKafka as an example:

      • Command 1:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:9092
      • Command 2:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.com
      • Command 3:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei
      • Command 4:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /config/truststore.jks --ssl.truststore.password huawei --kerberos.domain.name hadoop.hadoop.com
    • Asynchronous checkpoint mechanism (in Scala and Java).
      The proctime is used as the timestamp for DataStream in Java, and the event time is used as the timestamp for DataStream in Scala. Following are examples of commands:
      • Save the Checkpoint snapshot information to HDFS.
        • Java
          bin/flink run --class com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain /opt/client/FlinkCheckpointJavaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
        • Scala
          bin/flink run --class com.huawei.bigdata.flink.examples.FlinkEventTimeAPIChkMain /opt/client/FlinkCheckpointScalaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
      • Save the Checkpoint snapshot information to a local path.
        • Java
          bin/flink run --class com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain /opt/client/FlinkCheckpointJavaExample.jar --chkPath file:///home/zzz/flink-checkpoint/
        • Scala
          bin/flink run --class com.huawei.bigdata.flink.examples.FlinkEventTimeAPIChkMain /opt/client/FlinkCheckpointScalaExample.jar --ckkPath file:///home/zzz/flink-checkpoint/
      • Path to checkpoint source file: flink/checkpoint/checkpoint/fd5f5b3d08628d83038a30302b611/chk-X/4f854bf4-ea54-4595-a9d9-9b9080779ffe

        flink/checkpoint //The specified root directory.

        fd5f5b3d08628d83038a30302b611 //The second-level directory named after jobID

        chk-X // The third-level directory. X indicates the checkpoint numbers.

        4f854bf4-ea54-4595-a9d9-9b9080779ffe //Source files of checkpoint.

      • If Flink is in cluster mode, use the HDFS path, because a local path can only be used when Flink is in local mode.
    • Run the pipeline sample program.
      • Java
        1. Start the publisher job.
          bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipeline_NettySink /opt/client/FlinkPipelineJavaExample.jar
        2. Start the subscriber Job1.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 /opt/client/FlinkPipelineJavaExample.jar
        3. Start the subscriber Job2.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 /opt/client/FlinkPipelineJavaExample.jar
      • Scala
        1. Start the publisher job.
          bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipeline_NettySink /opt/client/FlinkPipelineScalaExample.jar
        2. Start the subscriber Job1.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 /opt/client/FlinkPipelineScalaExample.jar
        3. Start the subscriber Job1.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 /opt/client/FlinkPipelineScalaExample.jar
    • Running the Stream SQL Join sample program
      1. Start the program to generate data for Kafka. For details about Kafka configuration, see • Run the following code to....
        bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092
      2. Run the netcat command on any node in the cluster to wait for the connection of the application.
        netcat -l -p 9000
      3. Start the application to accept the socket data and perform the combined query.
        bin/flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/client/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port 9000

For details about sample projects of dependency packages provided by Flink, see Reference Information.