Configuring Time Models
Flink provides two time models: processing time and event time.
DLI allows you to specify the time model during creation of the source stream and temporary stream.
Configuring Processing Time
Processing time refers to the system time, which is irrelevant to the data timestamp.
Syntax
1 2 3 4 |
CREATE SOURCE STREAM stream_name(...) WITH (...)
TIMESTAMP BY proctime.proctime;
CREATE TEMP STREAM stream_name(...)
TIMESTAMP BY proctime.proctime;
|
Description
To set the processing time, you only need to add proctime.proctime following TIMESTAMP BY. You can directly use the proctime field later.
Precautions
None
Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
CREATE SOURCE STREAM student_scores (
student_number STRING, /* Student ID */
student_name STRING, /* Name */
subject STRING, /* Subject */
score INT /* Score */
)
WITH (
type = "dis",
region = "",
channel = "dliinput",
partition_count = "1",
encode = "csv",
field_delimiter=","
)TIMESTAMP BY proctime.proctime;
INSERT INTO score_greate_90
SELECT student_name, sum(score) over (order by proctime RANGE UNBOUNDED PRECEDING)
FROM student_scores;
|
Configuring Event Time
Event Time refers to the time when an event is generated, that is, the timestamp generated during data generation.
Syntax
1 2 3 |
CREATE SOURCE STREAM stream_name(...) WITH (...)
TIMESTAMP BY {attr_name}.rowtime
SET WATERMARK (RANGE {time_interval} | ROWS {literal}, {time_interval});
|
Description
To set the event time, you need to select a certain attribute in the stream as the timestamp and set the watermark policy.
Out-of-order events or late events may occur due to network faults. The watermark must be configured to trigger the window for calculation after waiting for a certain period of time. Watermarks are mainly used to process out-of-order data before generated events are sent to DLI during stream processing.
The following two watermark policies are available:
- By time interval
1
SET WATERMARK(range interval {time_unit}, interval {time_unit})
- By event quantity
1
SET WATERMARK(rows literal, interval {time_unit})
Parameters are separated by commas (,). The first parameter indicates the watermark sending interval and the second indicates the maximum event delay.
Precautions
None
Example
- Send a watermark every 10s the time2 event is generated. The maximum event latency is 20s.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
CREATE SOURCE STREAM student_scores ( student_number STRING, /* Student ID */ student_name STRING, /* Name */ subject STRING, /* Subject */ score INT, /* Score */ time2 TIMESTAMP ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," ) TIMESTAMP BY time2.rowtime SET WATERMARK (RANGE interval 10 second, interval 20 second); INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) FROM student_scores;
- Send the watermark every time when 10 pieces of data are received, and the maximum event latency is 20s.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
CREATE SOURCE STREAM student_scores ( student_number STRING, /* Student ID */ student_name STRING, /* Name */ subject STRING, /* Subject */ score INT, /* Score */ time2 TIMESTAMP ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," ) TIMESTAMP BY time2.rowtime SET WATERMARK (ROWS 10, interval 20 second); INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) FROM student_scores;
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.