IoTDA结合DLI+DWS+Astro实现大数据分析
概述
在物联网解决方案中,设备产生庞大的数据,使用传统的Mysql数据库已经无法达到要求,为了降低存储成本,提高数据查询效率,华为云物联网平台可以通过规则引擎,将数据转发到华为云其他云服务,例如可以将海量数据经过数据湖探索服务(DLI)进行数据清洗,再由云数据仓库(DWS)进行存储,再由Astro大屏应用读取数据进行可视化展示。实现数据一站式存储、处理和分析。
前提条件
- 已注册华为官方账号。未注册可参考注册华为账户完成注册。
- 已完成实名制认证。未完成可在华为云上单击实名认证完成认证,否则会影响后续云服务的开通。
- 已开通设备接入服务。未开通则访问设备接入服务,单击“免费试用”或单击“价格计算器”购买并开通该服务。
- 已开通Kafka服务。未开通则访问Kafka服务,单击“立即购买”后开通该服务。
- 已开通数据湖探索服务。未开通则访问数据湖探索服务,单击“立即购买”后开通该服务。
- 已开通云数据仓库GaussDB(DWS)服务。未开通则访问云数据仓库GaussDB(DWS)服务,单击“购买”后开通该服务。
- 已开通Astro大屏服务, 未开通则访问Astro大屏应用,单击“立即使用”后开通该服务。
示例场景
在本示例中,我们实现以下场景:
新能源设备上报发电量、地区等数据到物联网平台,物联网平台将数据转发至kafka,由DLI进行大数据清洗将数据存储至DWS,再由Astro大屏应用读取数据进行可视化展示实现各个地区发电量的统计分析。
操作步骤如下:
1、在物联网平台创建产品和设备。
2、在物联网平台配置数据转发规则实现将设备上报的数据转发到kafka中。
4、配置DWS数据仓库服务。
5、配置DLI数据湖探索服务实现将kafka中的数据进行清洗并转发至DWS。
6、配置Astro大屏服务,从数据仓库服务DWS中获取数据并进行展示。
7、模拟数据上报及结果验证。
![点击放大](https://support.huaweicloud.com/bestpractice-iothub/figure/zh-cn_image_0000001693434184.png)
数据转发规则配置
- 选择左侧导航栏的“规则>数据转发”,单击“创建规则”。
- 参考下表参数说明,填写规则内容。以下参数取值仅为示例,您可参考用户指南创建自己的规则,填写完成后单击“创建规则”。
图3 新建消息上报流转规则-数据转发至Kafka
表1 参数说明 参数名
参数说明
规则名称
自定义,如iotda-kafka。
规则描述
自定义,如数据转发至Kafka服务。
数据来源
选择“设备消息”。
触发事件
自动匹配“设备消息上报”。
资源空间
和上一步创建的产品所在的资源空间保持一致。
数据过滤语句
通过编写SQL来解析和处理上报的JSON数据。
该示例使用如下数据过滤语句进行转发:
notify_data.body.content.id as id, notify_data.body.content.device_id as device_id, notify_data.body.content.report_time as report_time, notify_data.body.content.province as province, notify_data.body.content.city as city, notify_data.body.content.daily_power_generation as daily_power_generation, notify_data.body.content.total_power_generation as total_power_generation, notify_data.body.content.total_power as total_power,notify_data.body.content.running_status as running_status
- 单击“设置转发目标”页签,单击“添加”,设置转发目标。
图4 新建转发目标-转发至Kafka
参考下表参数说明,填写转发目标。填写完成后单击“确定”。
表2 参数说明 参数名
参数说明
转发目标
选择“分布式消息服务(Kafka)”
区域
选择“分布式消息服务”区域。
对接地址
选择kafka服务的对接地址。
主题
填写接收推送消息的kafka topic。
SASL认证
若开启SASL认证,请填写您在购买Kafka实例中所选安全协议、SASL认证机制以及所填的SASL用户名和密码。
Kafka安全协议
选择您在购买Kafka实例中所启用的kafka安全协议。
SASL认证机制
选择您在购买Kafka实例中所开启的SASL认证机制。
SASL用户名
填写您在购买Kafka实例中输入的SASL用户名。
密码
填写您在购买Kafka实例中输入的密码。
- 单击“启动规则”,激活配置好的数据转发规则。
图5 启动规则-转发至Kafka
数据上报及验证数据是否转发成功
- 使用MQTT模拟器连接到平台(模拟器使用请参考:使用MQTT.fx调测)。
- 使用模拟器进行消息上报(参考:设备消息上报)。上报内容如下:
{ "id": "2", "device_id" : "********", "report_time":"2023-10-19 19:39:42", "province":"guangzhou", "city": "shengzhen", "daily_power_generation": 32, "total_power_generation": 108, "total_power": 1023, "running_status": "ONLINE" }
- 前往Kafka控制台,查看是否收到转发的消息。
图6 查看Kafka消息
配置DWS数据仓库服务
- 登录华为云官方网站,访问数据仓库服务。
- 单击“进入控制台”。
- 访问数据仓库服务控制台“集群管理”页面,选择购买的集群,若无集群,则参考购买集群,购买集群后,单击登录集群。
- 登录集群后,单击新建数据库,创建数据库。
图7 创建数据库
- 单击SQL查询, 进入SQL查询页面, 执行以下SQL语句创建数据表。
CREATE TABLE IF NOT EXISTS power_test_dws ( "id" BIGINT, "report_time" TIMESTAMP, "province" varchar(255), "city" varchar(255), "device_id" varchar(255) NOT NULL, "daily_power_generation" numeric(10,2), "total_power_generation" numeric(10,2), "total_power" numeric(10,1), "running_status" varchar(10), CONSTRAINT "device_uniqe" UNIQUE ("device_id", "report_time") ) WITH (ORIENTATION = COLUMN, PERIOD='15 days', storage_policy = 'LMT:180') PARTITION BY RANGE (report_time);
配置DLI数据湖探索服务
- 登录华为云官方网站,访问数据湖探索服务。
- 单击“进入控制台”。
- 在跨源管理中创建增强型跨源,用于打通Flink队列与DMS Kafka和DWS之间的网络。弹性资源池选择购买的DLI队列,虚拟私有云选择Kafka和DWS所属的虚拟私有云以及子网。
图8 跨源管理
- 创建完跨源管理后,可在“资源管理 > 队列管理 > 更多 > 测试地址连通性”中进行Kafka和DWS的连通性测试。
图9 连通性测试图10 dws联通测试
- 选择
中创建作业。图11 创建Flink作业
- 单击“编辑”, 在作业详情中编写SQL语句,详情请参考从Kafka读取数据写入到DWS。
图12 配置Flink作业
- 配置完Flink作业后,启动作业,完成Flink配置。
模拟数据上报及结果验证
- 使用MQTT模拟器连接到平台(模拟器使用请参考:使用MQTT.fx调测)。
- 使用模拟器进行消息上报,详情请参考:设备消息上报。
上报内容如下:
{ "id": "2", "device_id" : "********", "report_time":"2023-10-24 19:39:42", "province":"guangzhou", "city": "guangdong", "daily_power_generation": 11, "total_power_generation": 22, "total_power": 324, "running_status": "OFFLINE" }
- 前往DWS控制台,查看是否收到转发的消息。
图18 查看DWS消息
- 前往Astro大屏服务查看监控视图。
图19 监控大屏数据