文档首页/ 智能数据湖 AIDataLake/ 快速入门/ 基于Ray Data的数据处理
更新时间:2026-04-30 GMT+08:00
分享

基于Ray Data的数据处理

操作场景

AI DataLake是华为云提供的全托管的湖仓一体大数据分析服务,提供端到端的数据管理与分析能力,支持多种引擎的作业开发,满足多样化的大数据处理需求。

本文以通过创建计算资源池、配置Ray类型的端点、连接数据、使用API提交作业为例,演示通过AI DataLake分析数据的操作指导。

操作流程

开始使用如下样例前,请务必按准备工作指导完成必要操作。

  1. 规划并创建OBS桶:创建一个用于存储作业脚本的OBS桶。
  2. 创建工作空间:创建一个工作空间。
  3. 创建计算资源池:创建运行作业的计算资源。
  4. 创建运行作业的Ray端点:创建运行作业的Ray端点,并关联资源空间以获得运行作业的资源。
  5. 使用API提交作业:使用API封装rest请求提交AI DataLake作业至Ray端点运行,本入门以提交一个简单文本处理的AI DataLake作业为例进行演示。

准备工作

  • 已注册账号并实名认证,且账号不能处于欠费或冻结状态。
  • 已开通AI DataLake服务并授权使用云服务资源。
  • 已开通OBS权限并进行了委托确认。
  • 已构建镜像并上传至SWR中,并将镜像注册至AI-DataLake。
  • 已经获取IAM Token,详细操作请参见获取IAM用户Token

步骤一:规划并创建OBS桶

AI DataLake Ray通过OBS服务实现作业提交与数据存储,需要先在OBS控制台进行桶及文件夹创建,并导入样例数据或作业脚本。

  1. 登录管理控制台。
  2. 在页面左上角单击图标,选择“存储 > 对象存储服务 OBS”,进入对象存储服务页面。
  3. 选择“桶列表 > 创建桶”,进入创建桶页面,配置相关参数后单击“立即创建”。
    • 桶名称:根据界面要求设置桶名称。
    • 其他参数根据实际情况选择。
  4. 在桶列表页面,单击已创建的桶名称。
  5. 在“对象”页签,单击“上传对象”,将作业脚本上传至所创建的OBS桶中。

    例如文本处理脚本“text-job.py”,内容示例如下:

    import ray
    import re
    from collections import Counter
    import time
    ray.init()
    def simple_text_preprocessing():
        sample_texts = [
            "Ray is a fast and simple framework for building distributed applications.",
            "This example shows how to preprocess text data using Ray Data API.",
            "We will perform tokenization, lowercase conversion, and word counting.",
            "The processing will be distributed across multiple workers automatically.",
            "Ray makes it easy to scale your data processing pipelines.",
            "You can process large datasets without changing your code structure.",
            "This is a demonstration of basic text preprocessing operations.",
            "We are using a small sample for demonstration purposes.",
            "In real scenarios, you would process much larger text corpora.",
            "Ray provides high-level APIs for distributed data processing."
        ]
        print("开始文本预处理流程...")
        print(f"原始文本数量: {len(sample_texts)}")
        # 2. 将数据转换为Ray Dataset
        ds = ray.data.from_items([{"text": text, "doc_id": i} for i, text in enumerate(sample_texts)])
        print(f"数据集已创建: {ds.count()} 个文档")
        # 3. 定义文本预处理函数
        def clean_text(batch):
            all_tokens = []  # 变成一个扁平的列表
            doc_ids = []
            for text, doc_id in zip(batch["text"], batch["doc_id"]):
                text_lower = text.lower()
                text_clean = re.sub(r'[^a-zA-Z\s\.\,\!]', '', text_lower)
                tokens = text_clean.split()
                # 将当前文档的所有token合并为一个字符串,用空格分隔
                all_tokens.append(" ".join(tokens)) # 例如 "hello world"
                doc_ids.append(doc_id)
            return {"tokens": all_tokens, "doc_id": doc_ids}
        # 4. 应用预处理(分布式执行)
        print("开始分布式文本清洗...")
        processed_ds = ds.map_batches(
            clean_text,
            batch_size=2,  # 小批量适合演示
            num_cpus=0.5   # 每个任务需要的CPU资源
        )
        # 5. 定义词频统计函数
        def count_words(batch):
            """统计每个文档的词频"""
            word_counts = []
            for tokens in batch["tokens"]:
                count = Counter(tokens)
                word_counts.append(dict(count))
            return {"word_counts": word_counts, "doc_id": batch["doc_id"]}
        # 6. 统计词频
        print("开始词频统计...")
        counted_ds = processed_ds.map_batches(
            count_words,
            batch_size=2,
            num_cpus=0.5
        )
        # 7. 收集并汇总结果
        print("汇总最终结果...")
        results = counted_ds.take_all()
        # 汇总所有文档的词频
        global_word_count = Counter()
        doc_stats = []
        for result in results:
            doc_id = result["doc_id"]
            word_count = result["word_counts"]
            doc_stats.append({
                "doc_id": doc_id,
                "unique_words": len(word_count),
                "total_words": sum(word_count.values())
            })
            # 更新全局词频
            global_word_count.update(word_count)
        # 8. 打印结果摘要
        print("\n" + "="*60)
        print("文本预处理完成!")
        print("="*60)
        print(f"处理文档总数: {len(doc_stats)}")
        print(f"发现唯一词汇总数: {len(global_word_count)}")
        print(f"词汇出现总次数: {sum(global_word_count.values())}")
        print(f"\n文档统计信息:")
        for stat in doc_stats[:3]:  # 显示前3个文档的统计
            print(f"  文档 {stat['doc_id']}: {stat['unique_words']} 个唯一词汇, {stat['total_words']} 个总词汇")
        if len(doc_stats) > 3:
            print(f"  ... 和 {len(doc_stats) - 3} 个其他文档")
        print(f"\n出现频率最高的10个词汇:")
        for word, count in global_word_count.most_common(10):
            print(f"  '{word}': {count} 次")
        print("="*60)
        return global_word_count, doc_stats
    if __name__ == "__main__":
        try:
            start_time = time.time()
            # 运行基础示例
            word_count, doc_stats = simple_text_preprocessing()
            total_time = time.time() - start_time
            print(f"\n总执行时间: {total_time:.2f} 秒")
        finally:
            # 清理资源
            ray.shutdown()
            print("Ray已关闭。")

步骤二:创建工作空间

  1. 登录AI DataLake管理控制台。
  2. 在左侧导航栏中,单击工作空间区域。
  3. 在下拉列表中选择“创建工作空间”。
  4. 配置工作空间的相关参数。
    表1 工作空间的基本信息

    类型

    参数

    是否必填

    说明

    基础信息

    工作空间名称

    工作空间的具体名称,同一个账号下的工作空间不可重名。

    • 名称只能包含字母、数字、中划线、下划线。
    • 名称字符长度为4~32位。

    工作空间名称不区分大小写,系统会自动转换为小写。

    描述

    对工作空间的简要描述,最长不超过255个字符。

    企业项目

    如果所建工作空间属于企业项目,可选择对应的企业项目。

    企业项目是一种云资源管理方式,企业项目管理服务提供统一的云资源按项目管理,以及项目内的资源管理、成员管理。

    关于如何设置企业项目请参考《企业管理用户指南》。

    说明:

    只有开通了企业管理服务的用户才显示该参数。

    多模数据管理

    LakeFormation实例

    选择当前工作空间关联的LakeFormation实例,每个工作空间需关联1个LakeFormation实例,空间创建后已关联的实例不支持修改。

    • 选择已创建的LakeFormation实例,用于统一管理元数据和数据访问控制。
    • 如无可用实例,请先创建LakeFormation实例,具体操作请参考创建LakeFormation实例
  5. 确认所有配置信息无误。

    单击“立即创建”按钮,完成工作空间创建。

    工作空间创建成功后,您可以在工作空间管理的列表中查看新创建的工作空间。

步骤三:创建计算资源池

  1. 在AI DataLake管理控制台页面,切换页面右上角的工作空间为步骤二:创建工作空间新创建的空间。
  2. 单击左侧导航栏的“新建”,在下拉框中选择“计算资源池”进入购买计算资源池页面。
  3. 在“购买计算资源池”界面,填写具体参数,参数填写参考表2 购买计算资源池参数说明
    表2 购买计算资源池参数说明

    参数

    配置样例

    说明

    计费模式

    按需计费

    选择“按需计费”。

    按需计费即后付费模式,按实际使用量计费,在购买周期内资源独享,空闲时资源不被释放。

    资源池名称

    resource-pool-for-raytest

    计算资源池的具体名称。

    • 名称只能包含数字、小写英文字母和中划线,且只能以字母开头、以字母或数字结尾。
    • 输入长度不能超过63个字符。

    CPU资源

    本例配置8个通用计算标准型实例。

    CPU是通用型计算资源,适用于各种类型的计算任务。

    • CPU资源的特点:通用性强,适合各种类型的计算任务。相比于GPU和NPU的成本更低。擅长处理顺序执行的任务。
    • CPU资源的适用场景:ETL数据抽取、转换、加载;轻量计算场景,例如小规模数据处理、脚本运行,日志分析场景,例如日志采集、解析、统计分析。
    • CPU资源的具体规格请参考产品规格

    GPU资源

    本例不购买GPU实例

    GPU是图形处理器适用于图形渲染和大规模并行计算场景,适合深度学习训练和科学计算。GPU拥有成百上千个计算核心,可以同时处理大量简单计算任务。

    • GPU资源的特点:支持并行计算,适用于批处理作业场景,数千个核心同时计算,处理效率高。天然适合深度学习、神经网络、AI计算场景。
    • GPU资源的适用场景:深度学习,例如神经网络训练、模型调优场景;图形处理场景,例如图像识别、目标检测;视频处理场景,例如视频分析、转码、视频渲染场景,等其他AI科学计算场景。
    • GPU资源的具体规格请参考产品规格

    NPU资源

    本例购买1个昇腾AI加速型(B3)1卡

    NPU适用于AI计算场景。NPU资源采用架构优化和指令集,专门加速AI推理任务,具有高能效比的特点。

    • NPU资源的特点:AI计算场景专用,具备高性能、低延迟、推理成本更优的特点。
    • NPU资源的适用场景:AI计算场景、图像识别场景,推荐系统等AI计算设计场景。
    • NPU资源的具体规格请参考产品规格

    AI DataLake网络

    自定义

    选择AI DataLake资源池所属网络,该网络基于虚拟私有云(VPC)进行封装。如果不存在可使用的网络,也可单击“创建网络”进行创建。

    一个工作空间仅支持创建一个网络。

  4. 参数填写完成后,单击“立即购买”,在界面上确认当前配置是否正确。
  5. 单击“提交”完成创建。等待资源池状态变成“可使用”表示当前资源池创建成功。

