Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Situation Awareness
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive
Help Center/ Cloud Container Engine/ Best Practices/ Batch Computing/ Deploying and Using Flink in a CCE Cluster

Deploying and Using Flink in a CCE Cluster

Updated on 2024-12-28 GMT+08:00

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded (streams) and bounded (batches) data streams. It can process data streams in real time with low latency and high throughput and process complex events. Deploying Flink in CCE clusters enables you to build a high-performance, reliable, and flexible data processing system for a wide range of applications in big data environments while ensuring optimal resource usage. This section describes how to deploy Flink in a CCE cluster and how to run a Flink WordCount job in the CCE cluster. In this example, the Flink cluster is deployed standalone. For details about the deployment process, see the Kubernetes | Apache Flink.

Prerequisites

Step 1: Deploy a Flink Cluster

Three key components are required for deploying a Flink cluster. The Flink official website provides a resource definition file for each component. For details, see Table 1. In addition, you need to use the flink-configuration-configmap.yaml configuration file on the Flink official website to configure the Flink cluster.

Table 1 Key components of the Flink cluster

Key Component

Resource Definition File

Description

Deployment for running JobManager

jobmanager-session-deployment-non-ha.yaml

JobManager serves as the central coordinator in a Flink cluster. It coordinates Flink jobs, including task distribution, job scheduling, resource allocation, and fault tolerance.

Deployment for running TaskManager

taskmanager-session-deployment.yaml

TaskManager is a worker node in a Flink cluster and is responsible for executing data processing tasks. Each TaskManager runs one or more task slots, which are isolated units of execution.

Service exposing the JobManager's REST and UI ports

jobmanager-service.yaml

The REST and Web UI ports of Flink JobManager are exposed so that users can access the REST API and Web UI of JobManager through the Service.

  1. Configure basic information about the Flink cluster.

    1. Create a YAML file named flink-configuration-configmap.yaml.
      vim flink-configuration-configmap.yaml
      Check that the file contains comments and the content is as follows:
      apiVersion: v1
      kind: ConfigMap
      metadata:
        name: flink-config
        labels:
          app: flink
      # data defines the data stored in ConfigMap. In the example, data contains two configuration files: config.yaml and log4j-console.properties.
      data:   
        config.yaml: |+
      # RPC address of Flink JobManager. It is usually the JobManager name. In this example, the RPC address is flink-jobmanager.
          jobmanager.rpc.address: flink-jobmanager
      # Number of task slots in each TaskManager. Set the value to 2, indicating that each TaskManager can process two tasks concurrently.
          taskmanager.numberOfTaskSlots: 2
      # Port of the Flink BLOB service. It is used to transfer large objects, such as job code or large files.
          blob.server.port: 6124
          jobmanager.rpc.port: 6123              # RPC port of JobManager
          taskmanager.rpc.port: 6122             # RPC port of TaskManager
          jobmanager.memory.process.size: 1600m  # Total memory of JobManager
          taskmanager.memory.process.size: 1728m # Total memory of TaskManager
          parallelism.default: 2                 # The default degree of parallelism is 2 for Flink jobs.
        log4j-console.properties: |+
          # The following configuration affects the logging for user code and Flink logs.
          rootLogger.level = INFO                # Log messages of level INFO and above.
          rootLogger.appenderRef.console.ref = ConsoleAppender      # Send the logs to the console.
          rootLogger.appenderRef.rolling.ref = RollingFileAppender  # Export the logs to a rolling file.
          # If you only want to change the logging in Flink, delete the comments in the following lines:
          #logger.flink.name = org.apache.flink
          #logger.flink.level = INFO
       
          # The following eight lines keep the log level of the public libraries or connectors at INFO.
          # The configuration of the root logger does not overwrite the configuration here.
          # You need to manually change the log level.
          logger.pekko.name = org.apache.pekko
          logger.pekko.level = INFO
          logger.kafka.name= org.apache.kafka
          logger.kafka.level = INFO
          logger.hadoop.name = org.apache.hadoop
          logger.hadoop.level = INFO
          logger.zookeeper.name = org.apache.zookeeper
          logger.zookeeper.level = INFO
       
          # Send all logs of the INFO level to the console.
          appender.console.name = ConsoleAppender
          appender.console.type = CONSOLE
          appender.console.layout.type = PatternLayout
          appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
       
          # Export all logs of the INFO level to a specified scrolling file.
          appender.rolling.name = RollingFileAppender
          appender.rolling.type = RollingFile
          appender.rolling.append = false
          appender.rolling.fileName = ${sys:log.file}
          appender.rolling.filePattern = ${sys:log.file}.%i
          appender.rolling.layout.type = PatternLayout
          appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
          appender.rolling.policies.type = Policies
          appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
          appender.rolling.policies.size.size=100MB
          appender.rolling.strategy.type = DefaultRolloverStrategy
          appender.rolling.strategy.max = 10
       
          # Disable false alarms in the Netty channel handler.
          logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
          logger.netty.level = OFF      
    2. Use flink-configuration-configmap.yaml to configure basic information about the Flink cluster.
      kubectl create -f flink-configuration-configmap.yaml
    3. Check whether the ConfigMap named flink-config is successfully created.
      kubectl get configmap

      If the following information is displayed, the ConfigMap was created successfully.

      NAME               DATA   AGE
      flink-config       2      59s
      kube-root-ca.crt   1      16d

  2. Create a Service that exposes the REST and UI ports of JobManager.

    1. Create a YAML file named jobmanager-service.yaml.
      vim jobmanager-service.yaml

      Check that the file contains comments and the content is as follows:

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-jobmanager
      spec:
        type: ClusterIP   # The Service is used for internal communication within the cluster.
        ports:            # Define the list of ports to be exposed by Service.
        - name: rpc
          port: 6123
        - name: blob-server
          port: 6124
        - name: webui
          port: 8081
        selector:         # Define the Service's label selector, which is used to determine the pods to which the Service routes traffic.
          app: flink
          component: jobmanager
    2. Use jobmanager-service.yaml to create a Service named flink-jobmanager.
      kubectl create -f jobmanager-service.yaml
    3. Check whether the Service is created successfully.
      kubectl get service flink-jobmanager

  3. Create a Deployment for running JobManager.

    1. Create a YAML file named jobmanager-session-deployment-non-ha.yaml.
      vim jobmanager-session-deployment-non-ha.yaml

      Check that the file contains comments and the content is as follows:

      apiVersion: apps/v1
      kind: Deployment
      metadata:
        name: flink-jobmanager
      spec:
        replicas: 1                  # Set the number of JobManager replicas to 1.
        selector:
          matchLabels:               # Define labels.
            app: flink           
            component: jobmanager
        template:
          metadata:
            labels:
              app: flink
              component: jobmanager
          spec:
            containers:
            - name: jobmanager       # Set the container name to jobmanager.
              image: apache/flink:1.20.0-scala_2.12    # Use the Flink image of v1.20.0 and the Scala of v2.12.
              args: ["jobmanager"]   # Designate the container to run as JobManager.
              ports:                 # Expose ports in the container.
              - containerPort: 6123  # Used for communication between TaskManager and JobManager.
                name: rpc
              - containerPort: 6124  # Used to transfer binary objects.
                name: blob-server
              - containerPort: 8081  # Used to access the Flink web management page.
                name: webui
              livenessProbe:
                tcpSocket:
                  port: 6123         # Use TCP to check the health status of RPC port 6123.
                initialDelaySeconds: 30
                periodSeconds: 60
              volumeMounts:          # Mount a storage volume.
              - name: flink-config-volume
                mountPath: /opt/flink/conf
              securityContext:
                runAsUser: 9999      # For details, see the _flink_User in the official Flink image. You can change the username if necessary.
            volumes:                 # Define storage volumes to store configuration files.
            - name: flink-config-volume
              configMap:
                name: flink-config
                items:
                - key: config.yaml   # Mount the config.yaml file in the ConfigMap to the specified path of the container.
                  path: config.yaml  # The path in the container is /opt/flink/conf/config.yaml.
                - key: log4j-console.properties    # Mount the log4j-console.properties file in the ConfigMap to the specified path of the container.
                  path: log4j-console.properties   # The path in the container is /opt/flink/conf/log4j-console.properties.
    2. Use jobmanager-session-deployment-non-ha.yaml to create a Deployment named flink-jobmanager.
      kubectl create -f jobmanager-session-deployment-non-ha.yaml
    3. Check whether the Deployment flink-jobmanager is successfully created.
      kubectl get pod

  4. Create a Deployment for running TaskManager.

    1. Create a YAML file named taskmanager-session-deployment.yaml.
      vim taskmanager-session-deployment.yaml

      Check that the file contains comments and the content is as follows:

      apiVersion: apps/v1
      kind: Deployment
      metadata:
        name: flink-taskmanager
      spec:
        replicas: 2                    # Set the number of TaskManager replicas to 2.
        selector:
          matchLabels:                 # Define labels.
            app: flink
            component: taskmanager
        template:
          metadata:
            labels:
              app: flink
              component: taskmanager
          spec:
            containers:
            - name: taskmanager        # Set the container name to taskmanager.
              image: apache/flink:1.20.0-scala_2.12    # Use the Flink image of v1.20.0 and the Scala of v2.12.
              args: ["taskmanager"]    # Designate the container to run as TaskManager.
              ports:                   # Expose ports in the container.
              - containerPort: 6122    # Used for communication between TaskManager and JobManager.
                name: rpc
              livenessProbe: 
                tcpSocket:
                  port: 6122           # Use TCP to check the health status of RPC port 6122.
                initialDelaySeconds: 30
                periodSeconds: 60
              volumeMounts:            # Mount a storage volume.
              - name: flink-config-volume
                mountPath: /opt/flink/conf/
              securityContext:
                runAsUser: 9999       # For details, see the _flink_User in the official Flink image. You can change the username if necessary.
            volumes:                  # Define storage volumes to store configuration files.
            - name: flink-config-volume
              configMap:
                name: flink-config
                items:
                - key: config.yaml
                  path: config.yaml
                - key: log4j-console.properties
                  path: log4j-console.properties
    2. Use taskmanager-session-deployment.yaml to create a Deployment named flink-taskmanager.
      kubectl create -f taskmanager-session-deployment.yaml
    3. Check whether the Deployment flink-taskmanager is successfully created.
      kubectl get pod

Step 2: Publish the Service

Create a NodePort Service for flink-jobmanager to allow external networks to access flink-jobmanager through the public IP address and automatically allocated external port number. The Service will forward external requests to the corresponding container.

  1. Log in to the CCE console. Choose Workloads > Deployments, click flink-jobmanager, the Access Mode tab, and then Create Service.
  2. On the Create Service page, set Service Type to NodePort. In the Ports area, set both Container Port and Service Port to 8081, and click OK. The Service automatically generates a port for accessing the node. The port is displayed in Figure 2. In this example, the port is 30327. You can access the workload using the EIP and the port of any node in the cluster.

    Figure 1 Creating a NodePort Service

    Figure 2 NodePort

  3. Check whether the Service can be accessed. Choose Nodes, click the Nodes tab, select a node, and copy its EIP.

    In the address box of a browser, enter EIP of the node:Port for accessing the node. If the Flink dashboard page is displayed, the access is successful. If the access failed, check whether the source IP address for the node port is set to 0.0.0.0/0 or All in the inbound rule of the cluster security group. For details, see Configuring Security Group Rules.

    Figure 3 Flink Dashboard

Step 3: Run the Flink Job

Use the official WordCount.jar file to demonstrate how to execute Flink jobs in a CCE cluster. The WordCount task is to calculate the number of occurrences of each word in the text.

  1. Download and decompress the flink-1.20.0-bin-scala_2.12.tgz file. The file can be obtained at https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz. Check whether the WordCount.jar package exists in the flink-1.20.0-bin-scala_2.12\flink-1.20.0\examples\streamin directory.
  2. Add a .jar package on the Dashboard page. Open the Apache Flink Dashboard page, choose Submit New Job from the navigation tree, click Add New in the upper right corner, and select WordCount.jar in the flink-1.20.0-bin-scala_2.12\flink-1.20.0\examples\streamin directory. Click the WordCount.jar file and specify the output file path, for example, --output /opt/flink/output in the Program Arguments text box.

    Figure 4 Uploading a WordCount job

  3. Click the blue box on the lower right of Overview and click Taskmanager to check the endpoint of the job.

    Figure 5 Checking an endpoint

  4. Use the endpoint to obtain the TaskManager pod. Run the following command to query the IP address of the Flink pod:

    kubectl get pod -o wide | grep flink

    flink-taskmanager-579f47cf9f-prrff is the TaskManager pod if the following information is displayed:

    flink-jobmanager-789c8777-vhqbv         1/1     Running             1 (28m ago)    40h     192.168.0.139   192.168.0.53    <none>           <none>
    flink-taskmanager-579f47cf9f-prrff      1/1     Running             1 (28m ago)    40h     192.168.0.92    192.168.0.53    <none>           <none>
    flink-taskmanager-579f47cf9f-wgt66      1/1     Running             1 (28m ago)    40h     192.168.0.194   192.168.0.212   <none>           <none>

  5. After the job is complete, go to flink-taskmanager-579f47cf9f-prrff to check whether the number of occurrences of each word is correctly displayed.

    kubectl exec -it flink-taskmanager-579f47cf9f-prrff bash

    Run the ls command to query the output path.

    ls /opt/flink/output/

    Information similar to the following is displayed:

    2024-09-02--01

    Check the content of the 2024-09-02--01 folder.

    ls /opt/flink/output/2024-09-02--01

    Information similar to the following is displayed:

    part-bd89ad8b-a0dd-4b4d-b771-4c88eaed61e4-0

    Check the number of occurrences of each word.

    cat /opt/flink/output/2024-09-02--01/part-bd89ad8b-a0dd-4b4d-b771-4c88eaed61e4-0

    Information similar to the following is displayed:

    (to,1)
    (be,1)
    (or,1)
    (not,1)
    (to,2)
    (be,2)
    (that,1)
    ...

Step 4: Clear the Cluster

  1. Delete the Deployment that runs the JobManager.

    kubectl delete -f jobmanager-session-deployment-non-ha.yaml

    Information similar to the following is displayed:

    deployment.apps "flink-jobmanager" deleted

  2. Delete the Deployment that runs the TaskManager.

    kubectl delete -f taskmanager-session-deployment.yaml

    Information similar to the following is displayed:

    deployment.apps "flink-taskmanager" deleted

  3. Delete the ConfigMap.

    kubectl delete -f flink-configuration-configmap.yaml

    Information similar to the following is displayed:

    configmap "flink-config" deleted

  4. Delete the Service.

    kubectl delete -f jobmanager-service.yaml

    Information similar to the following is displayed:

    service "flink-jobmanager" delete

We use cookies to improve our site and your experience. By continuing to browse our site you accept our cookie policy. Find out more

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback