Updated on 2024-11-12 GMT+08:00

Running Flink in CCE

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded (streams) and bounded (batches) data streams. It can process data streams in real time with low latency and high throughput and process complex events. Deploying Flink in CCE clusters enables you to build a high-performance, reliable, and flexible data processing system for a wide range of applications in big data environments while ensuring optimal resource usage. This section describes how to deploy Flink in a CCE cluster and how to run a Flink WordCount job in the CCE cluster. In this example, the Flink cluster is deployed standalone. For details about the deployment process, see the Kubernetes | Apache Flink.

Prerequisites

Step 1: Deploy a Flink Cluster

Three key components are required for deploying a Flink cluster. The Flink official website provides a resource definition file for each component. For details, see Table 1. In addition, you need to use the flink-configuration-configmap.yaml configuration file on the Flink official website to configure the Flink cluster.

Table 1 Key components of the Flink cluster

Key Component

Resource Definition File

Description

Deployment for running JobManager

jobmanager-session-deployment-non-ha.yaml

JobManager serves as the central coordinator in a Flink cluster. It coordinates Flink jobs, including task distribution, job scheduling, resource allocation, and fault tolerance.

Deployment for running TaskManager

taskmanager-session-deployment.yaml

TaskManager is a worker node in a Flink cluster and is responsible for executing data processing tasks. Each TaskManager runs one or more task slots, which are isolated units of execution.

Service exposing the JobManager's REST and UI ports

jobmanager-service.yaml

The REST and Web UI ports of Flink JobManager are exposed so that users can access the REST API and Web UI of JobManager through the Service.

  1. Configure basic information about the Flink cluster.

    1. Create a YAML file named flink-configuration-configmap.yaml.
      vim flink-configuration-configmap.yaml
      Check that the file contains comments and the content is as follows:
      apiVersion: v1
      kind: ConfigMap
      metadata:
        name: flink-config
        labels:
          app: flink
      # data defines the data stored in ConfigMap. In the example, data contains two configuration files: config.yaml and log4j-console.properties.
      data:   
        config.yaml: |+
      # RPC address of Flink JobManager. It is usually the JobManager name. In this example, the RPC address is flink-jobmanager.
          jobmanager.rpc.address: flink-jobmanager
      # Number of task slots in each TaskManager. Set the value to 2, indicating that each TaskManager can process two tasks concurrently.
          taskmanager.numberOfTaskSlots: 2
      # Port of the Flink BLOB service. It is used to transfer large objects, such as job code or large files.
          blob.server.port: 6124
          jobmanager.rpc.port: 6123              # RPC port of JobManager
          taskmanager.rpc.port: 6122             # RPC port of TaskManager
          jobmanager.memory.process.size: 1600m  # Total memory of JobManager
          taskmanager.memory.process.size: 1728m # Total memory of TaskManager
          parallelism.default: 2                 # The default degree of parallelism is 2 for Flink jobs.
        log4j-console.properties: |+
          # The following configuration affects the logging for user code and Flink logs.
          rootLogger.level = INFO                # Log messages of level INFO and above.
          rootLogger.appenderRef.console.ref = ConsoleAppender      # Send the logs to the console.
          rootLogger.appenderRef.rolling.ref = RollingFileAppender  # Export the logs to a rolling file.
          # If you only want to change the logging in Flink, delete the comments in the following lines:
          #logger.flink.name = org.apache.flink
          #logger.flink.level = INFO
       
          # The following eight lines keep the log level of the public libraries or connectors at INFO.
          # The configuration of the root logger does not overwrite the configuration here.
          # You need to manually change the log level.
          logger.pekko.name = org.apache.pekko
          logger.pekko.level = INFO
          logger.kafka.name= org.apache.kafka
          logger.kafka.level = INFO
          logger.hadoop.name = org.apache.hadoop
          logger.hadoop.level = INFO
          logger.zookeeper.name = org.apache.zookeeper
          logger.zookeeper.level = INFO
       
          # Send all logs of the INFO level to the console.
          appender.console.name = ConsoleAppender
          appender.console.type = CONSOLE
          appender.console.layout.type = PatternLayout
          appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
       
          # Export all logs of the INFO level to a specified scrolling file.
          appender.rolling.name = RollingFileAppender
          appender.rolling.type = RollingFile
          appender.rolling.append = false
          appender.rolling.fileName = ${sys:log.file}
          appender.rolling.filePattern = ${sys:log.file}.%i
          appender.rolling.layout.type = PatternLayout
          appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
          appender.rolling.policies.type = Policies
          appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
          appender.rolling.policies.size.size=100MB
          appender.rolling.strategy.type = DefaultRolloverStrategy
          appender.rolling.strategy.max = 10
       
          # Disable false alarms in the Netty channel handler.
          logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
          logger.netty.level = OFF      
    2. Use flink-configuration-configmap.yaml to configure basic information about the Flink cluster.
      kubectl create -f flink-configuration-configmap.yaml
    3. Check whether the ConfigMap named flink-config is successfully created.
      kubectl get configmap

      If the following information is displayed, the ConfigMap was created successfully.

      NAME               DATA   AGE
      flink-config       2      59s
      kube-root-ca.crt   1      16d

  2. Create a Service that exposes the REST and UI ports of JobManager.

    1. Create a YAML file named jobmanager-service.yaml.
      vim jobmanager-service.yaml

      Check that the file contains comments and the content is as follows:

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-jobmanager
      spec:
        type: ClusterIP   # The Service is used for internal communication within the cluster.
        ports:            # Define the list of ports to be exposed by Service.
        - name: rpc
          port: 6123
        - name: blob-server
          port: 6124
        - name: webui
          port: 8081
        selector:         # Define the Service's label selector, which is used to determine the pods to which the Service routes traffic.
          app: flink
          component: jobmanager
    2. Use jobmanager-service.yaml to create a Service named flink-jobmanager.
      kubectl create -f jobmanager-service.yaml
    3. Check whether the Service is created successfully.
      kubectl get service flink-jobmanager

  3. Create a Deployment for running JobManager.

    1. Create a YAML file named jobmanager-session-deployment-non-ha.yaml.
      vim jobmanager-session-deployment-non-ha.yaml

      Check that the file contains comments and the content is as follows:

      apiVersion: apps/v1
      kind: Deployment
      metadata:
        name: flink-jobmanager
      spec:
        replicas: 1                  # Set the number of JobManager replicas to 1.
        selector:
          matchLabels:               # Define labels.
            app: flink           
            component: jobmanager
        template:
          metadata:
            labels:
              app: flink
              component: jobmanager
          spec:
            containers:
            - name: jobmanager       # Set the container name to jobmanager.
              image: apache/flink:1.20.0-scala_2.12    # Use the Flink image of v1.20.0 and the Scala of v2.12.
              args: ["jobmanager"]   # Designate the container to run as JobManager.
              ports:                 # Expose ports in the container.
              - containerPort: 6123  # Used for communication between TaskManager and JobManager.
                name: rpc
              - containerPort: 6124  # Used to transfer binary objects.
                name: blob-server
              - containerPort: 8081  # Used to access the Flink web management page.
                name: webui
              livenessProbe:
                tcpSocket:
                  port: 6123         # Use TCP to check the health status of RPC port 6123.
                initialDelaySeconds: 30
                periodSeconds: 60
              volumeMounts:          # Mount a storage volume.
              - name: flink-config-volume
                mountPath: /opt/flink/conf
              securityContext:
                runAsUser: 9999      # For details, see the _flink_User in the official Flink image. You can change the username if necessary.
            volumes:                 # Define storage volumes to store configuration files.
            - name: flink-config-volume
              configMap:
                name: flink-config
                items:
                - key: config.yaml   # Mount the config.yaml file in the ConfigMap to the specified path of the container.
                  path: config.yaml  # The path in the container is /opt/flink/conf/config.yaml.
                - key: log4j-console.properties    # Mount the log4j-console.properties file in the ConfigMap to the specified path of the container.
                  path: log4j-console.properties   # The path in the container is /opt/flink/conf/log4j-console.properties.
    2. Use jobmanager-session-deployment-non-ha.yaml to create a Deployment named flink-jobmanager.
      kubectl create -f jobmanager-session-deployment-non-ha.yaml
    3. Check whether the Deployment flink-jobmanager is successfully created.
      kubectl get pod

  4. Create a Deployment for running TaskManager.

    1. Create a YAML file named taskmanager-session-deployment.yaml.
      vim taskmanager-session-deployment.yaml

      Check that the file contains comments and the content is as follows:

      apiVersion: apps/v1
      kind: Deployment
      metadata:
        name: flink-taskmanager
      spec:
        replicas: 2                    # Set the number of TaskManager replicas to 2.
        selector:
          matchLabels:                 # Define labels.
            app: flink
            component: taskmanager
        template:
          metadata:
            labels:
              app: flink
              component: taskmanager
          spec:
            containers:
            - name: taskmanager        # Set the container name to taskmanager.
              image: apache/flink:1.20.0-scala_2.12    # Use the Flink image of v1.20.0 and the Scala of v2.12.
              args: ["taskmanager"]    # Designate the container to run as TaskManager.
              ports:                   # Expose ports in the container.
              - containerPort: 6122    # Used for communication between TaskManager and JobManager.
                name: rpc
              livenessProbe: 
                tcpSocket:
                  port: 6122           # Use TCP to check the health status of RPC port 6122.
                initialDelaySeconds: 30
                periodSeconds: 60
              volumeMounts:            # Mount a storage volume.
              - name: flink-config-volume
                mountPath: /opt/flink/conf/
              securityContext:
                runAsUser: 9999       # For details, see the _flink_User in the official Flink image. You can change the username if necessary.
            volumes:                  # Define storage volumes to store configuration files.
            - name: flink-config-volume
              configMap:
                name: flink-config
                items:
                - key: config.yaml
                  path: config.yaml
                - key: log4j-console.properties
                  path: log4j-console.properties
    2. Use taskmanager-session-deployment.yaml to create a Deployment named flink-taskmanager.
      kubectl create -f taskmanager-session-deployment.yaml
    3. Check whether the Deployment flink-taskmanager is successfully created.
      kubectl get pod

Step 2: Publish the Service

Create a NodePort Service for flink-jobmanager to allow external networks to access flink-jobmanager through the public IP address and automatically allocated external port number. The Service will forward external requests to the corresponding container.

  1. Log in to the CCE console. Choose Workloads > Deployments, click flink-jobmanager, the Access Mode tab, and then Create Service.
  2. On the Create Service page, set Service Type to NodePort. In the Ports area, set both Container Port and Service Port to 8081, and click OK. The Service automatically generates a port for accessing the node. The port is displayed in Figure 2. In this example, the port is 30327. You can access the workload using the EIP and the port of any node in the cluster.

    Figure 1 Creating a NodePort Service

    Figure 2 NodePort

  3. Check whether the Service can be accessed. Choose Nodes, click the Nodes tab, select a node, and copy its EIP.

    In the address box of a browser, enter EIP of the node:Port for accessing the node. If the Flink dashboard page is displayed, the access is successful. If the access failed, check whether the source IP address for the node port is set to 0.0.0.0/0 or All in the inbound rule of the cluster security group. For details, see Configuring Security Group Rules.

    Figure 3 Flink Dashboard

Step 3: Run the Flink Job

Use the official WordCount.jar file to demonstrate how to execute Flink jobs in a CCE cluster. The WordCount task is to calculate the number of occurrences of each word in the text.

  1. Download and decompress the flink-1.20.0-bin-scala_2.12.tgz file. The file can be obtained at https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz. Check whether the WordCount.jar package exists in the flink-1.20.0-bin-scala_2.12\flink-1.20.0\examples\streamin directory.
  2. Add a .jar package on the Dashboard page. Open the Apache Flink Dashboard page, choose Submit New Job from the navigation tree, click Add New in the upper right corner, and select WordCount.jar in the flink-1.20.0-bin-scala_2.12\flink-1.20.0\examples\streamin directory. Click the WordCount.jar file and specify the output file path, for example, --output /opt/flink/output in the Program Arguments text box.

    Figure 4 Uploading a WordCount job

  3. Click the blue box on the lower right of Overview and click Taskmanager to check the endpoint of the job.

    Figure 5 Checking an endpoint

  4. Use the endpoint to obtain the TaskManager pod. Run the following command to query the IP address of the Flink pod:

    kubectl get pod -o wide | grep flink

    flink-taskmanager-579f47cf9f-prrff is the TaskManager pod if the following information is displayed:

    flink-jobmanager-789c8777-vhqbv         1/1     Running             1 (28m ago)    40h     192.168.0.139   192.168.0.53    <none>           <none>
    flink-taskmanager-579f47cf9f-prrff      1/1     Running             1 (28m ago)    40h     192.168.0.92    192.168.0.53    <none>           <none>
    flink-taskmanager-579f47cf9f-wgt66      1/1     Running             1 (28m ago)    40h     192.168.0.194   192.168.0.212   <none>           <none>

  5. After the job is complete, go to flink-taskmanager-579f47cf9f-prrff to check whether the number of occurrences of each word is correctly displayed.

    kubectl exec -it flink-taskmanager-579f47cf9f-prrff bash

    Run the ls command to query the output path.

    ls /opt/flink/output/

    Information similar to the following is displayed:

    2024-09-02--01

    Check the content of the 2024-09-02--01 folder.

    ls /opt/flink/output/2024-09-02--01

    Information similar to the following is displayed:

    part-bd89ad8b-a0dd-4b4d-b771-4c88eaed61e4-0

    Check the number of occurrences of each word.

    cat /opt/flink/output/2024-09-02--01/part-bd89ad8b-a0dd-4b4d-b771-4c88eaed61e4-0

    Information similar to the following is displayed:

    (to,1)
    (be,1)
    (or,1)
    (not,1)
    (to,2)
    (be,2)
    (that,1)
    ...

Step 4: Clear the Cluster

  1. Delete the Deployment that runs the JobManager.

    kubectl delete -f jobmanager-session-deployment-non-ha.yaml

    Information similar to the following is displayed:

    deployment.apps "flink-jobmanager" deleted

  2. Delete the Deployment that runs the TaskManager.

    kubectl delete -f taskmanager-session-deployment.yaml

    Information similar to the following is displayed:

    deployment.apps "flink-taskmanager" deleted

  3. Delete the ConfigMap.

    kubectl delete -f flink-configuration-configmap.yaml

    Information similar to the following is displayed:

    configmap "flink-config" deleted

  4. Delete the Service.

    kubectl delete -f jobmanager-service.yaml

    Information similar to the following is displayed:

    service "flink-jobmanager" delete