Updated on 2023-11-03 GMT+08:00

User-Defined Functions

Overview

DLI supports the following three types of user-defined functions (UDFs):

  • Regular UDF: takes in one or more input parameters and returns a single result.
  • User-defined table-generating function (UDTF): takes in one or more input parameters and returns multiple rows or columns.
  • User-defined aggregate function (UDAF): aggregates multiple records into one value.

UDFs can only be used in dedicated queues.

POM Dependency

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table_2.11</artifactId>
   <version>1.7.2</version>
   <scope>provided</scope>
</dependency>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.7.2</version>
        <scope>provided</scope>
</dependency>

Precautions

  • Currently, Python is not supported for programming UDFs, UDTFs, and UDAFs.
  • If you use IntelliJ IDEA to debug the created UDF, select include dependencies with "Provided" scope. Otherwise, the dependency packages in the POM file cannot be loaded for local debugging.

    The following uses IntelliJ IDEA 2020.2 as an example:

    1. On the IntelliJ IDEA page, select the configuration file you need to debug and click Edit Configurations.

    2. On the Run/Debug Configurations page, select include dependencies with "Provided" scope.

    3. Click OK.

Using UDFs

  1. Write the code of custom functions. For details about the code examples, see UDF, UDTF, or UDAF.
  2. Compile the UDF code, pack it into a JAR package, and upload the package to OBS.
  3. In the left navigation pane of the DLI management console, click Job Management > Flink Jobs. Locate the row where the target resides and click Edit in the Operation column to switch to the page where you can edit the job.
  4. On the Running Parameters tab page, select an exclusive queue for Queue. The UDF Jar parameter is displayed. Select the JAR file stored on OBS and click Save.

    Before selecting a user-defined function JAR package, upload the JAR package to the created OBS bucket.

    After the JAR package is selected, add the UDF statement to the SQL statement. The following is an example:

    1
    CREATE FUNCTION udf_test AS 'com.xxx.udf.UdfScalarFunction';
    

UDF

The regular UDF must inherit the ScalarFunction function and implement the eval method. The open and close functions are optional.

Example code

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public class UdfScalarFunction extends ScalarFunction {
  private int factor = 12;
  public UdfScalarFunction() {
    this.factor = 12;
  }
  /**
   * (optional) Initialization
   * @param context
   */
  @Override
  public void open(FunctionContext context) {}
  /**
   * Custom logic
   * @param s
   * @return
   */
   public int eval(String s) {
     return s.hashCode() * factor;
   }
   /**
    * Optional
    */
   @Override
   public void close() {}
}

Example

1
2
CREATE FUNCTION udf_test AS 'com.xxx.udf.UdfScalarFunction';
INSERT INTO sink_stream select udf_test(attr) FROM source_stream;

UDTF

The UDTF must inherit the TableFunction function and implement the eval method. The open and close functions are optional. If the UDTF needs to return multiple columns, you only need to declare the returned value as Tuple or Row. If Row is used, you need to overload the getResultType method to declare the returned field type.

Example code

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UdfTableFunction extends TableFunction<Row> {
  private Logger log = LoggerFactory.getLogger(TableFunction.class);
  /**
   * (optional) Initialization
   * @param context
   */
  @Override
  public void open(FunctionContext context) {}
  public void eval(String str, String split) {
    for (String s : str.split(split)) {
      Row row = new Row(2);
      row.setField(0, s);
      row.setField(1, s.length());
      collect(row);
    }
  }
  /**
   * Declare the type returned by the function
   * @return
   */
  @Override
  public TypeInformation<Row> getResultType() {
  return Types.ROW(Types.STRING, Types.INT);
  }
  /**
    * Optional
   */
  @Override
  public void close() {}
 }

Example

The UDTF supports CROSS JOIN and LEFT JOIN. When the UDTF is used, the LATERAL and TABLE keywords must be included.

  • CROSS JOIN: does not output the data of a row in the left table if the UDTF does not output the result for the data of the row.
  • LEFT JOIN: outputs the data of a row in the left table even if the UDTF does not output the result for the data of the row, but pads null with UDTF-related fields.
1
2
3
4
5
6
7
CREATE FUNCTION udtf_test AS 'com.xxx.udf.TableFunction';
// CROSS JOIN
INSERT INTO sink_stream select subValue, length FROM source_stream, LATERAL
TABLE(udtf_test(attr, ',')) as T(subValue, length);
// LEFT JOIN
INSERT INTO sink_stream select subValue, length FROM source_stream LEFT JOIN LATERAL
TABLE(udtf_test(attr, ',')) as T(subValue, length) ON TRUE;

UDAF

The UDAF must inherit the AggregateFunction function. You need to create an accumulator for storing the computing result, for example, WeightedAvgAccum in the following example code.

Example code

public class WeightedAvgAccum {
public long sum = 0;
public int count = 0;
}

import org.apache.flink.table.functions.AggregateFunction;
import java.util.Iterator;
/**
* The first type variable is the type returned by the aggregation function, and the second type variable is of the Accumulator type.
* Weighted Average user-defined aggregate function.
*/
public class UdfAggFunction extends AggregateFunction<Long, WeightedAvgAccum> {
// Initialize the accumulator.
  @Override
  public WeightedAvgAccum createAccumulator() {
    return new WeightedAvgAccum();
  }
// Return the intermediate computing value stored in the accumulator.
  @Override
  public Long getValue(WeightedAvgAccum acc) {
    if (acc.count == 0) {
       return null;
    } else {
      return acc.sum / acc.count;
 }
}
// Update the intermediate computing value according to the input.
public void accumulate(WeightedAvgAccum acc, long iValue) {
acc.sum += iValue;
acc.count += 1;
}
// Perform the retraction operation, which is opposite to the accumulate operation.
public void retract(WeightedAvgAccum acc, long iValue) {
acc.sum -= iValue;
acc.count -= 1;
}
// Combine multiple accumulator values.
public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
Iterator<WeightedAvgAccum> iter = it.iterator();
while (iter.hasNext()) {
WeightedAvgAccum a = iter.next();
acc.count += a.count;
acc.sum += a.sum;
}
}
// Reset the intermediate computing value.
public void resetAccumulator(WeightedAvgAccum acc) {
acc.count = 0;
acc.sum = 0L;
}
}

Example

1
2
CREATE FUNCTION udaf_test AS 'com.xxx.udf.UdfAggFunction';
INSERT INTO sink_stream SELECT udaf_test(attr2) FROM source_stream GROUP BY attr1;