El contenido no se encuentra disponible en el idioma seleccionado. Estamos trabajando continuamente para agregar más idiomas. Gracias por su apoyo.

Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
On this page

Compiling and Running the Application

Updated on 2024-08-10 GMT+08:00

Scenarios

After the program code is developed, you can upload the code to the Linux client for running. The running procedures of applications developed in Scala or Java are the same.

NOTE:

Applications in Flink On YARN mode are allowed to run in a Linux-based environment, but not in a Windows-based environment.

Procedure

  1. In IntelliJ IDEA, click Reload All Maven Projects in the Maven window on the right of IDEA to import Maven project dependencies.

    Figure 1 Reload projects

  2. Compile and run the application.

    Use either of the following two methods:

    • Method 1:
      1. Choose Maven, locate the target project name, and double-click clean under Lifecycle to run the clean command of Maven.
      2. Choose Maven, locate the target project name, and double-click install under Lifecycle to run the install command of Maven.
      Figure 2 Maven clean and install
    • Method 2: Go to the directory where the pom.xml file is located in the Terminal window of IDEA, and run the mvn clean install command to build the file.
      Figure 3 Entering mvn clean install in the IDEA Terminal text box

  3. After the compilation is complete, the message "BUILD SUCCESS" is printed and the target directory is generated. Obtain the JAR package in the directory.

    Figure 4 Compilation completed

  4. Copy the .jar package (for example FlinkStreamJavaExample.jar) created in 3 to the Flink running environment (Flink client), for example, /opt/client, and then in that directory, create the conf folder and copy the required configuration files to the conf folder. For details, see Preparing an Operating Environment, to run the Flink application.

    In a Linux-based environment, Flink cluster needs to be started in advance. Run the yarn session command on Flink client and start Flink clusters. For example:
    bin/yarn-session.sh -jm 1024 -tm 4096
    NOTE:
    • Before running the yarn-session.sh command, copy the running dependency package of the Flink application to client directory #{client_install_home}/Flink/flink/lib. For details about the application running dependency package, see Reference information about the dependency package for running the sample project.
    • The dependencies of different sample projects may conflict. When running a new sample project, you need to remove the dependencies copied from the old sample project to the {client_install_home}/Flink/flink/lib directory on the client.
    • Run the source bingdata_env command in the client installation directory before running the yarn-session.sh command.
    • The yarn-session.sh command must be run in the /Flink client installation directory/Flink/flink directory, for example, /opt/client/Flink/flink.
    • When a Flink task is running, do not restart the HDFS service or all DataNode instances. Otherwise, the Flink task may fail, resulting loss of temporary data.
    • Ensure that the user permissions on the JAR package and configuration file are the same as those on the Flink client. For example, the user is omm and the permissions are 755.
    • The following applies to MRS 3.2.1 or later. The memory size of TaskManagers specified by using the -tm command must be at least 4,096 MB.
    • Run the DataStream sample program in Scala and Java.

      Go to the Flink client directory and call the bin/flink run script to run the code.

      • Java
        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamJavaExample /opt/client/FlinkStreamJavaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
      • Scala
        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkStreamScalaExample /opt/client/FlinkStreamScalaExample.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
      NOTE:

      The log1.txt and log2.txt files must be stored on each node where the Yarn NodeManager instance is deployed and the permission is 755.

      Table 1 Parameter description

      Parameter

      Description

      <filePath>

      File path in the local file system. The /opt/log1.txt and /opt/log2.txt files must be placed on each node. The default value can be retained or changed.

      <windowTime>

      Duration of the window. The unit is minute. The default value can be retained or changed.

    • Run the following code to interconnecting with Kafka (in Scala and Java.)

      Execution of the production data command to start the program.

      bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password]
      Execution of the consumption data command to start the program.
      bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [ssl.truststore.location] [ssl.truststore.password]
      Table 2 Parameter description

      Parameter

      Description

      Mandatory (Yes/No)

      topic

      The name of a Kafka topic.

      Yes

      bootstrap.server

      The list of IP addresses or ports of broker clusters.

      Yes

      security.protocol

      The parameter need be set to protocols PLAINTEXT (optional), SASL_PLAINTEXT, SSL, and SASL_SSL, and the corresponding FusionInsight Kafka ports are 21005, 21007, 21008, and 21009.

      • If the SASL is configured, the value of sasl.kerberos.service.name must be set to kafka and the security.kerberos.login parameter in conf/flink-conf.yaml are mandatory.
      • If the SSL is configured, ssl.truststore.location (path of truststore) and ssl.truststore.password (password of truststore) must be set.

      No

      NOTE:
      • If this parameter is not configured, the Kafka is non-secure.
      • If SSL needs to be configured, see "Kafka Development Guide" > "SSL Encryption Function Used by a Client" to determine the file generation mode of the truststore.jks file.
      Following is the example code corresponding to four protocols, take ReadFromKafka as an example, System domain name is HADOOP.COM:
      • Command 1:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:9092
      • Command 2:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21005 --security.protocol PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.com
      • Command 3:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password xxx
      • Command 4:
        bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka /opt/client/FlinkKafkaJavaExample.jar --topic topic1 --bootstrap.servers 10.96.101.32:21005 --security.protocol PLAINTEXT --sasl.kerberos.service.name kafka --ssl.truststore.location /config/truststore.jks --ssl.truststore.password xxx --kerberos.domain.name hadoop.hadoop.com
    • Asynchronous checkpoint mechanism (in Scala and Java).

      The proctime is used as the timestamp for DataStream in Java, and the event time is used as the timestamp for DataStream in Scala. Following are examples of commands:

      Save the Checkpoint snapshot information to HDFS.
      • Java
        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkProcessingTimeAPIMain /opt/client/FlinkCheckpointJavaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
      • Scala
        bin/flink run --class com.huawei.bigdata.flink.examples.FlinkEventTimeAPIChkMain /opt/client/FlinkCheckpointScalaExample.jar --chkPath hdfs://hacluster/flink/checkpoint/
      NOTE:
      • Path to checkpoint source file: flink/checkpoint/checkpoint/fd5f5b3d08628d83038a30302b611/chk-X/4f854bf4-ea54-4595-a9d9-9b9080779ffe

        flink/checkpoint //The specified root directory.

        fd5f5b3d08628d83038a30302b611 //The second-level directory named after jobID

        chk-X // The third-level directory. X indicates the checkpoint numbers.

        4f854bf4-ea54-4595-a9d9-9b9080779ffe //Source files of checkpoint.

      • If Flink is in cluster mode, use the HDFS path, because a local path can only be used when Flink is in local mode.
    • Run the pipeline sample program.
      • Java
        1. Start the publisher job.
          bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipelineNettySink /opt/client/FlinkPipelineJavaExample.jar
        2. Start the subscriber Job1.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipelineNettySource1 /opt/client/FlinkPipelineJavaExample.jar
        3. Start the subscriber Job2.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipelineNettySource2 /opt/client/FlinkPipelineJavaExample.jar
      • Scala
        1. Start the publisher job.
          bin/flink run -p 2 --class com.huawei.bigdata.flink.examples.TestPipeline_NettySink /opt/client/FlinkPipelineScalaExample.jar
        2. Start the subscriber Job1.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource1 /opt/client/FlinkPipelineScalaExample.jar
        3. Start the subscriber Job1.
          bin/flink run --class com.huawei.bigdata.flink.examples.TestPipeline_NettySource2 /opt/client/FlinkPipelineScalaExample.jar
    • Running the Stream SQL Join sample program
      • Java
        1. Start the program to generate data for Kafka. For details about Kafka configuration, see Run the following code to interconnecting with Kafka
          bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092
        2. Run the netcat command on any node in the cluster to wait for the connection of the application.
          netcat -l -p 9000
          NOTE:

          If "command not found" is displayed, install Netcat and run the command again.

        3. Start the application to accept the socket data and perform the combined query.
          bin/flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/client/FlinkStreamSqlJoinExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port 9000
      • Scala (for MRS 3.3.0 or later)
        1. Start the application to produce data in Kafka. For details about how to configure Kafka, see Run the following code to interconnecting with Kafka.
          bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka /opt/client/FlinkStreamSqlJoinScalaExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092
        2. Run the netcat command on any node in the cluster to wait for an application connection.
          netcat -l -p 9000
        3. Start the application to receive socket data and perform a joint query.
          bin/flink run --class com.huawei.bigdata.flink.examples.SqlJoinWithSocket /opt/client/FlinkStreamSqlJoinScalaExample.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:9092 --hostname xxx.xxx.xxx.xxx --port 9000
    • Running the Flink HBase sample program(MRS 3.2.0 or later clusters.)
      • yarn-session
        1. Start the flink cluster.
          ./bin/yarn-session.sh -t config -jm 1024 -tm 1024
        2. Run the Flink program and set parameters.
          bin/flink run --class com.huawei.bigdata.flink.examples.WriteHBase /opt/client1/Flink/flink/FlinkHBaseJavaExample-xxx.jar  --tableName xxx --confDir xxx
          
          bin/flink run --class com.huawei.bigdata.flink.examples.ReadHBase /opt/client1/Flink/flink/FlinkHBaseJavaExample-xxx.jar  --tableName xxx --confDir xxx
      • yarn-cluster
        bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.WriteHBase /opt/client1/Flink/flink/FlinkHBaseJavaExample-xxx.jar  --tableName xxx --confDir xxx
        
        bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.ReadHBase /opt/client1/Flink/flink/FlinkHBaseJavaExample-xxx.jar  --tableName xxx --confDir xxx
    • Running the Flink Hudi sample application (MRS 3.2.1 or later)
      • yarn-session mode
        1. Start the Flink cluster.
          ./bin/yarn-session.sh -jm 1024 -tm 4096
        2. Run the Flink application and enter parameters.
          ./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoHudi /opt/test.jar --hudiTableName hudiSinkTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable
          
          ./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromHudi /opt/test.jar --hudiTableName hudiSourceTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable --read.start-commit xxx
      • yarn-cluster mode
        ./bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.WriteIntoHudi /opt/test.jar --hudiTableName hudiSinkTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable
        
        ./bin/flink run -m yarn-cluster --class com.huawei.bigdata.flink.examples.ReadFromHudi /opt/test.jar --hudiTableName hudiSourceTable --hudiPath hdfs://hacluster/tmp/flinkHudi/hudiTable --read.start-commit xxx
    • Run the REST APIs to create a tenant sample application. The TestCreateTenants application is used as an example.
      • yarn-session mode
        1. Start the Flink cluster.
          ./bin/yarn-session.sh -t config -jm 1024 -tm 4096
        2. Run the Flink application and enter parameters.
          ./bin/flink run --class com.huawei.bigdata.flink.examples.TestCreateTenants /opt/client/FlinkRESTAPIJavaExample-xxx.jar --hostName xx-xx-xx-xx
      • yarn-cluster mode
        ./bin/flink run -m yarn-cluster -yt config -yjm 1024 -ytm 4096 --class com.huawei.bigdata.flink.examples.TestCreateTenants /opt/client/FlinkRESTAPIJavaExample-xxx.jar --hostName xx-xx-xx-xx
    • Run a SQL task to submit Flink JAR jobs (applicable to MRS 3.2.1 or later).
      • yarn-session mode
        1. Start the Flink cluster.
          ./bin/yarn-session.sh -t config -jm 1024 -tm 4096
        2. Run the Flink application and enter parameters.
          bin/flink run -d --class com.huawei.mrs.FlinkSQLExecutor /opt/flink-sql-xxx.jar --sql ./sql/datagen2kafka.sql
      • yarn-cluster mode
        bin/flink run -m yarn-cluster -yt config -yjm 1024 -ytm 4096 -d --class com.huawei.mrs.FlinkSQLExecutor /opt/flink-sql-xxx.jar --sql ./sql/datagen2kafka.sql

Utilizamos cookies para mejorar nuestro sitio y tu experiencia. Al continuar navegando en nuestro sitio, tú aceptas nuestra política de cookies. Descubre más

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback