文档首页/ 智能数据湖 AIDataLake/ 快速入门/ 快速使用多模数据引擎Aura分析智能驾驶数据
更新时间:2026-04-14 GMT+08:00
分享

快速使用多模数据引擎Aura分析智能驾驶数据

操作场景

在智能驾驶数据处理场景中,车辆会采集大量的视频、图像数据,对于这些多模数据的处理,要经过预处理、向量化等环节,才能为构建智驾模型训练数据集提供数据输入,这种数据密集型、计算密集型的分析场景给计算资源的调度和数据存储的I/O能力提出了极高的要求。

在传统的数据分析中,开发人员通过配置串行流水线,先在CPU执行解码任务,然后将数据存盘,再调度NPU资源执行推理任务。这种方式存在以下问题:资源调度效率低、算力利用率低、数据读写效率低等问题,异构资源的管理增加了工程运维的复杂性。

多模数据引擎Aura是专为分析多模态类型数据而设计,支持数据的边读边算能力,避免了传统方式的频繁存盘操作,让视频解码、向量化等预处理环节流水线式执行,让智能驾驶数据生产业务流实现了从串行到并行的架构升级,真正释放了异构算力的全部潜力。

本节操作以智能驾驶场景的数据处理为示例,介绍在AI DataLake使用多模数据引擎Aura完成多模数据处理与分析的操作步骤。

在作业开发过程中使用了aura_frame SDK,请您联系客户支持提前获取aura_frame SDK安装包。

准备工作

  • 已注册账号并实名认证,且账号不能处于欠费或冻结状态。
  • 已开通AI DataLake服务并授权使用云服务资源。
  • 已开通LakeFormation、OBS权限并进行了委托确认。
  • 已开通DataArts Studio服务,并创建了DataArts Studio实例及工作空间。

方案架构

AI DataLake搭载Aura多模数据分析引擎,实现跨异构资源(One Pool)、不落盘(One Flow)的数据处理流水线。

  • AI DataLake将CPU、GPU、NPU等异构资源进行统一池化抽象,实现资源的弹性调度与高效利用。通过统一资源池,将CPU、GPU、NPU异构资源进行统一管理。能够根据业务负载动态调整资源分配,实现资源的高效利用。
  • AI DataLake通过LakeFormation统一管理元数据,实现数据处理过程中全程不落盘,大幅提升处理效率。

    解码算子在CPU节点执行,向量化处理算子在NPU节点执行。Aura引擎自动通过LakeFormation完全避免中间数据落盘。这种设计消除了传统数据处理中磁盘I/O带来的性能瓶颈,显著降低数据处理延迟。

    图1 Aura多模数据分析-智能驾驶数据预处理解决方案架构图
    表1 与传统方案的对比

    特性

    传统方案

    Aura方案

    跨架构协同

    串行且割裂:先在CPU跑解码任务,然后将数据存盘,再去NPU跑推理任务。

    业务逻辑如同在一个单机进程内,资源调度无感知,而物理资源自动跨机流转。

    数据流转

    数据需重复落盘存储:中间图片数据必须写入对象存储,读写I/O效率低,存储资源成本增加。

    数据不落盘,依赖纯内存/网络流转,数据在内存中解码后直接流向NPU节点。

    资源利用率

    计算资源闲置,利用效率低,任务串行,部分资源需要等待数据I/O。

    计算资源利用效率高:流水线并行(Pipeline Parallelism)设计,异构算力不再闲置。

操作流程

图2 操作流程

本节介绍Aura Job的基本开发流程。技术实现流程:

  1. 环境准备
    1. AI DataLake管理控制台创建工作空间,用于提供独立的作业运行环境。
    2. AI DataLake管理控制台创建计算资源池,为作业的运行提供计算资源。
    3. AI DataLake管理控制台创建端点,配置Aura引擎与计算资源池的关联关系。
  2. 数据接入
    1. 获取Aura端点信息,建立与多模态数据引擎Aura的链接。
    2. 通过Lazy Mode读取元数据表。
  3. 算子注册
    注册数据处理算子,构建数据处理流水线:
    • 图片解码算子:支持多种格式的视频/图像解码。
    • 图片向量化算子:将图像转换为高维特征向量。
  4. 执行作业就查看结果
    1. 执行多模态数据处理流水线。
    2. 查看处理后的多模态数据,输出高质量的特征向量,为智驾模型训练提供数据输入。

步骤一:规划并创建OBS并行文件系统

AI DataLake Aura通过OBS服务实现数据存储,需要先在OBS控制台进行桶及文件夹创建,并导入样例数据。

  1. 登录管理控制台。
  2. 在页面左上角单击图标,选择“存储 > 对象存储服务”,进入对象存储服务页面。
  3. 以并行文件系统为例:
    选择“并行文件系统 > 创建并行文件系统”,进入创建页面,配置相关参数后单击“立即创建”。
    • 文件系统名称:根据界面要求设置并行文件系统名称,例如“aura-serverless”。
    • 其他参数根据实际情况选择。
  4. 在并行文件系统页面,单击已创建的文件系统名称,例如“aura-serverless”。
  5. 在左侧导航栏选择“文件”,单击“新建文件夹”,填写待创建的文件夹名称,单击“确定”。继续单击该新创建的文件夹名称,单击“新建文件夹”,可以创建其子文件夹。
  6. 参考该步骤,依次创建用于存放元数据的路径,例如:
    • Catalog存储路径:aura-serverless/catalog1
    • 数据库存储路径:aura-serverless/catalog1/database1

步骤二:规划并创建Lakeformation实例、Catalog、数据库

AI DataLake Aura通过LakeFormation服务管理数据源,需要在LakeFormation购买实例,并配置该实例的Catalog、数据库信息。

  1. 登录管理控制台。
  2. 在页面左上角单击图标,选择“大数据 > 湖仓构建 LakeFormation”,进入LakeFormation页面。
  3. 在“总览”页面右上角单击“购买实例”,配置相关参数购买LakeFormation实例。
  4. 在页面左上角选择切换到该实例。
    图3 切换目标实例
  5. 创建Catalog。
    1. 在左侧导航栏选择“元数据 > Catalog”。
    2. 单击“创建Catalog”,配置以下参数后,单击“提交”。

      参数

      配置样例

      参数说明

      Catalog名称

      lakeformation_for_auratest

      填写待创建Catalog名称。

      只能包含字母、数字和下划线,长度为1~256个字符。

      Catalog类型

      DEFAULT

      选择Catalog类型:

      • DEFAULT:即默认数据目录,用于管理存储在OBS中的数据资产。
      • CLICKHOUSE:CLICKHOUSE Catalog是用于连接外部ClickHouse数据库的外部目录类型。

      选择位置

      obs://lakeformation-test/catalog1

      Catalog数据存储在OBS桶中的位置。可选参数。

      根据实际需要选择“并行文件系统”或“对象存储桶”,并选择位置后,单击“确定”。

      • 如果配置该参数,则所选位置只能以“obs://”开头,且必须包含一个存储对象,例如选择“obs://lakeformation-test/catalog1”。如果没有合适的OBS桶,可以单击“前往OBS创建”进行创建。
      • 该路径不能与其他LakeFormation实例元数据存储路径重复,以免造成数据冲突。
      • 建议选择未被其他Catalog选中的文件夹。

      描述

      按需配置

      所创建Catalog的描述信息。

      长度为0~4000字节,1个中文字符对应3个字节。

    3. 创建完成后,即可在“Catalog”页面查看相关信息。
  6. 创建数据库。
    1. 在左侧导航栏选择“元数据 > 数据库”。
    2. 在右上角“Catalog”后的下拉框中选择步骤5创建的Catalog。
    3. 单击“创建数据库”,配置相关参数后,单击“提交”。

      在正式使用新Catalog前,请先手动创建default数据库并指定合法OBS路径,可避免后续系统自动创建带来的隐藏风险。

      如果当前已包含名称为“default”的数据库,则跳过数据库的创建操作。

      参数

      配置样例

      参数说明

      库名称

      default

      填写待创建数据库名称。

      只能包含中文、字母、数字、下划线、中划线,长度为1~128个字符。

      所属Catalog

      lakeformation_for_auratest

      待创建数据库所属Catalog。

      本例选择步骤5创建的Catalog。

      选择位置

      obs://lakeformation-test/catalog1/default

      数据库信息存储在OBS桶中的位置。

      根据实际需要选择“并行文件系统”或“对象存储桶”,并选择位置后,单击“确定”。

      • 所选位置只能以“obs://”开头,且必须包含一个存储对象,例如选择“obs://lakeformation-test/catalog1/default”。如果没有合适的OBS桶,可以单击“前往OBS创建”进行创建。
      • 该路径必须与所属的Catalog存储路径(即创建Catalog时配置的“选择位置”参数)不同。
      • 该路径不能与其他LakeFormation实例元数据存储路径重复,以免造成数据冲突。
      • 如果所属Catalog配置了“数据库存储位置”参数,则此处该参数必须选择为所属Catalog“选择位置”的子路径、或“数据库存储位置”的子路径。

      描述

      按需配置

      所创建数据库的描述信息。

      长度为0~4000字节,1个中文字符对应3个字节。

    4. 创建完成后,即可在“数据库”页面查看详细信息。

步骤三:创建工作空间

  1. 登录AI DataLake管理控制台。
  2. 在左侧导航栏中,单击工作空间区域。
  3. 在下拉列表中选择“创建工作空间” 。
  4. 配置工作空间的相关参数:
    表2 创建工作空间参数说明

    类型

    参数

    配置样例

    说明

    基础信息

    工作空间名称

    workspace_auratest

    工作空间的具体名称。

    • 名称只能包含字母、中文、数字、中划线、下划线。
    • 输入长度为4~32个字符。

    工作空间名称不区分大小写,系统会自动转换为小写。

    描述

    智能驾驶数据分析

    对工作空间的简要描述。

    企业项目

    default

    如果所建工作空间属于企业项目,可选择对应的企业项目。

    企业项目是一种云资源管理方式,企业项目管理服务提供统一的云资源按项目管理,以及项目内的资源管理、成员管理。

    关于如何设置企业项目请参考《企业管理用户指南》。

    说明:

    只有开通了企业管理服务的用户才显示该参数。

    多模数据管理

    LakeFormation实例

    lakeformation_for_auratest

    选择当前工作空间关联的LakeFormation实例,即步骤二:规划并创建LakeFormation实例、Catalog、数据库创建的LakeFormation实例。

    每个工作空间需关联1个LakeFormation实例, 空间创建后已关联的实例不支持修改。

  5. 确认所有配置信息无误。

    单击“立即创建”按钮,完成工作空间创建。

    工作空间创建成功后,您可以在工作空间管理的列表中查看新创建的工作空间。