步骤四:创建运行作业的Ray端点

  1. 在AI DataLake管理控制台页面,切换页面右上角的工作空间为步骤二:创建工作空间新创建的空间。
  2. 在左侧导航栏选择“引擎端点 > AI 计算引擎 Ray”进入Ray端点列表页面。
  3. 单击页面右上角的“创建端点”,配置以下参数并单击“立即创建”。
    表3 创建Ray引擎端点

    参数

    配置样例

    参数说明

    端点类型

    RayCluster

    选择端点类型。

    RayCluster:常驻模式,可多次提交Ray作业。

    端点名称

    Ray-Test

    输入端点名称,是端点的唯一标识符,不可与已存在的端点重名,且创建后不支持修改。

    名称只能包含小写字母、数字、中划线,且只能以字母开头,以字母或数字结尾。

    输入长度不能超过63个字符。

    端点显示名称

    开发团队专用Ray端点-01

    输入端点显示名称,创建后可修改,用于用户界面展示,方便用户识别和记忆。

    • 名称只能包含中文、英文、数字、中划线。
    • 输入长度不能超过63个字符。

    资源使用模式

    预留资源

    选择资源使用模式。

    预留资源:预留模式通过预留专属计算资源,确保业务所需的计算资源的稳定性,同时获得最优的单价成本。

    选择计算资源池

    resource_pool_for_raytest

    在下拉框中选择步骤三:创建计算资源池已创建的资源池。

    集群配置

    • Head:64vCPU,128GiB
    • Workergroups:64vCPU,128GiB

    配置对应Ray集群的Head节点规格、Workergroup规格,可创建多个Workergroup,根据业务需求选择。

    规格选择列表中可以看到所有的规格,选择的规格可根据创建的Ray资源向下兼容,例如创建了一个aidatalake.cpu.x86.general.1c2g的资源,那么在选择head规格的时候可以选择aidatalake.cpu.x86.general.1c2g,之后配置vCPU与内存资源。

    镜像配置

    自定义镜像“Ray”

    选择集群镜像版本。

    预置镜像:预置镜像为AI DataLake提供的基础镜像版本。

    自定义镜像:自定义镜像为用户自行注册的SWR镜像。

    存储配置

    • 选择临时存储:sfs-turbo-01
    • 存储路径:/
    • 挂载路径:/home/service/works

    基于共享存储(SFS Turbo),通过挂载路径规范容器内访问路径,并配合存储路径为Head与Worker节点划定独立的物理子目录,从而在实现跨节点数据互通的同时,解决多节点数据写入冲突的问题。

    GCS高可用

    勾选“GCS高可用”还需要配置:

    • DCS Redis缓存实例:test
    • 账号:default
    • DEW 凭据名称:test

    是否启用Redis为GCS提供可靠性。GCS承担集群元数据的管理职责。未启用高可用时,一旦 GCS发生故障,会引发整个集群故障;启用高可用后,GCS可通过 Redis恢复元数据,从而防止集群级故障。

    勾选该参数后,还需配置“DCS Redis缓存实例”和“账号”与“DEW凭据名称”参数。

  4. 端点创建后,可在列表中查看相关信息,端点状态变为“运行中”后即可提交作业到该端点中运行。

步骤五:使用API提交作业

本示例通过request库展示一个简单示例,例如“request.py”,模拟通过OpenApi从创建Ray Job到提交Ray Job的完整流程,请求脚本示例如下:

import requests
import json
import urllib3
import time

# 禁用 SSL 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# 公共配置
TOKEN = "XXX"  # IAM的token
WORKSPACE_ID = "3fa4b5a6-9208-4254-b823-119aa29c4016"  # 工作空间的ID
HEADERS = {"X-Auth-Token": TOKEN, "Content-Type": "application/json"}
ENDPOINT = "https://fabric.cn-southwest-2.myhuaweicloud.com" # 实际使用时这个ENDPOINT为对应region endpoint的值,此处为贵阳一地址
# 提交运行 URL
URL_SUBMIT_RUN = f"https://{ENDPOINT}/v2/workspaces/{WORKSPACE_ID}/ray-jobs"

def submit_run():
    payload = {
        "name": "Ray-Test",
        "description": "description",
        "endpoint_name": "Ray-Test",
        "config": {
            "entrypoint": "python test-job.py",
            "runtime_env": {
                "working_dir": "obs://bucket/dir1/",  # 即作业脚本存储的obs路径
                "py_modules": [
                    "string"
                ],
                "pip": [
                    "numpy=1.16.1"  # 可选值,填写值为涉及pip依赖库,若不涉及可不填
                ],
                "env_vars": {
                    "additionalProp1": "value1",  # 可选值,填写值为环境变量,若不涉及可不填
                    "additionalProp2": "value2",
                    "additionalProp3": "value3"
                }
            }
        }
    }

    try:
        response = requests.post(
            URL_SUBMIT_RUN, headers=HEADERS, json=payload, verify=False
        )
        if response.status_code in [200, 201, 202]:
            print(
                f"提交运行成功!响应码:{response.status_code}。响应内容:{response.text}"
            )
        else:
            print(f"提交运行失败 (Code: {response.status_code})")
            print(response.text)
    except Exception as e:
        print(f"请求异常: {e}")

if __name__ == "__main__":
        submit_run()

执行脚本“request.py”即可提交一个文本处理AI DataLake Job至指定Ray集群。

相关文档