OBS Source Stream

Overview

Create a source stream to obtain data from OBS. CS reads data stored by users in OBS as input data for jobs. OBS applies to various scenarios, such as big data analysis, cloud-native application program data, static website hosting, backup/active archive, and deep/cold archive.

OBS is an object-based storage service. It provides massive, secure, highly reliable, and low-cost data storage capabilities. For more information, see the Object Storage Service Console Operation Guide.

Syntax

Syntax

CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )WITH (type = "obs",region = "",bucket = "",object_name = "",row_delimiter = "\n",field_delimiter = '',version_id = "")(TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

Description

Table 1 Syntax description

Parameter

Mandatory

Description

type

Yes

Data source type. Value obs indicates that the data source is OBS.

region

Yes

Region to which OBS belongs.

bucket

Yes

Name of the OBS bucket where data is located.

object_name

Yes

Name of the object stored in the OBS bucket where data is located.

row_delimiter

Yes

Separator used to separate every two rows.

field_delimiter

Yes

Separator used to separate every two attributes.

  • You can set this parameter only when encode is set to csv. The default value is a comma (,).
  • This parameter is not required if the JSON encoding format is adopted.

quote

No

Quoted symbol in a data format. The attribute delimiters between two quoted symbols are treated as common characters.

  • If double quotation marks are used as the quoted symbol, set this parameter to "\u005c\u0022" for character conversion.
  • If a single quotation mark is used as the quoted symbol, set this parameter to a comma (,).
NOTE:

After this parameter is specified, ensure that each field does not contain quoted symbols or contains an even number of quoted symbols. Otherwise, parsing will fail.

version_id

No

Version number. This parameter is optional and required only when the OBS bucket or object has version settings.

timeindicator

No

Timestamp added in the source stream. The value can be processing time or event time.

NOTE:
  • If this parameter is set to processing time, the format is proctime.proctime.

    In this case, an attribute proctime will be added to the original attribute field. If there are three attributes in the original attribute field, four attributes will be exported after this parameter is set to processing time. However, the attribute length remains unchanged if the rowtime attribute is specified.

  • If this parameter is set to event time, you can select an attribute in the stream as the timestamp. The format is attr_name.rowtime.
  • This parameter can be simultaneously set to processing time and event time.

Precautions

The attribute type used as the timestamp must be long or timestamp.

Example

The input.csv file is read from the OBS bucket. Rows are separated by '\n' and columns are separated by ','.

CREATE SOURCE STREAM car_infos (
  car_id STRING,
  car_owner STRING,
  car_brand STRING,
  car_price INT,
  car_timestamp LONG
)
WITH (
  type = "obs",
  bucket = "obssource",
  region = "cn-north-1" ,
  object_name = "input.csv",
  row_delimiter = "\n",
  field_delimiter = ",",

)TIMESTAMP BY car_timestamp.rowtime;