文档首页/ 智能数据湖 AIDataLake/ 快速入门/ 基于PySpark的数据处理
更新时间:2026-04-30 GMT+08:00
分享

基于PySpark的数据处理

操作场景

AI DataLake是华为云提供的全托管的湖仓一体大数据分析服务,提供端到端的数据管理与分析能力,支持多种引擎的作业开发,满足多样化的大数据处理需求。

本文以通过创建Spark引擎端点、连接数据、使用API提交作业为例,演示通过AI DataLake分析数据的操作指导。

操作流程

开始使用如下样例前,请务必按准备工作指导完成必要操作。

  1. 规划并创建OBS桶:创建一个用于存储作业脚本的OBS桶。
  2. 创建工作空间:创建一个工作空间并关联LakeFormation实例。
  3. 创建运行作业的Spark Job端点:创建运行作业的Spark Job端点。
  4. 使用API提交作业:使用API封装rest请求提交AI DataLake作业至Spark Job端点运行,本入门以提交一个简单线性回归模型推理的AI DataLake作业为例进行演示。

准备工作

  • 已注册账号并实名认证,且账号不能处于欠费或冻结状态。
  • 已开通AI DataLake服务并授权使用云服务资源。
  • 已开通LakeFormation服务并创建实例。
  • 已开通OBS权限并进行了委托确认。
  • 已经获取IAM Token,详细操作请参见获取IAM用户Token

步骤一:规划并创建OBS桶

AI DataLake Spark通过OBS服务实现作业提交与数据存储,需要先在OBS控制台进行桶及文件夹创建,并导入样例数据或作业脚本。

  1. 登录管理控制台。
  2. 在页面左上角单击图标,选择“存储 > 对象存储服务”,进入对象存储服务页面。
  3. 选择“桶列表 > 创建桶”,进入创建桶页面,配置相关参数后单击“立即创建”。
    • 桶名称:根据界面要求设置桶名称。
    • 其他参数根据实际情况选择。
  4. 在桶列表页面,单击已创建的桶名称。
  5. 在“对象”页签,单击“上传对象”,将作业脚本上传至所创建的OBS桶中。

    例如文本处理脚本“text-job.py”,完整脚本示例如下:

    from __future__ import print_function
    from pyspark.sql import SparkSession
    if __name__ == "__main__":
        spark = SparkSession \
            .builder \
            .appName("SimpleSpark") \
            .getOrCreate()
        # 简单的 RDD 操作
        rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5], 3)
        data = rdd.map(lambda x: x + 2).collect()
        print("Result:", data)
        spark.stop()

步骤二:创建工作空间

  1. 登录AI DataLake管理控制台。
  2. 在左侧导航栏中,单击工作空间区域。
  3. 在下拉列表中选择“创建工作空间”。
  4. 配置工作空间的相关参数。
    表1 工作空间的基本信息

    类型

    参数

    是否必填

    说明

    基础信息

    工作空间名称

    工作空间的具体名称,同一个账号下的工作空间不可重名。

    • 名称只能包含字母、数字、中划线、下划线。
    • 名称字符长度为4~32位。

    工作空间名称不区分大小写,系统会自动转换为小写。

    描述

    对工作空间的简要描述,最长不超过255个字符。

    企业项目

    如果所建工作空间属于企业项目,可选择对应的企业项目。

    企业项目是一种云资源管理方式,企业项目管理服务提供统一的云资源按项目管理,以及项目内的资源管理、成员管理。

    关于如何设置企业项目请参考《企业管理用户指南》。

    说明:

    只有开通了企业管理服务的用户才显示该参数。

    多模数据管理

    LakeFormation实例

    选择当前工作空间关联的LakeFormation实例,每个工作空间需关联1个LakeFormation实例,空间创建后已关联的实例不支持修改。

    • 选择已创建的LakeFormation实例,用于统一管理元数据和数据访问控制。
    • 如无可用实例,请先创建LakeFormation实例,具体操作请参考创建LakeFormation实例
  5. 确认所有配置信息无误。

    单击“立即创建”按钮,完成工作空间创建。

    工作空间创建成功后,您可以在工作空间管理的列表中查看新创建的工作空间。

步骤三:创建运行作业的Spark Job端点

  1. 在AI DataLake管理控制台页面,切换页面右上角的工作空间为步骤二:创建工作空间新创建的空间。
  2. 在左侧导航栏选择“引擎端点 > 批处理引擎Spark”进入Spark端点列表页面。
  3. 单击页面右上角的“创建”,根据界面提示配置参数。
    1. 配置端点的基础信息。
      表2 端点的基础信息配置

      参数

      配置样例

      参数说明

      端点类型

      SparkJob

      选择端点类型。

      • SparkSQL:适用于数据查询和分析的Spark SQL场景,通过编写SQL语句快速完成数据的筛选、聚合、计算等操作,满足数据分析师需要快速探索数据的业务需求。
      • SparkJob:适用于复杂数据处理的Spark Job场景,适合执行大规模数据转换、机器学习模型训练、ETL作业等复杂的数据处理任务。

      端点名称

      spark-job-01

      端点名称是系统内部的唯一标识,在API调用、CLI命令中引用该端点。

      说明:

      端点名称创建后不可修改。

      命名规则:

      • 名称只能包含小写字母、数字、中划线,且只能以字母开头,以字母或数字结尾。
      • 输入长度不能超过63个字符。

      端点显示名称

      智能驾驶团队专用SparkJob端点-01

      端点显示名称创建后可修改。

      端点显示名称用于用户界面展示,方便用户识别和记忆。

      命名规则:

      • 名称只能输入中文、英文、数字、中划线。
      • 输入长度不能超过63个字符

      添加描述

      智能驾驶团队专用

      填写描述信息,用于详细说明端点的用途、用途背景等信息,帮助其他用户理解该端点的创建目的和使用场景。

    2. 配置端点的“资源配置”类信息。
      表3 端点的资源配置

      参数

      配置样例

      参数说明

      资源使用模式

      按需弹性

      当前Spark端点仅支持选择“按需弹性”的资源模式。

      “按需弹性”的资源模式使用的是弹性资源,无需提前购买,按实际使用量计费,无业务运行不产生费用。

      适用于短期或偶发性的业务需求,如开发测试或临时任务。

      端点CPU使用最大值

      216

      选择“按需弹性”的资源模式时需要配置端点CPU使用最大值,即设置弹性资源的使用上限,防止突发流量导致资源无限扩展,耗尽集群算力。

      同时系统根据设置的CPU使用最大值可以进行合理的资源调度和分配。

    3. 配置端点的“调度配置”类信息。
      表4 存储配置信息

      参数

      配置样例

      参数说明

      端点最大并发作业数量

      5

      设置端点同时执行的作业任务数,通过资源分流避免单点过载,确保多用户并发访问时的响应稳定性。

      作业调度策略

      随机(RANDOM)

      可选随机(RANDOM),先进先出(FIFO)。

    4. 配置Spark参数。

      以“key=value”的形式设置提交Spark作业的属性。

      • 键(Key):输入Spark作业所需的配置参数名称,例如spark.executor.memory等。
      • 值(Value):为对应的键设置参数值,例如true、4g等。
    5. 配置端点的“存储配置”信息。
      表4 存储配置信息

      参数

      配置样例

      参数说明

      选择OBS桶

      test

      通过下拉菜单选择已有的OBS桶。用于存储作业日志和临时数据等信息。

      创建后不支持修改,若未配置,可点击“新建OBS桶”进行创建。

      委托名

      test01

      配置用于读写作业event log、日志和临时数据等信息所使用的委托(即权限委托,如IAM委托)。

      通过下拉菜单选择已有的委托。创建后不支持修改,若未配置,可点击“新建委托”进行创建。

    6. 配置端点的“作业委托配置”信息。

      参数

      配置样例

      参数说明

      委托名

      test02

      委托给该端点的权限,会在运行时自动换取凭证并刷新至Spark集群。用户可在Spark集群中使用该委托凭证调用对应服务。

  4. 完成上述信息的配置后,单击“确认”,系统按照配置要求开始创建端点。

    端点创建完成后,可在列表中查看相关信息,端点状态变为“运行中”后,即可提交作业到该端点中运行。

步骤四:使用API提交作业

本示例通过requests库展示一个简单示例,例如“request.py”,模拟通过OpenAPI从创建Spark Job到提交Spark Job的完整流程,请求脚本示例如下:

# -*- coding: utf-8 -*-
import requests
import urllib3
# 禁用 SSL 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ==================== 配置区域 ====================
# 替换为实际值
TOKEN = "XXX"  # IAM的token
WORKSPACE_ID = "3fa4b5a6-9208-4254-b823-119aa29c4016"  # 工作空间的ID
X-Client-Token = "68d2002f-866c-4c4d-84d2-747137b12e9d" # 服务事务ID,用于追踪请求,可自定义
# 请求头
HEADERS = {
    "X-Client-Token": X-Client-Token,
    "X-auth-token": TOKEN,
    "Content-Type": "application/json"
}
ENDPOINT = "https://fabric.cn-southwest-2.myhuaweicloud.com" # 实际使用时这个ENDPOINT为对应region endpoint的值,此处为贵阳一地址

# API URL
URL_SUBMIT = f"https://{ENDPOINT}/v2/workspaces/{WORKSPACE_ID}/spark-jobs"

# 作业配置
JOB_CONFIG = {
    "execution_mode": "batch",
    "name": "sparkpython_airan", # 作业名称
    "endpoint_name": "test", # spark job endpoint名称
    "catalog_name": "test",  # catalog名称
    "description": "test", # 作业描述
    "job_agency": "dli_obs_agency_access",  # 作业代理名称
    "spark_version": "4.0.0",  # spark版本号
    "job_config": {
        "job_type": "spark_python_job",
        "spark_py_parameter": {
            "main_python_file": "obs://bucket/dir1/",  # 即作业脚本存储的obs路径
            "main_args": []
        }
    },
    "resource_config": {
        "executor_number": 3,
        "driver_resource_spec": {
            "cpu": 2,
            "memory": "4GB",
            "disk": 10
        },
        "executor_resource_spec": {
            "cpu": 2,
            "memory": "4GB",
            "disk": 10
        }
    },
    "spark_config": {
        "spark.executor.memoryOverhead": "1g",
        "spark.driver.memoryOverhead": "512m",
        "spark.dynamicAllocation.enabled": "true",
        "spark.dynamicAllocation.maxExecutors": "3",
        "spark.shuffle.service.enabled": "false",
        "spark.dynamicAllocation.shuffleTracking.enabled": "true",
        "spark.default.parallelism": "4000",
        "spark.hadoop.fs.obs.security.provider": "com.huawei.spark.provider.JobAgencyCredentialProvider"
    },
    "restore_strategy": {
        "max_retry": 0,
        "retry_delay": 30,
        "launching_timeout": 10800,
        "running_timeout": -1
    },
    "labels": [
        {
            "key": "environment0",
            "value": "dev0"
        }
    ]
}
def submit_spark_job(config=None):
    """提交 Spark Job"""
    payload = config or JOB_CONFIG
    try:
        response = requests.post(
            URL_SUBMIT, headers=HEADERS, json=payload, verify=False
        )
        if response.status_code in [200, 201, 202]:
            result = response.json()
            job_id = result.get("id", "unknown")
            print(f"作业提交成功!")
            print(f"作业ID: {job_id}")
            print(f"响应: {response.text}")
            return job_id
        else:
            print(f"作业提交失败 (Code: {response.status_code})")
            print(response.text)
            return None
    except Exception as e:
        print(f"请求异常: {e}")
        return None


if __name__ == "__main__":
    submit_spark_job()

执行脚本“request.py”,即可提交一个文本处理AI DataLake Job至指定Spark Job端点。

相关文档