文档首页/ 云容器引擎 CCE/ 最佳实践/ 批量计算/ 在CCE集群中部署使用Flink
更新时间:2024-11-25 GMT+08:00

在CCE集群中部署使用Flink

Flink是一个用于大规模数据处理的分布式流处理框架和计算引擎,可以处理有界(批处理)和无界(流处理)数据,提供低延迟、高吞吐量的实时数据处理能力,同时支持复杂事件处理和数据分析。在CCE集群中部署Flink,可以帮助您构建高效、可靠且灵活的数据处理系统,支持多样化的业务应用,并充分利用大数据环境中的集群资源。本示例将展示如何在CCE集群中部署Flink,并通过WordCount任务演示如何在CCE集群中运行Flink任务。示例中使用Flink standalone模式部署Flink集群,部署流程参考Flink官方文档:Kubernetes | Apache Flink

前提条件

步骤一:部署Flink集群

在Kubernetes上部署Flink集群通常需要三个关键组件,每个组件对应Flink官网提供的不同资源定义文件,具体说明请参见表1。此外,还需要使用Flink官网的flink-configuration-configmap.yaml配置文件完成Flink集群的基本配置。

表1 Flink集群关键组件

关键组件

资源定义文件

说明

运行JobManager的Deployment

jobmanager-session-deployment-non-ha.yaml

JobManager是Flink集群的核心组件,负责协调和管理Flink作业的执行,能够处理任务调度、作业协调、资源分配和故障恢复。

运行TaskManager的Deployment

taskmanager-session-deployment.yaml

TaskManager是Flink集群的工作节点,负责实际执行分配的任务。每个TaskManager运行一个或多个任务槽(Task Slot),这些任务槽用于执行Flink作业的任务。

暴露JobManager的Rest和UI端口的Service

jobmanager-service.yaml

