User-Defined Functions
Overview
DLI supports the following three types of user-defined functions (UDFs):
- Regular UDF: takes in one or more input parameters and returns a single result.
- User-defined table-generating function (UDTF): takes in one or more input parameters and returns multiple rows or columns.
- User-defined aggregate function (UDAF): aggregates multiple records into one value.
UDFs can only be used in dedicated queues.
POM Dependency
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>1.7.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.7.2</version> <scope>provided</scope> </dependency>
Precautions
- Currently, Python is not supported for programming UDFs, UDTFs, and UDAFs.
- If you use IntelliJ IDEA to debug the created UDF, select include dependencies with "Provided" scope. Otherwise, the dependency packages in the POM file cannot be loaded for local debugging.
The following uses IntelliJ IDEA 2020.2 as an example:
- On the IntelliJ IDEA page, select the configuration file you need to debug and click Edit Configurations.
- On the Run/Debug Configurations page, select include dependencies with "Provided" scope.
- Click OK.
- On the IntelliJ IDEA page, select the configuration file you need to debug and click Edit Configurations.
Using UDFs
- Write the code of custom functions. For details about the code examples, see UDF, UDTF, or UDAF.
- Compile the UDF code, pack it into a JAR package, and upload the package to OBS.
- In the left navigation pane of the DLI management console, click Edit in the Operation column to switch to the page where you can edit the job. > . Locate the row where the target resides and click
- On the Running Parameters tab page, select an exclusive queue for Queue. The UDF Jar parameter is displayed. Select the JAR file stored on OBS and click Save.
Before selecting a user-defined function JAR package, upload the JAR package to the created OBS bucket.
After the JAR package is selected, add the UDF statement to the SQL statement.
UDF
The regular UDF must inherit the ScalarFunction function and implement the eval method. The open and close functions are optional.
Example code
import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.ScalarFunction; public class UdfScalarFunction extends ScalarFunction { private int factor = 12; public UdfScalarFunction() { this.factor = 12; } /** * (optional) Initialization * @param context */ @Override public void open(FunctionContext context) {} /** * Custom logic * @param s * @return */ public int eval(String s) { return s.hashCode() * factor; } /** * Optional */ @Override public void close() {} }
Example
1 2 |
CREATE FUNCTION udf_test AS 'com.xxx.udf.UdfScalarFunction'; INSERT INTO sink_stream select udf_test(attr) FROM source_stream; |
UDTF
The UDTF must inherit the TableFunction function and implement the eval method. The open and close functions are optional. If the UDTF needs to return multiple columns, you only need to declare the returned value as Tuple or Row. If Row is used, you need to overload the getResultType method to declare the returned field type.
Example code
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class UdfTableFunction extends TableFunction<Row> { private Logger log = LoggerFactory.getLogger(TableFunction.class); /** * (optional) Initialization * @param context */ @Override public void open(FunctionContext context) {} public void eval(String str, String split) { for (String s : str.split(split)) { Row row = new Row(2); row.setField(0, s); row.setField(1, s.length()); collect(row); } } /** * Declare the type returned by the function * @return */ @Override public TypeInformation<Row> getResultType() { return Types.ROW(Types.STRING, Types.INT); } /** * Optional */ @Override public void close() {} }
Example
The UDTF supports CROSS JOIN and LEFT JOIN. When the UDTF is used, the LATERAL and TABLE keywords must be included.
- CROSS JOIN: does not output the data of a row in the left table if the UDTF does not output the result for the data of the row.
- LEFT JOIN: outputs the data of a row in the left table even if the UDTF does not output the result for the data of the row, but pads null with UDTF-related fields.
1 2 3 4 5 6 7 |
CREATE FUNCTION udtf_test AS 'com.xxx.udf.TableFunction'; // CROSS JOIN INSERT INTO sink_stream select subValue, length FROM source_stream, LATERAL TABLE(udtf_test(attr, ',')) as T(subValue, length); // LEFT JOIN INSERT INTO sink_stream select subValue, length FROM source_stream LEFT JOIN LATERAL TABLE(udtf_test(attr, ',')) as T(subValue, length) ON TRUE; |
UDAF
The UDAF must inherit the AggregateFunction function. You need to create an accumulator for storing the computing result, for example, WeightedAvgAccum in the following example code.
Example code
public class WeightedAvgAccum { public long sum = 0; public int count = 0; }
import org.apache.flink.table.functions.AggregateFunction; import java.util.Iterator; /** * The first type variable is the type returned by the aggregation function, and the second type variable is of the Accumulator type. * Weighted Average user-defined aggregate function. */ public class UdfAggFunction extends AggregateFunction<Long, WeightedAvgAccum> { // Initialize the accumulator. @Override public WeightedAvgAccum createAccumulator() { return new WeightedAvgAccum(); } // Return the intermediate computing value stored in the accumulator. @Override public Long getValue(WeightedAvgAccum acc) { if (acc.count == 0) { return null; } else { return acc.sum / acc.count; } } // Update the intermediate computing value according to the input. public void accumulate(WeightedAvgAccum acc, long iValue) { acc.sum += iValue; acc.count += 1; } // Perform the retraction operation, which is opposite to the accumulate operation. public void retract(WeightedAvgAccum acc, long iValue) { acc.sum -= iValue; acc.count -= 1; } // Combine multiple accumulator values. public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) { Iterator<WeightedAvgAccum> iter = it.iterator(); while (iter.hasNext()) { WeightedAvgAccum a = iter.next(); acc.count += a.count; acc.sum += a.sum; } } // Reset the intermediate computing value. public void resetAccumulator(WeightedAvgAccum acc) { acc.count = 0; acc.sum = 0L; } }
Example
1 2 |
CREATE FUNCTION udaf_test AS 'com.xxx.udf.UdfAggFunction'; INSERT INTO sink_stream SELECT udaf_test(attr2) FROM source_stream GROUP BY attr1; |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot