Esta página ainda não está disponível no idioma selecionado. Estamos trabalhando para adicionar mais opções de idiomas. Agradecemos sua compreensão.

Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Situation Awareness
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
Software Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive

Using the Flink Client

Updated on 2024-10-08 GMT+08:00

This section describes how to use Flink to run wordcount jobs.

Prerequisites

  • Flink has been installed in an MRS cluster.
  • The cluster runs properly and the client has been correctly installed, for example, in the /opt/hadoopclient directory. The client directory in the following operations is only an example. Change it to the actual installation directory.

Using the Flink Client (MRS 3.x or Later)

  1. Install a client.

    The following uses installing a Flink client on a node in the cluster as an example:

    1. Log in to FusionInsight Manager, choose Cluster, click the name of the desired cluster, click More, and select Download Client.
    2. In the Download Cluster Client dialog box that is displayed, select Complete Client for Select Client Type, select the platform type that matches the architecture of the node where the client is to install, select Save to Path, and click OK.
      • The generated file is stored in the /tmp/FusionInsight-Client directory on the active management node by default.
      • The name of the client software package is in the follow format: FusionInsight_Cluster_<Cluster ID>_Services_Client.tar. In this section, the cluster ID 1 is used as an example. Replace it with the actual cluster ID.
    3. Log in to the server where the client is to be installed as the client installation user.
    4. Go to the directory where the installation package is stored and run the following commands to decompress the package:

      cd /tmp/FusionInsight-Client

      tar -xvf FusionInsight_Cluster_1_Services_Client.tar

    5. Run the following command to verify the decompressed file and check whether the command output is consistent with the information in the sha256 file:

      sha256sum -c FusionInsight_Cluster_1_Services_ClientConfig.tar.sha256

      FusionInsight_Cluster_1_Services_ClientConfig.tar: OK
    6. Decompress the obtained installation file.

      tar -xvf FusionInsight_Cluster_1_Services_ClientConfig.tar

    7. Go to the directory where the installation package is stored, and run the following command to install the client to a specified directory (absolute path), for example, /opt/hadoopclient:

      cd /tmp/FusionInsight-Client/FusionInsight_Cluster_1_Services_ClientConfig

      ./install.sh /opt/hadoopclient

      The client is installed if information similar to the following is displayed:

      The component client is installed successfully

  2. Log in to the node where the client is installed as the client installation user.
  3. Run the following command to go to the client installation directory:

    cd /opt/hadoopclient

  4. Run the following command to initialize environment variables:

    source /opt/hadoopclient/bigdata_env

  5. Perform the following operations if Kerberos authentication is enabled for the cluster. Otherwise, skip these operations.

    1. Prepare a user for submitting Flink jobs.

      Log in to FusionInsight Manager and choose System > Permission > Role. Click Create Role and configure Role Name and Description. In Configure Resource Permission, choose Name of the desired cluster > Flink and select FlinkServer Admin Privilege. Then click OK.

      Choose System > Permission > User and click Create User. Configure Username, set User Type to Human-Machine, configure Password and Confirm Passowrd, click Add next to User Group to add the hadoop, yarnviewgroup, and hadooppmanager user groups as needed, click Add next to Role to add the System_administrator, default, and the created role, and click OK. (If you create a Flink job user for the first time, log in to FusionInsight Manager as the user and change the password.)

    2. Log in to Manager and download the authentication credential.

      Log in to FusionInsight Manager. For details, see Accessing FusionInsight Manager (MRS 3.x or Later). Choose System > Permission > User. In the Operation column of the created user, click More and select Download Authentication Credential.

      Figure 1 Downloading the authentication credential
    3. Decompress the downloaded authentication credential package and copy the obtained file to a directory on the client node, for example, /opt/hadoopclient/Flink/flink/conf. If the client is installed on a node outside the cluster, copy the obtained file to the /etc/ directory on this node.
    4. Add the service IP address of the node where the client is installed and IP addresses of all master nodes to the jobmanager.web.access-control-allow-origin and jobmanager.web.allow-access-address configuration items in the /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml file. Use commas (,) to separate the IP addresses.
      jobmanager.web.access-control-allow-origin: xx.xx.xxx.xxx,xx.xx.xxx.xxx,xx.xx.xxx.xxx
      jobmanager.web.allow-access-address: xx.xx.xxx.xxx,xx.xx.xxx.xxx,xx.xx.xxx.xxx
      NOTE:
      To obtain the service IP address of the node where the client is installed, perform the following operations:
      • Node inside the cluster:

        In the navigation tree of the MRS management console, choose Clusters > Active Clusters, select a cluster, and click its name to switch to the cluster details page.

        On the Nodes tab page, view the IP address of the node where the client is installed.

      • Node outside the cluster: IP address of the ECS where the client is installed.
    5. Configure security authentication by adding the keytab path and username in the /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml configuration file.
      security.kerberos.login.keytab: <user.keytab file path>
      security.kerberos.login.principal: <Username>

      Example:

      security.kerberos.login.keytab: /opt/hadoopclient/Flink/flink/conf/user.keytab
      security.kerberos.login.principal: test
    6. In the bin directory of the Flink client, run the following command to perform security hardening and configure a password used for submitting jobs:

      cd /opt/hadoopclient/Flink/flink/bin

      sh generate_keystore.sh

      The script automatically replaces the SSL value in the /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml file.

      NOTE:
      After authentication and encryption, the flink.keystore and flink.truststore files are generated in the conf directory on the Flink client and the following configuration items are set to the default values in the flink-conf.yaml file:
      • Set security.ssl.keystore to the absolute path of the flink.keystore file.
      • Set security.ssl.truststore to the absolute path of the flink.truststore file.
      • Set security.cookie to a random password automatically generated by the generate_keystore.sh script.
      • By default, security.ssl.encrypt.enabled is set to false in the flink-conf.yaml file by default. The generate_keystore.sh script sets security.ssl.key-password, security.ssl.keystore-password, and security.ssl.truststore-password to the password entered when the generate_keystore.sh script is called. There can be security risks if a configuration file contains the authentication password. You are advised to delete the configuration file or use other secure methods to keep the password.
      • For MRS 3.x or later, if ciphertext is required and security.ssl.encrypt.enabled is set to true in the flink-conf.yaml file, the generate_keystore.sh script does not set security.ssl.key-password, security.ssl.keystore-password, and security.ssl.truststore-password. To obtain the values, use the Manager plaintext encryption API by running curl -k -i -u Username:Password -X POST -HContent-type:application/json -d '{"plainText":"Password"}' 'https://x.x.x.x:28443/web/api/v2/tools/encrypt'.

        In the preceding command, Username:Password indicates the user name and password for logging in to the system. The password of "plainText" indicates the one used to call the generate_keystore.sh script. x.x.x.x indicates the floating IP address of Manager. There can be security risks if a command contains the authentication password. You are advised to disable the command recording function (history) before running the command.

    7. Configure paths for the client to access the flink.keystore and flink.truststore files.
      • Relative path (recommended):
        Perform the following steps to set the file path of flink.keystore and flink.truststore to the relative path and ensure that the directory where the Flink client command is executed can directly access the relative paths.
        1. Create a directory, for example, ssl, in /opt/hadoopclient/Flink/flink/conf/.

          cd /opt/hadoopclient/Flink/flink/conf/

          mkdir ssl

        2. Move the flink.keystore and flink.truststore files to the /opt/hadoopclient/Flink/flink/conf/ssl/ directory.

          mv flink.keystore ssl/

          mv flink.truststore ssl/

        3. Change the values of the following parameters to relative paths in the flink-conf.yaml file:
          security.ssl.keystore: ssl/flink.keystore
          security.ssl.truststore: ssl/flink.truststore
      • Absolute path:

        After the generate_keystore.sh script is executed, the file path of flink.keystore and flink.truststore is automatically set to the absolute path /opt/hadoopclient/Flink/flink/conf/ in the flink-conf.yaml file. In this case, you need to move the flink.keystore and flink.truststore files from the conf directory to this absolute path on the Flink client and YARN nodes.

  6. Run a wordcount job.

    NOTICE:

    When a user submits or runs a job in Flink, the user must have the following permissions based on whether Ranger authentication is enabled for related services (such as HDFS and Kafka):

    • If Ranger authentication is enabled, the current user must belong to the hadoop group or the user has been granted the /flink read and write permissions in Ranger.
    • If Ranger authentication is disabled, the current user must belong to the hadoop group.
    • For a normal cluster (Kerberos authentication disabled), you can submit jobs in either of the following ways:
      • Run the following commands to start a session and submit a job in the session:

        yarn-session.sh -nm "session-name" -d

        flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • Run the following command to submit a single job on Yarn:

        flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

    • For a security cluster (Kerberos authentication enabled), you can submit jobs in either of the following ways based on the paths of the flink.keystore and flink.truststore files:
      • If the flink.keystore and flink.truststore files are stored in the relative path:
        • Run the following command in the directory at the same level as ssl to start the session and submit the job in the session:

          ssl is a relative path. For example, if ssl is in opt/hadoopclient/Flink/flink/conf/, run the command in the opt/hadoopclient/Flink/flink/conf/ directory.

          cd /opt/hadoopclient/Flink/flink/conf

          yarn-session.sh -t ssl/ -nm "session-name" -d

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • Run the following command to submit a single job on Yarn:

          cd /opt/hadoopclient/Flink/flink/conf

          flink run -m yarn-cluster -yt ssl/ /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • If the flink.keystore and flink.truststore files are stored in the absolute path:
        • Run the following commands to start a session and submit a job in the session:

          cd /opt/hadoopclient/Flink/flink/conf

          yarn-session.sh -nm "session-name" -d

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • Run the following command to submit a single job on Yarn:

          flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

  7. After the job has been successfully submitted, the following information is displayed on the client:

    Figure 2 Job submitted successfully on Yarn
    Figure 3 Session started successfully
    Figure 4 Job submitted successfully in the session

  8. Go to the native Yarn service page, find the application of the job, and click the application name to go to the job details page. For details, see Viewing Flink Job Information.

    • If the job is not completed, click Tracking URL to go to the native Flink page and view the job running information.
    • If the job submitted in a session has been completed, you can click Tracking URL to log in to the native Flink service page to view job information.
      Figure 5 application

Using the Flink Client (Versions Earlier Than MRS 3.x)

  1. Install a client.

    The following uses installing a Flink client on a core node as an example:

    1. Log in to MRS Manager and choose Services > Download Client to download the client installation package to the active management node.
    2. Use the IP address to search for the active management node, and log in to the active management node over VNC.
    3. Log in to the active management node, and run the following command to switch the user:

      sudo su - omm

    4. On the MRS management console, view the IP address on the Nodes tab page of the specified cluster.

      Record the IP address of the core node where the client is to be used.

    5. On the active management node, run the following command to copy the client installation package to the core node:

      scp -p /tmp/MRS-client/MRS_Services_Client.tar IP address of the core node:/opt/client

    6. Log in to the core node as user root.

      Master nodes support Cloud-Init. The preset username for Cloud-Init is root, and the password is the one set during cluster creation.

    7. Run the following commands to install the client:

      cd /opt/client

      tar -xvf MRS_Services_Client.tar

      tar -xvf MRS_Services_ClientConfig.tar

      cd /opt/client/MRS_Services_ClientConfig

      ./install.sh Client installation directory

      For example, run the following command:

      ./install.sh /opt/hadoopclient

  2. Log in to the node where the client is installed as the client installation user.
  3. Run the following command to go to the client installation directory:

    cd /opt/hadoopclient

  4. Run the following command to initialize environment variables:

    source /opt/hadoopclient/bigdata_env

  5. If Kerberos authentication is enabled for the cluster, perform the following steps. If not, skip this whole step.

    1. Prepare a user for submitting Flink jobs.

      Log in to MRS Manager and choose System > Manage Role. Click Add Role to add a role, for example, flinkrole. In Permission, choose HDFS > File System > hdfs://hacluster/ and select Read, Write, and Execute. After you finish configuring this service, click Service in the Permission area. Choose Yarn > Scheduler Queue > root, select Submit for default, and click OK.

      Choose System > Manage User Group. Click Create User Group to create a user group for the sample project, for example, flinkgroup. Choose System > Manage User. Click Create User to create a user for the sample project. Enter a username, for example, flinkuser. Set User Type to Human-machine, add the user to user groups flinkgroup and hadoop, bind the role flinkrole to the user to obtain permissions, and click OK. (If you create a user for the first time, log in to MRS Manager as this user to change the password.)

    2. Log in to Manager and download the authentication credential.

      Log in to Manager of the cluster. For details, see Accessing MRS Manager (Versions Earlier Than MRS 3.x). Choose System Settings > User Management. In the Operation column of the row that contains the added user, choose More > Download Authentication Credential.

      Figure 6 Downloading the authentication credential
    3. Decompress the downloaded authentication credential package and copy the obtained file to a directory on the client node, for example, /opt/hadoopclient/Flink/flink/conf. If the client is installed on a node outside the cluster, copy the obtained file to the /etc/ directory on this node.
    4. Configure security authentication by adding the keytab path and username in the /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml configuration file.
      security.kerberos.login.keytab: <user.keytab file path>
      security.kerberos.login.principal: <Username>

      Example:

      security.kerberos.login.keytab: /opt/hadoopclient/Flink/flink/conf/user.keytab
      security.kerberos.login.principal: test
    5. Generate the generate_keystore.sh script by referring to Example of Issuing a Certificate and place it in the bin directory on the Flink client. In the bin directory of the Flink client, run the following command to perform security hardening and configure a password used for submitting jobs:

      cd /opt/hadoopclient/Flink/flink/bin

      sh generate_keystore.sh

      This script automatically replaces the SSL value in the /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml file. For MRS 2.x or earlier, external SSL is disabled for security clusters by default. To enable external SSL, configure it by referring to "Authentication and Encryption" and run this script again.

      NOTE:
      • You do not need to manually generate the generate_keystore.sh script.
      • After authentication and encryption, the generated flink.keystore, flink.truststore, and security.cookie items are automatically filled in the corresponding configuration items in flink-conf.yaml.
    6. Configure paths for the client to access the flink.keystore and flink.truststore files.
      • Relative path (recommended):
        Perform the following steps to set the file path of flink.keystore and flink.truststore to the relative path and ensure that the directory where the Flink client command is executed can directly access the relative paths.
        1. Create a directory, for example, ssl, in /opt/hadoopclient/Flink/flink/conf/.

          cd /opt/hadoopclient/Flink/flink/conf/

          mkdir ssl

        2. Move the flink.keystore and flink.truststore files to the /opt/hadoopclient/Flink/flink/conf/ssl/ directory.

          mv flink.keystore ssl/

          mv flink.truststore ssl/

        3. Change the values of the following parameters to relative paths in the flink-conf.yaml file:
          security.ssl.internal.keystore: ssl/flink.keystore
          security.ssl.internal.truststore: ssl/flink.truststore
      • Absolute path:

        After the generate_keystore.sh script is executed, the file path of flink.keystore and flink.truststore is automatically set to the absolute path /opt/hadoopclient/Flink/flink/conf/ in the flink-conf.yaml file. In this case, you need to move the flink.keystore and flink.truststore files from the conf directory to this absolute path on the Flink client and YARN nodes.

  6. Run a wordcount job.

    NOTICE:

    When a user submits or runs a job in Flink, the user must have the following permissions based on whether Ranger authentication is enabled for related services (such as HDFS and Kafka):

    • If Ranger authentication is enabled, the current user must belong to the hadoop group or the user has been granted the /flink read and write permissions in Ranger.
    • If Ranger authentication is disabled, the current user must belong to the hadoop group.
    • For a normal cluster (Kerberos authentication disabled), you can submit jobs in either of the following ways:
      • Run the following commands to start a session and submit a job in the session:

        yarn-session.sh -nm "session-name" -d

        flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • Run the following command to submit a single job on Yarn:

        flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

    • For a security cluster (Kerberos authentication enabled), you can submit jobs in either of the following ways based on the paths of the flink.keystore and flink.truststore files:
      • If the flink.keystore and flink.truststore files are stored in the relative path:
        • Run the following command in the directory at the same level as ssl to start the session and submit the job in the session:

          ssl is a relative path. For example, if ssl is in opt/hadoopclient/Flink/flink/conf/, run the command in the opt/hadoopclient/Flink/flink/conf/ directory.

          cd /opt/hadoopclient/Flink/flink/conf

          yarn-session.sh -t ssl/ -nm "session-name" -d

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • Run the following command to submit a single job on Yarn:

          cd /opt/hadoopclient/Flink/flink/conf

          flink run -m yarn-cluster -yt ssl/ /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

      • If the flink.keystore and flink.truststore files are stored in the absolute path:
        • Run the following commands to start a session and submit a job in the session:

          cd /opt/hadoopclient/Flink/flink/conf

          yarn-session.sh -nm "session-name" -d

          flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

        • Run the following command to submit a single job on Yarn:

          flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar

  7. After the job has been successfully submitted, the following information is displayed on the client:

    Figure 7 Job submitted successfully on Yarn
    Figure 8 Session started successfully
    Figure 9 Job submitted successfully in the session

  8. Go to the native Yarn service page, find the application of the job, and click the application name to go to the job details page. For details, see Viewing Flink Job Information.

    • If the job is not completed, click Tracking URL to go to the native Flink page and view the job running information.
    • If the job submitted in a session has been completed, you can click Tracking URL to log in to the native Flink service page to view job information.
      Figure 10 Application

Usamos cookies para aprimorar nosso site e sua experiência. Ao continuar a navegar em nosso site, você aceita nossa política de cookies. Saiba mais

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback