Edge-Hub Source Stream

Function

This API is used to create an edge job source stream and obtain data from the edge-hub. User data is written to the edge-hub. Flink edge jobs read data from the edge-hub as the data input for stream computing.

It is applicable to IoT scenarios. It extends the real-time stream computing capability from the cloud to the edge, quickly and accurately analyzes and processes stream data in real time at the edge, and improves the speed and efficiency of data processing and computing. In addition, data is preprocessed at the edge, effectively reducing invalid data to the cloud, reducing resource consumption, and improving analysis efficiency. By managing edge nodes of users, Intelligent EdgeFabric (IEF) extends cloud applications to edge nodes and associates edge and cloud data. IEF also provides unified on-cloud O&M capabilities, such as device/application monitoring and log collection, to offer a complete edge computing solution that contains integrated services under edge and cloud collaboration. For more information about IEF, see the Intelligent EdgeFabric User Guide.

Syntax

1
2
3
4
5
6
7
8
9
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "edgehub", 
    topic = "", 
    encode = "", 
    json_config = "", 
    field_delimiter = ''
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

Keywords

Table 1 Keyword description

Parameter

Mandatory

Description

type

Yes

Indicates the data source type. edgehub indicates that the data source is the edge-hub of IEF.

topic

Yes

Theme, which is the name of the theme in the edge-hub that needs to consume data.

encode

Yes

Data encoding format. The value can be csv and json.

  • field_delimiter must be specified if this parameter is set to csv.
  • json_config must be specified if this parameter is set to json.

field_delimiter

No

Separator used to separate every two attributes. If encode is set to csv, you can use this parameter to specify the separator between CSV fields. By default, the comma (,) is used.

If encode is set to json, you do not need to set separators between attributes.

json_config

No

If encode is set to json, this parameter can be used to specify the mapping between the JSON field and stream definition field. The format is as follows:

"field1=data_json.field1;field2=data_json.field2;field3=$"

In the preceding information, field3=$ indicates that the value of field3 is a JSON string.

timeindicator

No

Timestamp added in the source stream. The value can be processing time or event time.

  • If this parameter is set to processing time, the format is proctime.proctime.
    • In this case, an attribute proctime will be added to the original attribute field. If there are three attributes in the original attribute field, four attributes will be exported after this parameter is set to processing time.
    • When rowtime is set, the attribute field does not change.
  • If this parameter is set to event time, you can select an attribute in the stream as the timestamp. The format is attr_name.rowtime.
  • This parameter can be simultaneously set to processing time and event time.
NOTE:

The attribute type used as the timestamp must be long or timestamp.

Example

Data is read from the edge-hub topic abc and it is encoded in JSON format. Example: {"student":{"score":90,"name":"1bc2"}}.

1
2
3
4
5
6
7
8
9
CREATE SOURCE STREAM student_scores(
  name string,
  score int)
WITH (
  type = "edgehub",
  topic = "abc",
  encode = "json",
  json_config = "score = student.score; name=student.name"
);