Using UDFs in FlinkServer Jobs
This section applies to MRS 3.1.2 or later.
You can customize functions to extend SQL statements to meet personalized requirements. These functions are called user-defined functions (UDFs). You can upload and manage UDF JAR files on the Flink web UI and call UDFs when running jobs.
Flink supports the following three types of UDFs, as described in Table 1.
Type |
Description |
---|---|
User-defined Scalar function (UDF) |
Supports one or more input parameters and returns a single result value. For details, see UDF Java and SQL Examples. |
User-defined aggregation function (UDAF) |
Aggregates multiple records into one value. For details, see UDAF Java and SQL Examples. |
User-defined table-valued function (UDTF) |
Supports one or more input parameters and returns multiple rows or columns. For details, see UDTF Java and SQL Examples. |
Prerequisites
You have prepared a UDF JAR file whose size does not exceed 200 MB.
Uploading a UDF
- Access the Flink web UI. For details, see Accessing the FlinkServer Web UI.
- Click UDF Management. The UDF Management page is displayed.
- Click Add UDF. Select and upload the prepared UDF JAR file for Local .jar File.
- Enter the UDF name and description and click OK.
- A maximum of 10 UDF names can be added. Name can be customized. Type must correspond to the UDF fully qualified class name in the uploaded UDF JAR file.
- After the UDF JAR file is uploaded, the server retains the file for 5 minutes by default. If you click OK within 5 minutes, the UDF creation is complete. If you click OK after 5 minutes, the UDF creation fails and an error message is displayed, indicating that the local UDF file path is incorrect.
- In the UDF list, you can view information about all UDFs in the current application. You can edit or delete UDF information in the Operation column. (Only unused UDF items can be deleted.)
- (Optional) If you need to run or develop a job immediately, configure the job on the Job Management page. For details, see Creating a FlinkServer Job.
UDF Java and SQL Examples
- UDF Java example
package com.xxx.udf; import org.apache.flink.table.functions.ScalarFunction; public class UdfClass_UDF extends ScalarFunction { public int eval(String s) { return s.length(); } }
- UDF SQL example
CREATE TEMPORARY FUNCTION udf as 'com.xxx.udf.UdfClass_UDF'; CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1'); CREATE TABLE udfSink (a VARCHAR,b int) WITH ('connector' = 'print'); INSERT INTO udfSink SELECT a, udf(a) FROM udfSource;
UDAF Java and SQL Examples
- UDAF Java example
package com.xxx.udf; import org.apache.flink.table.functions.AggregateFunction; public class UdfClass_UDAF { public static class AverageAccumulator { public int sum; } public static class Average extends AggregateFunction<Integer, AverageAccumulator> { public void accumulate(AverageAccumulator acc, Integer value) { acc.sum += value; } @Override public Integer getValue(AverageAccumulator acc) { return acc.sum; } @Override public AverageAccumulator createAccumulator() { return new AverageAccumulator(); } } }
- UDAF SQL example
CREATE TEMPORARY FUNCTION udaf as 'com.xxx.udf.UdfClass_UDAF$Average'; CREATE TABLE udfSource (a int) WITH ('connector' = 'datagen','rows-per-second'='1','fields.a.min'='1','fields.a.max'='3'); CREATE TABLE udfSink (b int,c int) WITH ('connector' = 'print'); INSERT INTO udfSink SELECT a, udaf(a) FROM udfSource group by a;
UDTF Java and SQL Examples
- UDTF Java example
package com.xxx.udf; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.functions.TableFunction; public class UdfClass_UDTF extends TableFunction<Tuple2<String, Integer>> { public void eval(String str) { Tuple2<String, Integer> tuple2 = Tuple2.of(str, str.length()); collect(tuple2); } }
- UDTF SQL example
CREATE TEMPORARY FUNCTION udtf as 'com.xxx.udf.UdfClass_UDTF'; CREATE TABLE udfSource (a VARCHAR) WITH ('connector' = 'datagen','rows-per-second'='1'); CREATE TABLE udfSink (b VARCHAR,c int) WITH ('connector' = 'print'); INSERT INTO udfSink SELECT str, strLength FROM udfSource,lateral table(udtf(udfSource.a)) as T(str,strLength);
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot