El contenido no se encuentra disponible en el idioma seleccionado. Estamos trabajando continuamente para agregar más idiomas. Gracias por su apoyo.

Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
On this page

Show all

Compiling and Running a Flink Application

Updated on 2024-08-16 GMT+08:00

After application code development is complete, you are advised to upload it to the Linux client to run applications. The procedures for running applications developed using Scala or Java are the same on the Flink client.

NOTE:

Flink applications of a YARN cluster can run only on Linux, but not on Windows.

Procedure

  1. In IntelliJ IDEA, configure Artifacts of the project before generating a JAR file.

    1. On the IDEA home page, choose File > Project Structures... to go to the Project Structure page.
    2. On the Project Structure page, select Artifacts, click +, and choose JAR > Empty.
      Figure 1 Adding Artifacts
    3. You can set the name, type, and output path of the JAR file based on the site requirements.
      Figure 2 Setting basic information
    4. Right-click FlinkStreamJavaExample' compile output, and choose Put into Output Root. Click Apply.
      Figure 3 Put into Output Root
    5. Click OK to complete the configuration.

  2. Generate a JAR file.

    1. On the IDEA home page, choose Build > Build Artifacts....
      Figure 4 Build Artifacts
    2. In the displayed menu, choose FlinkStreamJavaExample > Build to generate the JAR file.
      Figure 5 Build
    3. If information similar to the following is displayed in the event log, the JAR file has been successfully generated. You can obtain the JAR file from the path configured in 1.c.
      21:25:43 Compilation completed successfully in 36 sec

  3. Copy the JAR file generated in 2 (for example, FlinkStreamJavaExample.jar) to the Flink operating environment on Linux (that is, the Flink client), for example, /opt/Flink_test. Run the Flink application.

    Start the Flink cluster before running the Flink applications on Linux. Run the yarn session command on the Flink client to start the Flink cluster. The following is a command example:

    bin/yarn-session.sh -n 3 -jm 1024 -tm 1024
    NOTE:

    Do not restart the HDFS service or all DataNode instances during Flink job running. Otherwise, the job may fail and some temporary application data cannot be cleared.

    • Running the DataStream sample application (Scala and Java)

      Open another window on the terminal. Go to the Flink client directory and invoke the bin/flink run script to run code. The following is an example.

      bin/flink run --class com.huawei.flink.example.stream.FlinkStreamJavaExample /opt/Flink_test/flink-examples-1.0.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
      
      bin/flink run --class com.huawei.flink.example.stream.FlinkStreamScalaExample /opt/Flink_test/flink-examples-1.0.jar --filePath /opt/log1.txt,/opt/log2.txt --windowTime 2
      Table 1 Parameters

      Parameter

      Description

      <filePath>

      File path in the local file system. The /opt/log1.txt and /opt/log2.txt files must be stored on each node. Run the chmod 755 File name command to grant the READ, WRITE, and EXECUTE permissions to users. The owner group user and other users have only the READ and EXECUTE permissions. The default value can be retained or changed.

      <windowTime>

      Duration of a time window. The unit is minute. The default value can be retained or changed.

    • Running the sample application for producing and consuming data in Kafka (in Java or Scala)

      Run the following command to start the application to generate data:

      bin/flink run --class com.huawei.flink.example.kafka.WriteIntoKafka /opt/Flink_test/flink-examples-1.0.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [kerberos.domain.name] [ssl.truststore.location] [ssl.truststore.password]

      Run the following commands to start the application to consume data. There can be security risks if a command contains the authentication password. You are advised to disable the command recording function (history) before running the command.

      bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar <topic> <bootstrap.servers> [security.protocol] [sasl.kerberos.service.name] [kerberos.domain.name] [ssl.truststore.location] [ssl.truststore.password]
      Table 2 Parameters

      Parameter

      Description

      Mandatory

      topic

      Name of a Kafka topic

      Yes

      bootstrap.server

      List of IP addresses or ports of broker clusters

      Yes

      security.protocol

      The parameter can be set to protocols PLAINTEXT (optional), SASL_PLAINTEXT, SSL, and SASL_SSL, corresponding to ports 21005, 21007, 21008, and 21009 of the MRS Kafka cluster, respectively.

      • If the SASL is configured, the value of sasl.kerberos.service.name must be set to kafka and the configuration items related to security.kerberos.login in conf/flink-conf.yaml must be set.
      • If the SSL is configured, ssl.truststore.location (path of truststore) and ssl.truststore.password (password of truststore) must be set.

      No

      NOTE:

      If this parameter is not configured, Kafka is in non-security mode.

      kerberos.domain.name

      Kafka domain name

      No

      NOTE:

      This parameter is mandatory when security.protocol is set to SASL.

      The following examples use ReadFromKafka to show four types of commands:

      bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar --topic topic1 --bootstrap.servers 10.96.101.32:21005
      bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.com
      bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar --topic topic1 --bootstrap.servers 10.96.101.32:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password xxx
      bin/flink run --class com.huawei.flink.example.kafka.ReadFromKafka /opt/Flink_test/flink-examples-1.0.jar --topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --kerberos.domain.name hadoop.hadoop.com --ssl.truststore.location /home/truststore.jks --ssl.truststore.password xxx
    • Running the sample application of the asynchronous checkpoint mechanism (in Scala or Java)

      To diversify sample code, the processing time is used as a timestamp for DataStream in Java, and the event time is used as a timestamp for DataStream in Scala. The command reference is as follows:

      • Saving checkpoint snapshot information to HDFS
        • Java
          1
          bin/flink run --class com.huawei.flink.example.checkpoint.FlinkProcessingTimeAPIChkMain /opt/Flink_test/flink-examples-1.0.jar --chkPath hdfs://hacluster/flink-checkpoint/
          
        • Scala
          1
          bin/flink run --class com.huawei.flink.example.checkpoint.FlinkEventTimeAPIChkMain /opt/Flink_test/flink-examples-1.0.jar --chkPath hdfs://hacluster/flink-checkpoint/
          
      • Saving checkpoint snapshot information to a local file
        • Java
          1
          bin/flink run --class com.huawei.flink.example.checkpoint.FlinkProcessingTimeAPIChkMain /opt/Flink_test/flink-examples-1.0.jar --chkPath file:///home/zzz/flink-checkpoint/
          
        • Scala
          1
          2
          bin/flink run --class com.huawei.flink.example.checkpoint.FlinkEventTimeAPIChkMain
          /opt/Flink_test/flink-examples-1.0.jar --chkPath file:///home/zzz/flink-checkpoint/
          
        NOTE:
        • Path of the checkpoint source file: flink/checkpoint/checkpoint/fd5f5b3d08628d83038a30302b611/chk-X/4f854bf4-ea54-4595-a9d9-9b9080779ffe

          In the path, flink/checkpoint/checkpoint indicates the specified root directory.

          fd5f5b3d08628d83038a30302b611 indicates a second-level directory named after jobID.

          In chk-X, X indicates a checkpoint number, which is a third-level directory.

          4f854bf4-ea54-4595-a9d9-9b9080779ffe indicates a checkpoint source file.

        • In cluster mode, Flink checkpoint saves files to HDFS.
    • Running the Stream SQL Join sample application
      1. Start the application to produce data in Kafka. For details about how to configure Kafka, see Running the sample application for producing and consuming data in Kafka (in Java or Scala).
        1
        bin/flink run --class com.huawei.flink.example.sqljoin.WriteIntoKafka4SQLJoin /opt/Flink_test/flink-examples-1.0.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:21005
        
      2. Run the netcat command on any node in the cluster to wait for an application connection.
        1
        netcat -l -p 9000
        
      3. Start the application to receive socket data and perform a joint query.
        1
        bin/flink run --class com.huawei.flink.example.sqljoin.SqlJoinWithSocket /opt/Flink_test/flink-examples-1.0.jar --topic topic-test --bootstrap.servers xxx.xxx.xxx.xxx:21005 --hostname xxx.xxx.xxx.xxx --port 9000
        

Utilizamos cookies para mejorar nuestro sitio y tu experiencia. Al continuar navegando en nuestro sitio, tú aceptas nuestra política de cookies. Descubre más

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback