Updated on 2024-08-16 GMT+08:00

Compiling and Running a Flink Application

After application code development is complete, you are advised to upload it to the Linux client to run applications. The procedures for running applications developed using Scala or Java are the same on the Flink client.

Flink applications of a YARN cluster can run only on Linux, but not on Windows.

Procedure

  1. In IntelliJ IDEA, configure Artifacts of the project before generating a JAR file.

    1. On the IDEA home page, choose File > Project Structures... to go to the Project Structure page.
    2. On the Project Structure page, select Artifacts, click +, and choose JAR > Empty.
      Figure 1 Adding Artifacts
    3. You can set the name, type, and output path of the JAR file based on the site requirements.
      Figure 2 Setting basic information
    4. Right-click FlinkStreamJavaExample' compile output, and choose Put into Output Root. Click Apply.
      Figure 3 Put into Output Root
    5. Click OK to complete the configuration.

  2. Generate a JAR file.

    1. On the IDEA home page, choose Build > Build Artifacts....
      Figure 4 Build Artifacts
    2. In the displayed menu, choose FlinkStreamJavaExample > Build to generate the JAR file.
      Figure 5 Build
    3. If information similar to the following is displayed in the event log, the JAR file has been successfully generated. You can obtain the JAR file from the path configured in 1.c.
      21:25:43 Compilation completed successfully in 36 sec

  3. Copy the JAR file generated in 2 (for example, FlinkStreamJavaExample.jar) to the Flink operating environment on Linux (that is, the Flink client), for example, /opt/Flink_test. Run the Flink application.

    Start the Flink cluster before running the Flink applications on Linux. Run the yarn session command on the Flink client to start the Flink cluster. The following is a command example:

    bin/yarn-session.sh -n 3 -jm 1024 -tm 1024

    Do not restart the HDFS service or all DataNode instances during Flink job running. Otherwise, the job may fail and some temporary application data cannot be cleared.

    • Running the DataStream sample application (Scala and Java)

      Open another window on the terminal. Go to the Flink client directory and invoke the bin/flink run script to run code. The following is an example.

      bin/flink run --class com.huawei.flink.example.stream.FlinkStreamJavaExample /opt/Flink_test/flink-examples-1.0.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
      
      bin/flink run --class com.huawei.flink.example.stream.FlinkStreamScalaExample /opt/Flink_test/flink-examples-1.0.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
      Table 1 Parameters

      Parameter

      Description

      <filePath>

      File path in the local file system. The /opt/log1.txt and /opt/log2.txt files must be stored on each node. Run the chmod 755 File name command to grant the READ, WRITE, and EXECUTE permissions to users. The owner group user and other users have only the READ and EXECUTE permissions. The default value can be retained or changed.

      <windowTime>

      Duration of a time window. The unit is minute. The default value can be retained or changed.

    • Running the sample application for producing and consuming data in Kafka (in Java or Scala)

      Run the following command to start the application to generate data:

      bin/flink run --class com.huawei.flink.example.kafka.WriteIntoKafka /opt/Flink_test/flink-examples-1.0.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [kerberos.domain.name] [ssl.truststore.location] [ssl.truststore.password]

      Run the following commands to start the application to consume data. There can be security risks if a command contains the authentication password. You are advised to disable the command recording function (history) before running the command.

      bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [kerberos.domain.name] [ssl.truststore.location] [ssl.truststore.password]
      Table 2 Parameters

      Parameter

      Description

      Mandatory

      topic

      Name of a Kafka topic

      Yes

      bootstrap.server

      List of IP addresses or ports of broker clusters

      Yes

      security.protocol

      The parameter can be set to protocols PLAINTEXT (optional), SASL_PLAINTEXT, SSL, and SASL_SSL, corresponding to ports 21005, 21007, 21008, and 21009 of the MRS Kafka cluster, respectively.

      • If the SASL is configured, the value of sasl.kerberos.service.name must be set to kafka and the configuration items related to security.kerberos.login in conf/flink-conf.yaml must be set.
      • If the SSL is configured, ssl.truststore.location (path of truststore) and ssl.truststore.password (password of truststore) must be set.

      No

      NOTE:

      If this parameter is not configured, Kafka is in non-security mode.

      kerberos.domain.name

      Kafka domain name

      No

      NOTE:

      This parameter is mandatory when security.protocol is set to SASL.

      The following examples use ReadFromKafka to show four types of commands:

      bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar --topic topic1 --bootstrap.servers 10.96.101.32:21005
      bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.com
      bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar --topic topic1 --bootstrap.servers 10.96.101.32:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password xxx
      bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar --topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.com --ssl.truststore.location /home/truststore.jks --ssl.truststore.password xxx
    • Running the sample application of the asynchronous checkpoint mechanism (in Scala or Java)

      To diversify sample code, the processing time is used as a timestamp for DataStream in Java, and the event time is used as a timestamp for DataStream in Scala. The command reference is as follows:

      • Saving checkpoint snapshot information to HDFS
        • Java
          1
          bin/flink run --class com.huawei.flink.example.checkpoint.FlinkProcessingTimeAPIChkMain /opt/Flink_test/flink-examples-1.0.jar --chkPath hdfs://hacluster/flink-checkpoint/
          
        • Scala
          1
          bin/flink run --class com.huawei.flink.example.checkpoint.FlinkEventTimeAPIChkMain /opt/Flink_test/flink-examples-1.0.jar --chkPath hdfs://hacluster/flink-checkpoint/
          
      • Saving checkpoint snapshot information to a local file
        • Java
          1
          bin/flink run --class com.huawei.flink.example.checkpoint.FlinkProcessingTimeAPIChkMain /opt/Flink_test/flink-examples-1.0.jar --chkPath file:///home/zzz/flink-checkpoint/
          
        • Scala
          1
          2
          bin/flink run --class com.huawei.flink.example.checkpoint.FlinkEventTimeAPIChkMain
          /opt/Flink_test/flink-examples-1.0.jar --chkPath file:///home/zzz/flink-checkpoint/
          
        • Path of the checkpoint source file: flink/checkpoint/checkpoint/fd5f5b3d08628d83038a30302b611/chk-X/4f854bf4-ea54-4595-a9d9-9b9080779ffe

          In the path, flink/checkpoint/checkpoint indicates the specified root directory.

          fd5f5b3d08628d83038a30302b611 indicates a second-level directory named after jobID.

          In chk-X, X indicates a checkpoint number, which is a third-level directory.

          4f854bf4-ea54-4595-a9d9-9b9080779ffe indicates a checkpoint source file.

        • In cluster mode, Flink checkpoint saves files to HDFS.
    • Running the Stream SQL Join sample application
      1. Start the application to produce data in Kafka. For details about how to configure Kafka, see Running the sample application for producing and consuming data in Kafka (in Java or Scala).
        1
        bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin /opt/Flink_test/flink-examples-1.0.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:21005
        
      2. Run the netcat command on any node in the cluster to wait for an application connection.
        1
        netcat -l -p 9000
        
      3. Start the application to receive socket data and perform a joint query.
        1
        bin/flink run --class com.huawei.flink.example.sqljoin.SqlJoinWithSocket /opt/Flink_test/flink-examples-1.0.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port 9000