Flink SQL语法增强
本章节适用于MRS 3.3.0及以后版本。
FlinkSQL DISTRIBUTEBY
FlinkSQL新增DISTRIBUTEBY特性,根据指定的字段进行分区,支持单字段及多字段,解决数据仅需要分区的场景。示例如下:
SELECT /*+ DISTRIBUTEBY('id') */ id, name FROM t1;
SELECT /*+ DISTRIBUTEBY('id', 'name') */ id, name FROM t1;
SELECT /*+ DISTRIBUTEBY('id1') */ id as id1, name FROM t1;
FlinkSQL窗口函数支持迟到数据
FlinkSQL新增窗口函数支持迟到数据特性,解决迟到数据需要处理的场景。目前支持TUMBLE、HOP、OVER、CUMULATE窗口函数的迟到数据,示例如下:
CREATE TABLE T1 (
`int` INT,
`double` DOUBLE,
`float` FLOAT,
`bigdec` DECIMAL(10, 2),
`string` STRING,
`name` STRING,
`rowtime` TIMESTAMP(3),
WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND
) WITH (
'connector' = 'values',
);
-- 该Sink的字段必须和窗口的输入数据保持一致,但顺序不要求一致
CREATE TABLE LD_SINK(
`float` FLOAT, `string` STRING, `name` STRING, `rowtime` TIMESTAMP(3)
) WITH (
'connector' = 'print',
);
SELECT /*+ LATE_DATA_SINK('sink.name'='LD_SINK') */
`name`,
MIN(`float`),
COUNT(DISTINCT `string`)
FROM TABLE(
TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))
GROUP BY `name`, window_start, window_end
该特性还支持窗口接收到迟到数据时输出当前窗口的开始时间和结束时间,可通过添加在Hint中'window.start.field'和'window.end.field'使用,字段类型必须是timestamp,示例如下:
CREATE TABLE LD_SINK(
`float` FLOAT, `string` STRING, `name` STRING, `rowtime` TIMESTAMP(3), `windowStart` TIMESTAMP(3), `windowEnd` TIMESTAMP(3)
) WITH (
'connector' = 'print',
);
SELECT /*+ LATE_DATA_SINK('sink.name'='LD_SINK', 'window.start.field'='windowStart', 'window.end.field'='windowEnd') */
`name`,
MIN(`float`),
COUNT(DISTINCT `string`)
FROM TABLE(
TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))
GROUP BY `name`, window_start, window_end
FlinkSQL支持设置Source的并发
本章节适用于MRS 3.3.0及以后版本。
FlinkSQL支持通过使用参数“source.parallelism”设置Source算子的并发数,解决下游算子的并发数引起的一些问题,例如下游算子发送数据倾斜、背压、作业性能慢等问题。
该特性会将Source和下游算子的Forward分区改为Rebalance分区,所以当Source算子的并发数和下游算子的并发数(parallelism数)不一致时,且作业不允许数据乱序,需要在启用该特性的同时开启DISTRIBUTEBY特性,可参考Flink SQL语法增强。
如设置Source并发数为“2”并开启DISTRIBUTEBY特性:
CREATE TABLE KafkaSource (
`user_id` VARCHAR,
`user_name` VARCHAR,
`age` INT
) WITH (
'connector' = 'kafka',
'topic' = 'test_source',
'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.系统域名',
-- 设置Source并发数
'source.parallelism' = '2'
);
CREATE TABLE KafkaSink(
`user_id` VARCHAR,
`user_name` VARCHAR,
`age` INT
) WITH (
'connector' = 'kafka',
'topic' = 'test_sink',
'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
'value.format' = 'csv',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.系统域名'
);
-- Insert into KafkaSink select user_id, user_name, age from KafkaSource;(未开启DISTRIBUTEBY特性)
-- 开启DISTRIBUTEBY特性
Insert into KafkaSink select/*+ DISTRIBUTEBY('user_id') */ user_id, user_name, age from KafkaSource;
- Kafka Broker实例IP地址及端口号说明:
- 服务的实例IP地址可通过登录FusionInsight Manager后,单击“集群 > 服务 > Kafka > 实例”,在实例列表页面中查询。
- 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
- 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:
登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。