更新时间:2025-08-25 GMT+08:00
不带UDF的DF示例
下文以tpch的query1为例,展示DataFrame的用法。
查询SQL为:
SELECT l_returnflag, l_linestatus, sum(l_quantity) AS sum_qty, sum(l_extendedprice) AS sum_base_price, sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge, avg(l_quantity) AS avg_qty, avg(l_extendedprice) AS avg_price, avg(l_discount) AS avg_disc, count(*) AS count_order FROM lineitem WHERE l_shipdate <= CAST('1998-09-02' AS date) GROUP BY l_returnflag, l_linestatus ORDER BY l_returnflag, l_linestatus;
对应的DataFrame逻辑如下:
import ibis # 导入ibis依赖 con = ibis.fabric.connect( # 调用DataArtsFabric后端连接,创建连接 endpoint=FABRIC_ENDPOINT, # 指定服务的区域,区域查询地区和终端节点。 endpoint_id=FABRIC_ENDPOINT_ID, # 查询endpoint_id,详情参见《API参考》手册的《附录》章节 domain=FABRIC_DOMAIN, #租户名 user=FABRIC_USER, #IAM用户名 password=FABRIC_PASS, #IAM密码 project_id=FABRIC_PROJECT_ID, # 如何获取project_id catelog_name=IBIS_TEST_FABRIC_CATELOG, #连接指定的Catalog workspace_id=FABRIC_WORKSPACE_ID, # 获取workspace_id,详情参见《API参考》手册的《附录》章节 lakeformation_instance_id=IBIS_TEST_FABRIC_LAKEFORMATION_INSTANCE_ID, #LakeFormation服务的实例ID obs_directory_base=OBS_DIRECTORY_BASE, # obs中udf的存储路径 obs_bucket_name=OBS_BUCKET_NAME, # obs的桶名字 obs_server=OBS_SERVER, # obs访问地址,参见终端节点(Endpoint)和访问域名 ) t = con.table("lineitem", database="tpch") # 通过连接到后端获取table表信息,建立表对象 q = t.filter(t.l_shipdate <= add_date("1998-12-01", dd=-90)) discount_price = t.l_extendedprice * (1 - t.l_discount) charge = discount_price * (1 + t.l_tax) q = q.group_by(["l_returnflag", "l_linestatus"]) q = q.aggregate( sum_qty=t.l_quantity.sum(), sum_base_price=t.l_extendedprice.sum(), sum_disc_price=discount_price.sum(), sum_charge=charge.sum(), avg_qty=t.l_quantity.mean(), avg_price=t.l_extendedprice.mean(), avg_disc=t.l_discount.mean(), count_order=lambda t: t.count(), ) q = q.order_by(["l_returnflag", "l_linestatus"]) sql = q.compile() # 将DataFrame编译为sql字符串 df = q.execute() # 执行表达式并且返回结果集
父主题: 场景实践