更新时间:2022-08-12 GMT+08:00

EdgeHub输入流

功能描述

创建边缘作业source流,从EdgeHub中获取数据。用户数据写入EdgeHub中,Flink边缘作业从中读取数据,作为流计算的数据输入。

适用于物联网IOT场景,将实时流计算能力从云端延伸到边缘,在边缘快速实现对流数据实时、快速、准确地分析处理,增加数据处理计算的速度和效率。同时将数据在边缘预处理,可以有效减少无效的数据上云,减少资源消耗,提升分析效率。边缘作业依赖于智能边缘平台(Intelligent EdgeFabric, IEF),IEF通过纳管用户的边缘节点,提供将云上应用延伸到边缘的能力,联动边缘和云端的数据,同时,在云端提供统一的设备/应用监控、日志采集等运维能力,为企业提供完整的边缘计算解决方案。IEF的更多信息,请参见《智能边缘平台用户指南》。

语法格式

1
2
3
4
5
6
7
8
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
  WITH (
    type = "edgehub", 
    topic = "", 
    encode = "", 
    json_config = "", 
    field_delimiter = ''
  );

关键字

表1 关键字说明

参数

是否必选

说明

type

数据源类型,“edgehub”表示数据源为智能边缘平台的edgehub。

topic

主题,需要消费数据的edgehub中的主题名称。

encode

数据编码格式,可选为“csv”“json”

  • 若编码格式为“csv”,则需配置“field_delimiter”属性。
  • 若编码格式为“json”,则需配置“json_config”属性。

field_delimiter

属性分隔符。当“encode”“csv”时,用于指定csv字段分隔符,默认为“,"。

“encode”“json”时,不需要设置属性之间的分隔符。

json_config

“encode”“json”时,可以通过该参数指定json字段和流定义字段的映射关系,格式为:

"field1=data_json.field1;field2=data_json.field2;field3=$"

其中"field3=$"表示field3的内容为整个json串。

注意事项

在创建Source Stream时可以指定时间模型以便在后续计算中使用,当前DLI支持Processing Time和Event Time两种时间模型,具体使用语法可以参考配置时间模型

示例

从edgehub主题abc中读取数据,数据编码格式为json。数据示例为:{"student":{"score":90,"name":"1bc2"}}。

1
2
3
4
5
6
7
8
9
CREATE SOURCE STREAM student_scores(
  name string,
  score int)
WITH (
  type = "edgehub",
  topic = "abc",
  encode = "json",
  json_config = "score = student.score; name=student.name"
);