- 最新动态
- 功能总览
- 服务公告
- 产品介绍
- 计费说明
- 快速入门
-
用户指南
- DLI作业开发流程
- 准备工作
- 创建弹性资源池和队列
- 创建数据库和表
- 数据迁移与数据传输
- 配置DLI访问其他云服务的委托权限
- 使用DLI提交SQL作业
- 使用DLI提交Flink作业
- 使用DLI提交Spark作业
- 使用Notebook实例提交DLI作业
- 使用CES监控DLI服务
- 使用AOM监控DLI服务
- 使用CTS审计DLI服务
- 权限管理
- DLI常用管理操作
- 最佳实践
-
开发指南
- 使用客户端工具连接DLI
- SQL作业开发指南
- Flink作业开发指南
- Spark Jar作业开发指南
-
语法参考
-
Spark SQL语法参考
- Spark SQL常用配置项说明
- Spark SQL语法概览
- Spark开源命令支持说明
- 数据库相关
- 表相关
- 数据相关
- 导出查询结果
- 跨源连接相关
- 视图相关
- 查看计划
- 数据权限相关
- 数据类型
- 自定义函数
-
内置函数
-
日期函数
- 日期函数概览
- add_months
- current_date
- current_timestamp
- date_add
- dateadd
- date_sub
- date_format
- datediff
- datediff1
- datepart
- datetrunc
- day/dayofmonth
- from_unixtime
- from_utc_timestamp
- getdate
- hour
- isdate
- last_day
- lastday
- minute
- month
- months_between
- next_day
- quarter
- second
- to_char
- to_date
- to_date1
- to_utc_timestamp
- trunc
- unix_timestamp
- weekday
- weekofyear
- year
-
字符串函数
- 字符串函数概览
- ascii
- concat
- concat_ws
- char_matchcount
- encode
- find_in_set
- get_json_object
- instr
- instr1
- initcap
- keyvalue
- length
- lengthb
- levenshtein
- locate
- lower/lcase
- lpad
- ltrim
- parse_url
- printf
- regexp_count
- regexp_extract
- replace
- regexp_replace
- regexp_replace1
- regexp_instr
- regexp_substr
- repeat
- reverse
- rpad
- rtrim
- soundex
- space
- substr/substring
- substring_index
- split_part
- translate
- trim
- upper/ucase
- 数学函数
- 聚合函数
- 分析窗口函数
- 其他函数
-
日期函数
- SELECT
-
标示符
- aggregate_func
- alias
- attr_expr
- attr_expr_list
- attrs_value_set_expr
- boolean_expression
- class_name
- col
- col_comment
- col_name
- col_name_list
- condition
- condition_list
- cte_name
- data_type
- db_comment
- db_name
- else_result_expression
- file_format
- file_path
- function_name
- groupby_expression
- having_condition
- hdfs_path
- input_expression
- input_format_classname
- jar_path
- join_condition
- non_equi_join_condition
- number
- num_buckets
- output_format_classname
- partition_col_name
- partition_col_value
- partition_specs
- property_name
- property_value
- regex_expression
- result_expression
- row_format
- select_statement
- separator
- serde_name
- sql_containing_cte_name
- sub_query
- table_comment
- table_name
- table_properties
- table_reference
- view_name
- view_properties
- when_expression
- where_condition
- window_function
- 运算符
-
Flink SQL语法参考
- Flink Opensource SQL1.15语法参考
- Flink Opensource SQL1.12语法参考
- Flink Opensource SQL1.10语法参考
-
HetuEngine SQL语法参考
-
HetuEngine SQL语法
- 使用前必读
- 数据类型
-
DDL 语法
- CREATE SCHEMA
- CREATE TABLE
- CREATE TABLE AS
- CREATE TABLE LIKE
- CREATE VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER SCHEMA
- DROP SCHEMA
- DROP TABLE
- DROP VIEW
- TRUNCATE TABLE
- COMMENT
- VALUES
- SHOW语法使用概要
- SHOW SCHEMAS(DATABASES)
- SHOW TABLES
- SHOW TBLPROPERTIES TABLE|VIEW
- SHOW TABLE/PARTITION EXTENDED
- SHOW FUNCTIONS
- SHOW PARTITIONS
- SHOW COLUMNS
- SHOW CREATE TABLE
- SHOW VIEWS
- SHOW CREATE VIEW
- DML 语法
- DQL 语法
- 辅助命令语法
- 预留关键字
- SQL函数和操作符
- 数据类型隐式转换
- 附录
-
HetuEngine SQL语法
- Hudi SQL语法参考
- Delta SQL语法参考
-
Spark SQL语法参考
-
API参考
- API使用前必读
- API概览
- 如何调用API
- API快速入门
- 权限相关API
- 全局变量相关API
- 资源标签相关API
- 增强型跨源连接相关API
- 跨源认证相关API
- 弹性资源池相关API
- 队列相关API(推荐)
- SQL作业相关API
- SQL模板相关API
- Flink作业相关API
- Flink作业模板相关API
- Flink作业管理相关API
- Spark作业相关API
- Spark作业模板相关API
- 权限策略和授权项
- 历史API
- 公共参数
- SDK参考
- 场景代码示例
-
常见问题
- DLI产品咨询类
- DLI弹性资源池和队列类
-
DLI数据库和表类
- 为什么在DLI控制台中查询不到表?
- OBS表压缩率较高怎么办?
- 字符码不一致导致数据乱码怎么办?
- 删除表后再重新创建同名的表,需要对操作该表的用户和项目重新赋权吗?
- DLI分区内表导入的文件不包含分区列的数据,导致数据导入完成后查询表数据失败怎么办?
- 创建OBS外表,由于OBS文件中的某字段存在换行符导致表字段数据错误怎么办?
- join表时没有添加on条件,造成笛卡尔积查询,导致队列资源爆满,作业运行失败怎么办?
- 手动在OBS表的分区目录下添加了数据,但是无法查询到数据怎么办?
- 为什么insert overwrite覆盖分区表数据的时候,覆盖了全量数据?
- 跨源连接RDS表中create_date字段类型是datetime,为什么DLI中查出来的是时间戳呢?
- SQL作业执行完成后,修改表名导致datasize不正确怎么办?
- 从DLI导入数据到OBS,数据量不一致怎么办?
-
增强型跨源连接类
- 增强型跨源连接绑定队列失败怎么办?
- DLI增强型跨源连接DWS失败怎么办?
- 创建跨源成功但测试网络连通性失败怎么办?
- 怎样配置DLI队列与数据源的网络连通?
- 为什么DLI增强型跨源连接要创建对等连接?
- DLI创建跨源连接,绑定队列一直在创建中怎么办?
- 新建跨源连接,显示已激活,但使用时提示communication link failure错误怎么办?
- 跨源访问MRS HBase,连接超时,日志未打印错误怎么办?
- DLI跨源连接报错找不到子网怎么办?
- 跨源RDS表,执行insert overwrite提示Incorrect string value错误怎么办?
- 创建RDS跨源表提示空指针错误怎么办?
- 对跨源DWS表执行insert overwrite操作,报错:org.postgresql.util.PSQLException: ERROR: tuple concurrently updated
- 通过跨源表向CloudTable Hbase表导入数据,executor报错:RegionTooBusyException
- 通过DLI跨源写DWS表,非空字段出现空值异常怎么办?
- 更新跨源目的端源表后,未同时更新对应跨源表,导致insert作业失败怎么办?
- RDS表有自增主键时怎样在DLI插入数据?
-
SQL作业类
- SQL作业开发类
-
SQL作业运维类
- 用户导表到OBS报“path obs://xxx already exists”错误
- 对两个表进行join操作时,提示:SQL_ANALYSIS_ERROR: Reference 't.id' is ambiguous, could be: t.id, t.id.;
- 执行查询语句报错:The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget.
- 执行查询语句报错:There should be at least one partition pruning predicate on partitioned table XX.YYY
- LOAD数据到OBS外表报错:IllegalArgumentException: Buffer size too small. size
- SQL作业运行报错:DLI.0002 FileNotFoundException
- 用户通过CTAS创建hive表报schema解析异常错误
- 在DataArts Studio上运行DLI SQL脚本,执行结果报org.apache.hadoop.fs.obs.OBSIOException错误
- 使用CDM迁移数据到DLI,迁移作业日志上报UQUERY_CONNECTOR_0001:Invoke DLI service api failed错误
- SQL作业访问报错:File not Found
- SQL作业访问报错:DLI.0003: AccessControlException XXX
- SQL作业访问外表报错:DLI.0001: org.apache.hadoop.security.AccessControlException: verifyBucketExists on {{桶名}}: status [403]
- 执行SQL语句报错:The current account does not have permission to perform this operation,the current account was restricted. Restricted for no budget.
-
Flink作业类
- Flink作业咨询类
-
Flink SQL作业类
- 怎样将OBS表映射为DLI的分区表?
- Flink SQL作业Kafka分区数增加或减少,怎样不停止Flink作业实现动态感知?
- 在Flink SQL作业中创建表使用EL表达式,作业运行提示DLI.0005错误怎么办?
- Flink作业输出流写入数据到OBS,通过该OBS文件路径创建的DLI表查询无数据
- Flink SQL作业运行失败,日志中有connect to DIS failed java.lang.IllegalArgumentException: Access key cannot be null错误
- Flink SQL作业消费Kafka后sink到es集群,作业执行成功,但未写入数据
- Flink Opensource SQL如何解析复杂嵌套 JSON?
- Flink Opensource SQL从RDS数据库读取的时间和RDS数据库存储的时间为什么会不一致?
- Flink Opensource SQL Elasticsearch结果表failure-handler参数填写retry_rejected导致提交失败
- Kafka Sink配置发送失败重试机制
- 如何在一个Flink作业中将数据写入到不同的Elasticsearch集群中?
- 作业语义检验时提示DIS通道不存在怎么处理?
- Flink jobmanager日志一直报Timeout expired while fetching topic metadata怎么办?
- Flink Jar作业类
- Flink作业性能调优类
-
Spark作业相类
- Spark作业开发类
-
Spark作业运维类
- 运行Spark作业报java.lang.AbstractMethodError
- Spark作业访问OBS数据时报ResponseCode: 403和ResponseStatus: Forbidden错误
- 有访问OBS对应的桶的权限,但是Spark作业访问时报错 verifyBucketExists on XXXX: status [403]
- Spark作业运行大批量数据时上报作业运行超时异常错误
- 使用Spark作业访问sftp中的文件,作业运行失败,日志显示访问目录异常
- 执行作业的用户数据库和表权限不足导致作业运行失败
- 为什么Spark3.x的作业日志中打印找不到global_temp数据库
- 在使用Spark2.3.x访问元数据时,DataSource语法创建avro类型的OBS表创建失败
- DLI资源配额类
- DLI权限管理类
- DLI API类
- 视频帮助
- 文档下载
- 通用参考
展开导读
链接复制成功!
窗口聚合
窗口表值函数(TVF)聚合
窗口聚合是通过GROUP BY子句定义的,其特征是包含窗口表值函数产生的 “window_start” 和 “window_end” 列。和普通的 GROUP BY 子句一样,窗口聚合对于每个组会计算出一行数据。和其他连续表上的聚合不同,窗口聚合不产生中间结果,只在窗口结束产生一个总的聚合结果,另外,窗口聚合会清除不需要的中间状态。
更多介绍和使用请参考开源社区文档:窗口聚合。
分组窗口的开始和结束时间戳可以通过 window_start 和 window_end 来选定。
- 窗口表值函数
Flink 支持在 TUMBLE, HOP 和 CUMULATE 上进行窗口聚合。
- 在流模式下,窗口表值函数的时间属性字段必须是事件时间或处理时间。关于窗口函数更多信息,参见 窗口表值函数(Windowing TVFs)。
- 在批模式下,窗口表值函数的时间属性字段必须是 TIMESTAMP 或 TIMESTAMP_LTZ 类型的。
-- tables must have time attribute, e.g. `bidtime` in this table Flink SQL> desc Bid; +-------------+------------------------+------+-----+--------+---------------------------------+ | name | type | null | key | extras | watermark | +-------------+------------------------+------+-----+--------+---------------------------------+ | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND | | price | DECIMAL(10, 2) | true | | | | | item | STRING | true | | | | | supplier_id | STRING | true | | | | +-------------+------------------------+------+-----+--------+---------------------------------+ Flink SQL> SELECT * FROM Bid; +------------------+-------+------+-------------+ | bidtime | price | item | supplier_id | +------------------+-------+------+-------------+ | 2020-04-15 08:05 | 4.00 | C | supplier1 | | 2020-04-15 08:07 | 2.00 | A | supplier1 | | 2020-04-15 08:09 | 5.00 | D | supplier2 | | 2020-04-15 08:11 | 3.00 | B | supplier2 | | 2020-04-15 08:13 | 1.00 | E | supplier1 | | 2020-04-15 08:17 | 6.00 | F | supplier2 | +------------------+-------+------+-------------+ -- tumbling window aggregation Flink SQL> SELECT window_start, window_end, SUM(price) FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; +------------------+------------------+-------+ | window_start | window_end | price | +------------------+------------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | +------------------+------------------+-------+ -- hopping window aggregation Flink SQL> SELECT window_start, window_end, SUM(price) FROM TABLE( HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; +------------------+------------------+-------+ | window_start | window_end | price | +------------------+------------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | | 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 | | 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 | +------------------+------------------+-------+ -- cumulative window aggregation Flink SQL> SELECT window_start, window_end, SUM(price) FROM TABLE( CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)) GROUP BY window_start, window_end; +------------------+------------------+-------+ | window_start | window_end | price | +------------------+------------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 | | 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 | | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 | | 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 | | 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 | | 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 | | 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
- GROUPING SETS
窗口聚合也支持 GROUPING SETS 语法。Grouping Sets 可以通过一个标准的 GROUP BY 语句来描述更复杂的分组操作。数据按每个指定的 Grouping Sets 分别分组,并像简单的 GROUP BY 子句一样为每个组进行聚合。
GROUPING SETS 窗口聚合中 GROUP BY 子句必须包含 window_start 和 window_end 列,但 GROUPING SETS 子句中不能包含这两个字段。
Flink SQL> SELECT window_start, window_end, supplier_id, SUM(price) as price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ()); +------------------+------------------+-------------+-------+ | window_start | window_end | supplier_id | price | +------------------+------------------+-------------+-------+ | 2020-04-15 08:00 | 2020-04-15 08:10 | (NULL) | 11.00 | | 2020-04-15 08:00 | 2020-04-15 08:10 | supplier2 | 5.00 | | 2020-04-15 08:00 | 2020-04-15 08:10 | supplier1 | 6.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | (NULL) | 10.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | supplier2 | 9.00 | | 2020-04-15 08:10 | 2020-04-15 08:20 | supplier1 | 1.00 | +------------------+------------------+-------------+-------+
GROUPING SETS 的每个子列表可以是空的,多列或表达式,它们的解释方式和直接使用 GROUP BY 子句是一样的。一个空的 Grouping Sets 表示所有行都聚合在一个分组下,即使没有数据,也会输出结果。
对于 Grouping Sets 中的空子列表,结果数据中的分组或表达式列会用NULL代替。例如,上例中的 GROUPING SETS ((supplier_id), ()) 里的 () 就是空子列表,与其对应的结果数据中的 supplier_id 列使用 NULL 填充。
- ROLLUP
ROLLUP 是一种特定通用类型 Grouping Sets 的简写。代表着指定表达式和所有前缀的列表,包括空列表。
例如:ROLLUP (one,two) 等效于 GROUPING SET((one,two),(one),()).
ROLLUP 窗口聚合中 GROUP BY 子句必须包含 window_start 和 window_end 列,但 ROLLUP 子句中不能包含这两个字段。
例如:下面这个查询和上个例子中的效果是一样的。
SELECT window_start, window_end, supplier_id, SUM(price) as price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, ROLLUP (supplier_id);
- CUBE
CUBE 是一种特定通用类型 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集。
CUBE 窗口聚合中 GROUP BY 子句必须包含 window_start 和 window_end 列,但 CUBE 子句中不能包含这两个字段。
例如:下面两个查询是等效的。
SELECT window_start, window_end, item, supplier_id, SUM(price) as price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, CUBE (supplier_id, item); SELECT window_start, window_end, item, supplier_id, SUM(price) as price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, GROUPING SETS ( (supplier_id, item), (supplier_id ), ( item), ( ) )
- 多级窗口聚合
window_start 和 window_end 列是普通的时间戳字段,并不是时间属性。因此它们不能在后续的操作中当做时间属性进行基于时间的操作。
为了传递时间属性,需要在 GROUP BY 子句中添加 window_time 列。window_time 是窗口表值函数(Windowing TVFs)产生的三列之一,它是窗口的时间属性。 window_time 添加到 GROUP BY 子句后就能被选定了。下面的查询可以把它用于后续基于时间的操作,比如:多级窗口聚合和Window TopN。
下面展示了一个多级窗口聚合:第一个窗口聚合后把时间属性传递给第二个窗口聚合。
-- tumbling 5 minutes for each supplier_id CREATE VIEW window1 AS -- Note: The window start and window end fields of inner Window TVF are optional in the select clause. However, if they appear in the clause, they need to be aliased to prevent name conflicting with the window start and window end of the outer Window TVF. SELECT window_start as window_5mintumble_start, window_end as window_5mintumble_end, window_time as rowtime, SUM(price) as partial_price FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY supplier_id, window_start, window_end, window_time; -- tumbling 10 minutes on the first window SELECT window_start, window_end, SUM(partial_price) as total_price FROM TABLE( TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end;