快速使用多模数据引擎Aura分析智能驾驶数据
操作场景
在智能驾驶数据处理场景中,车辆会采集大量的视频、图像数据,对于这些多模数据的处理,要经过预处理、向量化等环节,才能为构建智驾模型训练数据集提供数据输入,这种数据密集型、计算密集型的分析场景给计算资源的调度和数据存储的I/O能力提出了极高的要求。
在传统的数据分析中,开发人员通过配置串行流水线,先在CPU执行解码任务,然后将数据存盘,再调度NPU资源执行推理任务。这种方式存在以下问题:资源调度效率低、算力利用率低、数据读写效率低等问题,异构资源的管理增加了工程运维的复杂性。
多模数据引擎Aura是专为分析多模态类型数据而设计,支持数据的边读边算能力,避免了传统方式的频繁存盘操作,让视频解码、向量化等预处理环节流水线式执行,让智能驾驶数据生产业务流实现了从串行到并行的架构升级,真正释放了异构算力的全部潜力。
本节操作以智能驾驶场景的数据处理为示例,介绍在AI DataLake使用多模数据引擎Aura完成多模数据处理与分析的操作步骤。
在作业开发过程中使用了aura_frame SDK,请您联系客户支持提前获取aura_frame SDK安装包。
准备工作
- 已注册账号并实名认证,且账号不能处于欠费或冻结状态。
- 已开通AI DataLake服务并授权使用云服务资源。
- 已开通LakeFormation、OBS权限并进行了委托确认。
- 已开通DataArts Studio服务,并创建了DataArts Studio实例及工作空间。
方案架构
AI DataLake搭载Aura多模数据分析引擎,实现跨异构资源(One Pool)、不落盘(One Flow)的数据处理流水线。
- AI DataLake将CPU、GPU、NPU等异构资源进行统一池化抽象,实现资源的弹性调度与高效利用。通过统一资源池,将CPU、GPU、NPU异构资源进行统一管理。能够根据业务负载动态调整资源分配,实现资源的高效利用。
- AI DataLake通过LakeFormation统一管理元数据,实现数据处理过程中全程不落盘,大幅提升处理效率。
解码算子在CPU节点执行,向量化处理算子在NPU节点执行。Aura引擎自动通过LakeFormation完全避免中间数据落盘。这种设计消除了传统数据处理中磁盘I/O带来的性能瓶颈,显著降低数据处理延迟。
图1 Aura多模数据分析-智能驾驶数据预处理解决方案架构图
表1 与传统方案的对比 特性
传统方案
Aura方案
跨架构协同
串行且割裂:先在CPU跑解码任务,然后将数据存盘,再去NPU跑推理任务。
业务逻辑如同在一个单机进程内,资源调度无感知,而物理资源自动跨机流转。
数据流转
数据需重复落盘存储:中间图片数据必须写入对象存储,读写I/O效率低,存储资源成本增加。
数据不落盘,依赖纯内存/网络流转,数据在内存中解码后直接流向NPU节点。
资源利用率
计算资源闲置,利用效率低,任务串行,部分资源需要等待数据I/O。
计算资源利用效率高:流水线并行(Pipeline Parallelism)设计,异构算力不再闲置。
操作流程
本节介绍Aura Job的基本开发流程。技术实现流程:
- 环境准备
- 在AI DataLake管理控制台创建工作空间,用于提供独立的作业运行环境。
- 在AI DataLake管理控制台创建计算资源池,为作业的运行提供计算资源。
- 在AI DataLake管理控制台创建端点,配置Aura引擎与计算资源池的关联关系。
- 数据接入
- 获取Aura端点信息,建立与多模态数据引擎Aura的链接。
- 通过Lazy Mode读取元数据表。
- 算子注册
注册数据处理算子,构建数据处理流水线:
- 图片解码算子:支持多种格式的视频/图像解码。
- 图片向量化算子:将图像转换为高维特征向量。
- 执行作业就查看结果
- 执行多模态数据处理流水线。
- 查看处理后的多模态数据,输出高质量的特征向量,为智驾模型训练提供数据输入。
步骤一:规划并创建OBS并行文件系统
AI DataLake Aura通过OBS服务实现数据存储,需要先在OBS控制台进行桶及文件夹创建,并导入样例数据。
- 登录管理控制台。
- 在页面左上角单击
图标,选择“存储 > 对象存储服务”,进入对象存储服务页面。 - 以并行文件系统为例:
- 在并行文件系统页面,单击已创建的文件系统名称,例如“aura-serverless”。
- 在左侧导航栏选择“文件”,单击“新建文件夹”,填写待创建的文件夹名称,单击“确定”。继续单击该新创建的文件夹名称,单击“新建文件夹”,可以创建其子文件夹。
- 参考该步骤,依次创建用于存放元数据的路径,例如:
- Catalog存储路径:aura-serverless/catalog1
- 数据库存储路径:aura-serverless/catalog1/database1
步骤二:规划并创建Lakeformation实例、Catalog、数据库
AI DataLake Aura通过LakeFormation服务管理数据源,需要在LakeFormation购买实例,并配置该实例的Catalog、数据库信息。
- 登录管理控制台。
- 在页面左上角单击
图标,选择“大数据 > 湖仓构建 LakeFormation”,进入LakeFormation页面。 - 在“总览”页面右上角单击“购买实例”,配置相关参数购买LakeFormation实例。
- 在页面左上角选择切换到该实例。
图3 切换目标实例
- 创建Catalog。
- 在左侧导航栏选择“元数据 > Catalog”。
- 单击“创建Catalog”,配置以下参数后,单击“提交”。
参数
配置样例
参数说明
Catalog名称
lakeformation_for_auratest
填写待创建Catalog名称。
只能包含字母、数字和下划线,长度为1~256个字符。
Catalog类型
DEFAULT
选择Catalog类型:
- DEFAULT:即默认数据目录,用于管理存储在OBS中的数据资产。
- CLICKHOUSE:CLICKHOUSE Catalog是用于连接外部ClickHouse数据库的外部目录类型。
选择位置
obs://lakeformation-test/catalog1
Catalog数据存储在OBS桶中的位置。可选参数。
根据实际需要选择“并行文件系统”或“对象存储桶”,并选择位置后,单击“确定”。
- 如果配置该参数,则所选位置只能以“obs://”开头,且必须包含一个存储对象,例如选择“obs://lakeformation-test/catalog1”。如果没有合适的OBS桶,可以单击“前往OBS创建”进行创建。
- 该路径不能与其他LakeFormation实例元数据存储路径重复,以免造成数据冲突。
- 建议选择未被其他Catalog选中的文件夹。
描述
按需配置
所创建Catalog的描述信息。
长度为0~4000字节,1个中文字符对应3个字节。
- 创建完成后,即可在“Catalog”页面查看相关信息。
- 创建数据库。
- 在左侧导航栏选择“元数据 > 数据库”。
- 在右上角“Catalog”后的下拉框中选择步骤5创建的Catalog。
- 单击“创建数据库”,配置相关参数后,单击“提交”。
在正式使用新Catalog前,请先手动创建default数据库并指定合法OBS路径,可避免后续系统自动创建带来的隐藏风险。
如果当前已包含名称为“default”的数据库,则跳过数据库的创建操作。
参数
配置样例
参数说明
库名称
default
填写待创建数据库名称。
只能包含中文、字母、数字、下划线、中划线,长度为1~128个字符。
所属Catalog
lakeformation_for_auratest
待创建数据库所属Catalog。
本例选择步骤5创建的Catalog。
选择位置
obs://lakeformation-test/catalog1/default
数据库信息存储在OBS桶中的位置。
根据实际需要选择“并行文件系统”或“对象存储桶”,并选择位置后,单击“确定”。
- 所选位置只能以“obs://”开头,且必须包含一个存储对象,例如选择“obs://lakeformation-test/catalog1/default”。如果没有合适的OBS桶,可以单击“前往OBS创建”进行创建。
- 该路径必须与所属的Catalog存储路径(即创建Catalog时配置的“选择位置”参数)不同。
- 该路径不能与其他LakeFormation实例元数据存储路径重复,以免造成数据冲突。
- 如果所属Catalog配置了“数据库存储位置”参数,则此处该参数必须选择为所属Catalog“选择位置”的子路径、或“数据库存储位置”的子路径。
描述
按需配置
所创建数据库的描述信息。
长度为0~4000字节,1个中文字符对应3个字节。
- 创建完成后,即可在“数据库”页面查看详细信息。
步骤三:创建工作空间
- 登录AI DataLake管理控制台。
- 在左侧导航栏中,单击工作空间区域。
- 在下拉列表中选择“创建工作空间” 。
- 配置工作空间的相关参数:
表2 创建工作空间参数说明 类型
参数
配置样例
说明
基础信息
工作空间名称
workspace_auratest
工作空间的具体名称。
- 名称只能包含字母、中文、数字、中划线、下划线。
- 输入长度为4~32个字符。
工作空间名称不区分大小写,系统会自动转换为小写。
描述
智能驾驶数据分析
对工作空间的简要描述。
企业项目
default
如果所建工作空间属于企业项目,可选择对应的企业项目。
企业项目是一种云资源管理方式,企业项目管理服务提供统一的云资源按项目管理,以及项目内的资源管理、成员管理。
关于如何设置企业项目请参考《企业管理用户指南》。
说明:只有开通了企业管理服务的用户才显示该参数。
多模数据管理
LakeFormation实例
lakeformation_for_auratest
选择当前工作空间关联的LakeFormation实例,即步骤二:规划并创建LakeFormation实例、Catalog、数据库创建的LakeFormation实例。
每个工作空间需关联1个LakeFormation实例, 空间创建后已关联的实例不支持修改。
- 确认所有配置信息无误。
工作空间创建成功后,您可以在工作空间管理的列表中查看新创建的工作空间。
步骤四:创建计算资源池
- 在AI DataLake管理控制台页面,切换页面右上角的工作空间为步骤三:创建工作空间新创建的空间。
- 单击左侧导航栏的“新建”,在下拉框中选择“计算资源池”进入购买计算资源池页面。
- 在“购买计算资源池”界面,填写具体参数,参数填写参考表2。
表3 购买计算资源池参数说明 参数
配置样例
说明
计费模式
按需计费
选择“按需计费”。
按需计费即后付费模式,按实际使用量计费,在购买周期内资源独享,空闲时资源不被释放。
资源池名称
resource_pool_for_auratest
计算资源池的具体名称。
- 名称只能包含数字、小写英文字母和中划线,且只能以字母开头、以字母或数字结尾。
- 输入长度不能超过63个字符。
CPU 资源
本例配置8个通用计算标准型实例。
勾选CPU资源后,选择资源规格并配置购买的实例数量。
CPU是通用型计算资源,适用于各种类型的计算任务。
- CPU资源的特点:通用性强,适合各种类型的计算任务。相比于GPU和NPU的成本更低。擅长处理顺序执行的任务。
- CPU资源的适用场景:ETL 数据抽取、转换、加载;轻量计算场景,例如小规模数据处理、脚本运行,日志分析场景,例如日志采集、解析、统计分析。
- CPU资源的具体规格请参考产品规格
GPU 资源
本例不购买GPU实例
勾选GPU资源后,选择资源规格并配置购买的实例数量。
GPU是图形处理器适用于图形渲染和大规模并行计算场景,适合深度学习训练和科学计算。GPU拥有成百上千个计算核心,可以同时处理大量简单计算任务。
- GPU资源的特点:支持并行计算,适用于批处理作业场景,数千个核心同时计算,处理效率高。天然适合深度学习、神经网络、AI计算场景。
- GPU资源的适用场景:深度学习,例如神经网络训练、模型调优场景;图形处理场景,例如图像识别、目标检测;视频处理场景,例如视频分析、转码、视频渲染场景,等其他AI科学计算场景。
- GPU资源的具体规格请参考产品规格
NPU 资源
本例购买1个昇腾AI加速型(B3)1卡
勾选NPU资源后,选择资源规格并配置购买的实例数量。
NPU适用于AI计算场景。NPU资源采用架构优化和指令集,专门加速AI推理任务,具有高能效比的特点。
- NPU资源的特点:AI计算场景专用,具备高性能、低延迟、推理成本更优的特点。
- NPU资源的适用场景:AI计算场景、图像识别场景,推荐系统等AI计算设计场景。
- NPU资源的具体规格请参考产品规格
AI DataLake 网络
自定义
选择AI DataLake资源池所属网络,该网络基于虚拟私有云(VPC)进行封装。如果不存在可使用的网络,也可单击“创建网络”进行创建。
一个工作空间仅支持创建一个网络。
- 参数填写完成后,单击“立即购买”,在界面上确认当前配置是否正确。
- 单击“提交”完成创建。等待资源池状态变成“可使用”表示当前资源池创建成功。
步骤五:创建运行作业的Aura端点
- 在AI DataLake管理控制台页面,切换页面右上角的工作空间为步骤三:创建工作空间新创建的空间。
- 在左侧导航栏选择“引擎管理 > 多模数据引擎Aura”进入Aura引擎端点列表页面。
- 单击页面右上角的“创建端点”,配置以下参数并单击“立即创建”。
表4 创建Aura引擎端点 参数
配置样例
参数说明
端点类型
Job端点
选择端点类型。
- SQL 端点:提供交互式查询,满足BI报表与实时分析需求。
- Job 端点:用于执行定时调度任务,保障大规模数据稳定处理。
端点名称
aura_endpoint_test001
输入端点名称。
资源使用模式
混合模式
选择资源使用模式。
选择资源池
resource_pool_for_auratest
在下拉框中选择步骤四:创建计算资源池已创建的资源池。
CPU资源
8
配置对应CPU的保障配额及最大配额。
CPU为通用计算处理器,适合数据处理、ETL、批处理等任务。
GPU资源
不涉及
配置对应GPU的保障配额及最大配额。
GPU为神经网络处理器,擅长AI推理任务。
NPU资源
24
配置对应NPU的保障配额及最大配额。
NPU为图形处理器,具有强大的并行计算能力。
日志对接LTS
勾选“日志对接LTD”
是否对接LTS。对接LTS后日志会投递到云日志服务(Log Tank Service,简称LTS)进行管理。可以使用LTS对云服务日志进行关键词搜索、运营数据统计分析、运行状况监控告警等多种操作。
勾选该参数后,还需配置“日志组”和“日志流”参数。
日志组
container_aurajob_test
在下拉框中选择日志组,如果下拉框中没有可选的日志组,可以单击“创建日志组”进行创建。
日志组(LogGroup)是云日志服务进行日志管理的基本单位,用于对日志流进行分类,一个日志组下面可以创建多个日志流。日志组本身不存储任何日志数据,仅方便管理日志流,每个账号下可以创建100个日志组。
日志流
container_job_log
在下拉框中选择日志流,如果下拉框中没有可选的日志流,可以单击“创建日志流”进行创建。
云日志服务是以日志流(LogStream)作为日志管理维度。日志采集后,以日志流为单位,将不同类型的日志分类存储在不同的日志流上,方便对日志进一步分类管理。
- 端点创建后,可在列表中查看相关信息,端点状态变为“已就绪”后即可提交作业到该端点中运行。
步骤六:连接多模态数据引擎Aura的端点
- 获取步骤五:创建运行作业的Aura端点中创建的端点的基本信息。
- 使用aura_frame配置与端点的连接:
from fabric_data.multimodal import ai_lake import logging import os access_key = os.environ.get("access_key") secret_key = os.environ.get("secret_key") target_database = "multimodal_lake" # 建立连接 conn = ai_lake.connect( fabric_endpoint="100.85.xxx.xxx:xxxxx", fabric_endpoint_id="8a708bbf-f862-4c53-9622-xxxxxxxxx", fabric_workspace_id="ca319048-b07c-498c-97df-xxxxxxxxxx", lf_catalog_name="fabricsql_default", lf_instance_id="0102c9cf-0759-4478-b42d-xxxxxxxxxx", access_key=access_key, secret_key=secret_key, default_database=target_database, use_single_cn_mode=True, logging_level=logging.WARNING ) conn连接成功的回显信息:
<fabric_data.multimodal.ai_lake.FabricConnection at 0x2717948e990>
步骤七:加载原始数据(元信息表)
通过 Lazy Mode读取元数据表,加载Schema。
输入数据:包含 camera_type (摄像头类型) 和 image_uri (图片存储路径) 的表。
metadata_table_name = "image_object_table" ds = conn.load_dataset(metadata_table_name).limit(3) ds.execute(
返回结果:
camera_type image_uri 0 FRONT_FISHEYE obs://aura-test/cherry-data/A_1_417.3375.dat\r 1 FRONT_LEFT obs://aura-test/cherry-data/B_30_506.1382.dat\r 2 FRONT_TELE obs://aura-test/cherry-data/D_40_536.7318.dat\r
步骤八:注册数据处理算子
当前案例中涉及的算子如下,需要您先在本地完成算子的开发,然后再按照本节的操作注册算子。
- DownloadAndDecodeImage:下载原始数据并进行图片解码
- ClipExtractEmbedding:使用CLIP模型进行图片向量化处理
- 设置UDF归档存储位置(仅需一次)
conn.set_function_staging_workspace( obs_directory_base="data_ops_dev/user-defined-functions", obs_bucket_name="xxx", obs_server="obs.xxx.xxx.com", access_key=access_key, secret_key=secret_key )
- 注册UDF:以图片Embedding模型离线推理为例。
from fabric_data.multimodal.types import image, embedding from onnx_model import OnnxModel import numpy as np class ClipExtractEmbedding: def __init__(self): self.img_size = 224 self.model = OnnxModel(providers=['CPUExecutionProvider']) self.model.init_onnx_session('img.onnx') def __call__(self, img: image.Image) -> embedding.EmbeddingVector: image_pil = img.pil_image.convert('RGB') image_pil = image_pil.resize((224, 224)) image_array = np.array(image_pil, dtype=np.float32) image_array = image_array.transpose((2, 0, 1)) image_array = image_array[np.newaxis, :] emb = self.model.calc(image_array) vector = emb.tolist()[0] dims = len(vector) return embedding.EmbeddingVector({"vector": vector,"dims": dims}) conn.delete_function('ClipExtractEmbedding', if_it_exists=True) conn.create_scalar_function( ClipExtractEmbedding, imports=['img.onnx', "onnx_model.py"], packages=["pillow", "onnx==1.17.0", "onnxruntime", "numpy==1.25.2", "protobu database=target_database, comment="Extract image embedding using CLIP model", - 执行list_functions查看算子的注册结果。
conn.list_functions(database=target_database)
表5 注册的算子 序号
udf name
signature
description
0
ClipExtractEmbedding
(img: Image) ->EmbeddingVector
Extract imageembedding usingCLIP model
1
CosineSimilarity
(lhs: EmbeddingVector, rhs:EmbeddingVector) -> doubleprecision
Compute cosinesimilarity between2 embeddingvectors
2
DownloadAndDecodeImage
(text) -> Image
Download raw dataand decode image
3
DownloadAudioFromUrl
(text) -> Audio
None
4
PrivateImageEmbedding
(text) -> EmbeddingVector
Apply embeddingmodel on image
5
SpeakerWordsUDAF
(bigint, text) -> Image
None
6
SpeechRecognitionUDF
(audios: Audio) -> text
None
7
VideoContentAnalyzer
(vio: Video) ->struct
None
步骤九:提交数据分析作业
metadata_table_name = "image_object_table"
ds = conn.load_dataset(metadata_table_name, database=target_database)
# 1. 图片解码算子依赖用户提供的动态链接库,必须在x86机器上运行
ds = ds.map(
fn= '{}.DownloadAndDecodeImage'.format(target_database),
on=['image_uri'],
as_col='image',
num_dpus=0.5,
concurrency=6
)
# 2. 图片转Embedding算子需要使用NPU机器上运行
ds = ds.map(
fn='{}.ClipExtractEmbedding'.format(target_database),
on=['image'],
as_col='embedding',
num_apus=1,
apu_model='NPU/Ascend910B3/1'
)
In [7]:
高:流水线并行(Pipeline Parallelism)
设计,打满异构算力。
# 3. 将处理后的多模态数据存储至Iceberg表中
iceberg_table_name="iceberg_mm_tbl"
conn.delete_table(iceberg_table_name, if_it_exists=True)
ds.write_iceberg(
target_name=iceberg_table_name,
database=target_database,
create_if_not_exists=True
)
步骤十:查看处理后的多模态数据
ds = conn.load_dataset(iceberg_table_name).limit(3) ds.show()
|
camera_type |
image_uri |
image |
embedding |
|---|---|---|---|
|
FRONT_FISHEYE |
obs://aura-test/cherry-data/A_1_417.3375.dat\r |
|
Embedding(shape=(512,), data=[0.391,-0.83,-0.105,...]) |
|
FRONT_LEFT |
obs://aura-test/cherry-data/B_30_506.1382.dat\r |
|
Embedding(shape=(512,), data=[0.243,-0.87,-0.191,...]) |
|
FRONT_TELE |
obs://aura-test/cherry-data/D_40_536.7318.dat\r |
|
Embedding(shape=(512,), data=[0.409,-0.802,-0.2,...]) |
步骤十一:断开连接并清理计算资源
conn.close()


