Running Flink in CCE
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded (streams) and bounded (batches) data streams. It can process data streams in real time with low latency and high throughput and process complex events. Deploying Flink in CCE clusters enables you to build a high-performance, reliable, and flexible data processing system for a wide range of applications in big data environments while ensuring optimal resource usage. This section describes how to deploy Flink in a CCE cluster and how to run a Flink WordCount job in the CCE cluster. In this example, the Flink cluster is deployed standalone. For details about the deployment process, see the Kubernetes | Apache Flink.
Prerequisites
- There is a cluster with certain nodes available for use. For details, see Buying a CCE Standard/Turbo Cluster.
- An EIP has been assigned to nodes in the cluster, and the kubectl has been configured. For details, see Binding an EIP to an Instance and Connecting to a Cluster Using kubectl.
Step 1: Deploy a Flink Cluster
Three key components are required for deploying a Flink cluster. The Flink official website provides a resource definition file for each component. For details, see Table 1. In addition, you need to use the flink-configuration-configmap.yaml configuration file on the Flink official website to configure the Flink cluster.
Key Component |
Resource Definition File |
Description |
---|---|---|
Deployment for running JobManager |
jobmanager-session-deployment-non-ha.yaml |
JobManager serves as the central coordinator in a Flink cluster. It coordinates Flink jobs, including task distribution, job scheduling, resource allocation, and fault tolerance. |
Deployment for running TaskManager |
taskmanager-session-deployment.yaml |
TaskManager is a worker node in a Flink cluster and is responsible for executing data processing tasks. Each TaskManager runs one or more task slots, which are isolated units of execution. |
Service exposing the JobManager's REST and UI ports |
jobmanager-service.yaml |
The REST and Web UI ports of Flink JobManager are exposed so that users can access the REST API and Web UI of JobManager through the Service. |
- Configure basic information about the Flink cluster.
- Create a YAML file named flink-configuration-configmap.yaml.
vim flink-configuration-configmap.yaml
Check that the file contains comments and the content is as follows:apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink # data defines the data stored in ConfigMap. In the example, data contains two configuration files: config.yaml and log4j-console.properties. data: config.yaml: |+ # RPC address of Flink JobManager. It is usually the JobManager name. In this example, the RPC address is flink-jobmanager. jobmanager.rpc.address: flink-jobmanager # Number of task slots in each TaskManager. Set the value to 2, indicating that each TaskManager can process two tasks concurrently. taskmanager.numberOfTaskSlots: 2 # Port of the Flink BLOB service. It is used to transfer large objects, such as job code or large files. blob.server.port: 6124 jobmanager.rpc.port: 6123 # RPC port of JobManager taskmanager.rpc.port: 6122 # RPC port of TaskManager jobmanager.memory.process.size: 1600m # Total memory of JobManager taskmanager.memory.process.size: 1728m # Total memory of TaskManager parallelism.default: 2 # The default degree of parallelism is 2 for Flink jobs. log4j-console.properties: |+ # The following configuration affects the logging for user code and Flink logs. rootLogger.level = INFO # Log messages of level INFO and above. rootLogger.appenderRef.console.ref = ConsoleAppender # Send the logs to the console. rootLogger.appenderRef.rolling.ref = RollingFileAppender # Export the logs to a rolling file. # If you only want to change the logging in Flink, delete the comments in the following lines: #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following eight lines keep the log level of the public libraries or connectors at INFO. # The configuration of the root logger does not overwrite the configuration here. # You need to manually change the log level. logger.pekko.name = org.apache.pekko logger.pekko.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Send all logs of the INFO level to the console. appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Export all logs of the INFO level to a specified scrolling file. appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Disable false alarms in the Netty channel handler. logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF
- Use flink-configuration-configmap.yaml to configure basic information about the Flink cluster.
kubectl create -f flink-configuration-configmap.yaml
- Check whether the ConfigMap named flink-config is successfully created.
kubectl get configmap
If the following information is displayed, the ConfigMap was created successfully.
NAME DATA AGE flink-config 2 59s kube-root-ca.crt 1 16d
- Create a YAML file named flink-configuration-configmap.yaml.
- Create a Service that exposes the REST and UI ports of JobManager.
- Create a YAML file named jobmanager-service.yaml.
vim jobmanager-service.yaml
Check that the file contains comments and the content is as follows:
apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP # The Service is used for internal communication within the cluster. ports: # Define the list of ports to be exposed by Service. - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: # Define the Service's label selector, which is used to determine the pods to which the Service routes traffic. app: flink component: jobmanager
- Use jobmanager-service.yaml to create a Service named flink-jobmanager.
kubectl create -f jobmanager-service.yaml
- Check whether the Service is created successfully.
kubectl get service flink-jobmanager
- Create a YAML file named jobmanager-service.yaml.
- Create a Deployment for running JobManager.
- Create a YAML file named jobmanager-session-deployment-non-ha.yaml.
vim jobmanager-session-deployment-non-ha.yaml
Check that the file contains comments and the content is as follows:
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 # Set the number of JobManager replicas to 1. selector: matchLabels: # Define labels. app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager # Set the container name to jobmanager. image: apache/flink:1.20.0-scala_2.12 # Use the Flink image of v1.20.0 and the Scala of v2.12. args: ["jobmanager"] # Designate the container to run as JobManager. ports: # Expose ports in the container. - containerPort: 6123 # Used for communication between TaskManager and JobManager. name: rpc - containerPort: 6124 # Used to transfer binary objects. name: blob-server - containerPort: 8081 # Used to access the Flink web management page. name: webui livenessProbe: tcpSocket: port: 6123 # Use TCP to check the health status of RPC port 6123. initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: # Mount a storage volume. - name: flink-config-volume mountPath: /opt/flink/conf securityContext: runAsUser: 9999 # For details, see the _flink_User in the official Flink image. You can change the username if necessary. volumes: # Define storage volumes to store configuration files. - name: flink-config-volume configMap: name: flink-config items: - key: config.yaml # Mount the config.yaml file in the ConfigMap to the specified path of the container. path: config.yaml # The path in the container is /opt/flink/conf/config.yaml. - key: log4j-console.properties # Mount the log4j-console.properties file in the ConfigMap to the specified path of the container. path: log4j-console.properties # The path in the container is /opt/flink/conf/log4j-console.properties.
- Use jobmanager-session-deployment-non-ha.yaml to create a Deployment named flink-jobmanager.
kubectl create -f jobmanager-session-deployment-non-ha.yaml
- Check whether the Deployment flink-jobmanager is successfully created.
kubectl get pod
- Create a YAML file named jobmanager-session-deployment-non-ha.yaml.
- Create a Deployment for running TaskManager.
- Create a YAML file named taskmanager-session-deployment.yaml.
vim taskmanager-session-deployment.yaml
Check that the file contains comments and the content is as follows:
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 # Set the number of TaskManager replicas to 2. selector: matchLabels: # Define labels. app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager # Set the container name to taskmanager. image: apache/flink:1.20.0-scala_2.12 # Use the Flink image of v1.20.0 and the Scala of v2.12. args: ["taskmanager"] # Designate the container to run as TaskManager. ports: # Expose ports in the container. - containerPort: 6122 # Used for communication between TaskManager and JobManager. name: rpc livenessProbe: tcpSocket: port: 6122 # Use TCP to check the health status of RPC port 6122. initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: # Mount a storage volume. - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 # For details, see the _flink_User in the official Flink image. You can change the username if necessary. volumes: # Define storage volumes to store configuration files. - name: flink-config-volume configMap: name: flink-config items: - key: config.yaml path: config.yaml - key: log4j-console.properties path: log4j-console.properties
- Use taskmanager-session-deployment.yaml to create a Deployment named flink-taskmanager.
kubectl create -f taskmanager-session-deployment.yaml
- Check whether the Deployment flink-taskmanager is successfully created.
kubectl get pod
- Create a YAML file named taskmanager-session-deployment.yaml.
Step 2: Publish the Service
Create a NodePort Service for flink-jobmanager to allow external networks to access flink-jobmanager through the public IP address and automatically allocated external port number. The Service will forward external requests to the corresponding container.
- Log in to the CCE console. Choose Workloads > Deployments, click flink-jobmanager, the Access Mode tab, and then Create Service.
- On the Create Service page, set Service Type to NodePort. In the Ports area, set both Container Port and Service Port to 8081, and click OK. The Service automatically generates a port for accessing the node. The port is displayed in Figure 2. In this example, the port is 30327. You can access the workload using the EIP and the port of any node in the cluster.
Figure 1 Creating a NodePort Service
- Check whether the Service can be accessed. Choose Nodes, click the Nodes tab, select a node, and copy its EIP.
In the address box of a browser, enter EIP of the node:Port for accessing the node. If the Flink dashboard page is displayed, the access is successful. If the access failed, check whether the source IP address for the node port is set to 0.0.0.0/0 or All in the inbound rule of the cluster security group. For details, see Configuring Security Group Rules.
Figure 3 Flink Dashboard
Step 3: Run the Flink Job
Use the official WordCount.jar file to demonstrate how to execute Flink jobs in a CCE cluster. The WordCount task is to calculate the number of occurrences of each word in the text.
- Download and decompress the flink-1.20.0-bin-scala_2.12.tgz file. The file can be obtained at https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz. Check whether the WordCount.jar package exists in the flink-1.20.0-bin-scala_2.12\flink-1.20.0\examples\streamin directory.
- Add a .jar package on the Dashboard page. Open the Apache Flink Dashboard page, choose Submit New Job from the navigation tree, click Add New in the upper right corner, and select WordCount.jar in the flink-1.20.0-bin-scala_2.12\flink-1.20.0\examples\streamin directory. Click the WordCount.jar file and specify the output file path, for example, --output /opt/flink/output in the Program Arguments text box.
Figure 4 Uploading a WordCount job
- Click the blue box on the lower right of Overview and click Taskmanager to check the endpoint of the job.
Figure 5 Checking an endpoint
- Use the endpoint to obtain the TaskManager pod. Run the following command to query the IP address of the Flink pod:
kubectl get pod -o wide | grep flink
flink-taskmanager-579f47cf9f-prrff is the TaskManager pod if the following information is displayed:
flink-jobmanager-789c8777-vhqbv 1/1 Running 1 (28m ago) 40h 192.168.0.139 192.168.0.53 <none> <none> flink-taskmanager-579f47cf9f-prrff 1/1 Running 1 (28m ago) 40h 192.168.0.92 192.168.0.53 <none> <none> flink-taskmanager-579f47cf9f-wgt66 1/1 Running 1 (28m ago) 40h 192.168.0.194 192.168.0.212 <none> <none>
- After the job is complete, go to flink-taskmanager-579f47cf9f-prrff to check whether the number of occurrences of each word is correctly displayed.
kubectl exec -it flink-taskmanager-579f47cf9f-prrff bash
Run the ls command to query the output path.
ls /opt/flink/output/
Information similar to the following is displayed:
2024-09-02--01
Check the content of the 2024-09-02--01 folder.
ls /opt/flink/output/2024-09-02--01
Information similar to the following is displayed:
part-bd89ad8b-a0dd-4b4d-b771-4c88eaed61e4-0
Check the number of occurrences of each word.
cat /opt/flink/output/2024-09-02--01/part-bd89ad8b-a0dd-4b4d-b771-4c88eaed61e4-0
Information similar to the following is displayed:
(to,1) (be,1) (or,1) (not,1) (to,2) (be,2) (that,1) ...
Step 4: Clear the Cluster
- Delete the Deployment that runs the JobManager.
kubectl delete -f jobmanager-session-deployment-non-ha.yaml
Information similar to the following is displayed:
deployment.apps "flink-jobmanager" deleted
- Delete the Deployment that runs the TaskManager.
kubectl delete -f taskmanager-session-deployment.yaml
Information similar to the following is displayed:
deployment.apps "flink-taskmanager" deleted
- Delete the ConfigMap.
kubectl delete -f flink-configuration-configmap.yaml
Information similar to the following is displayed:
configmap "flink-config" deleted
- Delete the Service.
kubectl delete -f jobmanager-service.yaml
Information similar to the following is displayed:
service "flink-jobmanager" delete
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot