File System Sink Stream
Function
You can create a sink stream to export data to a file system such as HDFS or OBS. After the data is generated, a non-DLI table can be created directly according to the generated directory. The table can be processed through DLI SQL, and the output data directory can be stored in partition tables. It is applicable to scenarios such as data dumping, big data analysis, data backup, and active, deep, or cold archiving.
OBS is an object-based storage service. It provides massive, secure, highly reliable, and low-cost data storage capabilities. For more information about OBS, see the Object Storage Service Console Operation Guide.
Syntax
1 2 3 4 5 6 7 8 9 | CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
[PARTITIONED BY (attr_name (',' attr_name)*]
WITH (
type = "filesystem",
file.path = "obs://bucket/xx",
encode = "parquet",
ak = "",
sk = ""
);
|
Keyword
| Parameter | Mandatory | Description |
|---|---|---|
| type | Yes | The sink stream type. The filesystem indicates that data is output to the file system. |
| file.path | Yes | Output directory in the form: schema://file.path. The current schema supports only obs, indicating that data is exported to OBS. |
| encode | Yes | Output data encoding format. Currently, only the parquet format is supported. |
| ak | Yes | Access key for OBS authentication. Global variables can be used to mask sensitive information. For how to use global variables on the console, refer to the Data Lake Insight User Guide. |
| sk | No | Secret key for accessing OBS authentication. Global variables can be used to mask sensitive information. For how to use global variables on the console, refer to the Data Lake Insight User Guide. |
Precautions
- To ensure job consistency, enable checkpointing if the Flink job uses the file system output stream.
- To avoid data loss or data coverage, you need to enable automatic or manual restart upon job exceptions. Enable the Restore Job from Checkpoint.
- Set the checkpoint interval after weighing between real-time output file, file size, and recovery time, such as 10 minutes.
- Two modes are supported.
- At least once: Events are processed at least once.
- Exactly once: Events are processed only once.
Example
The following example dumps the car_info data to OBS, with the day field as the partition field and parquet as the encoding format.
1 2 3 4 5 6 7 8 9 10 11 12 13 | create sink stream car_infos (
carId string,
carOwner string,
average_speed double,
day string
) partitioned by (day)
with (
type = "filesystem",
file.path = "obs://obs-sink/car_infos",
encode = "parquet",
ak = "{{myAk}}",
sk = "{{mySk}}"
);
|
The data is ultimately stored in OBS. Directory: obs://obs-sink/car_infos/day-xx/part-x-x.
After the data is generated, the OBS partition table can be established for subsequent batch processing through the following SQL statements:
- Create an OBS partition table.
1 2 3 4 5 6 7 8
create table car_infos ( carId string, carOwner string, average_speed double ) partitioned by (day string) stored as parquet location 'obs://obs-sink/car-infos';
- Restore partition information from the associated OBS path.
1
alter table car_infos recover partitions;
Last Article: SMN Sink Stream
Next Article: Creating a Temporary Stream
Did this article solve your problem?
Thank you for your score!Your feedback would help us improve the website.