更新时间:2025-07-18 GMT+08:00

Kubeflow插件

随着模型和数据量的增加,如何高效地构建、部署和管理复杂的机器学习工作流成为重要挑战,具体如下:

  • 缺乏管理复杂机器学习任务的统一平台。
  • 不同组件之间的依赖关系和数据流难以追踪和管理。
  • 重复执行相同任务导致计算资源浪费。
  • 不同的计算环境之间无法迁移工作流,缺乏灵活性。

为解决上述问题,CCE Standard/Turbo集群基于Kubeflow提供了Kubeflow插件。Kubeflow是一个开源的机器学习(ML)平台,专为Kubernetes环境设计,用于简化机器学习工作流的开发、部署和管理。它为开发者和数据科学家提供了一个一致的、可扩展的框架,用于在Kubernetes上运行机器学习任务和应用。基于Kubeflow的能力,该插件具备以下功能:

  • 简化ML工作流的开发和管理:通过直观的界面和Python SDK,用户可以快速定义和管理复杂的ML工作流。
  • 提高资源利用效率:通过任务并行化和缓存机制,减少冗余计算,进而提高资源利用率。
  • 增强工作流的可移植性:通过平台中立的IR YAML定义,用户可以在不同的环境中轻松迁移工作流。
  • 加强管理和可视化能力:插件提供丰富的管理和可视化工具,帮助用户更好地跟踪和优化工作流。

基本概念

表1 插件基本概念

概念

定义

作用

Pipeline

是一组按顺序执行的机器学习步骤的集合,通常包括数据预处理、训练、评估和推理等过程。通过Pipeline,Kubeflow 插件可以进行整个机器学习生命周期自动化和管理。

Pipeline允许用户通过图形化界面或YAML文件定义机器学习工作流,并自动化各个步骤的执行。它使得复杂的机器学习任务能够在 Kubernetes 上高效地调度和运行。

Run

是一次实际的Pipeline执行实例,表示一个具体的工作流的运行。每个Run是Pipeline的一次执行,用于记录Pipeline的输入参数、执行状态、输出结果等信息。

Run用于执行具体的机器学习任务,提供不同的输入参数或数据集。通Run,用户可以追踪每次执行的进展和结果,进行版本管理和实验对比。

Experiment

是对多个Run的逻辑分组。一般情况下,多个Runs代表相同问题下的不同配置或超参数调整。通过Experiment,用户可以组织和管理多个Run,进行实验管理和结果对比。

Experiment使用户能够对比和跟踪不同机器学习任务的执行结果。通过组织多个Run,用户可以分析不同参数和配置对模型表现的影响,从而进行更精细化的模型调优和性能优化。

Component

是Pipeline中的最小单元,代表一个特定的机器学习任务或步骤,如数据预处理、模型训练、模型评估等。每个Component都是一个容器,包含执行该任务所需的代码和依赖。

Component使Pipeline中的每个步骤可以独立定义和管理,并且可以在不同的Pipelines中复用。这种组件化的设计让用户可以方便地构建、调试和扩展Pipeline。

DAG

是Kubeflow Pipelines中用于表示任务依赖关系的图结构。在一个Pipeline 中,DAG用于描述各个Component之间的执行顺序和依赖关系。

DAG确保任务按照正确的顺序执行,同时避免任务之间的循环依赖。通过DAG,Kubeflow Pipelines能够灵活地管理任务之间的依赖,确保工作流的高效执行。

更多概念请参见Kubeflow基本概念

前提条件

  • 已创建一个CCE Standard/Turbo集群,且集群版本为v1.27及以上,具体操作请参见购买Standard/Turbo集群
  • 安装插件前,请确保集群中的Pod可以访问公网,以便下载镜像。您可以为集群配置SNAT规则以保证Pod能够访问公网,配置SNAT规则将涉及一定费用,具体请参见NAT网关价格计算器
  • 在使用示例中,需要配置NodePort类型的Service以访问Kubeflow插件的Web UI界面,此时需要为集群中任一节点绑定弹性公网IP(EIP)。EIP涉及一定费用,具体请参见价格计算器

约束与限制

  • 目前,Kubeflow插件仅提供Pipeline能力。
  • 该插件当前正处于上线阶段,已发布区域请以控制台实际为准
  • 该插件处于公测阶段,您可体验最新插件特性,但需要注意该版本的稳定性未得到完全的验证,不适用于CCE服务SLA

安装插件

  1. 登录CCE控制台,单击集群名称进入集群。
  2. 在左侧导航栏中选择“插件中心”,在右侧找到Kubeflow插件,单击“安装”
  3. 在安装插件页面,进行相关配置。

    表2 插件规格配置

    参数

    参数说明

    选择版本

    表示插件版本,请按需选择。

    插件规格

    当前仅支持“默认”规格。

  4. 配置完成后,请在右下角单击“安装”。Kubeflow插件状态变为“运行中”时,则说明安装成功。

组件说明

组件名称

说明

资源类型

cache-deployer

用于初始化部署缓存所需的配置(如PVC、ConfigMap等),负责将缓存配置注入集群。

Deployment

cache-server

提供缓存服务,避免重复执行已经完成过的Pipeline Step,提高运行效率。

Deployment

metadata-envoy

用于在Pipeline执行过程中收集、处理并代理转发元数据至指定的Metadata Store。

Deployment

ml_metadata_store_server

表示元数据存储服务,用于存储Pipeline执行过程中的输入、输出、模型等信息。

Deployment

metadata-writer

用于将Pipeline执行的元数据写入Metadata Store。

Deployment

minio

表示对象存储服务,用于存储Pipeline运行过程中产生的Artifact,如模型、数据等。

Deployment

api-server

提供REST/gRPC API接口,供SDK、UI与后端通信。

Deployment

persistenceagent

将Pipeline执行记录持久化到数据库中,支持运行状态恢复。

Deployment

scheduledworkflow

提供定时触发Pipeline的能力,支持周期性运行。

Deployment

ml-pipeline-ui

提供Web UI前端服务,供用户查看并管理Pipeline、运行记录等。

Deployment

viewer-crd-controller

管理Viewer CRD,用于在UI中可视化Artifact(如图片、表格等)。

Deployment

visualization-server

提供数据可视化服务,如TensorBoard、ROC曲线等,需与Viewer配合使用。

Deployment

mysql

表示Kubeflow Pipelines使用的关系型数据库,用于存储运行记录、用户信息等。

Deployment

workflow-controller

是Argo的核心组件,表示工作流控制器,负责调度执行Pipeline中的每个任务(Step)。

Deployment

  • 以上组件均部署在kubeflow命名空间中。
  • 某些组件(如minio、mysql)为Kubeflow Pipeline提供依赖服务,您可按需替换。

目前,mysql和minio的后端PV绑定的是EmptyDir,如果Pod产生漂移或者重启会导致非预期结果。

使用示例

本示例将向您介绍如何使用Kubeflow插件执行一个简单的Pipeline,该示例步骤流程请参见图1。该Pipeline的包含一个任务comp,用于接收一个字符串类型的输入message,并将该消息内容原样输出。comp任务通过一个基于Python 3.9 的Pod实现,主要任务是打印接收到的message,并将其写入到指定的输出文件。

图1 示例整体流程
  1. 在已有的ECS中安装kubectl,并使用kubectl连接集群,具体操作请参见通过kubectl连接集群
  2. 安装Kubeflow插件后,系统将自动创建ml-pipeline-ui组件以提供Web UI前端服务,该组件自动绑定名为ml-pipeline-ui的ClusterIP类型Service。如果您需要访问该UI界面,则需要将ClusterIP类型的Service转化为NodePort或LoadBalancer类型的Service。本文以NodePort类型Service为例,具体步骤如下:

    执行以下命令,将Cluster IP类型的Service转化为NodePort类型的Service。
    kubectl edit svc ml-pipeline-ui -n kubeflow

    将文件中的type设置为NodePort,并指定一个与集群内其他服务不冲突的节点端口号。关于NodePort类型Service的详细说明,请参见节点访问(NodePort)

    ...
    spec:
      ports:
        - name: http
          protocol: TCP
          port: 80
          targetPort: 3000
          nodePort: 30083 # 节点端口号
      selector:
        app: ml-pipeline-ui
        application-crd-id: kubeflow-pipelines
      clusterIP: 10.247.101.116
      clusterIPs:
        - 10.247.101.116
      type: NodePort # 将ClusterIP改为NodePort
      sessionAffinity: None
    ...

  3. 在浏览器中输入http://<节点EIP>:<节点端口号>/ ,以访问Kubeflow插件的UI界面。通过该界面,您可以直观地创建和管理机器学习Pipeline,定义包含数据预处理、模型训练、评估部署等步骤的完整工作流。同时,可以配置和启动运行实例(Run),实时监控任务执行状态、查看日志和可视化结果,实现机器学习工作流的自动化编排与执行。

    显示如下界面,则说明访问成功。

    图2 Kubeflow插件的UI界面

  4. 在本地创建一个pipeline.yaml文件,用于配置Pipeline。本文仅提供一个简单的Pipeline示例,用于提取Pipelines运行时传入的参数,并将其打印并输出至指定文件中。关于Pipeline更详细的使用说明,请参见Compile a Pipeline

    pipeline.yaml的文件内容如下:
    # PIPELINE DEFINITION
    # Name: my-pipeline
    # Description: My ML pipeline running on HuaweiCloud CCE.
    # Inputs:
    #    message: str
    # Outputs:
    #    Output: str
    components:
    comp-comp:# 定义一个名为comp-comp的Component,指定其输入(message)和输出(Output)。
        executorLabel: exec-comp   # 执行器标签,用于匹配执行器
        inputDefinitions:          # 组件输入定义
          parameters:
            message:
              parameterType: STRING
        outputDefinitions:         # 组件输出定义
          parameters:
            Output:
              parameterType: STRING
    deploymentSpec:
      executors:      # 配置执行器
        exec-comp:
          container:  # 容器配置,表示CCE集群中Pod执行的具体任务
            image: python:3.9    # 使用python 3.9镜像
            command:             # 容器的启动命令
              - python3
              - -c
              - |
                  import sys, json      # python脚本:从输入参数中提取message,打印并写入输出文件(路径由Kubeflow运行时提供)
    
                  executor_input = json.loads(sys.argv[1])
                  inputs = executor_input.get("inputs", {})
                  parameters = inputs.get("parameterValues", {})
                  message = parameters.get("message", "Success Message From pipeline")
    
                  print("Received message:", message)
    
                  outputs = executor_input.get("outputs", {})
                  output_file = outputs.get("parameters", {}).get("Output", {}).get("outputFile")
                  if output_file:
                      with open(output_file, "w") as f:
                          f.write(message)
              - '{{$}}'
    pipelineInfo:
      description: My ML pipeline running on HuaweiCloud CCE.
      name: my-pipeline
    root:    
      dag:    # 定义一个DAG,当存在多Component并行或串行时,DAG可以确定其执行顺序和依赖关系
        outputs:
          parameters:
            Output:
              valueFromParameter:
                outputParameterKey: Output   # 将任务comp的Output值作为整个DAG的最终输出
                producerSubtask: comp  
        tasks:     # 定义一个任务comp,调用组件comp-comp,将python脚本的输入message传递给组件,并将组件的输出Output作为Pipeline的最终输出
          comp:    
            cachingOptions:
              enableCache: true
            componentRef:
              name: comp-comp
            inputs:
              parameters:
                message:
                  componentInputParameter: message
            taskInfo:
              name: comp
      inputDefinitions:     #声明Pipeline的输入参数(message)和输出参数(Output),类型均为字符串
        parameters:
          message:
            parameterType: STRING
      outputDefinitions:
        parameters:
          Output:
            parameterType: STRING
    schemaVersion: 2.1.0
    sdkVersion: kfp-2.12.1

    以上YAML文件的整体运行逻辑如下:

    表3 运行逻辑

    阶段

    步骤

    步骤说明

    关联的YAML字段

    输入阶段

    用户提交参数

    用户通过UI传入message参数

    inputDefinitions.parameters.message

    Pipeline接收

    对流水线运行所需的输入参数进行集中验证、标准化处理,并将合法参数持久化存储

    root.inputDefinitions

    DAG参数分发

    DAG将参数传递给指定任务

    tasks.comp.inputs.parameters.message

    任务执行阶段

    任务调用组件

    任务“comp”引用组件“comp-comp”

    tasks.comp.componentRef

    组件绑定执行器

    组件通过“executorLabel”找到容器实现

    components.comp-comp.executorLabel

    deploymentSpec.exec-comp

    容器运行

    CCE集群的API Server接收执行请求,启动Pod执行Python脚本任务

    deploymentSpec.exec-comp.container.command

    输出阶段

    组件输出定义

    组件声明输出参数“Output”

    components.comp-comp.outputDefinitions

    DAG结果聚合

    从任务“comp”收集输出到流水线

    root.dag.outputs.parameters.Output

    最终输出

    用户可查看的全局输出结果

    outputDefinitions.parameters.Output

  5. 返回UI界面,在右上角单击“Upload pipeline”,上传上述pipeline.yaml,具体示例如下。配置完成后,在下方单击“Create”创建Pipeline。

    图3 上传Pipeline

  6. Pipeline创建完成后,在UI界面自动出现comp任务。在右上角单击“Create run”,用于创建执行实例,参数示例如下。配置完成后,在下方单击“Start”运行上述Pipeline。

    图4 创建Run
    • Run parameters:根据pipeline.yaml文件内容自动生成的参数,请填入需要输出的message。

    单击“Start”后,CCE集群的API Server接收Executor下发的任务请求,将启动Pod执行具体任务,执行过程中的状态和日志会实时反馈至UI界面,用户可以在UI界面中监控任务进度并查看最终结果。当CCE集群的Pod状态为停止时,则说明任务执行完成。此时在UI界面左侧菜单栏单击“Runs”,若对应Run实例前出现,则说明Pipeline运行成功。

    图5 启动Pod执行任务

  7. 单击comp任务,查看输入信息和输出结果。如图所示,输入的message为“My ML pipeline running on HuaweiCloud CCE.”,输出结果为“My ML pipeline running on HuaweiCloud CCE.”,符合Pipeline定义。

    图6 输入信息和输出信息

版本记录

表4 Kubeflow插件版本记录

插件版本

支持的集群版本

更新特性

1.1.4

v1.27及以上

CCE Standard/Turbo集群支持使用Kubeflow插件