Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
On this page

Compiling and Running the Application

Updated on 2024-08-12 GMT+08:00

Scenario

After programming is complete, upload application code to the Linux client for running. The procedures for running applications developed using Scala or Java are the same on the Flink client.

NOTE:

Flink applications for Yarn clusters can run only on Linux.

Procedure

  1. In IntelliJ IDEA, configure Artifacts of the project before generating a JAR file.

    1. On the IDEA homepage, choose File > Project Structures....
    2. On the Project Structure page, select Artifacts, click +, and choose JAR > Empty.
      Figure 1 Adding Artifacts
    3. Set the name, type, and output path of the JAR file based on site requirements.
      Figure 2 Setting basic information
    4. Right-click 'FlinkStreamJavaExample' compile output and choose Put into Output Root from the shortcut menu. Then, click Apply.
      Figure 3 Put into Output Root
    5. Click OK.

  2. Generate a JAR file.

    1. On the IDEA home page, choose Build > Build Artifacts....
      Figure 4 Building Artifacts
    2. In the displayed menu, choose FlinkStreamJavaExample > Build to generate the JAR file.
      Figure 5 Build
    3. If information similar to the following is displayed in the event log, the JAR file is generated. You can obtain the JAR file from the path configured in 1.c.
      21:25:43 Compilation completed successfully in 36 sec

  3. Copy the JAR file generated in 2 (for example, FlinkStreamJavaExample.jar) to the Flink operating environment on Linux (that is, the Flink client), for example, /opt/client. Create the conf directory in the directory and copy the required configuration file to the conf directory. For details, see Preparing the Operating Environment. Run the Flink application.

    Start the Flink cluster before running the Flink applications on Linux. Run the yarn session command on the Flink client to start the Flink cluster. An example command is as follows:
    bin/yarn-session.sh -jm 1024 -tm 1024
    NOTE:
    • Before running the yarn-session.sh command, copy the dependency package of the Flink application to the client directory {client_install_home}/Flink/flink/lib. For details about the dependency packages of the application, see Reference Information.
    • The dependencies of different sample projects may conflict. When running a new sample project, you need to remove the dependencies copied from the old sample project to the {client_install_home}/Flink/flink/lib directory on the client.
    • Run the source bigdata_env command in the client installation directory before running the yarn-session.sh command.
    • The yarn-session.sh command must be run in the /Flink client installation directory/Flink/flink directory, for example, /opt/client/Flink/flink.
    • Do not restart the HDFS service or all DataNode instances during Flink job running. Otherwise, the job may fail and some temporary application data cannot be cleared.
    • Ensure that the user permissions on the JAR file and configuration file are the same as those on the Flink client. For example, the user is omm and the permission is 755.
    • Running the DataStream sample application (in Scala or Java)

      Open another window on the terminal. Go to the Flink client directory and use the bin/flink run script to run code.

      • Java
        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/client/FlinkStreamJavaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
      • Scala
        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/client/FlinkStreamScalaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
      NOTE:

      The log1.txt and log2.txt files must be stored on each node where NodeManager instances are deployed, and the permission is 755.

      Table 1 Parameters

      Parameter

      Description

      <filePath>

      File path in the local file system. The /opt/log1.txt and /opt/log2.txt files must be stored on every node. The default value can be retained or changed.

      <windowTime>

      Duration of a time window, in minutes. The default value can be retained or changed.

    • Running the sample application for producing and consuming data in Kafka (in Java or Scala)

      Bootstrap program for producing data

      bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password]
      Bootstrap program for consuming data
      bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password]
      Table 2 Parameters

      Parameter

      Description

      Mandatory

      topic

      Kafka topic name

      Yes

      bootstrap.server

      List of IP addresses or ports of broker clusters

      Yes

      security.protocol

      The parameter can be set to PLAINTEXT (optional), SASL_PLAINTEXT, SSL, or SASL_SSL, corresponding to port 21005, 21007, 21008, or 21009 of the FusionInsight Kafka cluster, respectively.

      • If SASL is configured, sasl.kerberos.service.name must be set to kafka and the configuration items related to security.kerberos.login in conf/flink-conf.yaml must be configured.
      • If SSL is configured, ssl.truststore.location (path of truststore) and ssl.truststore.password (password of truststore) must be configured.

      No

      NOTE:
      • If this parameter is not configured, Kafka is in non-security mode.
      • If SSL needs to be configured, find more information about how to generate the truststore.jks file in section "SSL Encryption Function Used by a Client" of Kafka Development Guide.

      The following commands use ReadFromKafka as an example and the cluster domain name is HADOOP.COM:

      • Command 1:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:9092
      • Command 2:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21005 --security.protocol PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name 
        hadoop.hadoop.com
      • Command 3:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password xxx
      • Command 4:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21005 --security.protocol PLAINTEXT --sasl.kerberos.service.name kafka --ssl.truststore.location config/truststore.jks --ssl.truststore.password xxx --kerberos.domain.name hadoop.hadoop.com
    • Running the sample application of the asynchronous checkpoint mechanism (in Scala or Java)

      In Java sample code, the processing time is used as a timestamp for data stream. In Scala sample code, the event time is used as a timestamp for data stream.

      Save checkpoint snapshot information to HDFS.
      • Java code
        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain /opt/client/FlinkCheckpointJavaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
      • Scala code
        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkEventTimeAPIChkMain /opt/client/FlinkCheckpointScalaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
      NOTE:
      • Checkpoint source file path: flink/checkpoint/fd5f5b3d08628d83038a30302b611/chk-X/4f854bf4-ea54-4595-a9d9-9b9080779ffe

        flink/checkpoint: indicates the specified root directory.

        fd5f5b3d08628d83038a30302b611: indicates the level-2 directory named after jobID.

        chk-X: "X" indicates the checkpoint number, which is the level-3 directory.

        4f854bf4-ea54-4595-a9d9-9b9080779ffe: indicates a checkpoint source file.

      • If Flink is in cluster mode, checkpoints store the file in HDFS. A local path can be used only when Flink is in local mode, facilitating commissioning.
    • Running the Pipeline sample application
      • Java code
        1. Start the publisher job.
          bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipelineNettySink /opt/client/FlinkPipelineJavaExample.jar
        2. Start the subscriber Job1.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipelineNettySource1 /opt/client/FlinkPipelineJavaExample.jar
        3. Start the subscriber Job2.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipelineNettySource2 /opt/client/FlinkPipelineJavaExample.jar
      • Scala
        1. Start the publisher job.
          bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipeline_NettySink /opt/client/FlinkPipelineScalaExample.jar
        2. Start the subscriber Job1.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 /opt/client/FlinkPipelineScalaExample.jar
        3. Start the subscriber Job2.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 /opt/client/FlinkPipelineScalaExample.jar
    • Running the Stream SQL Join sample application
      1. Start the application to generate data for Kafka. For details about Kafka configuration, see Running the sample application for producing and consuming data in Kafka (in Java or Scala).
        bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092
      2. Run the netcat command on any node in the cluster to wait for an application connection.
        netcat -l -p 9000
        NOTE:

        If "command not found" is displayed, install NetCat and run the command again.

      3. Start the application to receive socket data and perform a joint query.
        bin/flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/client/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port 9000

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback