开发和部署HetuEngine Function Plugin
用户可以自定义一些函数,用于扩展SQL以满足个性化的需求,这类函数称为UDF。
本章节主要介绍开发和应用HetuEngine Function Plugin的具体步骤。
MRS 3.2.1及以后版本,需要基于JDK17.0.4及以上版本开发。本章节以MRS 3.3.0版本为例。
开发Function Plugin项目
本样例实现两个Function Plugin,说明见下表。
名称 |
说明 |
类型 |
---|---|---|
add_two |
输入一个整数,返回其加2后的结果 |
ScalarFunction |
avg_double |
聚合计算指定列的平均值,且该列的字段类型为double |
AggregationFunction |
- 创建Maven项目,“groupId”配置“com.test.functions”,“artifactId”配置“myfunctions”。这个两个值可根据实际情况自定义。
- 修改“pom.xml”文件如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.test.functions</groupId> <artifactId>myfunctions</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>trino-plugin</packaging> <properties> <project.build.targetJdk>17</project.build.targetJdk> <dep.guava.version>31.1-jre</dep.guava.version> <dep.hetu.version>399-h0.cbu.mrs.321.r13</dep.hetu.version> </properties> <dependencies> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${dep.guava.version}</version> </dependency> <dependency> <groupId>io.trino</groupId> <artifactId>trino-spi</artifactId> <version>${dep.hetu.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>io.trino</groupId> <artifactId>trino-maven-plugin</artifactId> <version>11</version> <extensions>true</extensions> </plugin> </plugins> </build> </project>
- 创建Function Plugin实现类。
- 创建Function Plugin实现类com.test.functions.scalar.MyFunction,其内容如下:
package com.test.functions.scalar; import io.trino.spi.function.ScalarFunction; import io.trino.spi.function.SqlNullable; import io.trino.spi.function.SqlType; import io.trino.spi.type.StandardTypes; import jdk.jfr.Description; public class MyFunction { private MyFunction() { } @Description("Add two") @ScalarFunction("add_two") @SqlType(StandardTypes.INTEGER) public static long add2(@SqlNullable @SqlType(StandardTypes.INTEGER) Long i) { return i + 2; } }
- 创建Function Plugin实现类com.test.function.aggregation.MyAverageAggregationFunction,其内容如下:
package com.test.functions.aggregation; import static io.trino.spi.type.DoubleType.DOUBLE; import io.trino.spi.block.BlockBuilder; import io.trino.spi.function.AggregationFunction; import io.trino.spi.function.AggregationState; import io.trino.spi.function.CombineFunction; import io.trino.spi.function.InputFunction; import io.trino.spi.function.OutputFunction; import io.trino.spi.function.SqlType; import io.trino.spi.type.StandardTypes; @AggregationFunction("avg_double") public class MyAverageAggregationFunction { private MyAverageAggregationFunction() {} @InputFunction public static void input( @AggregationState LongAndDoubleState state, @SqlType(StandardTypes.DOUBLE) double value) { state.setLong(state.getLong() + 1); state.setDouble(state.getDouble() + value); } @CombineFunction public static void combine( @AggregationState LongAndDoubleState state, @AggregationState LongAndDoubleState otherState) { state.setLong(state.getLong() + otherState.getLong()); state.setDouble(state.getDouble() + otherState.getDouble()); } @OutputFunction(StandardTypes.DOUBLE) public static void output(@AggregationState LongAndDoubleState state, BlockBuilder out) { long count = state.getLong(); if (count == 0) { out.appendNull(); } else { double value = state.getDouble(); DOUBLE.writeDouble(out, value / count); } } }
- 创建Function Plugin实现类com.test.functions.scalar.MyFunction,其内容如下:
- 创建AverageAggregation的依赖接口com.test.functions.aggregation.LongAndDoubleState。
package com.test.functions.aggregation; import io.trino.spi.function.AccumulatorState; public interface LongAndDoubleState extends AccumulatorState { long getLong(); void setLong(long value); double getDouble(); void setDouble(double value); }
- 创建Function Plugin注册类com.test.functions.MyFunctionsPlugin,其内容如下:
package com.test.functions; import com.google.common.collect.ImmutableSet; import com.test.functions.aggregation.MyAverageAggregationFunction; import com.test.functions.scalar.MyFunction; import io.trino.spi.Plugin; import java.util.Set; public class MyFunctionsPlugin implements Plugin { @Override public Set<Class<?>> getFunctions() { return ImmutableSet.<Class<?>>builder() .add(MyFunction.class) .add(MyAverageAggregationFunction.class) .build(); } }
- 打包Maven项目,获取target目录下的myfunctions-0.0.1-SNAPSHOT目录,最终项目整体结构如下图所示。
部署Function Plugin
部署前需要确认:
- HetuEngine服务处于正常状态。
- HDFS和HetuEngine客户端已经安装到集群节点,例如“/opt/client”目录下。
- 已创建HetuEngine用户,用户创建请参考创建HetuEngine权限角色。
- 将打包Maven项目得到的myfunctions-0.0.1-SNAPSHOT目录上传到安装客户端节点的任意目录。
- 将myfunctions-0.0.1-SNAPSHOT目录上传到HDFS中。
同时上传6中target目录下的所有jar包。
- 登录客户端安装节点,执行安全认证。
source bigdata_env
kinit HetuEngine的用户
根据回显提示输入密码,首次认证需要修改密码。
- HDFS中创建如下路径,如已存在则不需创建。
hdfs dfs -mkdir -p /user/hetuserver/udf/data/externalFunctionsPlugin
- 上传myfunctions-0.0.1-SNAPSHOT目录到HDFS。
hdfs dfs -put myfunctions-0.0.1-SNAPSHOT /user/hetuserver/udf/data/externalFunctionsPlugin
- 修改目录属主。
hdfs dfs -chown -R hetuserver:hadoop /user/hetuserver/udf/data
- 登录客户端安装节点,执行安全认证。
- 重启HetuEngine计算实例。
使用验证Function Plugin
- 登录客户端安装节点,执行安全认证。
cd /opt/client
source bigdata_env
kinit HetuEngine用户
hetu-cli
- 选择验证环境上有数值(int或double类型)列的表,此处选择hive.default.test1,执行如下命令验证Function Plugin。
- 查询表。
select * from hive.default.test1;
select * from hive.default.test1; name | price --------|------- apple | 17.8 orange | 25.0 (2 rows)
- 返回平均值。
select avg_double(price) from hive.default.test1;
select avg_double(price) from hive.default.test1; _col0 ------- 21.4 (1 row)
- 返回输入整数加2的值。
select add_two(4); _col0 ------- 6 (1 row)
- 查询表。