文档首页/ 智能数据湖 AIDataLake/ 快速入门/ 基于Aura DataFrame的多模数据处理
更新时间:2026-05-22 GMT+08:00
分享

基于Aura DataFrame的多模数据处理

操作场景

多模数据引擎Aura是专为分析多模态类型数据而设计,支持数据的边读边算能力,避免了传统方式的频繁存盘操作,使得视频解码、向量化等预处理环节能够流水线式执行。提供自定义UDF功能,满足多模态场景定制化数据处理的需求。

本章节操作以表格数据处理为示例,介绍在AI DataLake使用多模数据引擎Aura和DataArts Notebook交互式地完成数据的处理与分析的操作。

在作业开发过程中使用到了DataArts Notebook镜像内置的Aura DataFrame SDK,可直接使用魔法命令创建连接。

准备工作

  • 已注册账号并实名认证,且账号不能处于欠费或冻结状态。
  • 已开通AI DataLake服务并授权使用云服务资源。
  • 已开通LakeFormation、OBS权限并进行了委托确认。
  • 已开通DataArts Studio服务,并创建了DataArts Studio实例及工作空间。
  • 已购买Notebook资源组并创建Notebook实例(已绑定Fabric SQL计算资源)。

操作流程

本节介绍Aura Job的基本开发流程。技术实现流程:

  1. 环境准备
    1. AI DataLake管理控制台创建工作空间,用于提供独立的作业运行环境。
    2. AI DataLake管理控制台创建计算资源池,为作业的运行提供计算资源。
    3. AI DataLake管理控制台创建端点,配置Aura引擎与计算资源池的关联关系。
  2. 绑定计算资源和数据接入
    1. 在Notebook页面或DataArts Studio管理中心绑定Aura计算资源。
    2. 在Notebook页面为当前Notebook指定已绑定的Aura计算资源。
    3. 在Notebook交互式开发页面使用%aura_frame魔法命令建立与多模态数据引擎Aura的连接。
  3. 算子注册

    注册数据处理算子,构建数据处理流水线:

    表格处理算子:对表格数据进行自定义处理。

  4. 执行作业就查看结果
    1. 执行算子操作,处理数据。
    2. 在Notebook交互式开发界面查看执行结果。

步骤一:规划并创建OBS并行文件系统

AI DataLake Aura通过OBS服务实现数据存储,需要先在OBS控制台进行桶及文件夹创建,并导入样例数据。

  1. 登录华为云管理控制台
  2. 在页面左上角单击图标,选择“存储 > 对象存储服务 OBS”,进入对象存储服务页面。
  3. 以并行文件系统为例:
    选择“并行文件系统 > 创建并行文件系统”,进入创建页面,配置相关参数后单击“立即创建”。
    • 文件系统名称:根据界面要求设置并行文件系统名称,例如“aura-serverless”。
    • 其他参数根据实际情况选择。
  4. 在并行文件系统页面,单击已创建的文件系统名称,例如“aura-serverless”。
  5. 在左侧导航栏选择“文件”,单击“新建文件夹”,填写待创建的文件夹名称,单击“确定”。继续单击该新创建的文件夹名称,单击“新建文件夹”,可以创建其子文件夹。
  6. 参考该步骤,依次创建用于存放元数据的路径,例如:
    • Catalog存储路径:aura-serverless/catalog1
    • 数据库存储路径:aura-serverless/catalog1/database1

步骤二:规划并创建LakeFormation实例、Catalog、数据库

