Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
On this page

Using Flink from Scratch

Updated on 2024-11-29 GMT+08:00

Scenario

Use Flink to run wordcount jobs.

Prerequisites

  • Flink has been installed in the MRS cluster and all components in the cluster are running properly.
  • The cluster client has been installed in a directory, for example, /opt/hadoopclient.

Procedure

  1. Log in to the node where the client is installed as the client installation user.
  2. Run the following commands to go to the client installation directory.

    cd /opt/hadoopclient

  3. Run the following command to initialize environment variables:

    source /opt/hadoopclient/bigdata_env

  4. If Kerberos authentication is enabled for the cluster, perform the following substeps. If Kerberos authentication is not enabled for the cluster, skip the following substeps.

    1. Create a user, for example, test, for submitting Flink jobs.

      Log in to FusionInsight Manager and choose System > Permission > Role. Click Create Role and configure Role Name and Description. In Configure Resource Permission, choose Name of the desired cluster > Flink and select FlinkServer Admin Privilege. Then click OK.

      Choose System > Permission > User and click Create User. Configure Username, set User Type to Human-Machine, configure Password and Confirm Passowrd, click Add next to User Group to add the hadoop, yarnviewgroup, and hadooppmanager user groups as needed, click Add next to Role to add the System_administrator, default, and the created role, and click OK. (If you create a Flink job user for the first time, log in to FusionInsight Manager as the user and change the password.)

      NOTE:

      When a user submits or runs a job in Flink, the user must have the following permissions based on whether Ranger authentication is enabled for related services (such as HDFS and Kafka):

      • If Ranger authentication is enabled, the current user must belong to the hadoop group or the user has been granted the /flink read and write permissions in Ranger.
      • If Ranger authentication is disabled, the current user must belong to the hadoop group.
    2. Log in to FusionInsight Manager and choose System > Permission > User. On the displayed page, locate the row that contains the added user, click More in the Operation column, and select Download Authentication Credential to download the authentication credential file of the user to the local PC and decompress the file.
    3. Copy the decompressed user.keytab and krb5.conf files to the /opt/hadoopclient/Flink/flink/conf directory on the client node.
    4. Log in to the client node and add the service IP address of the client node and the floating IP address of FusionInsight Manager to the jobmanager.web.allow-access-address configuration item in the /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml file. Use commas (,) to separate the IP addresses.

      vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml

    5. Configure security authentication.

      Add the keytab path and username to the /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml configuration file.

      security.kerberos.login.keytab: <user.keytab file path>
      security.kerberos.login.principal: <Username>

      Example:

      security.kerberos.login.keytab: /opt/hadoopclient/Flink/flink/conf/user.keytab
      security.kerberos.login.principal: test
    6. Configure security hardening by referring to Authentication and Encryption. Run the following commands to set a password for submitting jobs.

      cd /opt/hadoopclient/Flink/flink/bin

      sh generate_keystore.sh

      The script automatically changes the SSL-related parameter values in the /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml file.

    7. Configure paths for the client to access the flink.keystore and flink.truststore files.
      • Absolute path

        After the generate_keystore.sh script is executed, the flink.keystore and flink.truststore file paths are automatically set to absolute paths in the flink-conf.yaml file by default. In this case, you need to place the flink.keystore and flink.truststore files in the conf directory to the absolute paths of the Flink client and each Yarn node, respectively.

      • Relative path (recommended)
        Perform the following steps to set the file paths of flink.keystore and flink.truststore to relative paths and ensure that the directory where the Flink client command is executed can directly access the relative paths.
        1. Create a directory, for example, ssl, in /opt/hadoopclient/Flink/flink/conf/.

          cd /opt/hadoopclient/Flink/flink/conf/

          mkdir ssl

        2. Move the flink.keystore and flink.truststore files to the new paths.

          mv flink.keystore ssl/

          mv flink.truststore ssl/

        3. Change the values of the following parameters to relative paths in the flink-conf.yaml file:
          vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml
          security.ssl.keystore: ssl/flink.keystore
          security.ssl.truststore: ssl/flink.truststore

  5. Run a wordcount job.

    NOTE:

    Job submission modes are as follows:

    • Session

      In this mode, a YARN property file is created in Client installation directory/Flink/tmp/.yarn-properties-<Username>. Jobs are submitted to the application corresponding to the application ID recorded in the file. After jobs are complete, the Flink cluster is not closed. In session mode, jobs can be submitted in either of the following modes:

      • attached (default)

        The yarn-session.sh client submits the Flink cluster to YARN, but the client keeps running to trace the cluster status. If the cluster fails, the client displays an error. If the client is terminated, a shutdown signal is sent to the cluster.

      • detached (-d or --detached)

        The yarn-session.sh client submits the Flink cluster to YARN, and then the client returns a response. You need to invoke the client or YARN again to stop the Flink cluster.

    • Application

      In this mode, a Flink cluster is started on YARN. The main() method of the application JAR file is executed on JobManager in YARN. You can use the dependency package on HDFS. After the application is complete, the cluster is shut down immediately. You can also use yarn application -kill <ApplicationId> or cancel the Flink job to manually stop the cluster. After the job is complete, the Flink cluster is also stopped.

    • Per-Job Cluster

      In this mode, a Flink cluster is started on YARN, the provided application JAR file is run locally, and JobGraph is submitted to JobManager in YARN. If the --detached parameter is used, the client stops after the job is submitted. After the job is complete, the Flink cluster is also closed.

    • Yarn-cluster mode

      This mode is similar to the Per-Job Cluster mode.

    • If the job registration function is enabled, that is, the job.alarm.enable, job.register.enable, and flinkServer.tenant.name parameters in the /opt/client/Flink/flink/conf/flink-conf.yaml file are set to true, true, and CLIENT_APP, respectively, you need to specify the job name in the job running command by adding the -Dyarn.application.name=Job name parameter.
    • Normal cluster (Kerberos authentication disabled)
      • Submitting a Job in session's attached mode

        yarn-session.sh -nm "session-name"

        Start a new client connection and submit the job:

        source /opt/hadoopclient/bigdata_env

        flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • Submitting a job in application mode

        flink run-application -t yarn-application /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • Submitting a Job in Per-job mode

        flink run -t yarn-per-job /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • Submitting a Job in yarn-cluster mode

        flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

    • Security cluster (Kerberos authentication enabled)
      • If the flink.keystore and flink.truststore file paths are relative paths:
        • Submit a job in session's attached mode. ssl/ is a relative path.

          cd /opt/hadoopclient/Flink/flink/conf/

          yarn-session.sh -t ssl/ -nm "session-name"

          ...
          Cluster started: Yarn cluster with application id application_1624937999496_0017
          JobManager Web Interface: http://192.168.1.150:32261

          Start a new client connection and submit the job:

          source /opt/hadoopclient/bigdata_env

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

          ...
          Job has been submitted with JobID 587d5498fff18d8b2501fdf7ebb9c4fb
          Program execution finished
          Job with JobID 587d5498fff18d8b2501fdf7ebb9c4fb has finished.
          Job Runtime: 19917 ms
        • Submitting a job in application mode

          cd /opt/hadoopclient/Flink/flink/conf/

          flink run-application -t yarn-application -Dyarn.ship-files="ssl/" /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

          ...
          Submitted application application_1669179911005_0070
          Waiting for the cluster to be allocated
          Deploying cluster, current state ACCEPTED
          YARN application has been deployed successfully.
          Found Web Interface xxx:xxx of application 'application_1669179911005_0070'.
        • Submitting a Job in Per-job mode

          cd /opt/hadoopclient/Flink/flink/conf/

          flink run -t yarn-per-job -Dyarn.ship-files="ssl/" /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

          ...
          Cluster started: Yarn cluster with application id application_1669179911005_0071
          Job has been submitted with JobID 75011429a29f230121809f54f4570ed0
          Program execution finished
          Job with JobID 75011429a29f230121809f54f4570ed0 has finished.
          Job Runtime: 21245 ms
        • Submitting a Job in yarn-cluster mode

          flink run -m yarn-cluster -yt ssl/ /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • If the flink.keystore and flink.truststore file paths are absolute paths:
        • Submitting a Job in session's attached mode

          cd /opt/hadoopclient/Flink/flink/conf/

          yarn-session.sh -nm "session-name"

          Start a new client connection and submit the job:

          source /opt/hadoopclient/bigdata_env

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

          Or flink run -t yarn-session -Dyarn.application.id=application_xxx /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • Submitting a job in application mode

          cd /opt/hadoopclient/Flink/flink/conf/

          flink run-application -t yarn-application /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • Submitting a Job in Per-job mode

          cd /opt/hadoopclient/Flink/flink/conf/

          flink run -t yarn-per-job /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • Submitting a Job in yarn-cluster mode

          flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

    NOTE:

    If the function of registering jobs with FlinkServer is not enabled, you cannot start, develop, or stop jobs registered with FlinkServer through the client on the FlinkServer web UI. For details about how to enable the function, see Verifying Flink's Job Inspection.

  6. Log in to FusionInsight Manager as a running user, go to the native page of the Yarn service, find the application of the corresponding job, and click the application name to go to the job details page.

    • If the job is not completed, click Tracking URL to go to the native Flink page and view the job running information.
    • If the job submitted in a session has been completed, you can click Tracking URL to log in to the native Flink service page to view job information.
      Figure 1 Application

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback