EdgeHub输入流
功能描述
创建边缘作业source流,从EdgeHub中获取数据。用户数据写入EdgeHub中,Flink边缘作业从中读取数据,作为流计算的数据输入。
适用于物联网IOT场景,将实时流计算能力从云端延伸到边缘,在边缘快速实现对流数据实时、快速、准确地分析处理,增加数据处理计算的速度和效率。同时将数据在边缘预处理,可以有效减少无效的数据上云,减少资源消耗,提升分析效率。边缘作业依赖于智能边缘平台(Intelligent EdgeFabric, IEF),IEF通过纳管用户的边缘节点,提供将云上应用延伸到边缘的能力,联动边缘和云端的数据,同时,在云端提供统一的设备/应用监控、日志采集等运维能力,为企业提供完整的边缘计算解决方案。IEF的更多信息,请参见《智能边缘平台用户指南》。
语法格式
1 2 3 4 5 6 7 8 |
CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* )
WITH (
type = "edgehub",
topic = "",
encode = "",
json_config = "",
field_delimiter = ''
);
|
关键字
参数 |
是否必选 |
说明 |
---|---|---|
type |
是 |
数据源类型,“edgehub”表示数据源为智能边缘平台的edgehub。 |
topic |
是 |
主题,需要消费数据的edgehub中的主题名称。 |
encode |
是 |
数据编码格式,可选为“csv”和“json”。
|
field_delimiter |
否 |
属性分隔符。当“encode”为“csv”时,用于指定csv字段分隔符,默认为“,"。 当“encode”为“json”时,不需要设置属性之间的分隔符。 |
json_config |
否 |
当“encode”为“json”时,可以通过该参数指定json字段和流定义字段的映射关系,格式为: "field1=data_json.field1;field2=data_json.field2;field3=$" 其中"field3=$"表示field3的内容为整个json串。 |
注意事项
在创建Source Stream时可以指定时间模型以便在后续计算中使用,当前DLI支持Processing Time和Event Time两种时间模型,具体使用语法可以参考配置时间模型。
示例
从edgehub主题abc中读取数据,数据编码格式为json。数据示例为:{"student":{"score":90,"name":"1bc2"}}。
1 2 3 4 5 6 7 8 9 |
CREATE SOURCE STREAM student_scores(
name string,
score int)
WITH (
type = "edgehub",
topic = "abc",
encode = "json",
json_config = "score = student.score; name=student.name"
);
|