步骤四:创建计算资源池

  1. AI DataLake管理控制台页面,切换页面右上角的工作空间为步骤三:创建工作空间新创建的空间。
  2. 单击左侧导航栏的“新建”,在下拉框中选择“计算资源池”进入购买计算资源池页面。
  3. 在“购买计算资源池”界面,填写具体参数,参数填写参考表2
    表3 购买计算资源池参数说明

    参数

    配置样例

    说明

    计费模式

    按需计费

    选择“按需计费”。

    按需计费即后付费模式,按实际使用量计费,在购买周期内资源独享,空闲时资源不被释放。

    资源池名称

    resource_pool_for_auratest

    计算资源池的具体名称。

    • 名称只能包含数字、小写英文字母和中划线,且只能以字母开头、以字母或数字结尾。
    • 输入长度不能超过63个字符。

    CPU 资源

    本例配置8个通用计算标准型实例。

    勾选CPU资源后,选择资源规格并配置购买的实例数量。

    CPU是通用型计算资源,适用于各种类型的计算任务。

    • CPU资源的特点:通用性强,适合各种类型的计算任务。相比于GPU和NPU的成本更低。擅长处理顺序执行的任务。
    • CPU资源的适用场景:ETL 数据抽取、转换、加载;轻量计算场景,例如小规模数据处理、脚本运行,日志分析场景,例如日志采集、解析、统计分析。
    • CPU资源的具体规格请参考产品规格

    GPU 资源

    本例不购买GPU实例

    勾选GPU资源后,选择资源规格并配置购买的实例数量。

    GPU是图形处理器适用于图形渲染和大规模并行计算场景,适合深度学习训练和科学计算。GPU拥有成百上千个计算核心,可以同时处理大量简单计算任务。

    • GPU资源的特点:支持并行计算,适用于批处理作业场景,数千个核心同时计算,处理效率高。天然适合深度学习、神经网络、AI计算场景。
    • GPU资源的适用场景:深度学习,例如神经网络训练、模型调优场景;图形处理场景,例如图像识别、目标检测;视频处理场景,例如视频分析、转码、视频渲染场景,等其他AI科学计算场景。
    • GPU资源的具体规格请参考产品规格

    NPU 资源

    本例购买1个昇腾AI加速型(B3)1卡

    勾选NPU资源后,选择资源规格并配置购买的实例数量。

    NPU适用于AI计算场景。NPU资源采用架构优化和指令集,专门加速AI推理任务,具有高能效比的特点。

    • NPU资源的特点:AI计算场景专用,具备高性能、低延迟、推理成本更优的特点。
    • NPU资源的适用场景:AI计算场景、图像识别场景,推荐系统等AI计算设计场景。
    • NPU资源的具体规格请参考产品规格

    AI DataLake 网络

    自定义

    选择AI DataLake资源池所属网络,该网络基于虚拟私有云(VPC)进行封装。如果不存在可使用的网络,也可单击“创建网络”进行创建。

    一个工作空间仅支持创建一个网络。

  4. 参数填写完成后,单击“立即购买”,在界面上确认当前配置是否正确。
  5. 单击“提交”完成创建。等待资源池状态变成“可使用”表示当前资源池创建成功。

步骤五:创建运行作业的Aura端点

  1. AI DataLake管理控制台页面,切换页面右上角的工作空间为步骤三:创建工作空间新创建的空间。
  2. 在左侧导航栏选择“引擎管理 > 多模数据引擎Aura”进入Aura引擎端点列表页面。
  3. 单击页面右上角的“创建端点”,配置以下参数并单击“立即创建”。
    表4 创建Aura引擎端点

    参数

    配置样例

    参数说明

    端点类型

    Job端点

    选择端点类型。

    • SQL 端点:提供交互式查询,满足BI报表与实时分析需求。
    • Job 端点:用于执行定时调度任务,保障大规模数据稳定处理。

    端点名称

    aura_endpoint_test001

    输入端点名称。

    资源使用模式

    混合模式

    选择资源使用模式。

    • 预留资源:独享性能,成本最优。预留资源,单价最低,确保业务基线稳定。

      适用于负载稳定业务场景。

    • 混合模式:基线保障,自动扩容。优先消耗预留资源,高峰期自动触发弹性补位。

      适用于有规律波动的业务

    选择资源池

    resource_pool_for_auratest

    在下拉框中选择步骤四:创建计算资源池已创建的资源池。

    CPU资源

    8

    配置对应CPU的保障配额及最大配额。

    CPU为通用计算处理器,适合数据处理、ETL、批处理等任务。

    GPU资源

    不涉及

    配置对应GPU的保障配额及最大配额。

    GPU为神经网络处理器,擅长AI推理任务。

    NPU资源

    24

    配置对应NPU的保障配额及最大配额。

    NPU为图形处理器,具有强大的并行计算能力。

    日志对接LTS

    勾选“日志对接LTD”

    是否对接LTS。对接LTS后日志会投递到云日志服务(Log Tank Service,简称LTS)进行管理。可以使用LTS对云服务日志进行关键词搜索、运营数据统计分析、运行状况监控告警等多种操作。

    勾选该参数后,还需配置“日志组”和“日志流”参数。

    日志组

    container_aurajob_test

    在下拉框中选择日志组,如果下拉框中没有可选的日志组,可以单击“创建日志组”进行创建。

    日志组(LogGroup)是云日志服务进行日志管理的基本单位,用于对日志流进行分类,一个日志组下面可以创建多个日志流。日志组本身不存储任何日志数据,仅方便管理日志流,每个账号下可以创建100个日志组。

    日志流

    container_job_log

    在下拉框中选择日志流,如果下拉框中没有可选的日志流,可以单击“创建日志流”进行创建。

    云日志服务是以日志流(LogStream)作为日志管理维度。日志采集后,以日志流为单位,将不同类型的日志分类存储在不同的日志流上,方便对日志进一步分类管理。

  4. 端点创建后,可在列表中查看相关信息,端点状态变为“已就绪”后即可提交作业到该端点中运行。

步骤六:连接多模态数据引擎Aura的端点

  1. 获取步骤五:创建运行作业的Aura端点中创建的端点的基本信息。
  2. 使用aura_frame配置与端点的连接:
    from fabric_data.multimodal import ai_lake
    import logging
    import os
    access_key = os.environ.get("access_key")
    secret_key = os.environ.get("secret_key")
    target_database = "multimodal_lake"
    # 建立连接
    conn = ai_lake.connect(
    fabric_endpoint="100.85.xxx.xxx:xxxxx",
    fabric_endpoint_id="8a708bbf-f862-4c53-9622-xxxxxxxxx",
    fabric_workspace_id="ca319048-b07c-498c-97df-xxxxxxxxxx",
    lf_catalog_name="fabricsql_default",
    lf_instance_id="0102c9cf-0759-4478-b42d-xxxxxxxxxx",
    access_key=access_key,
    secret_key=secret_key,
    default_database=target_database,
    use_single_cn_mode=True,
    logging_level=logging.WARNING
    )
    conn

    连接成功的回显信息:

    <fabric_data.multimodal.ai_lake.FabricConnection at 0x2717948e990>

步骤七:加载原始数据(元信息表)

通过 Lazy Mode读取元数据表,加载Schema。

输入数据:包含 camera_type (摄像头类型) 和 image_uri (图片存储路径) 的表。

metadata_table_name = "image_object_table"
ds = conn.load_dataset(metadata_table_name).limit(3)
ds.execute(

返回结果:

camera_type image_uri
0 FRONT_FISHEYE obs://aura-test/cherry-data/A_1_417.3375.dat\r
1 FRONT_LEFT obs://aura-test/cherry-data/B_30_506.1382.dat\r
2 FRONT_TELE obs://aura-test/cherry-data/D_40_536.7318.dat\r

步骤八:注册数据处理算子

当前案例中涉及的算子如下,需要您先在本地完成算子的开发,然后再按照本节的操作注册算子。

  • DownloadAndDecodeImage:下载原始数据并进行图片解码
  • ClipExtractEmbedding:使用CLIP模型进行图片向量化处理
  1. 设置UDF归档存储位置(仅需一次)
    conn.set_function_staging_workspace(
        obs_directory_base="data_ops_dev/user-defined-functions",
        obs_bucket_name="xxx",
        obs_server="obs.xxx.xxx.com",
        access_key=access_key,
        secret_key=secret_key
    )
  1. 注册UDF:以图片Embedding模型离线推理为例。
    from fabric_data.multimodal.types import image, embedding
    from onnx_model import OnnxModel
    import numpy as np
    class ClipExtractEmbedding:
        def __init__(self):
            self.img_size = 224
            self.model = OnnxModel(providers=['CPUExecutionProvider'])
            self.model.init_onnx_session('img.onnx')
        def __call__(self, img: image.Image) -> embedding.EmbeddingVector:
            image_pil = img.pil_image.convert('RGB')
            image_pil = image_pil.resize((224, 224))
            image_array = np.array(image_pil, dtype=np.float32)
            image_array = image_array.transpose((2, 0, 1))
            image_array = image_array[np.newaxis, :]
            emb = self.model.calc(image_array)
            vector = emb.tolist()[0]
            dims = len(vector)
            return embedding.EmbeddingVector({"vector": vector,"dims": dims})
    conn.delete_function('ClipExtractEmbedding', if_it_exists=True)
    conn.create_scalar_function(
        ClipExtractEmbedding,
        imports=['img.onnx', "onnx_model.py"],
        packages=["pillow", "onnx==1.17.0", "onnxruntime", "numpy==1.25.2", "protobu
        database=target_database,
        comment="Extract image embedding using CLIP model",
  2. 执行list_functions查看算子的注册结果。
    conn.list_functions(database=target_database)
    表5 注册的算子

    序号

    udf name

    signature

    description

    0

    ClipExtractEmbedding

    (img: Image) ->EmbeddingVector

    Extract imageembedding usingCLIP model

    1

    CosineSimilarity

    (lhs: EmbeddingVector, rhs:EmbeddingVector) -> doubleprecision

    Compute cosinesimilarity between2 embeddingvectors

    2

    DownloadAndDecodeImage

    (text) -> Image

    Download raw dataand decode image

    3

    DownloadAudioFromUrl

    (text) -> Audio

    None

    4

    PrivateImageEmbedding

    (text) -> EmbeddingVector

    Apply embeddingmodel on image

    5

    SpeakerWordsUDAF

    (bigint, text) -> Image

    None

    6

    SpeechRecognitionUDF

    (audios: Audio) -> text

    None

    7

    VideoContentAnalyzer

    (vio: Video) ->struct

    None

步骤九:提交数据分析作业

metadata_table_name = "image_object_table"
ds = conn.load_dataset(metadata_table_name, database=target_database)
# 1. 图片解码算子依赖用户提供的动态链接库,必须在x86机器上运行
ds = ds.map(
fn= '{}.DownloadAndDecodeImage'.format(target_database),
on=['image_uri'],
as_col='image',
num_dpus=0.5, 
concurrency=6
)
# 2. 图片转Embedding算子需要使用NPU机器上运行
ds = ds.map(
fn='{}.ClipExtractEmbedding'.format(target_database),
on=['image'],
as_col='embedding',
num_apus=1,
apu_model='NPU/Ascend910B3/1'
)
In [7]:
高:流水线并行(Pipeline Parallelism)
设计,打满异构算力。
# 3. 将处理后的多模态数据存储至Iceberg表中
iceberg_table_name="iceberg_mm_tbl"
conn.delete_table(iceberg_table_name, if_it_exists=True)
ds.write_iceberg(
target_name=iceberg_table_name,
database=target_database,
create_if_not_exists=True
)

步骤十:查看处理后的多模态数据

ds = conn.load_dataset(iceberg_table_name).limit(3)
ds.show()

camera_type

image_uri

image

embedding

FRONT_FISHEYE

obs://aura-test/cherry-data/A_1_417.3375.dat\r

Embedding(shape=(512,), data=[0.391,-0.83,-0.105,...])

FRONT_LEFT

obs://aura-test/cherry-data/B_30_506.1382.dat\r

Embedding(shape=(512,), data=[0.243,-0.87,-0.191,...])

FRONT_TELE

obs://aura-test/cherry-data/D_40_536.7318.dat\r

Embedding(shape=(512,), data=[0.409,-0.802,-0.2,...])

步骤十一:断开连接并清理计算资源

 conn.close()

相关文档