Kubeflow插件
随着模型和数据量的增加,如何高效地构建、部署和管理复杂的机器学习工作流成为重要挑战,具体如下:
- 缺乏管理复杂机器学习任务的统一平台。
- 不同组件之间的依赖关系和数据流难以追踪和管理。
- 重复执行相同任务导致计算资源浪费。
- 不同的计算环境之间无法迁移工作流,缺乏灵活性。
为解决上述问题,CCE Standard/Turbo集群基于Kubeflow提供了Kubeflow插件。Kubeflow是一个开源的机器学习(ML)平台,专为Kubernetes环境设计,用于简化机器学习工作流的开发、部署和管理。它为开发者和数据科学家提供了一个一致的、可扩展的框架,用于在Kubernetes上运行机器学习任务和应用。基于Kubeflow的能力,该插件具备以下功能:
- 简化ML工作流的开发和管理:通过直观的界面和Python SDK,用户可以快速定义和管理复杂的ML工作流。
- 提高资源利用效率:通过任务并行化和缓存机制,减少冗余计算,进而提高资源利用率。
- 增强工作流的可移植性:通过平台中立的IR YAML定义,用户可以在不同的环境中轻松迁移工作流。
- 加强管理和可视化能力:插件提供丰富的管理和可视化工具,帮助用户更好地跟踪和优化工作流。
基本概念
概念 |
定义 |
作用 |
---|---|---|
Pipeline |
是一组按顺序执行的机器学习步骤的集合,通常包括数据预处理、训练、评估和推理等过程。通过Pipeline,Kubeflow 插件可以进行整个机器学习生命周期自动化和管理。 |
Pipeline允许用户通过图形化界面或YAML文件定义机器学习工作流,并自动化各个步骤的执行。它使得复杂的机器学习任务能够在 Kubernetes 上高效地调度和运行。 |
Run |
是一次实际的Pipeline执行实例,表示一个具体的工作流的运行。每个Run是Pipeline的一次执行,用于记录Pipeline的输入参数、执行状态、输出结果等信息。 |
Run用于执行具体的机器学习任务,提供不同的输入参数或数据集。通Run,用户可以追踪每次执行的进展和结果,进行版本管理和实验对比。 |
Experiment |
是对多个Run的逻辑分组。一般情况下,多个Runs代表相同问题下的不同配置或超参数调整。通过Experiment,用户可以组织和管理多个Run,进行实验管理和结果对比。 |
Experiment使用户能够对比和跟踪不同机器学习任务的执行结果。通过组织多个Run,用户可以分析不同参数和配置对模型表现的影响,从而进行更精细化的模型调优和性能优化。 |
Component |
是Pipeline中的最小单元,代表一个特定的机器学习任务或步骤,如数据预处理、模型训练、模型评估等。每个Component都是一个容器,包含执行该任务所需的代码和依赖。 |
Component使Pipeline中的每个步骤可以独立定义和管理,并且可以在不同的Pipelines中复用。这种组件化的设计让用户可以方便地构建、调试和扩展Pipeline。 |
DAG |
是Kubeflow Pipelines中用于表示任务依赖关系的图结构。在一个Pipeline 中,DAG用于描述各个Component之间的执行顺序和依赖关系。 |
DAG确保任务按照正确的顺序执行,同时避免任务之间的循环依赖。通过DAG,Kubeflow Pipelines能够灵活地管理任务之间的依赖,确保工作流的高效执行。 |
更多概念请参见Kubeflow基本概念。
前提条件
- 已创建一个CCE Standard/Turbo集群,且集群版本为v1.27及以上,具体操作请参见购买Standard/Turbo集群。
- 安装插件前,请确保集群中的Pod可以访问公网,以便下载镜像。您可以为集群配置SNAT规则以保证Pod能够访问公网,配置SNAT规则将涉及一定费用,具体请参见NAT网关价格计算器。
- 在使用示例中,需要配置NodePort类型的Service以访问Kubeflow插件的Web UI界面,此时需要为集群中任一节点绑定弹性公网IP(EIP)。EIP涉及一定费用,具体请参见价格计算器。
约束与限制
- 目前,Kubeflow插件仅提供Pipeline能力。
- 该插件当前正处于上线阶段,已发布区域请以控制台实际为准。
- 该插件处于公测阶段,您可体验最新插件特性,但需要注意该版本的稳定性未得到完全的验证,不适用于CCE服务SLA。
安装插件
- 登录CCE控制台,单击集群名称进入集群。
- 在左侧导航栏中选择“插件中心”,在右侧找到Kubeflow插件,单击“安装”。
- 在安装插件页面,进行相关配置。
表2 插件规格配置 参数
参数说明
选择版本
表示插件版本,请按需选择。
插件规格
当前仅支持“默认”规格。
- 配置完成后,请在右下角单击“安装”。Kubeflow插件状态变为“运行中”时,则说明安装成功。
组件说明
组件名称 |
说明 |
资源类型 |
---|---|---|
cache-deployer |
用于初始化部署缓存所需的配置(如PVC、ConfigMap等),负责将缓存配置注入集群。 |
Deployment |
cache-server |
提供缓存服务,避免重复执行已经完成过的Pipeline Step,提高运行效率。 |
Deployment |
metadata-envoy |
用于在Pipeline执行过程中收集、处理并代理转发元数据至指定的Metadata Store。 |
Deployment |
ml_metadata_store_server |
表示元数据存储服务,用于存储Pipeline执行过程中的输入、输出、模型等信息。 |
Deployment |
metadata-writer |
用于将Pipeline执行的元数据写入Metadata Store。 |
Deployment |
minio |
表示对象存储服务,用于存储Pipeline运行过程中产生的Artifact,如模型、数据等。 |
Deployment |
api-server |
提供REST/gRPC API接口,供SDK、UI与后端通信。 |
Deployment |
persistenceagent |
将Pipeline执行记录持久化到数据库中,支持运行状态恢复。 |
Deployment |
scheduledworkflow |
提供定时触发Pipeline的能力,支持周期性运行。 |
Deployment |
ml-pipeline-ui |
提供Web UI前端服务,供用户查看并管理Pipeline、运行记录等。 |
Deployment |
viewer-crd-controller |
管理Viewer CRD,用于在UI中可视化Artifact(如图片、表格等)。 |
Deployment |
visualization-server |
提供数据可视化服务,如TensorBoard、ROC曲线等,需与Viewer配合使用。 |
Deployment |
mysql |
表示Kubeflow Pipelines使用的关系型数据库,用于存储运行记录、用户信息等。 |
Deployment |
workflow-controller |
是Argo的核心组件,表示工作流控制器,负责调度执行Pipeline中的每个任务(Step)。 |
Deployment |
- 以上组件均部署在kubeflow命名空间中。
- 某些组件(如minio、mysql)为Kubeflow Pipeline提供依赖服务,您可按需替换。

目前,mysql和minio的后端PV绑定的是EmptyDir,如果Pod产生漂移或者重启会导致非预期结果。
使用示例
本示例将向您介绍如何使用Kubeflow插件执行一个简单的Pipeline,该示例步骤流程请参见图1。该Pipeline的包含一个任务comp,用于接收一个字符串类型的输入message,并将该消息内容原样输出。comp任务通过一个基于Python 3.9 的Pod实现,主要任务是打印接收到的message,并将其写入到指定的输出文件。
- 在已有的ECS中安装kubectl,并使用kubectl连接集群,具体操作请参见通过kubectl连接集群。
- 安装Kubeflow插件后,系统将自动创建ml-pipeline-ui组件以提供Web UI前端服务,该组件自动绑定名为ml-pipeline-ui的ClusterIP类型Service。如果您需要访问该UI界面,则需要将ClusterIP类型的Service转化为NodePort或LoadBalancer类型的Service。本文以NodePort类型Service为例,具体步骤如下:
执行以下命令,将Cluster IP类型的Service转化为NodePort类型的Service。
kubectl edit svc ml-pipeline-ui -n kubeflow
将文件中的type设置为NodePort,并指定一个与集群内其他服务不冲突的节点端口号。关于NodePort类型Service的详细说明,请参见节点访问(NodePort)。
... spec: ports: - name: http protocol: TCP port: 80 targetPort: 3000 nodePort: 30083 # 节点端口号 selector: app: ml-pipeline-ui application-crd-id: kubeflow-pipelines clusterIP: 10.247.101.116 clusterIPs: - 10.247.101.116 type: NodePort # 将ClusterIP改为NodePort sessionAffinity: None ...
- 在浏览器中输入http://<节点EIP>:<节点端口号>/ ,以访问Kubeflow插件的UI界面。通过该界面,您可以直观地创建和管理机器学习Pipeline,定义包含数据预处理、模型训练、评估部署等步骤的完整工作流。同时,可以配置和启动运行实例(Run),实时监控任务执行状态、查看日志和可视化结果,实现机器学习工作流的自动化编排与执行。
显示如下界面,则说明访问成功。
图2 Kubeflow插件的UI界面 - 在本地创建一个pipeline.yaml文件,用于配置Pipeline。本文仅提供一个简单的Pipeline示例,用于提取Pipelines运行时传入的参数,并将其打印并输出至指定文件中。关于Pipeline更详细的使用说明,请参见Compile a Pipeline。
pipeline.yaml的文件内容如下:
# PIPELINE DEFINITION # Name: my-pipeline # Description: My ML pipeline running on HuaweiCloud CCE. # Inputs: # message: str # Outputs: # Output: str components: comp-comp:# 定义一个名为comp-comp的Component,指定其输入(message)和输出(Output)。 executorLabel: exec-comp # 执行器标签,用于匹配执行器 inputDefinitions: # 组件输入定义 parameters: message: parameterType: STRING outputDefinitions: # 组件输出定义 parameters: Output: parameterType: STRING deploymentSpec: executors: # 配置执行器 exec-comp: container: # 容器配置,表示CCE集群中Pod执行的具体任务 image: python:3.9 # 使用python 3.9镜像 command: # 容器的启动命令 - python3 - -c - | import sys, json # python脚本:从输入参数中提取message,打印并写入输出文件(路径由Kubeflow运行时提供) executor_input = json.loads(sys.argv[1]) inputs = executor_input.get("inputs", {}) parameters = inputs.get("parameterValues", {}) message = parameters.get("message", "Success Message From pipeline") print("Received message:", message) outputs = executor_input.get("outputs", {}) output_file = outputs.get("parameters", {}).get("Output", {}).get("outputFile") if output_file: with open(output_file, "w") as f: f.write(message) - '{{$}}' pipelineInfo: description: My ML pipeline running on HuaweiCloud CCE. name: my-pipeline root: dag: # 定义一个DAG,当存在多Component并行或串行时,DAG可以确定其执行顺序和依赖关系 outputs: parameters: Output: valueFromParameter: outputParameterKey: Output # 将任务comp的Output值作为整个DAG的最终输出 producerSubtask: comp tasks: # 定义一个任务comp,调用组件comp-comp,将python脚本的输入message传递给组件,并将组件的输出Output作为Pipeline的最终输出 comp: cachingOptions: enableCache: true componentRef: name: comp-comp inputs: parameters: message: componentInputParameter: message taskInfo: name: comp inputDefinitions: #声明Pipeline的输入参数(message)和输出参数(Output),类型均为字符串 parameters: message: parameterType: STRING outputDefinitions: parameters: Output: parameterType: STRING schemaVersion: 2.1.0 sdkVersion: kfp-2.12.1
以上YAML文件的整体运行逻辑如下:
表3 运行逻辑 阶段
步骤
步骤说明
关联的YAML字段
输入阶段
用户提交参数
用户通过UI传入message参数
inputDefinitions.parameters.message
Pipeline接收
对流水线运行所需的输入参数进行集中验证、标准化处理,并将合法参数持久化存储
root.inputDefinitions
DAG参数分发
DAG将参数传递给指定任务
tasks.comp.inputs.parameters.message
任务执行阶段
任务调用组件
任务“comp”引用组件“comp-comp”
tasks.comp.componentRef
组件绑定执行器
组件通过“executorLabel”找到容器实现
components.comp-comp.executorLabel
deploymentSpec.exec-comp
容器运行
CCE集群的API Server接收执行请求,启动Pod执行Python脚本任务
deploymentSpec.exec-comp.container.command
输出阶段
组件输出定义
组件声明输出参数“Output”
components.comp-comp.outputDefinitions
DAG结果聚合
从任务“comp”收集输出到流水线
root.dag.outputs.parameters.Output
最终输出
用户可查看的全局输出结果
outputDefinitions.parameters.Output
- 返回UI界面,在右上角单击“Upload pipeline”,上传上述pipeline.yaml,具体示例如下。配置完成后,在下方单击“Create”创建Pipeline。
图3 上传Pipeline
- Pipeline创建完成后,在UI界面自动出现comp任务。在右上角单击“Create run”,用于创建执行实例,参数示例如下。配置完成后,在下方单击“Start”运行上述Pipeline。
图4 创建Run
- Run parameters:根据pipeline.yaml文件内容自动生成的参数,请填入需要输出的message。
单击“Start”后,CCE集群的API Server接收Executor下发的任务请求,将启动Pod执行具体任务,执行过程中的状态和日志会实时反馈至UI界面,用户可以在UI界面中监控任务进度并查看最终结果。当CCE集群的Pod状态为停止时,则说明任务执行完成。此时在UI界面左侧菜单栏单击“Runs”,若对应Run实例前出现
,则说明Pipeline运行成功。
图5 启动Pod执行任务 - 单击comp任务,查看输入信息和输出结果。如图所示,输入的message为“My ML pipeline running on HuaweiCloud CCE.”,输出结果为“My ML pipeline running on HuaweiCloud CCE.”,符合Pipeline定义。
图6 输入信息和输出信息
版本记录
插件版本 |
支持的集群版本 |
更新特性 |
---|---|---|
1.1.4 |
v1.27及以上 |
CCE Standard/Turbo集群支持使用Kubeflow插件 |