更新时间:2022-02-22 GMT+08:00

EdgeHub输入流

功能描述

创建边缘作业source流,从EdgeHub中获取数据。用户数据写入EdgeHub中,Flink边缘作业从中读取数据,作为流计算的数据输入。

适用于物联网IOT场景,将实时流计算能力从云端延伸到边缘,在边缘快速实现对流数据实时、快速、准确地分析处理,增加数据处理计算的速度和效率。同时将数据在边缘预处理,可以有效减少无效的数据上云,减少资源消耗,提升分析效率。边缘作业依赖于智能边缘平台(Intelligent EdgeFabric, IEF),IEF通过纳管用户的边缘节点,提供将云上应用延伸到边缘的能力,联动边缘和云端的数据,同时,在云端提供统一的设备/应用监控、日志采集等运维能力,为企业提供完整的边缘计算解决方案。IEF的更多信息,请参见《智能边缘平台用户指南》。

语法格式

1
2
3
4
5
6
7
8
9
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "edgehub", 
    topic = "", 
    encode = "", 
    json_config = "", 
    field_delimiter = ''
  )
  (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME

关键字

表1 关键字说明

参数

是否必选

说明

type

数据源类型,“edgehub”表示数据源为智能边缘平台的edgehub。

topic

主题,需要消费数据的edgehub中的主题名称。

encode

数据编码格式,可选为“csv”“json”

  • 若编码格式为“csv”,则需配置“field_delimiter”属性。
  • 若编码格式为“json”,则需配置“json_config”属性。

field_delimiter

属性分隔符。当“encode”“csv”时,用于指定csv字段分隔符,默认为“,"。

“encode”“json”时,不需要设置属性之间的分隔符。

json_config

“encode”“json”时,可以通过该参数指定json字段和流定义字段的映射关系,格式为:

"field1=data_json.field1;field2=data_json.field2;field3=$"

其中"field3=$"表示field3的内容为整个json串。

timeindicator

在流中增加时间戳,可增加“processing time”时间戳或者“event time”时间戳。

  • 若设置“processing time”,则为“proctime.proctime”
    • 设置“proctime.proctime”时,会在原有属性字段基础上多增加一个proctime系统时间戳属性,假设原有字段为3个,设置了“proctime.proctime”后会变成4个.
    • 设置“rowtime”,属性字段不会发生变化。
  • 若设置“event time”,可选择流中的某个属性来作为时间戳,格式为“attr_name.rowtime”
  • 以上两者可以同时设置。
说明:

用来做时间戳的属性类型必须为“long”或者“timestamp”

示例

从edgehub主题abc中读取数据,数据编码格式为json。数据示例为:{"student":{"score":90,"name":"1bc2"}}。

1
2
3
4
5
6
7
8
9
CREATE SOURCE STREAM student_scores(
  name string,
  score int)
WITH (
  type = "edgehub",
  topic = "abc",
  encode = "json",
  json_config = "score = student.score; name=student.name"
);