OBS Sink Stream
Function
Create a sink stream to export DLI data to OBS. DLI can export the job analysis results to OBS. OBS applies to various scenarios, such as big data analysis, cloud-native application program data, static website hosting, backup/active archive, and deep/cold archive.
OBS is an object-based storage service. It provides massive, secure, highly reliable, and low-cost data storage capabilities. For more information about OBS, see the .Object Storage Service Console Operation Guide
You are advised to use the File System Sink Stream (Recommended).
Prerequisites
Before data exporting, check the version of the OBS bucket. The OBS sink stream supports data exporting to an OBS bucket running OBS 3.0 or a later version.
Syntax
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
WITH (
type = "obs",
region = "",
encode = "",
field_delimiter = "",
row_delimiter = "",
obs_dir = "",
file_prefix = "",
rolling_size = "",
rolling_interval = "",
quote = "",
array_bracket = "",
append = "",
max_record_num_per_file = "",
dump_interval = "",
dis_notice_channel = ""
)
|
Keyword
Parameter |
Mandatory |
Description |
---|---|---|
type |
Yes |
Output channel type. obs indicates that data is exported to OBS. |
region |
Yes |
Region to which OBS belongs. |
ak |
No |
Access Key ID (AK). For details about how to obtain the access key, see My Credentials. |
sk |
No |
Secret access key used together with the ID of the access key. For details about how to obtain the access key, see My Credentials. |
encode |
Yes |
Encoding format. Currently, formats CSV, JSON, ORC, Avro, Avro-Merge, and Parquet are supported. |
field_delimiter |
No |
Separator used to separate every two attributes. This parameter is mandatory only when the CSV encoding format is adopted. If this parameter is not specified, the default separator comma (,) is used. |
row_delimiter |
No |
Row delimiter. This parameter does not need to be configured if the CSV or JSON encoding format is adopted. |
json_config |
No |
If encode is set to json, you can set this parameter to specify the mapping between the JSON field and the stream definition field. An example of the format is as follows: field1=data_json.field1;field2=data_json.field2. |
obs_dir |
Yes |
Directory for storing files. The directory is in the format of {Bucket name}/{Directory name}, for example, obs-a1/dir1/subdir. If encode is set to csv (append is false), json (append is false), avro_merge, or parquet, parameterization is supported. |
file_prefix |
No |
Prefix of the data export file name. The generated file is named in the format of file_prefix.x, for example, file_prefix.1 and file_prefix.2. If this parameter is not specified, the file prefix is temp by default. |
rolling_size |
No |
Maximum size of a file.
NOTE:
|
rolling_interval |
No |
Time mode, in which data is saved to the corresponding directory.
NOTE:
|
quote |
No |
Modifier, which is added before and after each attribute only when the CSV encoding format is adopted. You are advised to use invisible characters, such as u0007, as the parameter value. |
array_bracket |
No |
Array bracket, which can be configured only when the CSV encoding format is adopted. The available options are (), {}, and []. For example, if you set this parameter to {}, the array output format is {a1, a2}. |
append |
No |
The value can be true or false. The default value is true. If OBS does not support the append mode and the encoding format is CSV or JSON, set this parameter to false. If Append is set to false, max_record_num_per_file and dump_interval must be set. |
max_record_num_per_file |
No |
Maximum number of records in a file. This parameter needs to be set if encode is csv (append is false), json (append is false), orc, avro, avro_merge, or parquet. If the maximum number of records has been reached, a new file is generated. |
dump_interval |
No |
Triggering period. This parameter needs to be configured when the ORC encoding format is adopted or notification to DIS is enabled.
|
dis_notice_channel |
No |
DIS channel where DLI sends the record that contains the OBS directory DLI periodically sends the DIS channel a record, which contains the OBS directory, indicating that no more new files will be generated in the directory. |
encoded_data |
No |
Data to be encoded. This parameter is set if encode is json (append is false), avro_merge, or parquet. The format is ${field_name}, indicating that the stream field content is encoded as a complete record. |
Precautions
If a configuration item can be specified through parameter configurations, one or more columns in the record can be used as part of the configuration item. For example, if the configuration item is set to car_$ {car_brand} and the value of car_brand in a record is BMW, the value of this configuration item is car_BMW in the record.
Example
- Export the car_infos data to the obs-sink bucket in OBS. The output directory is car_infos. The output file uses greater_30 as the file name prefix. The maximum size of a single file is 100 MB. If the data size exceeds 100 MB, another new file is generated. The data is encoded in CSV format, the comma (,) is used as the attribute delimiter, and the line break is used as the line separator.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "obs", encode = "csv", region = "xxx", field_delimiter = ",", row_delimiter = "\n", obs_dir = "obs-sink/car_infos", file_prefix = "greater_30", rolling_size = "100m" );
- Example of the ORC encoding format
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "obs", region = "xxx", encode = "orc", obs_dir = "dli-append-2/obsorc", FILE_PREFIX = "es_info", max_record_num_per_file = "100000", dump_interval = "60" );
- For details about the parquet encoding example, see the example in File System Sink Stream (Recommended).
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.