- 最新动态
- 功能总览
-
服务公告
- 最新公告
- 产品变更公告
- 集群版本公告
-
漏洞公告
- 漏洞修复策略
- Kubernetes安全漏洞公告(CVE-2024-10220)
- Kubernetes安全漏洞公告(CVE-2024-9486,CVE-2024-9594)
- NVIDIA Container Toolkit容器逃逸漏洞公告(CVE-2024-0132)
- Linux CUPS服务RCE 漏洞公告(CVE-2024-47076、CVE-2024-47175、CVE-2024-47176、CVE-2024-47177)
- NGINX Ingress控制器验证绕过漏洞公告(CVE-2024-7646)
- Docker Engine授权插件AuthZ权限绕过漏洞公告(CVE-2024-41110)
- Linux内核权限提升漏洞公告(CVE-2024-1086)
- OpenSSH远程代码执行漏洞公告(CVE-2024-6387)
- Fluent Bit内存崩溃漏洞公告(CVE-2024-4323)
- runc systemd属性注入漏洞公告(CVE-2024-3154)
- runc漏洞(CVE-2024-21626)对CCE服务的影响说明
- Kubernetes安全漏洞公告(CVE-2022-3172)
- Linux Kernel openvswitch 模块权限提升漏洞预警(CVE-2022-2639)
- nginx-ingress插件安全漏洞预警公告(CVE-2021-25748)
- nginx-ingress插件安全漏洞预警公告(CVE-2021-25745,CVE-2021-25746)
- containerd容器进程权限提升漏洞公告(CVE-2022-24769)
- CRI-O容器运行时引擎任意代码执行漏洞(CVE-2022-0811)
- linux内核导致的容器逃逸漏洞公告(CVE-2022-0492)
- containerd镜像Volume非安全处理漏洞公告(CVE-2022-23648)
- Linux内核整数溢出漏洞(CVE-2022-0185)
- Linux Polkit 权限提升漏洞预警(CVE-2021-4034)
- Kubernetes subpath符号链接交换安全漏洞(CVE-2021- 25741)
- runc符号链接挂载与容器逃逸漏洞预警公告(CVE-2021-30465)
- Docker资源管理错误漏洞公告(CVE-2021-21285)
- NVIDIA GPU驱动漏洞公告(CVE-2021-1056)
- Sudo缓冲区错误漏洞公告(CVE-2021-3156)
- Kubernetes安全漏洞公告(CVE-2020-8554)
- Apache containerd安全漏洞公告(CVE-2020-15257)
- Docker Engine输入验证错误漏洞公告(CVE-2020-13401)
- Kubernetes kube-apiserver输入验证错误漏洞公告(CVE-2020-8559)
- Kubernetes kubelet资源管理错误漏洞公告(CVE-2020-8557)
- Kubernetes kubelet和kube-proxy授权问题漏洞公告(CVE-2020-8558)
- 修复Kubernetes HTTP/2漏洞公告
- 修复Linux内核SACK漏洞公告
- 修复Docker操作系统命令注入漏洞公告(CVE-2019-5736)
- 全面修复Kubernetes权限许可和访问控制漏洞公告(CVE-2018-1002105)
- 修复Kubernetes Dashboard安全漏洞公告(CVE-2018-18264)
-
产品发布记录
-
集群版本发布记录
- Kubernetes版本策略
-
Kubernetes版本发布记录
- Kubernetes 1.31版本说明
- Kubernetes 1.30版本说明
- Kubernetes 1.29版本说明
- Kubernetes 1.28版本说明
- Kubernetes 1.27版本说明
- Kubernetes 1.25版本说明
- Kubernetes 1.23版本说明
- (停止维护)Kubernetes 1.21版本说明
- (停止维护)Kubernetes 1.19版本说明
- (停止维护)Kubernetes 1.17版本说明
- (停止维护)Kubernetes 1.15版本说明
- (停止维护)Kubernetes 1.13版本说明
- (停止维护)Kubernetes 1.11版本说明
- (停止维护)Kubernetes 1.9及之前版本说明
- 补丁版本发布记录
- 操作系统镜像发布记录
-
插件版本发布记录
- CoreDNS域名解析插件版本发布记录
- CCE容器存储插件(Everest)版本发布记录
- CCE节点故障检测插件版本发布记录
- Kubernetes Dashboard插件版本发布记录
- CCE集群弹性引擎版本发布记录
- NGINX Ingress控制器插件版本发布记录
- Kubernetes Metrics Server插件版本发布记录
- CCE容器弹性引擎插件版本发布记录
- CCE突发弹性引擎(对接CCI)插件版本发布记录
- CCE AI套件(NVIDIA GPU)版本发布记录
- CCE AI套件(Ascend NPU)版本发布记录
- Volcano调度器版本发布记录
- CCE密钥管理(对接 DEW)插件版本发布记录
- CCE容器网络扩展指标插件版本发布记录
- 节点本地域名解析加速插件版本发布记录
- 云原生监控插件版本发布记录
- 云原生日志采集插件版本发布记录
- 容器镜像签名验证插件版本发布记录
- Grafana插件版本发布记录
- OpenKruise插件版本发布记录
- Gatekeeper插件版本发布记录
- 容器垂直弹性引擎版本发布记录
- CCE集群备份恢复插件版本发布记录(停止维护)
- Kubernetes Web终端版本发布记录(停止维护)
- Prometheus插件版本发布记录(停止维护)
-
集群版本发布记录
- 产品介绍
- 计费说明
- Kubernetes基础知识
- 快速入门
-
用户指南
- 高危操作一览
-
集群
- 集群概述
-
集群版本发布说明
-
Kubernetes版本发布记录
- Kubernetes 1.31版本说明
- Kubernetes 1.30版本说明
- Kubernetes 1.29版本说明
- Kubernetes 1.28版本说明
- Kubernetes 1.27版本说明
- Kubernetes 1.25版本说明
- Kubernetes 1.23版本说明
- (停止维护)Kubernetes 1.21版本说明
- (停止维护)Kubernetes 1.19版本说明
- (停止维护)Kubernetes 1.17版本说明
- (停止维护)Kubernetes 1.15版本说明
- (停止维护)Kubernetes 1.13版本说明
- (停止维护)Kubernetes 1.11版本说明
- (停止维护)Kubernetes 1.9及之前版本说明
- 补丁版本发布记录
-
Kubernetes版本发布记录
- 购买集群
- 连接集群
- 管理集群
-
升级集群
- 升级集群的流程和方法
- 升级前须知
- 升级后验证
- 集群跨版本业务迁移
-
升级前检查异常问题排查
- 升级前检查项
- 节点限制检查异常处理
- 升级管控检查异常处理
- 插件检查异常处理
- Helm模板检查异常处理
- Master节点SSH连通性检查异常处理
- 节点池检查异常处理
- 安全组检查异常处理
- 残留待迁移节点检查异常处理
- K8s废弃资源检查异常处理
- 兼容性风险检查异常处理
- 节点上CCE Agent版本检查异常处理
- 节点CPU使用率检查异常处理
- CRD检查异常处理
- 节点磁盘检查异常处理
- 节点DNS检查异常处理
- 节点关键目录文件权限检查异常处理
- 节点Kubelet检查异常处理
- 节点内存检查异常处理
- 节点时钟同步服务器检查异常处理
- 节点OS检查异常处理
- 节点CPU数量检查异常处理
- 节点Python命令检查异常处理
- ASM网格版本检查异常处理
- 节点Ready检查异常处理
- 节点journald检查异常处理
- 节点干扰ContainerdSock检查异常处理
- 内部错误异常处理
- 节点挂载点检查异常处理
- K8s节点污点检查异常处理
- everest插件版本限制检查异常处理
- cce-hpa-controller插件限制检查异常处理
- 增强型CPU管理策略检查异常处理
- 用户节点组件健康检查异常处理
- 控制节点组件健康检查异常处理
- K8s组件内存资源限制检查异常处理
- K8s废弃API检查异常处理
- 节点NetworkManager检查异常处理
- 节点ID文件检查异常处理
- 节点配置一致性检查异常处理
- 节点配置文件检查异常处理
- CoreDNS配置一致性检查异常处理
- 节点Sudo检查异常处理
- 节点关键命令检查异常处理
- 节点sock文件挂载检查异常处理
- HTTPS类型负载均衡证书一致性检查异常处理
- 节点挂载检查异常处理
- 节点paas用户登录权限检查异常处理
- ELB IPv4私网地址检查异常处理
- 检查历史升级记录是否满足升级条件
- 检查集群管理平面网段是否与主干配置一致
- GPU插件检查异常处理
- 节点系统参数检查异常处理
- 残留packageversion检查异常处理
- 节点命令行检查异常处理
- 节点交换区检查异常处理
- NGINX Ingress控制器插件升级检查异常处理
- 云原生监控插件升级检查异常处理
- Containerd Pod重启风险检查异常处理
- GPU插件关键参数检查异常处理
- GPU/NPU Pod重建风险检查异常处理
- ELB监听器访问控制配置项检查异常处理
- Master节点规格检查异常处理
- Master节点子网配额检查异常处理
- 节点运行时检查异常处理
- 节点池运行时检查异常处理
- 检查节点镜像数量异常处理
- OpenKruise插件兼容性检查异常处理
- Secret落盘加密特性兼容性检查异常处理
- Ubuntu内核与GPU驱动兼容性提醒
- 排水任务检查异常处理
- 节点镜像层数量异常检查
- 检查集群是否满足滚动升级条件
- 轮转证书文件数量检查
- Ingress与ELB配置一致性检查
- 集群网络组件的NetworkPolicy开关检查
- 集群与节点池配置管理检查
- Master节点时区检查
- 集群管理最佳实践
- 节点
- 节点池
- 工作负载
- 调度
-
网络
- 网络概述
- 容器网络
-
服务(Service)
- 服务概述
- 集群内访问(ClusterIP)
- 节点访问(NodePort)
-
负载均衡(LoadBalancer)
- 创建负载均衡类型的服务
- 使用Annotation配置负载均衡类型的服务
- 为负载均衡类型的Service配置HTTP/HTTPS协议
- 为负载均衡类型的Service配置服务器名称指示(SNI)
- 为负载均衡类型的Service配置跨集群的后端
- 为负载均衡类型的Service配置HTTP/2
- 为负载均衡类型的Service配置HTTP/HTTPS头字段
- 为负载均衡类型的Service配置超时时间
- 为负载均衡类型的Service配置TLS
- 为负载均衡类型的Service配置gzip数据压缩
- 为负载均衡类型的Service配置黑名单/白名单访问策略
- 为负载均衡类型的Service指定多个端口配置健康检查
- 为负载均衡类型的Service配置pass-through能力
- 为负载均衡类型的Service配置获取客户端IP
- 为负载均衡类型的Service配置自定义EIP
- 为负载均衡类型的Service配置区间端口监听
- 通过ELB健康检查设置Pod就绪状态
- 健康检查使用UDP协议的安全组规则说明
- DNAT网关(DNAT)
- Headless Service
-
路由(Ingress)
- 路由概述
- ELB Ingress和Nginx Ingress对比
-
ELB Ingress管理
- 通过控制台创建ELB Ingress
- 通过Kubectl命令行创建ELB Ingress
- 用于配置ELB Ingress的注解(Annotations)
-
ELB Ingress高级配置示例
- 为ELB Ingress配置HTTPS证书
- 更新ELB Ingress的HTTPS证书
- 为ELB Ingress配置服务器名称指示(SNI)
- 为ELB Ingress配置多个转发策略
- 为ELB Ingress配置HTTP/2
- 为ELB Ingress配置HTTPS协议的后端服务
- 为ELB Ingress配置GRPC协议的后端服务
- 为ELB Ingress配置超时时间
- 为ELB Ingress配置慢启动持续时间
- 为ELB Ingress配置灰度发布
- 为ELB Ingress配置黑名单/白名单访问策略
- 为ELB Ingress配置多个监听端口
- 为ELB Ingress配置HTTP/HTTPS头字段
- 为ELB Ingress配置gzip数据压缩
- 为ELB Ingress配置URL重定向
- 为ELB Ingress配置Rewrite重写
- 为ELB Ingress配置HTTP重定向到HTTPS
- 为ELB Ingress配置转发规则优先级
- 为ELB Ingress配置自定义Header转发策略
- 为ELB Ingress配置自定义EIP
- 为ELB Ingress配置跨域访问
- 为ELB Ingress配置高级转发规则
- 为ELB Ingress配置高级转发动作
- ELB Ingress转发策略优先级说明
- 多个Ingress使用同一个ELB对外端口的配置说明
- Nginx Ingress管理
- 自建Nginx Ingress迁移到ELB Ingress
- DNS
- 集群网络配置
- 容器如何访问VPC内部网络
- 从容器访问公网
- 网络管理最佳实践
- 存储
- 弹性伸缩
- 云原生观测
- 云原生成本治理
- 命名空间
- 配置项与密钥
- 插件
- 模板(Helm Chart)
- 权限
- 配置中心
- 存储管理-Flexvolume(已弃用)
-
最佳实践
- CCE最佳实践汇总
- 容器应用部署上云CheckList
- 容器化改造
- 集群备份恢复
- 迁移
- DevOps
- 容灾
- 安全
- 弹性伸缩
- 监控
- 集群
-
网络
- 集群网络地址段规划实践
- 集群网络模型选择及各模型区别
- CCE集群实现访问跨VPC网络通信
- 使用VPC和云专线实现容器与IDC之间的网络通信
- 自建IDC与CCE集群共享域名解析
- 通过负载均衡配置实现会话保持
- 不同场景下容器内获取客户端源IP
- 通过配置容器内核参数增大监听队列长度
- 为负载均衡类型的Service配置pass-through能力
- 从Pod访问集群外部网络
- 通过模板包部署Nginx Ingress Controller
- CoreDNS配置优化实践
- CCE Turbo配置容器网卡动态预热
- 集群通过企业路由器连接对端VPC
- 在VPC网络集群中访问集群外地址时使用Pod IP作为客户端源IP
- 存储
- 容器
- 权限
- 发布
- 批量计算
- API参考
- SDK参考
- 场景代码示例
-
常见问题
- 高频常见问题
- 计费类
- 集群
-
节点
- 节点异常问题排查
- 节点创建
-
节点运行
- 集群可用但节点状态为“不可用”如何解决?
- CCE集群中的节点无法远程登录,如何排查解决?
- 如何重置CCE集群中节点的密码?
- 如何收集CCE集群中节点的日志?
- 如何解决yum update升级操作系统导致的容器网络不可用问题?
- Node节点vdb盘受损,通过重置节点仍无法恢复节点?
- CCE集群节点中安装kubelet的端口主要有哪些?
- 如何配置Pod使用GPU节点的加速能力?
- 容器使用SCSI类型云硬盘偶现IO卡住如何解决?
- docker审计日志量过大影响磁盘IO如何解决?
- thinpool磁盘空间耗尽导致容器或节点异常时,如何解决?
- CCE节点上监听的端口列表
- GPU节点使用nvidia驱动启动容器排查思路
- CCE节点NTP时间不同步如何解决?
- Containerd节点业务容器标准输出日志写入过快导致节点数据盘使用率过高
- 为什么kubectl top命令查看节点内存使用超过100%?
- CCE节点事件中一直出现“镜像回收失败”告警如何解决?
- 规格配置变更
- 操作系统问题说明
- 节点池
- 工作负载
-
网络管理
-
网络异常问题排查
- 工作负载网络异常时,如何定位排查?
- 负载均衡类型Service异常问题排查
- 集群内部无法使用ELB地址访问负载
- 集群外部访问Ingress异常
- CCE集群中域名解析失败
- 为什么访问部署的应用时浏览器返回404错误码?
- 为什么容器无法连接互联网?
- VPC的子网无法删除,怎么办?
- 如何修复出现故障的容器网卡?
- 节点无法连接互联网(公网),如何排查定位?
- 如何解决VPC网段与容器网络冲突的问题?
- ELB四层健康检查导致java报错:Connection reset by peer
- Service事件:Have no node to bind,如何排查?
- 为什么登录虚拟机VNC界面会间歇性出现Dead loop on virtual device gw_11cbf51a, fix it urgently?
- 集群节点使用networkpolicy概率性出现panic问题
- 节点远程登录界面(VNC)打印较多source ip_type日志问题
- 使用IE浏览器访问nginx-ingress出现重定向308无法访问
- NGINX Ingress控制器插件升级导致集群内Nginx类型的Ingress路由访问异常
- 负载均衡型Service更新出现错误:Quota exceeded for resources: members_per_pool
- ELB Ingress出现告警:Invalid input for rules
- 为ELB Ingress配置了HTTPS证书后访问异常的原因有哪些?
- 网络规划
- 安全加固
- 网络指导
-
网络异常问题排查
-
存储管理
- 如何扩容容器的存储空间?
- CCE支持的存储在持久化和多节点挂载方面的有什么区别?
- 创建CCE节点时可以不添加数据盘吗?
- CCE集群中的EVS存储卷被删除或者过期后是否可以恢复?
- 公网访问CCE部署的服务并上传OBS,为何报错找不到host?
- Pod接口ExtendPathMode: PodUID如何与社区client-go兼容?
- 创建存储卷失败如何解决?
- CCE容器云存储PVC能否感知底层存储故障?
- 通用文件存储(SFS 3.0)在OS中的挂载点修改属组及权限报错
- 无法使用kubectl命令删除PV或PVC
- 删除挂载了云存储的Pod时提示target is busy
- 无法自动创建包周期的云硬盘存储卷
- 误卸载存储池的磁盘后如何恢复
- 删除动态创建的PVC之后,底层存储依旧残留
- 命名空间
-
模板插件
- 插件异常问题排查
- 集群安装nginx-ingress插件失败,一直处于创建中?
- NPD插件版本过低导致进程资源残留问题
- 模板格式不正确,无法删除模板实例?
- CCE是否支持nginx-ingress?
- 插件安装失败,提示The release name is already exist如何解决?
- 创建或升级实例失败,提示rendered manifests contain a resource that already exists
- kube-prometheus-stack插件实例调度失败如何解决?
- 上传模板失败如何解决?
- 如何根据集群规格调整插件配额?
- NGINX Ingress控制器插件处于Unknown状态时卸载残留
- NGINX Ingress控制器插件升级后无法使用TLS v1.0和v1.1
- API&kubectl
- 域名DNS
- 镜像仓库
- 权限
- 相关服务
- 配置参考
- 视频帮助
- 文档下载
- 通用参考
链接复制成功!
在CCE集群中部署使用Flink
Flink是一个用于大规模数据处理的分布式流处理框架和计算引擎,可以处理有界(批处理)和无界(流处理)数据,提供低延迟、高吞吐量的实时数据处理能力,同时支持复杂事件处理和数据分析。在CCE集群中部署Flink,可以帮助您构建高效、可靠且灵活的数据处理系统,支持多样化的业务应用,并充分利用大数据环境中的集群资源。本示例将展示如何在CCE集群中部署Flink,并通过WordCount任务演示如何在CCE集群中运行Flink任务。示例中使用Flink standalone模式部署Flink集群,部署流程参考Flink官方文档:Kubernetes | Apache Flink。
前提条件
- 已创建一个集群,且集群有可用节点,具体操作步骤请参见购买Standard/Turbo集群。
- 集群内节点已绑定弹性公网IP,且已配置kubectl命令行工具,具体请参见将弹性公网IP绑定至实例和通过kubectl连接集群。
步骤一:部署Flink集群
在Kubernetes上部署Flink集群通常需要三个关键组件,每个组件对应Flink官网提供的不同资源定义文件,具体说明请参见表1。此外,还需要使用Flink官网的flink-configuration-configmap.yaml配置文件完成Flink集群的基本配置。
关键组件 |
资源定义文件 |
说明 |
---|---|---|
运行JobManager的Deployment |
jobmanager-session-deployment-non-ha.yaml |
JobManager是Flink集群的核心组件,负责协调和管理Flink作业的执行,能够处理任务调度、作业协调、资源分配和故障恢复。 |
运行TaskManager的Deployment |
taskmanager-session-deployment.yaml |
TaskManager是Flink集群的工作节点,负责实际执行分配的任务。每个TaskManager运行一个或多个任务槽(Task Slot),这些任务槽用于执行Flink作业的任务。 |
暴露JobManager的Rest和UI端口的Service |
jobmanager-service.yaml |
暴露Flink JobManager的REST和Web UI端口,使用户可以通过该Service访问JobManager的REST API和Web UI。 |
- 配置Flink集群的基本信息。
- 创建名为flink-configuration-configmap.yaml的YAML文件。
vim flink-configuration-configmap.yaml
文件内容如下,已添加相关注释。apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink # data定义了ConfigMap中存储的数据部分。示例中,data包含两个配置文件:config.yaml和log4j-console.properties data: config.yaml: |+ # Flink JobManager的RPC地址,通常对应JobManager的服务名称,这里设置为flink-jobmanager jobmanager.rpc.address: flink-jobmanager # 每个TaskManager中的任务槽数量,这里设置为2,即每个TaskManager可以并行处理2个任务。 taskmanager.numberOfTaskSlots: 2 # Flink BLOB服务的端口,用于传输大对象,如作业代码或大文件 blob.server.port: 6124 jobmanager.rpc.port: 6123 # JobManager的RPC服务端口 taskmanager.rpc.port: 6122 # TaskManager的RPC服务端口 jobmanager.memory.process.size: 1600m # JobManager的总内存配置 taskmanager.memory.process.size: 1728m # TaskManager的总内存配置 parallelism.default: 2 # Flink作业的默认并行度为2 log4j-console.properties: |+ # 如下配置会同时影响用户代码和Flink的日志行为 rootLogger.level = INFO # 日志系统将记录INFO级别及其以上的日志 rootLogger.appenderRef.console.ref = ConsoleAppender # 将日志输出到控制台 rootLogger.appenderRef.rolling.ref = RollingFileAppender # 日志输出到滚动文件 # 如果您只想改变Flink的日志行为则可以取消下面2行的注释部分 #logger.flink.name = org.apache.flink #logger.flink.level = INFO # 下面8行将公共libraries或connectors的日志级别保持在INFO级别 # root logger 的配置不会覆盖此处配置 # 你必须手动修改这里的日志级别 logger.pekko.name = org.apache.pekko logger.pekko.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # 将所有info级别的日志输出到控制台 appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # 将所有info级别的日志输出到指定的滚动文件 appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # 关闭 Netty channel handler 中不相关的(错误)警告 logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF
- 利用flink-configuration-configmap.yaml配置Flink集群的基本信息。
kubectl create -f flink-configuration-configmap.yaml
- 根据以下代码,检查名为flink-config的ConfigMap是否建立成功。
kubectl get configmap
若回显内容如下,则说明ConfigMap创建成功。
NAME DATA AGE flink-config 2 59s kube-root-ca.crt 1 16d
- 创建名为flink-configuration-configmap.yaml的YAML文件。
- 创建暴露JobManager的REST和UI端口的Service。
- 创建名为jobmanager-service.yaml的YAML文件。
vim jobmanager-service.yaml
文件内容如下,已添加相关注释。
apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP # Service类型为集群内部访问 ports: # 定义Service暴露的端口列表 - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: #定义Service的标签选择器,用于确定该Service将流量路由到哪些Pod app: flink component: jobmanager
- 根据jobmanager-service.yaml创建名为flink-jobmanager的Service。
kubectl create -f jobmanager-service.yaml
- 利用以下命令检查Service是否创建成功。
kubectl get service flink-jobmanager
回显内容如下,则说明Service创建成功。
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE flink-jobmanager ClusterIP 10.247.199.212 <none> 6123/TCP,6124/TCP,8081/TCP 115s
- 创建名为jobmanager-service.yaml的YAML文件。
- 创建运行JobManager的Deployment。
- 创建名为jobmanager-session-deployment-non-ha.yaml的YAML文件。
vim jobmanager-session-deployment-non-ha.yaml
文件内容如下,已添加相关注释。
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 # 指定JobManager的副本数为1 selector: matchLabels: # 定义标签 app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager # 容器的名称为jobmanager image: apache/flink:1.20.0-scala_2.12 # 使用的Flink镜像,版本为1.20.0,Scala版本为2.12 args: ["jobmanager"] # 运行容器时传递的参数,指定这个容器运行为JobManager ports: # 用于暴露容器内相关端口 - containerPort: 6123 # 用于TaskManager与JobManager通信 name: rpc - containerPort: 6124 # 用于传输二进制对象 name: blob-server - containerPort: 8081 # 用于访问Flink的Web管理界面 name: webui livenessProbe: tcpSocket: port: 6123 # 使用TCP检查RPC端口6123的健康状况 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: # 挂载存储卷 - name: flink-config-volume mountPath: /opt/flink/conf securityContext: runAsUser: 9999 # 参考官方flink镜像中的_flink_用户,如有必要可以修改 volumes: # 定义存储卷,用于存储配置文件 - name: flink-config-volume configMap: name: flink-config items: - key: config.yaml # 将ConfigMap中的config.yaml文件挂载到容器的指定路径 path: config.yaml # 在容器中的路径为/opt/flink/conf/config.yaml - key: log4j-console.properties # 将ConfigMap中的log4j-console.properties文件挂载到容器的指定路径 path: log4j-console.properties # 在容器中的路径为/opt/flink/conf/log4j-console.properties
- 根据jobmanager-session-deployment-non-ha.yaml创建名为flink-jobmanager的Deployment。
kubectl create -f jobmanager-session-deployment-non-ha.yaml
- 利用以下命令检查名为flink-jobmanager的Deployment是否创建成功。
kubectl get pod
回显内容如下,则说明该Deployment创建成功。
NAME READY STATUS RESTARTS AGE flink-jobmanager-789c8777-vhqbv 1/1 Running 0 97s
- 创建名为jobmanager-session-deployment-non-ha.yaml的YAML文件。
- 创建运行TaskManager的Deployment。
- 创建名为taskmanager-session-deployment.yaml的YAML文件。
vim taskmanager-session-deployment.yaml
文件内容如下,已添加相关注释。
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 # 指定TaskManager的副本数为2 selector: matchLabels: # 定义标签 app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager # 容器的名称为taskmanager image: apache/flink:1.20.0-scala_2.12 # 使用的Flink镜像,版本为1.20.0,Scala版本为2.12 args: ["taskmanager"] # 运行容器时传递的参数,指定这个容器运行为TaskManager ports: # 用于暴露容器内相关端口 - containerPort: 6122 # 用于TaskManager与JobManager通信 name: rpc livenessProbe: tcpSocket: port: 6122 # 使用TCP检查RPC端口6122的健康状况 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: # 挂载存储卷 - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 # 参考官方flink镜像中的_flink_用户,如有必要可以修改 volumes: # 定义存储卷,用于存储配置文件 - name: flink-config-volume configMap: name: flink-config items: - key: config.yaml path: config.yaml - key: log4j-console.properties path: log4j-console.properties
- 根据taskmanager-session-deployment.yaml创建名为flink-taskmanager的Deployment。
kubectl create -f taskmanager-session-deployment.yaml
- 利用以下命令检查名为flink-taskmanager的Deployment是否创建成功。
kubectl get pod
回显内容如下,则说明该Deployment创建成功。
NAME READY STATUS RESTARTS AGE flink-jobmanager-789c8777-vhqbv 1/1 Running 0 13m flink-taskmanager-579f47cf9f-prrff 1/1 Running 0 23s flink-taskmanager-579f47cf9f-wgt66 1/1 Running 0 23s
- 创建名为taskmanager-session-deployment.yaml的YAML文件。
步骤二:对外发布服务
创建flink-jobmanager工作负载的节点访问类型Service,并允许外部网络通过集群节点的公网IP和自动分配的对外端口号来访问Flink JobManager服务。该节点访问类型Service会将外部请求转发给相应的容器。
- 登录CCE控制台,进入“工作负载 > 无状态负载”页面,选择flink-jobmanager,单击“访问方式”页签,服务模块单击“创建服务”。
- 在创建服务界面,访问类型选择“节点访问”,在端口配置中,容器端口和服务端口都设置为8081,单击“确定”。该服务会自动生成节点访问端口,自动生成的节点端口位置请参见图2,本示例中节点访问端口为30327。您可以通过集群任一节点的弹性公网IP和端口号访问该工作负载。
图1 创建节点访问类型Service
- 检查该服务是否能正常访问。单击“节点管理”,单击“节点”,选择任一节点,复制对应的弹性公网IP。
在浏览器中输入“节点弹性公网IP地址:节点访问端口”,出现Flink的Dashboard页面则说明访问成功。若访问不成功,请检查集群安全组入站规则中是否已将该节点访问端口的源地址设置为“0.0.0.0/0”或“全部”,具体请参见配置安全组规则。
图3 Flink的Dashboard页面
步骤三:执行Flink任务
利用官方示例WordCount.jar文件示范如何在CCE集群中执行Flink任务。WordCount任务的核心功能是计算输入文本中每个单词出现的次数。
- 下载并解压flink-1.20.0-bin-scala_2.12.tgz,该文件来自https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz。检查“flink-1.20.0-bin-scala_2.12\flink-1.20.0\examples\streamin”路径下是否有WordCount.jar包。
- 在Dashboard页面添加Jar包。打开Flink的Dashboard页面,在左侧菜单栏中单击“Submit New Job”,右上角单击“Add New”,选择“flink-1.20.0-bin-scala_2.12\flink-1.20.0\examples\streamin”路径下的WordCount.jar。单击上传的WordCount.jar,在“Program Arguments”栏中指定输出文件的路径,如“--output /opt/flink/output”。
图4 上传WordCount任务
- 在弹出的页面中,单击Overview的右侧蓝色方块,单击“Taskmanager”,查看该任务的Endpoint,根据Endpoint可以进一步得知该任务对应的TaskManager Pod。
图5 查看Endpoint
- 查找Endpoint对应的TaskManager Pod。在命令行界面输入以下命令,查看Flink相关Pod对应的IP地址。
kubectl get pod -o wide | grep flink
回显结果如下,可知“flink-taskmanager-579f47cf9f-prrff”为寻找的TaskManager Pod。
flink-jobmanager-789c8777-vhqbv 1/1 Running 1 (28m ago) 40h 192.168.0.139 192.168.0.53 <none> <none> flink-taskmanager-579f47cf9f-prrff 1/1 Running 1 (28m ago) 40h 192.168.0.92 192.168.0.53 <none> <none> flink-taskmanager-579f47cf9f-wgt66 1/1 Running 1 (28m ago) 40h 192.168.0.194 192.168.0.212 <none> <none>
- 待任务执行完毕,进入“flink-taskmanager-579f47cf9f-prrff”查看是否正确输出每个单词出现的次数。
kubectl exec -it flink-taskmanager-579f47cf9f-prrff -- bash
利用ls命令查找输出结果的具体路径。
ls /opt/flink/output/
回显结果如下:
2024-09-02--01
查看2024-09-02--01文件夹内容。
ls /opt/flink/output/2024-09-02--01
回显结果如下:
part-bd89ad8b-a0dd-4b4d-b771-4c88eaed61e4-0
根据最后确定的路径,查看每个单词的出现次数。
cat /opt/flink/output/2024-09-02--01/part-bd89ad8b-a0dd-4b4d-b771-4c88eaed61e4-0
回显结果如下:
(to,1) (be,1) (or,1) (not,1) (to,2) (be,2) (that,1) ...
步骤四:集群清理
- 删除运行JobManager的Deployment。
kubectl delete -f jobmanager-session-deployment-non-ha.yaml
回显结果如下:
deployment.apps "flink-jobmanager" deleted
- 删除运行TaskManager的Deployment。
kubectl delete -f taskmanager-session-deployment.yaml
回显结果如下:
deployment.apps "flink-taskmanager" deleted
- 删除ConfigMap。
kubectl delete -f flink-configuration-configmap.yaml
回显结果如下:
configmap "flink-config" deleted
- 删除Service。
kubectl delete -f jobmanager-service.yaml
回显结果如下:
service "flink-jobmanager" delete