在CCE集群中部署使用Flink
Flink是一个用于大规模数据处理的分布式流处理框架和计算引擎,可以处理有界(批处理)和无界(流处理)数据,提供低延迟、高吞吐量的实时数据处理能力,同时支持复杂事件处理和数据分析。在CCE集群中部署Flink,可以帮助您构建高效、可靠且灵活的数据处理系统,支持多样化的业务应用,并充分利用大数据环境中的集群资源。本示例将展示如何在CCE集群中部署Flink,并通过WordCount任务演示如何在CCE集群中运行Flink任务。示例中使用Flink standalone模式部署Flink集群,部署流程参考Flink官方文档:Kubernetes | Apache Flink。
前提条件
- 已创建一个集群,且集群有可用节点,具体操作步骤请参见购买Standard/Turbo集群。
- 集群内节点已绑定弹性公网IP,且已配置kubectl命令行工具,具体请参见将弹性公网IP绑定至实例和通过kubectl连接集群。
步骤一:部署Flink集群
在Kubernetes上部署Flink集群通常需要三个关键组件,每个组件对应Flink官网提供的不同资源定义文件,具体说明请参见表1。此外,还需要使用Flink官网的flink-configuration-configmap.yaml配置文件完成Flink集群的基本配置。
关键组件 |
资源定义文件 |
说明 |
---|---|---|
运行JobManager的Deployment |
jobmanager-session-deployment-non-ha.yaml |
JobManager是Flink集群的核心组件,负责协调和管理Flink作业的执行,能够处理任务调度、作业协调、资源分配和故障恢复。 |
运行TaskManager的Deployment |
taskmanager-session-deployment.yaml |
TaskManager是Flink集群的工作节点,负责实际执行分配的任务。每个TaskManager运行一个或多个任务槽(Task Slot),这些任务槽用于执行Flink作业的任务。 |
暴露JobManager的Rest和UI端口的Service |
jobmanager-service.yaml |
暴露Flink JobManager的REST和Web UI端口,使用户可以通过该Service访问JobManager的REST API和Web UI。 |
- 配置Flink集群的基本信息。
- 创建名为flink-configuration-configmap.yaml的YAML文件。
vim flink-configuration-configmap.yaml
文件内容如下,已添加相关注释。apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink # data定义了ConfigMap中存储的数据部分。示例中,data包含两个配置文件:config.yaml和log4j-console.properties data: config.yaml: |+ # Flink JobManager的RPC地址,通常对应JobManager的服务名称,这里设置为flink-jobmanager jobmanager.rpc.address: flink-jobmanager # 每个TaskManager中的任务槽数量,这里设置为2,即每个TaskManager可以并行处理2个任务。 taskmanager.numberOfTaskSlots: 2 # Flink BLOB服务的端口,用于传输大对象,如作业代码或大文件 blob.server.port: 6124 jobmanager.rpc.port: 6123 # JobManager的RPC服务端口 taskmanager.rpc.port: 6122 # TaskManager的RPC服务端口 jobmanager.memory.process.size: 1600m # JobManager的总内存配置 taskmanager.memory.process.size: 1728m # TaskManager的总内存配置 parallelism.default: 2 # Flink作业的默认并行度为2 log4j-console.properties: |+ # 如下配置会同时影响用户代码和Flink的日志行为 rootLogger.level = INFO # 日志系统将记录INFO级别及其以上的日志 rootLogger.appenderRef.console.ref = ConsoleAppender # 将日志输出到控制台 rootLogger.appenderRef.rolling.ref = RollingFileAppender # 日志输出到滚动文件 # 如果您只想改变Flink的日志行为则可以取消下面2行的注释部分 #logger.flink.name = org.apache.flink #logger.flink.level = INFO # 下面8行将公共libraries或connectors的日志级别保持在INFO级别 # root logger 的配置不会覆盖此处配置 # 你必须手动修改这里的日志级别 logger.pekko.name = org.apache.pekko logger.pekko.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # 将所有info级别的日志输出到控制台 appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # 将所有info级别的日志输出到指定的滚动文件 appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # 关闭 Netty channel handler 中不相关的(错误)警告 logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF
- 利用flink-configuration-configmap.yaml配置Flink集群的基本信息。
kubectl create -f flink-configuration-configmap.yaml
- 根据以下代码,检查名为flink-config的ConfigMap是否建立成功。
kubectl get configmap
若回显内容如下,则说明ConfigMap创建成功。
NAME DATA AGE flink-config 2 59s kube-root-ca.crt 1 16d
- 创建名为flink-configuration-configmap.yaml的YAML文件。
- 创建暴露JobManager的REST和UI端口的Service。
- 创建名为jobmanager-service.yaml的YAML文件。
vim jobmanager-service.yaml
文件内容如下,已添加相关注释。
apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP # Service类型为集群内部访问 ports: # 定义Service暴露的端口列表 - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: #定义Service的标签选择器,用于确定该Service将流量路由到哪些Pod app: flink component: jobmanager
- 根据jobmanager-service.yaml创建名为flink-jobmanager的Service。
kubectl create -f jobmanager-service.yaml
- 利用以下命令检查Service是否创建成功。
kubectl get service flink-jobmanager
- 创建名为jobmanager-service.yaml的YAML文件。
- 创建运行JobManager的Deployment。
- 创建名为jobmanager-session-deployment-non-ha.yaml的YAML文件。
vim jobmanager-session-deployment-non-ha.yaml
文件内容如下,已添加相关注释。
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 # 指定JobManager的副本数为1 selector: matchLabels: # 定义标签 app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager # 容器的名称为jobmanager image: apache/flink:1.20.0-scala_2.12 # 使用的Flink镜像,版本为1.20.0,Scala版本为2.12 args: ["jobmanager"] # 运行容器时传递的参数,指定这个容器运行为JobManager ports: # 用于暴露容器内相关端口 - containerPort: 6123 # 用于TaskManager与JobManager通信 name: rpc - containerPort: 6124 # 用于传输二进制对象 name: blob-server - containerPort: 8081 # 用于访问Flink的Web管理界面 name: webui livenessProbe: tcpSocket: port: 6123 # 使用TCP检查RPC端口6123的健康状况 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: # 挂载存储卷 - name: flink-config-volume mountPath: /opt/flink/conf securityContext: runAsUser: 9999 # 参考官方flink镜像中的_flink_用户,如有必要可以修改 volumes: # 定义存储卷,用于存储配置文件 - name: flink-config-volume configMap: name: flink-config items: - key: config.yaml # 将ConfigMap中的config.yaml文件挂载到容器的指定路径 path: config.yaml # 在容器中的路径为/opt/flink/conf/config.yaml - key: log4j-console.properties # 将ConfigMap中的log4j-console.properties文件挂载到容器的指定路径 path: log4j-console.properties # 在容器中的路径为/opt/flink/conf/log4j-console.properties
- 根据jobmanager-session-deployment-non-ha.yaml创建名为flink-jobmanager的Deployment。
kubectl create -f jobmanager-session-deployment-non-ha.yaml
- 利用以下命令检查名为flink-jobmanager的Deployment是否创建成功。
kubectl get pod
- 创建名为jobmanager-session-deployment-non-ha.yaml的YAML文件。
- 创建运行TaskManager的Deployment。
- 创建名为taskmanager-session-deployment.yaml的YAML文件。
vim taskmanager-session-deployment.yaml
文件内容如下,已添加相关注释。
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 # 指定TaskManager的副本数为2 selector: matchLabels: # 定义标签 app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager # 容器的名称为taskmanager image: apache/flink:1.20.0-scala_2.12 # 使用的Flink镜像,版本为1.20.0,Scala版本为2.12 args: ["taskmanager"] # 运行容器时传递的参数,指定这个容器运行为TaskManager ports: # 用于暴露容器内相关端口 - containerPort: 6122 # 用于TaskManager与JobManager通信 name: rpc livenessProbe: tcpSocket: port: 6122 # 使用TCP检查RPC端口6122的健康状况 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: # 挂载存储卷 - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 # 参考官方flink镜像中的_flink_用户,如有必要可以修改 volumes: # 定义存储卷,用于存储配置文件 - name: flink-config-volume configMap: name: flink-config items: - key: config.yaml path: config.yaml - key: log4j-console.properties path: log4j-console.properties
- 根据taskmanager-session-deployment.yaml创建名为flink-taskmanager的Deployment。
kubectl create -f taskmanager-session-deployment.yaml
- 利用以下命令检查名为flink-taskmanager的Deployment是否创建成功。
kubectl get pod
- 创建名为taskmanager-session-deployment.yaml的YAML文件。
步骤二:对外发布服务
创建flink-jobmanager工作负载的节点访问类型Service,并允许外部网络通过集群节点的公网IP和自动分配的对外端口号来访问Flink JobManager服务。该节点访问类型Service会将外部请求转发给相应的容器。
- 登录CCE控制台,进入“工作负载 > 无状态负载”页面,选择flink-jobmanager,单击“访问方式”页签,服务模块单击“创建服务”。
- 在创建服务界面,访问类型选择“节点访问”,在端口配置中,容器端口和服务端口都设置为8081,单击“确定”。该服务会自动生成节点访问端口,自动生成的节点端口位置请参见图2,本示例中节点访问端口为30327。您可以通过集群任一节点的弹性公网IP和端口号访问该工作负载。
图1 创建节点访问类型Service
- 检查该服务是否能正常访问。单击“节点管理”,单击“节点”,选择任一节点,复制对应的弹性公网IP。
在浏览器中输入“节点弹性公网IP地址:节点访问端口”,出现Flink的Dashboard页面则说明访问成功。若访问不成功,请检查集群安全组入站规则中是否已将该节点访问端口的源地址设置为“0.0.0.0/0”或“全部”,具体请参见配置安全组规则。
图3 Flink的Dashboard页面
步骤三:执行Flink任务
利用官方示例WordCount.jar文件示范如何在CCE集群中执行Flink任务。WordCount任务的核心功能是计算输入文本中每个单词出现的次数。
- 下载并解压flink-1.20.0-bin-scala_2.12.tgz,该文件来自https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz。检查“flink-1.20.0-bin-scala_2.12\flink-1.20.0\examples\streamin”路径下是否有WordCount.jar包。
- 在Dashboard页面添加Jar包。打开Flink的Dashboard页面,在左侧菜单栏中单击“Submit New Job”,右上角单击“Add New”,选择“flink-1.20.0-bin-scala_2.12\flink-1.20.0\examples\streamin”路径下的WordCount.jar。单击上传的WordCount.jar,在“Program Arguments”栏中指定输出文件的路径,如“--output /opt/flink/output”。
图4 上传WordCount任务
- 在弹出的页面中,单击Overview的右侧蓝色方块,单击“Taskmanager”,查看该任务的Endpoint,根据Endpoint可以进一步得知该任务对应的TaskManager Pod。
图5 查看Endpoint
- 查找Endpoint对应的TaskManager Pod。在命令行界面输入以下命令,查看Flink相关Pod对应的IP地址。
kubectl get pod -o wide | grep flink
回显结果如下,可知“flink-taskmanager-579f47cf9f-prrff”为寻找的TaskManager Pod。
flink-jobmanager-789c8777-vhqbv 1/1 Running 1 (28m ago) 40h 192.168.0.139 192.168.0.53 <none> <none> flink-taskmanager-579f47cf9f-prrff 1/1 Running 1 (28m ago) 40h 192.168.0.92 192.168.0.53 <none> <none> flink-taskmanager-579f47cf9f-wgt66 1/1 Running 1 (28m ago) 40h 192.168.0.194 192.168.0.212 <none> <none>
- 待任务执行完毕,进入“flink-taskmanager-579f47cf9f-prrff”查看是否正确输出每个单词出现的次数。
kubectl exec -it flink-taskmanager-579f47cf9f-prrff -- bash
利用ls命令查找输出结果的具体路径。
ls /opt/flink/output/
回显结果如下:
2024-09-02--01
查看2024-09-02--01文件夹内容。
ls /opt/flink/output/2024-09-02--01
回显结果如下:
part-bd89ad8b-a0dd-4b4d-b771-4c88eaed61e4-0
根据最后确定的路径,查看每个单词的出现次数。
cat /opt/flink/output/2024-09-02--01/part-bd89ad8b-a0dd-4b4d-b771-4c88eaed61e4-0
回显结果如下:
(to,1) (be,1) (or,1) (not,1) (to,2) (be,2) (that,1) ...
步骤四:集群清理
- 删除运行JobManager的Deployment。
kubectl delete -f jobmanager-session-deployment-non-ha.yaml
回显结果如下:
deployment.apps "flink-jobmanager" deleted
- 删除运行TaskManager的Deployment。
kubectl delete -f taskmanager-session-deployment.yaml
回显结果如下:
deployment.apps "flink-taskmanager" deleted
- 删除ConfigMap。
kubectl delete -f flink-configuration-configmap.yaml
回显结果如下:
configmap "flink-config" deleted
- 删除Service。
kubectl delete -f jobmanager-service.yaml
回显结果如下:
service "flink-jobmanager" delete