基于PySpark的数据处理
操作场景
AI DataLake是华为云提供的全托管的湖仓一体大数据分析服务,提供端到端的数据管理与分析能力,支持多种引擎的作业开发,满足多样化的大数据处理需求。
本文以通过创建Spark引擎端点、连接数据、使用API提交作业为例,演示通过AI DataLake分析数据的操作指导。
操作流程
开始使用如下样例前,请务必按准备工作指导完成必要操作。
- 规划并创建OBS桶:创建一个用于存储作业脚本的OBS桶。
- 创建工作空间:创建一个工作空间并关联LakeFormation实例。
- 创建运行作业的Spark Job端点:创建运行作业的Spark Job端点。
- 使用API提交作业:使用API封装rest请求提交AI DataLake作业至Spark Job端点运行,本入门以提交一个简单线性回归模型推理的AI DataLake作业为例进行演示。
准备工作
- 已注册账号并实名认证,且账号不能处于欠费或冻结状态。
- 已开通AI DataLake服务并授权使用云服务资源。
- 已开通LakeFormation服务并创建实例。
- 已开通OBS权限并进行了委托确认。
- 已经获取IAM Token,详细操作请参见获取IAM用户Token。
步骤一:规划并创建OBS桶
AI DataLake Spark通过OBS服务实现作业提交与数据存储,需要先在OBS控制台进行桶及文件夹创建,并导入样例数据或作业脚本。
- 登录管理控制台。
- 在页面左上角单击
图标,选择“存储 > 对象存储服务”,进入对象存储服务页面。 - 选择“桶列表 > 创建桶”,进入创建桶页面,配置相关参数后单击“立即创建”。
- 桶名称:根据界面要求设置桶名称。
- 其他参数根据实际情况选择。
- 在桶列表页面,单击已创建的桶名称。
- 在“对象”页签,单击“上传对象”,将作业脚本上传至所创建的OBS桶中。
例如文本处理脚本“text-job.py”,完整脚本示例如下:
from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("SimpleSpark") \ .getOrCreate() # 简单的 RDD 操作 rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5], 3) data = rdd.map(lambda x: x + 2).collect() print("Result:", data) spark.stop()
步骤二:创建工作空间
- 登录AI DataLake管理控制台。
- 在左侧导航栏中,单击工作空间区域。
- 在下拉列表中选择“创建工作空间”。
- 配置工作空间的相关参数。
表1 工作空间的基本信息 类型
参数
是否必填
说明
基础信息
工作空间名称
是
工作空间的具体名称,同一个账号下的工作空间不可重名。
- 名称只能包含字母、数字、中划线、下划线。
- 名称字符长度为4~32位。
工作空间名称不区分大小写,系统会自动转换为小写。
描述
否
对工作空间的简要描述,最长不超过255个字符。
企业项目
否
如果所建工作空间属于企业项目,可选择对应的企业项目。
企业项目是一种云资源管理方式,企业项目管理服务提供统一的云资源按项目管理,以及项目内的资源管理、成员管理。
关于如何设置企业项目请参考《企业管理用户指南》。
说明:只有开通了企业管理服务的用户才显示该参数。
多模数据管理
LakeFormation实例
是
选择当前工作空间关联的LakeFormation实例,每个工作空间需关联1个LakeFormation实例,空间创建后已关联的实例不支持修改。
- 选择已创建的LakeFormation实例,用于统一管理元数据和数据访问控制。
- 如无可用实例,请先创建LakeFormation实例,具体操作请参考创建LakeFormation实例。
- 确认所有配置信息无误。
工作空间创建成功后,您可以在工作空间管理的列表中查看新创建的工作空间。
步骤三:创建运行作业的Spark Job端点
- 在AI DataLake管理控制台页面,切换页面右上角的工作空间为步骤二:创建工作空间新创建的空间。
- 在左侧导航栏选择“引擎端点 > 批处理引擎Spark”进入Spark端点列表页面。
- 单击页面右上角的“创建”,根据界面提示配置参数。
- 配置端点的基础信息。
表2 端点的基础信息配置 参数
配置样例
参数说明
端点类型
SparkJob
选择端点类型。
- SparkSQL:适用于数据查询和分析的Spark SQL场景,通过编写SQL语句快速完成数据的筛选、聚合、计算等操作,满足数据分析师需要快速探索数据的业务需求。
- SparkJob:适用于复杂数据处理的Spark Job场景,适合执行大规模数据转换、机器学习模型训练、ETL作业等复杂的数据处理任务。
端点名称
spark-job-01
端点名称是系统内部的唯一标识,在API调用、CLI命令中引用该端点。
说明:端点名称创建后不可修改。
命名规则:
- 名称只能包含小写字母、数字、中划线,且只能以字母开头,以字母或数字结尾。
- 输入长度不能超过63个字符。
端点显示名称
智能驾驶团队专用SparkJob端点-01
端点显示名称创建后可修改。
端点显示名称用于用户界面展示,方便用户识别和记忆。
命名规则:
- 名称只能输入中文、英文、数字、中划线。
- 输入长度不能超过63个字符
添加描述
智能驾驶团队专用
填写描述信息,用于详细说明端点的用途、用途背景等信息,帮助其他用户理解该端点的创建目的和使用场景。
- 配置端点的“资源配置”类信息。
表3 端点的资源配置 参数
配置样例
参数说明
资源使用模式
按需弹性
当前Spark端点仅支持选择“按需弹性”的资源模式。
“按需弹性”的资源模式使用的是弹性资源,无需提前购买,按实际使用量计费,无业务运行不产生费用。
适用于短期或偶发性的业务需求,如开发测试或临时任务。
端点CPU使用最大值
216
选择“按需弹性”的资源模式时需要配置端点CPU使用最大值,即设置弹性资源的使用上限,防止突发流量导致资源无限扩展,耗尽集群算力。
同时系统根据设置的CPU使用最大值可以进行合理的资源调度和分配。
- 配置端点的“调度配置”类信息。
- 配置Spark参数。
以“key=value”的形式设置提交Spark作业的属性。
- 键(Key):输入Spark作业所需的配置参数名称,例如spark.executor.memory等。
- 值(Value):为对应的键设置参数值,例如true、4g等。
- 配置端点的“存储配置”信息。
表4 存储配置信息 参数
配置样例
参数说明
选择OBS桶
test
通过下拉菜单选择已有的OBS桶。用于存储作业日志和临时数据等信息。
创建后不支持修改,若未配置,可点击“新建OBS桶”进行创建。
委托名
test01
配置用于读写作业event log、日志和临时数据等信息所使用的委托(即权限委托,如IAM委托)。
通过下拉菜单选择已有的委托。创建后不支持修改,若未配置,可点击“新建委托”进行创建。
- 配置端点的“作业委托配置”信息。
参数
配置样例
参数说明
委托名
test02
委托给该端点的权限,会在运行时自动换取凭证并刷新至Spark集群。用户可在Spark集群中使用该委托凭证调用对应服务。
- 配置端点的基础信息。
- 完成上述信息的配置后,单击“确认”,系统按照配置要求开始创建端点。
步骤四:使用API提交作业
本示例通过requests库展示一个简单示例,例如“request.py”,模拟通过OpenAPI从创建Spark Job到提交Spark Job的完整流程,请求脚本示例如下:
# -*- coding: utf-8 -*-
import requests
import urllib3
# 禁用 SSL 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# ==================== 配置区域 ====================
# 替换为实际值
TOKEN = "XXX" # IAM的token
WORKSPACE_ID = "3fa4b5a6-9208-4254-b823-119aa29c4016" # 工作空间的ID
X-Client-Token = "68d2002f-866c-4c4d-84d2-747137b12e9d" # 服务事务ID,用于追踪请求,可自定义
# 请求头
HEADERS = {
"X-Client-Token": X-Client-Token,
"X-auth-token": TOKEN,
"Content-Type": "application/json"
}
ENDPOINT = "https://fabric.cn-southwest-2.myhuaweicloud.com" # 实际使用时这个ENDPOINT为对应region endpoint的值,此处为贵阳一地址
# API URL
URL_SUBMIT = f"https://{ENDPOINT}/v2/workspaces/{WORKSPACE_ID}/spark-jobs"
# 作业配置
JOB_CONFIG = {
"execution_mode": "batch",
"name": "sparkpython_airan", # 作业名称
"endpoint_name": "test", # spark job endpoint名称
"catalog_name": "test", # catalog名称
"description": "test", # 作业描述
"job_agency": "dli_obs_agency_access", # 作业代理名称
"spark_version": "4.0.0", # spark版本号
"job_config": {
"job_type": "spark_python_job",
"spark_py_parameter": {
"main_python_file": "obs://bucket/dir1/", # 即作业脚本存储的obs路径
"main_args": []
}
},
"resource_config": {
"executor_number": 3,
"driver_resource_spec": {
"cpu": 2,
"memory": "4GB",
"disk": 10
},
"executor_resource_spec": {
"cpu": 2,
"memory": "4GB",
"disk": 10
}
},
"spark_config": {
"spark.executor.memoryOverhead": "1g",
"spark.driver.memoryOverhead": "512m",
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.maxExecutors": "3",
"spark.shuffle.service.enabled": "false",
"spark.dynamicAllocation.shuffleTracking.enabled": "true",
"spark.default.parallelism": "4000",
"spark.hadoop.fs.obs.security.provider": "com.huawei.spark.provider.JobAgencyCredentialProvider"
},
"restore_strategy": {
"max_retry": 0,
"retry_delay": 30,
"launching_timeout": 10800,
"running_timeout": -1
},
"labels": [
{
"key": "environment0",
"value": "dev0"
}
]
}
def submit_spark_job(config=None):
"""提交 Spark Job"""
payload = config or JOB_CONFIG
try:
response = requests.post(
URL_SUBMIT, headers=HEADERS, json=payload, verify=False
)
if response.status_code in [200, 201, 202]:
result = response.json()
job_id = result.get("id", "unknown")
print(f"作业提交成功!")
print(f"作业ID: {job_id}")
print(f"响应: {response.text}")
return job_id
else:
print(f"作业提交失败 (Code: {response.status_code})")
print(response.text)
return None
except Exception as e:
print(f"请求异常: {e}")
return None
if __name__ == "__main__":
submit_spark_job() 执行脚本“request.py”,即可提交一个文本处理AI DataLake Job至指定Spark Job端点。