Updated on 2024-05-29 GMT+08:00

HetuEngine Function Plugin Development and Application

You can customize functions to extend SQL statements to meet personalized requirements. These functions are called UDFs.

This section describes how to develop and apply HetuEngine function plugins.

The development must be based on JDK 17.0.4 or later for MRS 3.3.0 or later. This section uses MRS 3.3.0 as an example.

Developing Function Plugins

This sample implements two function plugins described in the following table.

Table 1 HetuEngine function plugins

Parameter

Description

Type

add_two

Adds 2 to the input integer and returns the result.

ScalarFunction

avg_double

Aggregates and calculates the average value of a specified column. The field type of the column is double.

AggregationFunction

  1. Create a Maven project. Set groupId to com.test.functions and artifactId to myfunctions. The two values can be customized based on the site requirements.
  2. Modify the pom.xml file as follows:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
           <modelVersion>4.0.0</modelVersion>
           <groupId>com.test.functions</groupId>
           <artifactId>myfunctions</artifactId>
           <version>0.0.1-SNAPSHOT</version>
           <packaging>trino-plugin</packaging>
          <properties>
              <project.build.targetJdk>17</project.build.targetJdk>
              <dep.guava.version>31.1-jre</dep.guava.version>
               <dep.hetu.version>399-h0.cbu.mrs.321.r13</dep.hetu.version>
          </properties>
    
           <dependencies>
               <dependency>
                   <groupId>com.google.guava</groupId>
                   <artifactId>guava</artifactId>
                   <version>${dep.guava.version}</version>
               </dependency>
     
               <dependency>
                   <groupId>io.trino</groupId>
                   <artifactId>trino-spi</artifactId>
                   <version>${dep.hetu.version}</version>
                   <scope>provided</scope>
               </dependency>
    
           </dependencies>
     
           <build>
               <plugins>
                   <plugin>
                       <groupId>org.apache.maven.plugins</groupId>
                       <artifactId>maven-assembly-plugin</artifactId>
                       <version>3.3.0</version>
                       <configuration>
                           <encoding>UTF-8</encoding>
                       </configuration>
                   </plugin>
                   <plugin>
                       <groupId>io.trino</groupId>
                       <artifactId>trino-maven-plugin</artifactId>
                       <version>11</version>
                       <extensions>true</extensions>
                   </plugin>
               </plugins>
           </build>
       </project>

  3. Create the implementation class of the function plugin.

    1. Create the function plugin implementation class com.test.functions.scalar.MyFunction. The code is as follows:
      package com.test.functions.scalar;
      import io.trino.spi.function.ScalarFunction;
      import io.trino.spi.function.SqlNullable;
      import io.trino.spi.function.SqlType;
      import io.trino.spi.type.StandardTypes;
      import jdk.jfr.Description;
      public class MyFunction {
          private MyFunction() {
          }
          @Description("Add two")
          @ScalarFunction("add_two")
          @SqlType(StandardTypes.INTEGER)
          public static long add2(@SqlNullable @SqlType(StandardTypes.INTEGER) Long i) {
              return i + 2;
          }
      }
    2. Create the function plugin implementation class com.test.function.aggregation.MyAverageAggregationFunction. The code is as follows:
      package com.test.functions.aggregation;
      
      import static io.trino.spi.type.DoubleType.DOUBLE;
      
      import io.trino.spi.block.BlockBuilder;
      import io.trino.spi.function.AggregationFunction;
      import io.trino.spi.function.AggregationState;
      import io.trino.spi.function.CombineFunction;
      import io.trino.spi.function.InputFunction;
      import io.trino.spi.function.OutputFunction;
      import io.trino.spi.function.SqlType;
      import io.trino.spi.type.StandardTypes;
      
      @AggregationFunction("avg_double")
      public class MyAverageAggregationFunction
      {
          private MyAverageAggregationFunction() {}
      
          @InputFunction
          public static void input(
              @AggregationState LongAndDoubleState state,
              @SqlType(StandardTypes.DOUBLE) double value)
          {
              state.setLong(state.getLong() + 1);
              state.setDouble(state.getDouble() + value);
          }
      
          @CombineFunction
          public static void combine(
              @AggregationState LongAndDoubleState state,
              @AggregationState LongAndDoubleState otherState)
          {
              state.setLong(state.getLong() + otherState.getLong());
              state.setDouble(state.getDouble() + otherState.getDouble());
          }
      
          @OutputFunction(StandardTypes.DOUBLE)
          public static void output(@AggregationState LongAndDoubleState state, BlockBuilder out)
          {
              long count = state.getLong();
              if (count == 0) {
                  out.appendNull();
              }
              else {
                  double value = state.getDouble();
                  DOUBLE.writeDouble(out, value / count);
              }
          }
      }

  4. Create the com.test.functions.aggregation.LongAndDoubleState API on which AverageAggregation depends.

    package com.test.functions.aggregation;
    
    import io.trino.spi.function.AccumulatorState;
    
    public interface LongAndDoubleState
        extends AccumulatorState
    {
        long getLong();
    
        void setLong(long value);
    
        double getDouble();
    
        void setDouble(double value);
    }

  5. Create the function plugin registration class com.test.functions.MyFunctionsPlugin. The code is as follows:

    package com.test.functions;
    
    import com.google.common.collect.ImmutableSet;
    import com.test.functions.aggregation.MyAverageAggregationFunction;
    import com.test.functions.scalar.MyFunction;
    
    import io.trino.spi.Plugin;
    
    import java.util.Set;
    
    public class MyFunctionsPlugin
        implements Plugin
    {
        @Override
        public Set<Class<?>> getFunctions() {
            return ImmutableSet.<Class<?>>builder()
                .add(MyFunction.class)
                .add(MyAverageAggregationFunction.class)
                .build();
        }
    }

  6. Pack the Maven project and obtain the udf-test-0.0.1-SNAPSHOT directory in the target directory. The following figure shows the overall structure of the project.

Deploying Function Plugins

Before the deployment, ensure that:

  • The HetuEngine service is normal.
  • The HDFS and HetuEngine client have been installed in a directory on the cluster node, for example, /opt/client.
  • A HetuEngine user has been created. For details about how to create a user, see Creating a HetuEngine User.
  1. Upload the myfunctions-0.0.1-SNAPSHOT directory obtained in packing the Maven project to any directory on the node where the client is installed.
  2. Upload the myfunctions-0.0.1-SNAPSHOT directory to HDFS.

    1. Log in to the node where the client is installed and perform security authentication.

      cd /opt/client

      source bigdata_env

      kinit HetuEngine user

      Enter the password as prompted and change the password upon the first authentication.

    2. Create the following paths in HDFS. If the paths already exist, skip this step.

      hdfs dfs -mkdir -p /user/hetuserver/udf/data/externalFunctionsPlugin

    3. Upload the myfunctions-0.0.1-SNAPSHOT directory to HDFS.

      hdfs dfs -put myfunctions-0.0.1-SNAPSHOT /user/hetuserver/udf/data/externalFunctionsPlugin

    4. Change the directory owner and owner group.

      hdfs dfs -chown -R hetuserver:hadoop /user/hetuserver/udf/data

  3. Restart the HetuEngine compute instance.

Verifying Function Plugins

  1. Log in to the node where the client is installed and perform security authentication.

    cd /opt/client

    source bigdata_env

    kinit HetuEngine user

    hetu-cli

  2. Verify function plugins.

    1. Query a table.

      select * from hive.default.test1;

      select * from hive.default.test1;
      name  |  price
      --------|-------
      apple   |  17.8
      orange |  25.0
      (2 rows)
    2. Return the average value.

      select avg_double(price) from hive.default.test1;

      select avg_double(price) from hive.default.test1;
      _col0
      -------
        21.4
      (1 row)
    3. Return the value of the input integer plus 2.

      select add_two(4);

      select add_two(4);
      _col0
      -------
          6
      (1 row)