暴露Flink JobManager的REST和Web UI端口,使用户可以通过该Service访问JobManager的REST API和Web UI。

  1. 配置Flink集群的基本信息。

    1. 创建名为flink-configuration-configmap.yaml的YAML文件。
      vim flink-configuration-configmap.yaml
      文件内容如下,已添加相关注释。
      apiVersion: v1
      kind: ConfigMap
      metadata:
        name: flink-config
        labels:
          app: flink
      # data定义了ConfigMap中存储的数据部分。示例中,data包含两个配置文件:config.yaml和log4j-console.properties
      data:   
        config.yaml: |+
          # Flink JobManager的RPC地址,通常对应JobManager的服务名称,这里设置为flink-jobmanager
          jobmanager.rpc.address: flink-jobmanager
          # 每个TaskManager中的任务槽数量,这里设置为2,即每个TaskManager可以并行处理2个任务。   
          taskmanager.numberOfTaskSlots: 2
          # Flink BLOB服务的端口,用于传输大对象,如作业代码或大文件
          blob.server.port: 6124
          jobmanager.rpc.port: 6123              # JobManager的RPC服务端口
          taskmanager.rpc.port: 6122             # TaskManager的RPC服务端口
          jobmanager.memory.process.size: 1600m  # JobManager的总内存配置
          taskmanager.memory.process.size: 1728m # TaskManager的总内存配置
          parallelism.default: 2                 # Flink作业的默认并行度为2
        log4j-console.properties: |+
          # 如下配置会同时影响用户代码和Flink的日志行为
          rootLogger.level = INFO                # 日志系统将记录INFO级别及其以上的日志
          rootLogger.appenderRef.console.ref = ConsoleAppender      # 将日志输出到控制台
          rootLogger.appenderRef.rolling.ref = RollingFileAppender  # 日志输出到滚动文件
          # 如果您只想改变Flink的日志行为则可以取消下面2行的注释部分
          #logger.flink.name = org.apache.flink
          #logger.flink.level = INFO
       
          # 下面8行将公共libraries或connectors的日志级别保持在INFO级别
          # root logger 的配置不会覆盖此处配置
          # 你必须手动修改这里的日志级别
          logger.pekko.name = org.apache.pekko
          logger.pekko.level = INFO
          logger.kafka.name= org.apache.kafka
          logger.kafka.level = INFO
          logger.hadoop.name = org.apache.hadoop
          logger.hadoop.level = INFO
          logger.zookeeper.name = org.apache.zookeeper
          logger.zookeeper.level = INFO
       
          # 将所有info级别的日志输出到控制台
          appender.console.name = ConsoleAppender
          appender.console.type = CONSOLE
          appender.console.layout.type = PatternLayout
          appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
       
          # 将所有info级别的日志输出到指定的滚动文件
          appender.rolling.name = RollingFileAppender
          appender.rolling.type = RollingFile
          appender.rolling.append = false
          appender.rolling.fileName = ${sys:log.file}
          appender.rolling.filePattern = ${sys:log.file}.%i
          appender.rolling.layout.type = PatternLayout
          appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
          appender.rolling.policies.type = Policies
          appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
          appender.rolling.policies.size.size=100MB
          appender.rolling.strategy.type = DefaultRolloverStrategy
          appender.rolling.strategy.max = 10
       
          # 关闭 Netty channel handler 中不相关的(错误)警告
          logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
          logger.netty.level = OFF      
    2. 利用flink-configuration-configmap.yaml配置Flink集群的基本信息。
      kubectl create -f flink-configuration-configmap.yaml
    3. 根据以下代码,检查名为flink-config的ConfigMap是否建立成功。
      kubectl get configmap

      若回显内容如下,则说明ConfigMap创建成功。

      NAME               DATA   AGE
      flink-config       2      59s
      kube-root-ca.crt   1      16d

  2. 创建暴露JobManager的REST和UI端口的Service。

    1. 创建名为jobmanager-service.yaml的YAML文件。
      vim jobmanager-service.yaml

      文件内容如下,已添加相关注释。

      apiVersion: v1
      kind: Service
      metadata:
        name: flink-jobmanager
      spec:
        type: ClusterIP   # Service类型为集群内部访问
        ports:            # 定义Service暴露的端口列表
        - name: rpc
          port: 6123
        - name: blob-server
          port: 6124
        - name: webui
          port: 8081
        selector:         #定义Service的标签选择器,用于确定该Service将流量路由到哪些Pod
          app: flink
          component: jobmanager
    2. 根据jobmanager-service.yaml创建名为flink-jobmanager的Service。
      kubectl create -f jobmanager-service.yaml
    3. 利用以下命令检查Service是否创建成功。
      kubectl get service flink-jobmanager

  3. 创建运行JobManager的Deployment。

    1. 创建名为jobmanager-session-deployment-non-ha.yaml的YAML文件。
      vim jobmanager-session-deployment-non-ha.yaml

      文件内容如下,已添加相关注释。

      apiVersion: apps/v1
      kind: Deployment
      metadata:
        name: flink-jobmanager
      spec:
        replicas: 1                  # 指定JobManager的副本数为1
        selector:
          matchLabels:               # 定义标签 
            app: flink           
            component: jobmanager
        template:
          metadata:
            labels:
              app: flink
              component: jobmanager
          spec:
            containers:
            - name: jobmanager       # 容器的名称为jobmanager
              image: apache/flink:1.20.0-scala_2.12    # 使用的Flink镜像,版本为1.20.0,Scala版本为2.12
              args: ["jobmanager"]   # 运行容器时传递的参数,指定这个容器运行为JobManager
              ports:                 # 用于暴露容器内相关端口
              - containerPort: 6123  # 用于TaskManager与JobManager通信
                name: rpc
              - containerPort: 6124  # 用于传输二进制对象
                name: blob-server
              - containerPort: 8081  # 用于访问Flink的Web管理界面
                name: webui
              livenessProbe:
                tcpSocket:
                  port: 6123         # 使用TCP检查RPC端口6123的健康状况
                initialDelaySeconds: 30
                periodSeconds: 60
              volumeMounts:          # 挂载存储卷
              - name: flink-config-volume
                mountPath: /opt/flink/conf
              securityContext:
                runAsUser: 9999      # 参考官方flink镜像中的_flink_用户,如有必要可以修改
            volumes:                 # 定义存储卷,用于存储配置文件
            - name: flink-config-volume
              configMap:
                name: flink-config
                items:
                - key: config.yaml   # 将ConfigMap中的config.yaml文件挂载到容器的指定路径
                  path: config.yaml  # 在容器中的路径为/opt/flink/conf/config.yaml
                - key: log4j-console.properties    # 将ConfigMap中的log4j-console.properties文件挂载到容器的指定路径
                  path: log4j-console.properties   # 在容器中的路径为/opt/flink/conf/log4j-console.properties
    2. 根据jobmanager-session-deployment-non-ha.yaml创建名为flink-jobmanager的Deployment。
      kubectl create -f jobmanager-session-deployment-non-ha.yaml
    3. 利用以下命令检查名为flink-jobmanager的Deployment是否创建成功。
      kubectl get pod

  4. 创建运行TaskManager的Deployment。

    1. 创建名为taskmanager-session-deployment.yaml的YAML文件。
      vim taskmanager-session-deployment.yaml

      文件内容如下,已添加相关注释。

      apiVersion: apps/v1
      kind: Deployment
      metadata:
        name: flink-taskmanager
      spec:
        replicas: 2                    # 指定TaskManager的副本数为2
        selector:
          matchLabels:                 # 定义标签
            app: flink
            component: taskmanager
        template:
          metadata:
            labels:
              app: flink
              component: taskmanager
          spec:
            containers:
            - name: taskmanager        # 容器的名称为taskmanager
              image: apache/flink:1.20.0-scala_2.12    # 使用的Flink镜像,版本为1.20.0,Scala版本为2.12
              args: ["taskmanager"]    # 运行容器时传递的参数,指定这个容器运行为TaskManager
              ports:                   # 用于暴露容器内相关端口
              - containerPort: 6122    # 用于TaskManager与JobManager通信
                name: rpc
              livenessProbe: 
                tcpSocket:
                  port: 6122           # 使用TCP检查RPC端口6122的健康状况
                initialDelaySeconds: 30
                periodSeconds: 60
              volumeMounts:            # 挂载存储卷
              - name: flink-config-volume
                mountPath: /opt/flink/conf/
              securityContext:
                runAsUser: 9999       # 参考官方flink镜像中的_flink_用户,如有必要可以修改
            volumes:                  # 定义存储卷,用于存储配置文件
            - name: flink-config-volume
              configMap:
                name: flink-config
                items:
                - key: config.yaml
                  path: config.yaml
                - key: log4j-console.properties
                  path: log4j-console.properties
    2. 根据taskmanager-session-deployment.yaml创建名为flink-taskmanager的Deployment。
      kubectl create -f taskmanager-session-deployment.yaml
    3. 利用以下命令检查名为flink-taskmanager的Deployment是否创建成功。
      kubectl get pod

