UDF概述
UDF(User Defined Function)即用户自定义函数。IoTDB提供多种内建函数及自定义函数来满足用户的计算需求。
UDF类型
IoTDB支持的UDF函数的类型如表1所示。
UDTF(User Defined Timeseries Generating Function)
编写一个UDTF需要继承“org.apache.iotdb.db.query.udf.api.UDTF”类,并至少实现“beforeStart”方法和一种“transform”方法。
表2是所有可供用户实现的接口说明。
接口定义 |
描述 |
是否必须 |
---|---|---|
void validate(UDFParameterValidator validator) throws Exception |
在初始化方法“beforeStart”调用前执行,用于检测“UDFParameters”中用户输入的参数是否合法。 |
否 |
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception |
初始化方法,在UDTF处理输入数据前,调用用户自定义的初始化行为。用户每执行一次UDTF查询,框架就会构造一个新的UDF类实例,该方法在每个UDF类实例被初始化时调用一次。在每一个UDF类实例的生命周期内,该方法只会被调用一次。 |
是 |
void transform(Row row, PointCollector collector) throws Exception |
该方法由框架调用。当在“beforeStart”中选择以“RowByRowAccessStrategy”的策略消费原始数据时,这个数据处理方法就会被调用。输入参数以“Row”的形式传入,输出结果通过“PointCollector”输出。需要在该方法内自行调用“collector”提供的数据收集方法,以决定最终的输出数据。 |
与“transform(RowWindow rowWindow, PointCollector collector) ”方法二选一 |
void transform(RowWindow rowWindow, PointCollector collector) throws Exception |
该方法由框架调用。当在“beforeStart”中选择以“SlidingSizeWindowAccessStrategy”或者“SlidingTimeWindowAccessStrategy”的策略消费原始数据时,这个数据处理方法就会被调用。输入参数以“RowWindow”的形式传入,输出结果通过“PointCollector”输出。需要在该方法内自行调用“collector”提供的数据收集方法,以决定最终的输出数据。 |
与“transform(Row row, PointCollector collector)”方法二选一 |
void terminate(PointCollector collector) throws Exception |
该方法由框架调用。该方法会在所有的“transform”调用执行完成后,在“beforeDestory”方法执行前被调用。在一个UDF查询过程中,该方法会且只会调用一次。需要在该方法内自行调用“collector”提供的数据收集方法,以决定最终的输出数据。 |
否 |
void beforeDestroy() |
UDTF的结束方法。此方法由框架调用,并且只会被调用一次,即在处理完最后一条记录之后被调用。 |
否 |
调用顺序:
- void validate(UDFParameterValidator validator) throws Exception
- void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception
- void transform(Row row, PointCollector collector) throws Exception或者void transform(RowWindow rowWindow, PointCollector collector) throws Exception
- void terminate(PointCollector collector) throws Exception
- void beforeDestroy()
框架每执行一次UDTF查询,都会构造一个全新的UDF类实例,查询结束时,对应的UDF类实例即被销毁,因此不同UDTF查询(即使是在同一个SQL语句中)UDF类实例内部的数据都是隔离的。用户可以放心地在UDTF中维护一些状态数据,无需考虑并发对UDF类实例内部状态数据的影响。
使用方法:
- void validate(UDFParameterValidator validator) throws Exception
在该方法中限制输入序列的数量和类型,检查用户输入的属性或者进行自定义逻辑的验证。
UDFParameters
UDFParameters的作用是解析SQL语句中的UDF参数(SQL中UDF函数名称后括号中的部分)。参数包括路径(及其序列类型)参数和字符串“key-value”对形式输入的属性参数。
例如:
SELECT UDF(s1, s2, 'key1'='iotdb', 'key2'='123.45') FROM root.sg.d;
用法:
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception { // parameters for (PartialPath path : parameters.getPaths()) { TSDataType dataType = parameters.getDataType(path); // do something } String stringValue = parameters.getString("key1"); // iotdb Float floatValue = parameters.getFloat("key2"); // 123.45 Double doubleValue = parameters.getDouble("key3"); // null int intValue = parameters.getIntOrDefault("key4", 678); // 678 // do something // configurations // ... }
UDTFConfigurations
使用“UDTFConfigurations”指定UDF访问原始数据时采取的策略和输出结果序列的类型。
用法:
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception { // parameters // ... // configurations configurations .setAccessStrategy(new RowByRowAccessStrategy()) .setOutputDataType(TSDataType.INT32); }
其中“setAccessStrategy”方法用于设定UDF访问原始数据时采取的策略,“setOutputDataType”用于设定输出结果序列的类型。
- setAccessStrategy
注意:在此处设定的原始数据访问策略决定了框架会调用哪一种“transform”方法,请实现与原始数据访问策略对应的“transform”方法。也可以根据“UDFParameters”解析出来的属性参数,动态决定设定哪一种策略,因此,实现两种“transform”方法也是被允许的。
可以设定的访问原始数据的策略:
接口定义
描述
调用transform方法
RowByRowAccessStrategy
逐行地处理原始数据。框架会为每一行原始数据输入调用一次“transform”方法。当UDF只有一个输入序列时,一行输入就是该输入序列中的一个数据点。当UDF有多个输入序列时,一行输入序列对应的是这些输入序列按时间对齐后的结果(一行数据中,可能存在某一列为null值,但不会全部都是null)。
void transform(Row row, PointCollector collector) throws Exception
SlidingTimeWindowAccessStrategy
以滑动时间窗口的方式处理原始数据。框架会为每一个原始数据输入窗口调用一次“transform”方法。一个窗口可能存在多行数据,每一行数据对应的是输入序列按时间对齐后的结果(一行数据中,可能存在某一列为null值,但不会全部都是null)。
void transform(RowWindow rowWindow, PointCollector collector) throws Exception
SlidingSizeWindowAccessStrategy
以固定行数的方式处理原始数据,即每个数据处理窗口都会包含固定行数的数据(最后一个窗口除外)。框架会为每一个原始数据输入窗口调用一次transform方法。一个窗口可能存在多行数据,每一行数据对应的是输入序列按时间对齐后的结果(一行数据中,可能存在某一列为null值,但不会全部都是null)。
void transform(RowWindow rowWindow, PointCollector collector) throws Exception
“RowByRowAccessStrategy”的构造不需要任何参数。
“SlidingTimeWindowAccessStrategy”有多种构造方法,可以向构造方法提供三类参数:- 时间轴显示时间窗开始和结束时间。
- 划分时间轴的时间间隔参数(必须为正数)。
- 滑动步长(不要求大于等于时间间隔,但是必须为正数)。
时间轴显示时间窗开始和结束时间不是必须要提供的。当不提供这类参数时,时间轴显示时间窗开始时间会被定义为整个查询结果集中最小的时间戳,时间轴显示时间窗结束时间会被定义为整个查询结果集中最大的时间戳。
滑动步长参数也不是必须的。当不提供滑动步长参数时,滑动步长会被设定为划分时间轴的时间间隔。
三类参数的关系可见下图:
注意,最后的一些时间窗口的实际时间间隔可能小于规定的时间间隔参数。另外,可能存在某些时间窗口内数据行数量为0的情况,这种情况框架也会为该窗口调用一次“transform”方法。
“SlidingSizeWindowAccessStrategy”有多种构造方法,用户可以向构造方法提供两个参数:- 窗口大小,即一个数据处理窗口包含的数据行数。注意,最后一些窗口的数据行数可能少于规定的数据行数。
- 滑动步长,即下一窗口第一个数据行与当前窗口第一个数据行间的数据行数(不要求大于等于窗口大小,但是必须为正数)。
滑动步长参数不是必须的。当不提供滑动步长参数时,滑动步长会被设定为窗口大小。
注意:在此处设定的输出结果序列的类型,决定了“transform”方法中“PointCollector”实际能够接收的数据类型。“setOutputDataType”中设定的输出类型和“PointCollector”实际能够接收的数据输出类型关系如下:
setOutputDataType中设定的输出类型
PointCollector实际能够接收的输出类型
INT32
int
INT64
long
FLOAT
float
DOUBLE
double
BOOLEAN
boolean
TEXT
“java.lang.String”和“org.apache.iotdb.tsfile.utils.Binary”
- UDTF输出序列的类型是运行时决定的。可以根据输入序列类型动态决定输出序列类型。
例如:
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception { // do something // ... configurations .setAccessStrategy(new RowByRowAccessStrategy()) .setOutputDataType(parameters.getDataType(0)); }
- void transform(Row row, PointCollector collector) throws Exception
当在“beforeStart”方法中指定UDF读取原始数据的策略为“RowByRowAccessStrategy”,就需要实现该方法,在该方法中增加对原始数据处理的逻辑。
该方法每次处理原始数据的一行。原始数据由“Row”读入,由“PointCollector”输出。可以选择在一次“transform”方法调用中输出任意数量的数据点。需要注意的是,输出数据点的类型必须与在“beforeStart”方法中设置的一致,而输出数据点的时间戳必须是严格单调递增的。
下面是一个实现了“void transform(Row row, PointCollector collector) throws Exception”方法的完整UDF示例。它是一个加法器,接收两列时间序列输入,当这两个数据点都不为“null”时,输出这两个数据点的代数和。
import org.apache.iotdb.db.query.udf.api.UDTF; import org.apache.iotdb.db.query.udf.api.access.Row; import org.apache.iotdb.db.query.udf.api.collector.PointCollector; import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations; import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; public class Adder implements UDTF { @Override public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) { configurations .setOutputDataType(TSDataType.INT64) .setAccessStrategy(new RowByRowAccessStrategy()); } @Override public void transform(Row row, PointCollector collector) throws Exception { if (row.isNull(0) || row.isNull(1)) { return; } collector.putLong(row.getTime(), row.getLong(0) + row.getLong(1)); } }
- void transform(RowWindow rowWindow, PointCollector collector) throws Exception
当在“beforeStart”方法中指定UDF读取原始数据的策略为“SlidingTimeWindowAccessStrategy”或者“SlidingSizeWindowAccessStrategy”时,就需要实现该方法,在该方法中增加对原始数据处理的逻辑。
该方法每次处理固定行数或者固定时间间隔内的一批数据,称包含这一批数据的容器为窗口。原始数据由“RowWindow”读入,由“PointCollector”输出。“RowWindow”能够帮助访问某一批次的“Row”,它提供了对这一批次的“Row”进行随机访问和迭代访问的接口。可以选择在一次“transform”方法调用中输出任意数量的数据点,需要注意的是,输出数据点的类型必须与在“beforeStart”方法中设置的一致,而输出数据点的时间戳必须是严格单调递增的。
下面是一个实现了“void transform(RowWindow rowWindow, PointCollector collector) throws Exception”方法的完整UDF示例。它是一个计数器,接收任意列数的时间序列输入,作用是统计并输出指定时间范围内每一个时间窗口中的数据行数。
import java.io.IOException; import org.apache.iotdb.db.query.udf.api.UDTF; import org.apache.iotdb.db.query.udf.api.access.RowWindow; import org.apache.iotdb.db.query.udf.api.collector.PointCollector; import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations; import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; public class Counter implements UDTF { @Override public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) { configurations .setOutputDataType(TSDataType.INT32) .setAccessStrategy(new SlidingTimeWindowAccessStrategy( parameters.getLong("time_interval"), parameters.getLong("sliding_step"), parameters.getLong("display_window_begin"), parameters.getLong("display_window_end"))); } @Override public void transform(RowWindow rowWindow, PointCollector collector) throws Exception { if (rowWindow.windowSize() != 0) { collector.putInt(rowWindow.getRow(0).getTime(), rowWindow.windowSize()); } } }
- void terminate(PointCollector collector) throws Exception
在一些场景下,UDF需要遍历完所有的原始数据后才能得到最后的输出结果。“terminate”接口为这类UDF提供了支持。
该方法会在所有的“transform”调用执行完成后,在“beforeDestory”方法执行前被调用。可以选择使用“transform”方法进行单纯的数据处理,最后使用“terminate”将处理结果输出。
结果需要由“PointCollector”输出。可以选择在一次“terminate”方法调用中输出任意数量的数据点。需要注意的是,输出数据点的类型必须与在“beforeStart”方法中设置的一致,而输出数据点的时间戳必须是严格单调递增的。
下面是一个实现了“void terminate(PointCollector collector) throws Exception”方法的完整UDF示例。它接收一个“INT32”类型的时间序列输入,作用是输出该序列的最大值点。
import java.io.IOException; import org.apache.iotdb.db.query.udf.api.UDTF; import org.apache.iotdb.db.query.udf.api.access.Row; import org.apache.iotdb.db.query.udf.api.collector.PointCollector; import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations; import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters; import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; public class Max implements UDTF { private Long time; private int value; @Override public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) { configurations .setOutputDataType(TSDataType.INT32) .setAccessStrategy(new RowByRowAccessStrategy()); } @Override public void transform(Row row, PointCollector collector) { int candidateValue = row.getInt(0); if (time == null || value < candidateValue) { time = row.getTime(); value = candidateValue; } } @Override public void terminate(PointCollector collector) throws IOException { if (time != null) { collector.putInt(time, value); } } }
- void transform(Row row, PointCollector collector) throws Exception