User-Defined Functions
Overview
CS supports the following three types of user-defined functions (UDFs):
- Regular UDF: takes in one or more input parameters and returns a single result.
- User-defined table-generating function (UDTF): takes in one or more input parameters and returns multiple rows or columns.
- User-defined aggregate function (UDAF): aggregates multiple records into one value.
UDFs can only be used in exclusive clusters.
POM Dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.5.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.0</version>
<scope>provided</scope>
</dependency> Using UDFs
- Compress the prepared UDFs into a JAR package and upload the package to OBS.
- In the left navigation pane of the CS management console, click Job Management. Locate the row where the target resides and click Edit in the Operation column to switch to the page where you can edit the job.
- On the Running Parameters page, select the JAR file on OBS for JAR File to Be Inserted and click Save.
After the JAR package is selected, add the UDF statement to the SQL statement.
CREATE FUNCTION udf_test AS 'com.huawei.udf.UdfScalarFunction';
UDF
The regular UDF must inherit the ScalarFunction function and implement the eval method. The open and close functions are optional.
Example code
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public class UdfScalarFunction extends ScalarFunction {
private int factor = 12;
public UdfScalarFunction() {
this.factor = 12;
}
/**
* (optional) Initialization
* @param context
*/
@Override
public void open(FunctionContext context) {}
/**
* Custom logic
* @param s
* @return
*/
public int eval(String s) {
return s.hashCode() * factor;
}
/**
* Optional
*/
@Override
public void close() {}
} Example
CREATE FUNCTION udf_test AS 'com.huawei.udf.UdfScalarFunction'; INSERT INTO sink_stream select udf_test(attr) FROM source_stream;
UDTF
The UDTF must inherit the TableFunction function and implement the eval method. The open and close functions are optional. If the UDTF needs to return multiple columns, you only need to declare the returned value as Tuple or Row. If Row is used, you need to overload the getResultType method to declare the returned field type.
Example code
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UdfTableFunction extends TableFunction<Row> {
private Logger log = LoggerFactory.getLogger(TableFunction.class);
/**
* (optional) Initialization
* @param context
*/
@Override
public void open(FunctionContext context) {}
public void eval(String str, String split) {
for (String s : str.split(split)) {
Row row = new Row(2);
row.setField(0, s);
row.setField(1, s.length());
collect(row);
}
}
/**
* Declare the type returned by the function
* @return
*/
@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING, Types.INT);
}
/**
* Optional
*/
@Override
public void close() {}
} Example
The UDTF supports CROSS JOIN and LEFT JOIN. When the UDTF is used, the LATERAL and TABLE keywords must be included.
- CROSS JOIN: does not output the data of a row in the left table if the UDTF does not output the result for the data of the row.
- LEFT JOIN: outputs the data of a row in the left table even if the UDTF does not output the result for the data of the row, but pads null with UDTF-related fields.
CREATE FUNCTION udtf_test AS 'com.huawei.udf.TableFunction'; // CROSS JOIN INSERT INTO sink_stream select subValue, length FROM source_stream, LATERAL TABLE(udtf_test(attr, ',')) as T(subValue, length); // LEFT JOIN INSERT INTO sink_stream select subValue, length FROM source_stream LEFT JOIN LATERAL TABLE(udtf_test(attr, ',')) as T(subValue, length) ON TRUE;
UDAF
The UDAF must inherit the AggregateFunction function. You need to create an accumulator for storing the computing result, for example, WeightedAvgAccum in the following example code.
Example code
public class WeightedAvgAccum {
public long sum = 0;
public int count = 0;
} import org.apache.flink.table.functions.AggregateFunction;
import java.util.Iterator;
/**
* The first type variable is the type returned by the aggregation function, and the second type variable is of the Accumulator type.
* Weighted Average user-defined aggregate function.
*/
public class UdfAggFunction extends AggregateFunction<Long, WeightedAvgAccum> {
// Initialize the accumulator.
@Override
public WeightedAvgAccum createAccumulator() {
return new WeightedAvgAccum();
}
// Return the intermediate computing value stored in the accumulator.
@Override
public Long getValue(WeightedAvgAccum acc) {
if (acc.count == 0) {
return null;
} else {
return acc.sum / acc.count;
}
}
// Update the intermediate computing value according to the input.
public void accumulate(WeightedAvgAccum acc, long iValue) {
acc.sum += iValue;
acc.count += 1;
}
// Perform the retraction operation, which is opposite to the accumulate operation.
public void retract(WeightedAvgAccum acc, long iValue) {
acc.sum -= iValue;
acc.count -= 1;
}
// Combine multiple accumulator values.
public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
Iterator<WeightedAvgAccum> iter = it.iterator();
while (iter.hasNext()) {
WeightedAvgAccum a = iter.next();
acc.count += a.count;
acc.sum += a.sum;
}
}
// Reset the intermediate computing value.
public void resetAccumulator(WeightedAvgAccum acc) {
acc.count = 0;
acc.sum = 0L;
}
} Example
CREATE FUNCTION udaf_test AS 'com.huawei.udf.UdfAggFunction'; INSERT INTO sink_stream SELECT udaf_test(attr2) FROM source_stream GROUP BY attr1;
Last Article: JOIN Between Stream Data and Table Data
Next Article: Configuring Time Models
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.