AI DataLake Aura通过LakeFormation服务管理数据源,需要在LakeFormation购买实例,并配置该实例的Catalog、数据库信息。

  1. 登录华为云管理控制台
  2. 在页面左上角单击图标,选择“大数据 > 湖仓构建 LakeFormation”,进入LakeFormation页面。
  3. 在“总览”页面右上角单击“购买实例”,配置相关参数购买LakeFormation实例。
  4. 在页面左上角选择切换到该实例。
    图1 切换目标实例
  5. 创建Catalog。
    1. 在左侧导航栏选择“元数据 > Catalog”。
    2. 单击“创建Catalog”,配置以下参数后,单击“提交”。

      参数

      配置样例

      参数说明

      Catalog名称

      lakeformation_for_auratest

      填写待创建Catalog名称。

      只能包含字母、数字和下划线,长度为1~256个字符。

      Catalog类型

      DEFAULT

      选择Catalog类型:

      • DEFAULT:即默认数据目录,用于管理存储在OBS中的数据资产。
      • CLICKHOUSE:CLICKHOUSE Catalog是用于连接外部ClickHouse数据库的外部目录类型。

      选择位置

      obs://lakeformation-test/catalog1

      Catalog数据存储在OBS桶中的位置。可选参数。

      根据实际需要选择“并行文件系统”或“对象存储桶”,并选择位置后,单击“确定”。

      • 如果配置该参数,则所选位置只能以“obs://”开头,且必须包含一个存储对象,例如选择“obs://lakeformation-test/catalog1”。如果没有合适的OBS桶,可以单击“前往OBS创建”进行创建。
      • 该路径不能与其他LakeFormation实例元数据存储路径重复,以免造成数据冲突。
      • 建议选择未被其他Catalog选中的文件夹。

      描述

      按需配置

      所创建Catalog的描述信息。

      长度为0~4000字节,1个中文字符对应3个字节。

    3. 创建完成后,即可在“Catalog”页面查看相关信息。
  6. 创建数据库。
    1. 在左侧导航栏选择“元数据 > 数据库”。
    2. 在右上角“Catalog”后的下拉框中选择步骤5创建的Catalog。
    3. 单击“创建数据库”,配置相关参数后,单击“提交”。

      在正式使用新Catalog前,请先手动创建default数据库并指定合法OBS路径,可避免后续系统自动创建带来的隐藏风险。

      如果当前已包含名称为“default”的数据库,则跳过数据库的创建操作。

      参数

      配置样例

      参数说明

      库名称

      default

      填写待创建数据库名称。

      只能包含中文、字母、数字、下划线、中划线,长度为1~128个字符。

      所属Catalog

      lakeformation_for_auratest

      待创建数据库所属Catalog。

      本例选择步骤5创建的Catalog。

      选择位置

      obs://lakeformation-test/catalog1/default

      数据库信息存储在OBS桶中的位置。

      根据实际需要选择“并行文件系统”或“对象存储桶”,并选择位置后,单击“确定”。

      • 所选位置只能以“obs://”开头,且必须包含一个存储对象,例如选择“obs://lakeformation-test/catalog1/default”。如果没有合适的OBS桶,可以单击“前往OBS创建”进行创建。
      • 该路径必须与所属的Catalog存储路径(即创建Catalog时配置的“选择位置”参数)不同。
      • 该路径不能与其他LakeFormation实例元数据存储路径重复,以免造成数据冲突。
      • 如果所属Catalog配置了“数据库存储位置”参数,则此处该参数必须选择为所属Catalog“选择位置”的子路径、或“数据库存储位置”的子路径。

      描述

      按需配置

      所创建数据库的描述信息。

      长度为0~4000字节,1个中文字符对应3个字节。

    4. 创建完成后,即可在“数据库”页面查看详细信息。

步骤三:创建工作空间

  1. 登录AI DataLake管理控制台
  2. 在左侧导航栏中,单击工作空间区域。
  3. 在下拉列表中,选择“创建工作空间” 。
  4. 配置工作空间的相关参数:
    表1 创建工作空间参数说明

    类型

    参数

    配置样例

    说明

    基础信息

    工作空间名称

    workspace_auratest

    工作空间的具体名称。

    • 名称只能包含字母、中文、数字、中划线、下划线。
    • 输入长度为4~32个字符。

    工作空间名称不区分大小写,系统会自动转换为小写。

    描述

    智能驾驶数据分析

    对工作空间的简要描述。

    企业项目

    default

    如果所建工作空间属于企业项目,可选择对应的企业项目。企业项目是一种云资源管理方式,企业项目管理服务提供统一的云资源按项目管理,以及项目内的资源管理、成员管理。

    关于如何设置企业项目请参考《企业管理用户指南》。

    说明:

    只有开通了企业管理服务的用户才显示该参数。

    多模数据管理

    LakeFormation实例

    lakeformation_for_auratest

    选择当前工作空间关联的LakeFormation实例,即步骤二:规划并创建LakeFormation实例、Catalog、数据库创建的LakeFormation实例。

    每个工作空间需关联1个LakeFormation实例,空间创建后已关联的实例不支持修改。

  5. 确认所有配置信息无误。

    单击“立即创建”按钮,完成工作空间创建。

    工作空间创建成功后,您可以在工作空间管理的列表中查看新创建的工作空间。

步骤四:创建计算资源池

  1. AI DataLake管理控制台页面,切换页面右上角的工作空间为步骤三:创建工作空间中所创建的空间。
  2. 单击左侧导航栏的“新建”,在下拉框中选择“计算资源池”进入购买计算资源池页面。
  3. 在“购买计算资源池”界面,填写具体参数,参数填写参考表2
    表2 购买计算资源池参数说明

    参数

    配置样例

    说明

    计费模式

    按需计费

    选择“按需计费”。

    按需计费即后付费模式,按实际使用量计费,在购买周期内资源独享,空闲时资源不被释放。

    资源池名称

    resource_pool_for_auratest

    计算资源池的具体名称。

    • 名称只能包含数字、小写英文字母和中划线,且只能以字母开头、以字母或数字结尾。
    • 输入长度不能超过63个字符。

    CPU 资源

    本例配置8个通用计算标准型实例。

    勾选CPU资源后,选择资源规格并配置购买的实例数量。

    CPU是通用型计算资源,适用于各种类型的计算任务。

    • CPU资源的特点:通用性强,适合各种类型的计算任务。相比于GPU和NPU的成本更低。擅长处理顺序执行的任务。
    • CPU资源的适用场景:ETL数据抽取、转换、加载;轻量计算场景,例如小规模数据处理、脚本运行,日志分析场景,例如日志采集、解析、统计分析。
    • CPU资源的具体规格请参考产品规格

    GPU 资源

    本例不购买GPU实例

    勾选GPU资源后,选择资源规格并配置购买的实例数量。

    GPU是图形处理器适用于图形渲染和大规模并行计算场景,适合深度学习训练和科学计算。GPU拥有成百上千个计算核心,可以同时处理大量简单计算任务。

    • GPU资源的特点:支持并行计算,适用于批处理作业场景,数千个核心同时计算,处理效率高。天然适合深度学习、神经网络、AI计算场景。
    • GPU资源的适用场景:深度学习,例如神经网络训练、模型调优场景;图形处理场景,例如图像识别、目标检测;视频处理场景,例如视频分析、转码、视频渲染场景,等其他AI科学计算场景。
    • GPU资源的具体规格请参考产品规格

    NPU 资源

    本例购买1个昇腾AI加速型(B3)1卡

    勾选NPU资源后,选择资源规格并配置购买的实例数量。

    NPU适用于AI计算场景。NPU资源采用架构优化和指令集,专门加速AI推理任务,具有高能效比的特点。

    • NPU资源的特点:AI计算场景专用,具备高性能、低延迟、推理成本更优的特点。
    • NPU资源的适用场景:AI计算场景、图像识别场景,推荐系统等AI计算设计场景。
    • NPU资源的具体规格请参考产品规格

    AI DataLake 网络

    自定义

    选择AI DataLake资源池所属网络,该网络基于虚拟私有云(VPC)进行封装。如果不存在可使用的网络,也可单击“创建网络”进行创建。

    一个工作空间仅支持创建一个网络。

  4. 参数填写完成后,单击“立即购买”,在界面上确认当前配置是否正确。
  5. 单击“提交”完成创建。等待资源池状态变成“可使用”表示当前资源池创建成功。

步骤五:创建运行作业的Aura端点

  1. AI DataLake管理控制台页面,切换页面右上角的工作空间为步骤三:创建工作空间新创建的空间。
  2. 在左侧导航栏选择“引擎端点 > 多模数据引擎Aura”进入Aura引擎端点列表页面。
  3. 单击页面右上角的“创建端点”,配置以下参数并单击“立即创建”。
    表3 创建Aura引擎端点

    参数

    配置样例

    参数说明

    端点类型

    AuraJob

    选择端点类型。

    • AuraJob:用于执行定时调度任务,保障大规模数据稳定处理。

    端点版本

    v2.0

    选择端点版本,基于Aura DataFrame的多模数据处理需选择“v2.0”。

    • v1.0:自定义镜像算子,智算、通算混合资源池统一调度,支持Job级弹性。
    • v2.0(推荐):细粒度Actor级算子编排调度,数据处理Pipeline执行,性能和效率极大提升。

    端点名称

    aura-endpoint-test001

    输入端点名称,是端点的唯一标识符,不可与已存在的端点重名,且创建后不支持修改。

    • 名称只能包含小写字母、数字、中划线,且只能以字母开头,以字母或数字结尾。
    • 输入长度不能超过63个字符。

    端点显示名称

    智能驾驶团队专用Aura端点-02

    输入端点显示名称,创建后可修改,用于用户界面展示,方便用户识别和记忆。

    • 名称只能包含中文、英文、数字、中划线。
    • 输入长度不能超过63个字符。

    资源使用模式

    混合模式

    选择资源使用模式。

    • 预留资源:独享性能,成本最优。预留资源,单价最低,确保业务基线稳定。

      适用于负载稳定业务场景。

    • 混合模式:基线保障,自动扩容。优先消耗预留资源,高峰期自动触发弹性补位。

      适用于有规律波动的业务

    选择资源池

    resource_pool_for_auratest

    在下拉框中选择步骤四:创建计算资源池已创建的资源池。

    工作组配置

    -

    配置Aura端点的工作组资源。

    • 资源规格:在下拉框中选择需要配置的资源规格,支持选择CPU、GPU、NPU资源。
    • 每Worker卡数:配置Worker卡数。

      分配了GPU/NPU资源的每个Worker的GPU/NPU卡数。只有GPU和NPU场景可以配置卡数,CPU场景不支持配置。且GPU/NPU场景不支持调整Worker CPU和内存。

    • 每Worker CPU及内存:配置Worker CPU和内存。

      分配了CPU资源的工作组,不支持自定义GPU/NPU卡数。

    • Worker 数量 (Min-Max):每个工作组的Worker最大值和最小值。
      • 预留资源:Worker最小值需与最大值相同,且需大于0。
      • 混合模式:Worker最小值需小于等于最大值,且需大于0。

    可单击“添加工作组”添加多个工作组配置,也可单击“删除”删除工作组,至少需保留一个工作组。

    选择 OBS 桶

    obs-aura

    选择用于存储作业运行过程中的结果缓存和日志文件的OBS桶,端点创建后不支持修改。

    如果没有可用的OBS桶可单击“新建OBS”进行创建。

    日志对接LTS

    勾选“日志对接LTS”

    是否对接LTS。对接LTS后日志会投递到云日志服务(Log Tank Service,简称LTS)进行管理。可以使用LTS对云服务日志进行关键词搜索、运营数据统计分析、运行状况监控告警等多种操作。

    勾选该参数后,还需配置“日志组”和“日志流”参数。

    日志组

    container_aurajob_test

    在下拉框中选择日志组,如果下拉框中没有可选的日志组,可以单击“创建日志组”进行创建。

    日志组(LogGroup)是云日志服务进行日志管理的基本单位,用于对日志流进行分类,一个日志组下面可以创建多个日志流。日志组本身不存储任何日志数据,仅方便管理日志流,每个账号下可以创建100个日志组。

    日志流

    container_job_log

    在下拉框中选择日志流,如果下拉框中没有可选的日志流,可以单击“创建日志流”进行创建。

    云日志服务是以日志流(LogStream)作为日志管理维度。日志采集后,以日志流为单位,将不同类型的日志分类存储在不同的日志流上,方便对日志进一步分类管理。

  4. 端点创建后,可在列表中查看相关信息,端点状态变为“已就绪”后即可提交作业到该端点中运行。

步骤六:连接多模态数据引擎Aura的端点

  1. AI DataLake管理控制台页面,单击导航栏下方“DataArts Studio”右侧的按钮进入DataArts Studio作业开发界面。
  2. 在“数据开发 > 作业开发”页面的右下角,单击“运行环境为空”,选择“新建环境”,配置以下参数并单击“创建”创建Notebook:
    • 运行环境名称:输入自定义名称。
    • 所属资源组:选择已创建的Notebook资源组。
    • 运行资源规格:选择所需的运行资源规格。
    • 镜像选择:选择Notebook镜像。
  3. 在“Notebook环境目录”区域右键单击“我的文件”选择“新建 Notebook”,配置Notebook名称并选择存放的目录,单击“确定”。
  4. 在Notebook中的单元格执行以下魔法命令连接运行作业的Aura端点:
    conn = %aura_frame --lf_catalog_name xxx --obs_bucket_name xxx --obs_directory_base xxx/test --lf_instance_id xxx --default_database xxx

    其中:

    • lf_catalog_name:连接的LakeFormation中的Catalog名称。
    • --obs_bucket_name:用于存储UDF的OBS并行文件系统名称。
    • --obs_directory_base:用于存储UDF的OBS路径。
    • --lf_instance_id:连接的LakeFormation中的实例ID。
    • --default_database:连接的LakeFormation中的默认数据库。

    单击左上角的运行按钮运行命令连接Aura端点,回显以下信息,即表示连接Aura端点成功:

    [INFO] [Aura Frame] 2026-04-29 09:21:18 Setting guc options: 'SET timezone = UTC; SET obs_result_format = 2;SET current_schema = xxx;'
    [INFO] [Aura Frame] 2026-04-29 09:21:19 Created session 019dd6d3-7fe1-743d-8017-ce7974125079 successfully.

步骤七:导入Python和Aura DataFrame SDK依赖包

在Notebook中的新增单元格,执行以下命令导入Python和Aura DataFrame SDK依赖包:

import sys
import requests
import pandas as pd

import aura_frame as aura
from aura_frame.multimodal.types import image, embedding
from aura_frame.ibis.backends.sql.datatype import advance_dtype

单击左上角的运行按钮运行命令。

步骤八:创建数据表

  1. 在Notebook中的新增单元格,执行以下命令创建数据表:
    df = pd.DataFrame({
        'a': [1],
        'b': [3],
        'c': [1]
    })
    ds = conn.from_pandas(df)
    ds.show()
  2. 单击左上角的运行按钮运行命令,命令运行后,结果示例如下则表示创建表成功:
    generated sql statment: CREATE TEMP TABLE aura_pandas_dataset_vp2r6lk7fna7zaldkcfsk3pajm (
      "a" BIGINT,"b" BIGINT,"c" BIGINT
    )
    STORE AS PANDAS
    [INFO] [Aura Frame] 2026-04-29 09:26:52 Loading dataset pg_temp.aura_pandas_dataset_vp2r6lk7fna7zaldkcfsk3pajm
    [INFO] [Aura Frame] 2026-04-29 09:26:55 read pandas aura_pandas_dataset_vp2r6lk7fna7zaldkcfsk3pajm
    [INFO] [Aura Frame] 2026-04-29 09:26:55 SQL statement for executing:
    SELECT *
    FROM "pg_temp"."aura_pandas_dataset_vp2r6lk7fna7zaldkcfsk3pajm" AS "t0"
    LIMIT 10
    [INFO] [Aura Frame] 2026-04-29 09:26:58 Running SQL statement, id is 019dd6d8-a5a2-747c-901b-1fb08d25f379
       a  b  c
    0  1  3  1

步骤九:定义和注册UDF

  1. 在Notebook中的新增单元格,执行以下命令定义和注册UDF:
    target_database = xxx
    
    import ibis.expr.datatypes as dt
    import aura_frame as aura
    
    func_signature = aura.Signature(
        parameters=[
            aura.Parameter(name="start", annotation=int),
            aura.Parameter(name="endnum", annotation=int),
            aura.Parameter(name="step", annotation=int)
        ],
        return_annotation=dt.Struct({"col1": int}),
    )
    
    @aura.udf.python(database=target_database, signature=func_signature)
    def py_generate_series(start, endnum, step):
        current = start
        while current <= endnum:
            yield {'col1': current}
            current += step
    
     # 如果未注册UDF,则无需执行删除操作,需注释掉delete_function操作
    conn.delete_function("py_generate_series", database=target_database)
    
    fn = conn.create_table_function(
        py_generate_series,
        name="py_generate_series",
        database=target_database,
        signature=func_signature
    )

    “target_database”表示LakeFormation中用于管理注册的UDF的信息的数据库名称。

  2. 单击左上角的运行按钮运行命令,命令运行后,结果示例如下则表示定义和注册UDF成功:
    [INFO] [Aura Frame] 2026-04-29 09:26:58 Start to register function xxx.py_generate_series
    [INFO] [Aura Frame] 2026-04-29 09:26:58 Function main entry code has been serialized by cloudpickle.
    [INFO] [Aura Frame] 2026-04-29 09:26:58 The Python package dependencies have been resolved.
    [INFO] [Aura Frame] 2026-04-29 09:26:58 Appending /opt/cloud/3rdComponent/python3.11/lib/python3.11/site-packages/aura_frame/multimodal/types(4.0 KB) to archive zipfile as aura_frame/multimodal/types
    [INFO] [Aura Frame] 2026-04-29 09:26:58 Appending /opt/cloud/3rdComponent/python3.11/lib/python3.11/site-packages/aura_frame/multimodal/types/__init__.py(0 bytes) to archive zipfile as aura_frame/multimodal/types/__init__.py
    [INFO] [Aura Frame] 2026-04-29 09:26:58 Appending /opt/cloud/3rdComponent/python3.11/lib/python3.11/site-packages/aura_frame/multimodal/types/audio.py(8.93 KB) to archive zipfile as aura_frame/multimodal/types/audio.py
    [INFO] [Aura Frame] 2026-04-29 09:26:58 Appending /opt/cloud/3rdComponent/python3.11/lib/python3.11/site-packages/aura_frame/multimodal/types/embedding.py(798 bytes) to archive zipfile as aura_frame/multimodal/types/embedding.py
    [INFO] [Aura Frame] 2026-04-29 09:26:58 Appending /opt/cloud/3rdComponent/python3.11/lib/python3.11/site-packages/aura_frame/multimodal/types/image.py(5.83 KB) to archive zipfile as aura_frame/multimodal/types/image.py
    [INFO] [Aura Frame] 2026-04-29 09:26:58 Appending /opt/cloud/3rdComponent/python3.11/lib/python3.11/site-packages/aura_frame/multimodal/types/vectorized.py(508 bytes) to archive zipfile as aura_frame/multimodal/types/vectorized.py
    [INFO] [Aura Frame] 2026-04-29 09:26:58 Appending /opt/cloud/3rdComponent/python3.11/lib/python3.11/site-packages/aura_frame/multimodal/types/video.py(10.83 KB) to archive zipfile as aura_frame/multimodal/types/video.py
    [INFO] [Aura Frame] 2026-04-29 09:26:58 Appending /opt/cloud/3rdComponent/python3.11/lib/python3.11/site-packages/aura_frame/multimodal/utils.py(1.84 KB) to archive zipfile as aura_frame/multimodal/utils.py
    [INFO] [Aura Frame] 2026-04-29 09:26:58 Function py_generate_series's archive zipfile has been uploaded to OBS.
    [INFO] [Aura Frame] 2026-04-29 09:26:58 SQL statement for registering function is 
    CREATE  
     FUNCTION 
    xxx.py_generate_series(start BIGINT, endnum BIGINT, step BIGINT)
    RETURNS SETOF STRUCT<"col1":BIGINT>
    LANGUAGE PYTHON
    RUNTIME_VERSION = '3.11'
    
    STRICT
    VOLATILE
    IMPORTS = ('https://xxx.com/xxx/xxx/py_generate_series/py_generate_series_20260429092658.zip')
    
    HANDLER = 'py_generate_series'
    AS $$
    # Comment for UDF 'py_generate_series'
    
    
    # === Function Metadata ===
    # UDTFMeta(
    #     _runtime_version='3.11',
    #     _runtime_env=RuntimeEnv({}),
    #     _registered=False,
    #     _input_type=<InputType.PYTHON: 4>,
    #     _replace=False,
    #     _temporary=False,
    #     _if_not_exists=False,
    #     _strict=True,
    #     _volatility=<VolatilityType.VOLATILE: 3>,
    #     _imports=('/opt/cloud/3rdComponent/python3.11/lib/python3.11/site-packages/aura_frame/multimodal/types', '/opt/cloud/3rdComponent/python3.11/lib/python3.11/site-packages/aura_frame/multimodal/utils.py'),
    #     _packages=(),
    #     _register_type=None,
    #     _comment=None,
    #     _work_group_name=None,
    #     _database='xxx',
    #     _func_name='py_generate_series',
    #     _signature=<Signature (start: int, endnum: int, step: int) -> struct<col1: int64>>,
    #     _udf=<function py_generate_series at 0x7fc5be4d5c60>,
    #     _module='__main__',
    # )
    
    
    # === Function Source Code ===
    # @aura.udf.python(database=target_database, signature=func_signature)
    # def py_generate_series(start, endnum, step):
    #     current = start
    #     while current <= endnum:
    #         yield {'col1': current}
    #         current += step
    
    import cloudpickle
    py_generate_series = cloudpickle.loads(bytes.fromhex("xxx"))
    
    $$
    COMMENT '{"description": null, "pretty_name": "py_generate_series"}'
    [INFO] [Aura Frame] 2026-04-29 09:27:01 User-defined function xxx.py_generate_series registered successfully.

步骤十:使用UDF向数据表中写入数据

  1. 在Notebook中的新增单元格,执行以下命令使用UDF向数据表中写入数据:
    fn = conn.create_table_function(
        py_generate_series,
        name="py_generate_series",
        database=target_database,
        signature=func_signature
    )
    ds = ds.flat_map(
        fn=fn,
        on=[ds.a, ds.b, ds.c],
        as_col='new_col'
    )
    ds.show()
  2. 单击左上角的运行按钮运行命令,命令运行后,结果示例如下则表示向数据表中写入数据成功:
    [INFO] [Aura Frame] 2026-04-29 09:27:01 SQL statement for executing:
    SELECT "t2"."a",
           "t2"."b",
           "t2"."c",
           "t2"."new_col"
    FROM
      (SELECT "t1"."a",
              "t1"."b",
              "t1"."c",
              "t1"."udtf_tmp_col", ("t1"."udtf_tmp_col")."col1" AS "new_col"
       FROM
         (SELECT "t0"."a",
                 "t0"."b",
                 "t0"."c",
                 xxx.PY_GENERATE_SERIES("t0"."a", "t0"."b", "t0"."c") WITH ARGUMENTS(cpu = 0.5, memory = 1024, arch = 'x86') AS "udtf_tmp_col"
          FROM "pg_temp"."aura_pandas_dataset_vp2r6lk7fna7zaldkcfsk3pajm" AS "t0") AS "t1") AS "t2"
    LIMIT 10
    [INFO] [Aura Frame] 2026-04-29 09:27:05 Running SQL statement, id is 019dd6d8-be56-71e9-9196-d7f7e04a4c8f
       a  b  c  new_col
    0  1  3  1        1
    1  1  3  1        2
    2  1  3  1        3

步骤十一:关闭数据库连接

在使用数据库连接完成相应的数据操作后,需要关闭数据库连接。

 conn.close()

相关文档