Reusing Flink UDFs
Scenario
The UDF reuse function is added to Flink SQL. When a UDF is executed for multiple times, only the first result is copied for the Nth (N > 1) execution. This ensures data consistency between multiple UDF executions and ensures that the UDF is executed only once, improving operator performance.
How to Use
When configuring a Flink job, you can set table.optimizer.function-reuse-enabled to true on the Flink job development page of the Flink server web UI to enable the UDF reuse function. For details, see Creating a Job.
Example
- UDF:
class ItemExist extends ScalarFunction { val items: mutable.Set[String] = mutable.Set[String]() def eval(item: String): Boolean = { val exist = items.contains(item); if (!exist) { items.add(item) } exist } }
- SQL statement:
SELECT * FROM ( SELECT `a`, IfExist(b) as `exist`, `c` FROM Table1 ) WHERE exist IS FALSE;
- Execution result:
- Return value when the UDF reuse function is disabled:
a,true,c
Because IfExist is executed once in the WHERE condition and the result is false, the data has been stored in the cache. When IfExist is executed again in SELECT, true is returned.
- Return value when the UDF reuse function is enabled:
a,false,c
- Return value when the UDF reuse function is disabled:
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot