更新时间:2024-11-29 GMT+08:00

开发和应用HetuEngine Function Plugin

用户可以自定义一些函数,用于扩展SQL以满足个性化的需求,这类函数称为UDF。

本章节主要介绍开发和应用HetuEngine Function Plugin的具体步骤。

需要基于JDK17.0.4及以上版本开发。

开发Function Plugin项目

本样例实现两个Function Plugin,说明见下表。

表1 HetuEngine Function Plugin说明

名称

说明

类型

add_two

输入一个整数,返回其加2后的结果

ScalarFunction

avg_double

聚合计算指定列的平均值,且该列的字段类型为double

AggregationFunction

  1. 创建Maven项目,“groupId”配置“com.test.functions”,“artifactId”配置“myfunctions”。这个两个值可根据实际情况自定义。
  2. 修改“pom.xml”文件如下:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
           <modelVersion>4.0.0</modelVersion>
           <groupId>com.test.functions</groupId>
           <artifactId>myfunctions</artifactId>
           <version>0.0.1-SNAPSHOT</version>
           <packaging>trino-plugin</packaging>
          <properties>
              <project.build.targetJdk>17</project.build.targetJdk>
              <dep.guava.version>31.1-jre</dep.guava.version>
               <dep.hetu.version>399-h0.cbu.mrs.321.r13</dep.hetu.version>
          </properties>
    
           <dependencies>
               <dependency>
                   <groupId>com.google.guava</groupId>
                   <artifactId>guava</artifactId>
                   <version>${dep.guava.version}</version>
               </dependency>
     
               <dependency>
                   <groupId>io.trino</groupId>
                   <artifactId>trino-spi</artifactId>
                   <version>${dep.hetu.version}</version>
                   <scope>provided</scope>
               </dependency>
    
           </dependencies>
     
           <build>
               <plugins>
                   <plugin>
                       <groupId>org.apache.maven.plugins</groupId>
                       <artifactId>maven-assembly-plugin</artifactId>
                       <version>3.3.0</version>
                       <configuration>
                           <encoding>UTF-8</encoding>
                       </configuration>
                   </plugin>
                   <plugin>
                       <groupId>io.trino</groupId>
                       <artifactId>trino-maven-plugin</artifactId>
                       <version>11</version>
                       <extensions>true</extensions>
                   </plugin>
               </plugins>
           </build>
       </project>

  3. 创建Function Plugin实现类。

    1. 创建Function Plugin实现类com.test.functions.scalar.MyFunction,其内容如下:
      package com.test.functions.scalar;
      import io.trino.spi.function.ScalarFunction;
      import io.trino.spi.function.SqlNullable;
      import io.trino.spi.function.SqlType;
      import io.trino.spi.type.StandardTypes;
      import jdk.jfr.Description;
      public class MyFunction {
          private MyFunction() {
          }
          @Description("Add two")
          @ScalarFunction("add_two")
          @SqlType(StandardTypes.INTEGER)
          public static long add2(@SqlNullable @SqlType(StandardTypes.INTEGER) Long i) {
              return i + 2;
          }
      }
    2. 创建Function Plugin实现类com.test.function.aggregation.MyAverageAggregationFunction,其内容如下:
      package com.test.functions.aggregation;
      
      import static io.trino.spi.type.DoubleType.DOUBLE;
      
      import io.trino.spi.block.BlockBuilder;
      import io.trino.spi.function.AggregationFunction;
      import io.trino.spi.function.AggregationState;
      import io.trino.spi.function.CombineFunction;
      import io.trino.spi.function.InputFunction;
      import io.trino.spi.function.OutputFunction;
      import io.trino.spi.function.SqlType;
      import io.trino.spi.type.StandardTypes;
      
      @AggregationFunction("avg_double")
      public class MyAverageAggregationFunction
      {
          private MyAverageAggregationFunction() {}
      
          @InputFunction
          public static void input(
              @AggregationState LongAndDoubleState state,
              @SqlType(StandardTypes.DOUBLE) double value)
          {
              state.setLong(state.getLong() + 1);
              state.setDouble(state.getDouble() + value);
          }
      
          @CombineFunction
          public static void combine(
              @AggregationState LongAndDoubleState state,
              @AggregationState LongAndDoubleState otherState)
          {
              state.setLong(state.getLong() + otherState.getLong());
              state.setDouble(state.getDouble() + otherState.getDouble());
          }
      
          @OutputFunction(StandardTypes.DOUBLE)
          public static void output(@AggregationState LongAndDoubleState state, BlockBuilder out)
          {
              long count = state.getLong();
              if (count == 0) {
                  out.appendNull();
              }
              else {
                  double value = state.getDouble();
                  DOUBLE.writeDouble(out, value / count);
              }
          }
      }

  4. 创建AverageAggregation的依赖接口com.test.functions.aggregation.LongAndDoubleState

    package com.test.functions.aggregation;
    
    import io.trino.spi.function.AccumulatorState;
    
    public interface LongAndDoubleState
        extends AccumulatorState
    {
        long getLong();
    
        void setLong(long value);
    
        double getDouble();
    
        void setDouble(double value);
    }

  5. 创建Function Plugin注册类com.test.functions.MyFunctionsPlugin,其内容如下:

    package com.test.functions;
    
    import com.google.common.collect.ImmutableSet;
    import com.test.functions.aggregation.MyAverageAggregationFunction;
    import com.test.functions.scalar.MyFunction;
    
    import io.trino.spi.Plugin;
    
    import java.util.Set;
    
    public class MyFunctionsPlugin
        implements Plugin
    {
        @Override
        public Set<Class<?>> getFunctions() {
            return ImmutableSet.<Class<?>>builder()
                .add(MyFunction.class)
                .add(MyAverageAggregationFunction.class)
                .build();
        }
    }

  6. 打包Maven项目,获取target目录下的myfunctions-0.0.1-SNAPSHOT目录,最终项目整体结构如下图所示。

部署Function Plugin

部署前需要确认:

  • HetuEngine服务处于正常状态。
  • HDFS和HetuEngine客户端已经安装到集群节点,例如“/opt/client”目录下。
  • 已创建HetuEngine用户,用户创建请参考创建HetuEngine用户
  1. 将打包Maven项目得到的myfunctions-0.0.1-SNAPSHOT目录上传到安装客户端节点的任意目录。
  2. myfunctions-0.0.1-SNAPSHOT目录上传到HDFS中。

    1. 登录客户端安装节点,执行安全认证。

      cd /opt/client

      source bigdata_env

      kinit HetuEngine的用户

      根据回显提示输入密码,首次认证需要修改密码。

    2. HDFS中创建如下路径,如已存在则不需创建。

      hdfs dfs -mkdir -p /user/hetuserver/udf/data/externalFunctionsPlugin

    3. 上传myfunctions-0.0.1-SNAPSHOT目录到HDFS。

      hdfs dfs -put myfunctions-0.0.1-SNAPSHOT /user/hetuserver/udf/data/externalFunctionsPlugin

    4. 修改目录属主。

      hdfs dfs -chown -R hetuserver:hadoop /user/hetuserver/udf/data

  3. 重启HetuEngine计算实例。

使用验证Function Plugin

  1. 登录客户端安装节点,执行安全认证。

    cd /opt/client

    source bigdata_env

    kinit HetuEngine用户

    hetu-cli

  2. 选择验证环境上有数值(int或double类型)列的表,此处选择hive.default.test1,执行如下命令验证Function Plugin。

    1. 查询表。

      select * from hive.default.test1;

      select * from hive.default.test1;
      name  |  price
      --------|-------
      apple   |  17.8
      orange |  25.0
      (2 rows)
    2. 返回平均值。

      select avg_double(price) from hive.default.test1;

      select avg_double(price) from hive.default.test1;
      _col0
      -------
        21.4
      (1 row)
    3. 返回输入整数加2的值。

      select add_two(4);

      select add_two(4);
      _col0
      -------
          6
      (1 row)