步骤二:对外发布服务

创建flink-jobmanager工作负载的节点访问类型Service,并允许外部网络通过集群节点的公网IP和自动分配的对外端口号来访问Flink JobManager服务。该节点访问类型Service会将外部请求转发给相应的容器。

  1. 登录CCE控制台,进入“工作负载 > 无状态负载”页面,选择flink-jobmanager,单击“访问方式”页签,服务模块单击“创建服务”
  2. 在创建服务界面,访问类型选择“节点访问”,在端口配置中,容器端口和服务端口都设置为8081,单击“确定”。该服务会自动生成节点访问端口,自动生成的节点端口位置请参见图2,本示例中节点访问端口为30327。您可以通过集群任一节点的弹性公网IP和端口号访问该工作负载。

    图1 创建节点访问类型Service

    图2 节点端口位置

  3. 检查该服务是否能正常访问。单击“节点管理”,单击“节点”,选择任一节点,复制对应的弹性公网IP。

    在浏览器中输入“节点弹性公网IP地址:节点访问端口”,出现Flink的Dashboard页面则说明访问成功。若访问不成功,请检查集群安全组入站规则中是否已将该节点访问端口的源地址设置为“0.0.0.0/0”“全部”,具体请参见配置安全组规则

    图3 Flink的Dashboard页面

