更新时间:2022-12-14 GMT+08:00

开发和应用HetuEngine Function Plugin

用户可以自定义一些函数,用于扩展SQL以满足个性化的需求,这类函数称为UDF。

本章节主要介绍开发和应用HetuEngine Function Plugin的具体步骤。

开发Function Plugin项目

本样例实现两个Function Plugin,说明见下表。

表1 HetuEngine Function Plugin说明

名称

说明

类型

add_two

输入一个整数,返回其加2后的结果

ScalarFunction

avg_double

聚合计算指定列的平均值,且该列的字段类型为double

AggregationFunction

  1. 创建Maven项目,“groupId”配置“com.test.udf”,“artifactId”配置“udf-test”。这个两个值可根据实际情况自定义。
  2. 修改“pom.xml”文件如下:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
           <modelVersion>4.0.0</modelVersion>
           <groupId>com.test.udf</groupId>
           <artifactId>udf-test</artifactId>
           <version>0.0.1-SNAPSHOT</version>
     
           <packaging>hetu-plugin</packaging>
     
           <dependencies>
               <dependency>
                   <groupId>com.google.guava</groupId>
                   <artifactId>guava</artifactId>
                   <version>26.0-jre</version>
               </dependency>
     
               <dependency>
                   <groupId>io.hetu.core</groupId>
                   <artifactId>presto-spi</artifactId>
                   <version>1.2.0</version>
                   <scope>provided</scope>
               </dependency>
     
           </dependencies>
     
           <build>
               <plugins>
                   <plugin>
                       <groupId>org.apache.maven.plugins</groupId>
                       <artifactId>maven-assembly-plugin</artifactId>
                       <version>2.4.1</version>
                       <configuration>
                           <encoding>UTF-8</encoding>
                       </configuration>
                   </plugin>
                   <plugin>
                       <groupId>io.hetu</groupId>
                       <artifactId>presto-maven-plugin</artifactId>
                       <version>9</version>
                       <extensions>true</extensions>
                   </plugin>
               </plugins>
           </build>
       </project>

  3. 创建Function Plugin实现类。

    1.创建Function Plugin实现类com.hadoop.other.TestUDF4,其内容如下:
    public class TestUDF4 {   
         @ScalarFunction("add_two")
         @SqlType(StandardTypes.INTEGER)
         public static long add2(@SqlNullable @SqlType(StandardTypes.INTEGER) Long i)      {
             return i+2;
    }
    2.创建Function Plugin实现类com.hadoop.other.AverageAggregation,其内容如下:
    @AggregationFunction("avg_double")
    public class AverageAggregation
    {
        @InputFunction
        public static void input(
            LongAndDoubleState state,
            @SqlType(StandardTypes.DOUBLE) double value)
        {
            state.setLong(state.getLong() + 1);
            state.setDouble(state.getDouble() + value);
        }
    
        @CombineFunction
        public static void combine(
            LongAndDoubleState state,
            LongAndDoubleState otherState)
        {
            state.setLong(state.getLong() + otherState.getLong());
            state.setDouble(state.getDouble() + otherState.getDouble());
        }
    
        @OutputFunction(StandardTypes.DOUBLE)
        public static void output(LongAndDoubleState state, BlockBuilder out)
        {
            long count = state.getLong();
            if (count == 0) {
                out.appendNull();
            }
            else {
                double value = state.getDouble();
                DOUBLE.writeDouble(out, value / count);
            }
        }
    }

  4. 创建AverageAggregation的依赖接口com.hadoop.other.LongAndDoubleState。

    public interface LongAndDoubleState extends AccumulatorState {
       long getLong();
    
       void setLong(long value);
    
       double getDouble();
    
       void setDouble(double value);
    }

  5. 创建Function Plugin注册类com.hadoop.other.RegisterFunctionTestPlugin,其内容如下:

    public class RegisterFunctionTestPlugin implements Plugin {
    
        @Override
        public Set<Class<?>> getFunctions() {
            return ImmutableSet.<Class<?>>builder()
                    .add(TestUDF4.class)
                    .add(AverageAggregation.class)
                    .build();
        }
    }

  6. 打包Maven项目,获取target目录下的udf-test-0.0.1-SNAPSHOT目录,最终项目整体结构如下图所示。

部署Function Plugin

部署前需要确认:

  • HetuEngine服务处于正常状态。
  • HDFS和HetuEngine客户端已经安装到集群节点,例如“/opt/client”目录下。
  • 已创建HetuEngine用户,用户创建请参考创建HetuEngine用户
  1. 将打包Maven项目得到的udf-test-0.0.1-SNAPSHOT目录上传到安装客户端节点的任意目录。
  2. 将udf-test-0.0.1-SNAPSHOT目录上传到HDFS中。

    1. 登录客户端安装节点,执行安全认证。

      cd /opt/client

      source bigdata_env

      kinit HetuEngine的用户

      根据回显提示输入密码,首次认证需要修改密码。

    2. HDFS中创建如下路径,如已存在则不需创建。

      hdfs dfs -mkdir -p /user/hetuserver/udf/data/externalFunctionsPlugin

    3. 上传udf-test-0.0.1-SNAPSHOT目录到HDFS。

      hdfs dfs -put udf-test-0.0.1-SNAPSHOT /user/hetuserver/udf/data/externalFunctionsPlugin

    4. 修改目录属主。

      hdfs dfs -chown -R hetuserver:hadoop /user/hetuserver/udf/data

  3. 重启HetuEngine计算实例。

使用验证Function Plugin

  1. 登录客户端安装节点,执行安全认证。

    cd /opt/client

    source bigdata_env

    kinit HetuEngine用户

    hetu-cli --catalog hive --schema default

  2. 执行如下命令验证Function Plugin。

    1. 查询表。

      select * from test1;

      select * from test1;
      name  |  price
      --------|-------
      apple   |  17.8
      orange |  25.0
      (2 rows)
    2. 返回平均值。

      select avg_double(price) from test1;

      select avg_double(price) from test1;
      _col0
      -------
        21.4
      (1 row)
    3. 返回输入整数加2的值。

      select add_two(4);

      select add_two(4);
      _col0
      -------
          6
      (1 row)