步骤三:执行Flink任务

利用官方示例WordCount.jar文件示范如何在CCE集群中执行Flink任务。WordCount任务的核心功能是计算输入文本中每个单词出现的次数。

  1. 下载并解压flink-1.20.0-bin-scala_2.12.tgz,该文件来自https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz。检查“flink-1.20.0-bin-scala_2.12\flink-1.20.0\examples\streamin”路径下是否有WordCount.jar包。
  2. 在Dashboard页面添加Jar包。打开Flink的Dashboard页面,在左侧菜单栏中单击“Submit New Job”,右上角单击“Add New”,选择“flink-1.20.0-bin-scala_2.12\flink-1.20.0\examples\streamin”路径下的WordCount.jar。单击上传的WordCount.jar,在“Program Arguments”栏中指定输出文件的路径,如“--output /opt/flink/output”

    图4 上传WordCount任务

  3. 在弹出的页面中,单击Overview的右侧蓝色方块,单击“Taskmanager”,查看该任务的Endpoint,根据Endpoint可以进一步得知该任务对应的TaskManager Pod。

    图5 查看Endpoint

  4. 查找Endpoint对应的TaskManager Pod。在命令行界面输入以下命令,查看Flink相关Pod对应的IP地址。

    kubectl get pod -o wide | grep flink

    回显结果如下,可知“flink-taskmanager-579f47cf9f-prrff”为寻找的TaskManager Pod。

    flink-jobmanager-789c8777-vhqbv         1/1     Running             1 (28m ago)    40h     192.168.0.139   192.168.0.53    <none>           <none>
    flink-taskmanager-579f47cf9f-prrff      1/1     Running             1 (28m ago)    40h     192.168.0.92    192.168.0.53    <none>           <none>
    flink-taskmanager-579f47cf9f-wgt66      1/1     Running             1 (28m ago)    40h     192.168.0.194   192.168.0.212   <none>           <none>

  5. 待任务执行完毕,进入“flink-taskmanager-579f47cf9f-prrff”查看是否正确输出每个单词出现的次数。

    kubectl exec -it flink-taskmanager-579f47cf9f-prrff bash

    利用ls命令查找输出结果的具体路径。

    ls /opt/flink/output/

    回显结果如下:

    2024-09-02--01

    查看2024-09-02--01文件夹内容。

    ls /opt/flink/output/2024-09-02--01

    回显结果如下:

    part-bd89ad8b-a0dd-4b4d-b771-4c88eaed61e4-0

    根据最后确定的路径,查看每个单词的出现次数。

    cat /opt/flink/output/2024-09-02--01/part-bd89ad8b-a0dd-4b4d-b771-4c88eaed61e4-0

    回显结果如下:

    (to,1)
    (be,1)
    (or,1)
    (not,1)
    (to,2)
    (be,2)
    (that,1)
    ...

步骤四:集群清理

  1. 删除运行JobManager的Deployment。

    kubectl delete -f jobmanager-session-deployment-non-ha.yaml

    回显结果如下:

    deployment.apps "flink-jobmanager" deleted

  2. 删除运行TaskManager的Deployment。

    kubectl delete -f taskmanager-session-deployment.yaml

    回显结果如下:

    deployment.apps "flink-taskmanager" deleted

  3. 删除ConfigMap。

    kubectl delete -f flink-configuration-configmap.yaml

    回显结果如下:

    configmap "flink-config" deleted

  4. 删除Service。

    kubectl delete -f jobmanager-service.yaml

    回显结果如下:

    service "flink-jobmanager" delete