C# Demo使用说明
概述
本文以C#语言为例,介绍通过MQTTS/MQTT协议接入平台,基于平台接口实现“属性上报”、“订阅接收命令”等功能。
本文中使用的代码为样例代码,仅用于体验平台通信功能,如需进行商用,可以参考资源获取获取对应语言的IoT Device SDK进行集成。
前提条件
准备工作
- 访问Microsoft官网,选择合适系统的版本下载Microsoft Visual Studio。(本文以windows 64-bit系统,Microsoft Visual Studio 2017和.NET Framework 4.5.1为例)。
- 下载完成后,运行安装文件,根据界面提示安装。
导入代码样例
- 下载quickStart(C#)样例。
- 运行Microsoft Visual Studio 2017,单击“打开项目/解决方案”,选择步骤1中下载的样例。
- 完成代码导入。
代码目录简述:
- App.config:Server地址和设备信息配置文件;
- C#:项目C#代码;
EncryptUtil.cs:设备密钥加密辅助类;
FrmMqttDemo.cs:窗体界面;
Program.cs:Demo程序启动入口。
- dll:项目中使用到了第三方库
MQTTnet:v3.0.11,是一个基于 MQTT 通信的高性能 .NET 开源库,它同时支持 MQTT 服务器端和客户端,引用库文件包含MQTTnet.dll。
MQTTnet.Extensions.ManagedClient:v3.0.11,这是一个扩展库,它使用MQTTnet为托管MQTT客户机提供附加功能。
- Demo里的工程配置参数。
界面展示
新建连接
设备或网关在接入物联网平台时首先需要和平台建立连接,从而将设备或网关与平台进行关联。开发者通过传入设备信息,将设备或网关连接到物联网平台。
- FrmMqttDemo类主要提供建立MQTT/MQTTS连接等方法,MQTT默认使用1883端口,MQTTS默认使用8883端口(需要加载设备校验平台身份的证书DigiCertGlobalRootCA.crt.pem,用于设备侧接入物联网平台登录鉴权使用,可以在资源获取中下载证书文件),ManagedMqttClientOptionsBuilder中提供了设置初始化KeepAlivePeriod的属性。mqtt连接心跳时间的建议值是120秒,有使用限制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
int portIsSsl = int.Parse(ConfigurationManager.AppSettings["PortIsSsl"]); int portNotSsl = int.Parse(ConfigurationManager.AppSettings["PortNotSsl"]); if (client == null) { client = new MqttFactory().CreateManagedMqttClient(); } string timestamp = DateTime.Now.ToString("yyyyMMddHH"); string clientID = txtDeviceId.Text + "_0_0_" + timestamp; // 对密码进行HmacSHA256加密 string secret = string.Empty; if (!string.IsNullOrEmpty(txtDeviceSecret.Text)) { secret = EncryptUtil.HmacSHA256(txtDeviceSecret.Text, timestamp); } // 判断是否为安全连接 if (!cbSSLConnect.Checked) { options = new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(RECONNECT_TIME)) .WithClientOptions(new MqttClientOptionsBuilder() .WithTcpServer(txtServerUri.Text, portNotSsl) .WithCommunicationTimeout(TimeSpan.FromSeconds(DEFAULT_CONNECT_TIMEOUT)) .WithCredentials(txtDeviceId.Text, secret) .WithClientId(clientID) .WithKeepAlivePeriod(TimeSpan.FromSeconds(DEFAULT_KEEPLIVE)) .WithCleanSession(false) .WithProtocolVersion(MqttProtocolVersion.V311) .Build()) .Build(); } else { string caCertPath = Environment.CurrentDirectory + @"\certificate\rootcert.pem"; X509Certificate2 crt = new X509Certificate2(caCertPath); options = new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(RECONNECT_TIME)) .WithClientOptions(new MqttClientOptionsBuilder() .WithTcpServer(txtServerUri.Text, portIsSsl) .WithCommunicationTimeout(TimeSpan.FromSeconds(DEFAULT_CONNECT_TIMEOUT)) .WithCredentials(txtDeviceId.Text, secret) .WithClientId(clientID) .WithKeepAlivePeriod(TimeSpan.FromSeconds(DEFAULT_KEEPLIVE)) .WithCleanSession(false) .WithTls(new MqttClientOptionsBuilderTlsParameters() { AllowUntrustedCertificates = true, UseTls = true, Certificates = new List<X509Certificate> { crt }, CertificateValidationHandler = delegate { return true; }, IgnoreCertificateChainErrors = false, IgnoreCertificateRevocationErrors = false }) .WithProtocolVersion(MqttProtocolVersion.V311) .Build()) .Build(); }
- FrmMqttDemo类提供了Mqtt客户端建立连接的的方法StartAsync,连接成功后会通过回调函数OnMqttClientConnected打印连接成功日志。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
Invoke((new Action(() => { ShowLogs($"{"try to connect to server " + txtServerUri.Text}{Environment.NewLine}"); }))); if (client.IsStarted) { await client.StopAsync(); } // 注册事件 client.ApplicationMessageProcessedHandler = new ApplicationMessageProcessedHandlerDelegate(new Action<ApplicationMessageProcessedEventArgs>(ApplicationMessageProcessedHandlerMethod)); // 消息发布回调 client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(MqttApplicationMessageReceived)); // 命令下发回调 client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Action<MqttClientConnectedEventArgs>(OnMqttClientConnected)); // 连接成功回调 client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(new Action<MqttClientDisconnectedEventArgs>(OnMqttClientDisconnected)); // 连接断开回调 // 连接平台设备 await client.StartAsync(options);
注:如果连接失败,在OnMqttClientDisconnected函数中已实现退避重连,代码样例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
private void OnMqttClientDisconnected(MqttClientDisconnectedEventArgs e) { try { Invoke((new Action(() => { ShowLogs("mqtt server is disconnected" + Environment.NewLine); txtSubTopic.Enabled = true; btnConnect.Enabled = true; btnDisconnect.Enabled = false; btnPublish.Enabled = false; btnSubscribe.Enabled = false; }))); if (cbReconnect.Checked) { Invoke((new Action(() => { ShowLogs("reconnect is starting" + Environment.NewLine); }))); //退避重连 int lowBound = (int)(defaultBackoff * 0.8); int highBound = (int)(defaultBackoff * 1.2); long randomBackOff = random.Next(highBound - lowBound); long backOffWithJitter = (int)(Math.Pow(2.0, retryTimes)) * (randomBackOff + lowBound); long waitTImeUtilNextRetry = (int)(minBackoff + backOffWithJitter) > maxBackoff ? maxBackoff : (minBackoff + backOffWithJitter); Invoke((new Action(() => { ShowLogs("next retry time: " + waitTImeUtilNextRetry + Environment.NewLine); }))); Thread.Sleep((int)waitTImeUtilNextRetry); retryTimes++; Task.Run(async () => { await ConnectMqttServerAsync(); }); } } catch (Exception ex) { Invoke((new Action(() => { ShowLogs("mqtt demo error: " + ex.Message + Environment.NewLine); }))); } }
订阅Topic
订阅某Topic的设备才能接收broker发布的关于该Topic的消息,关于平台预置Topic可参考Topic定义。
在FrmMqttDemo类中提供了订阅命令下发Topic的功能:
List<MqttTopicFilter> listTopic = new List<MqttTopicFilter>(); var topicFilterBulderPreTopic = new MqttTopicFilterBuilder().WithTopic(topic).Build(); listTopic.Add(topicFilterBulderPreTopic); // 订阅Topic client.SubscribeAsync(listTopic.ToArray()).Wait();
建链后,如果成功订阅Topic,主界面日志栏显示如下信息:
接收下发命令
在FrmMqttDemo类中提供了接收平台下发命令的功能,在MQTT建链完成并成功订阅Topic后,可以在管理控制台设备详情中命令下发或使用应用侧Demo对该设备ID进行命令下发。下发成功后,在MQTT的回调函数中接收到平台下发给设备的命令。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
private void MqttApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { Invoke((new Action(() => { ShowLogs($"received message is {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}"); string msg = "{\"result_code\": 0,\"response_name\": \"COMMAND_RESPONSE\",\"paras\": {\"result\": \"success\"}}"; string topic = "$oc/devices/" + txtDeviceId.Text + "/sys/commands/response/request_id=" + e.ApplicationMessage.Topic.Split('=')[1]; ShowLogs($"{"response message msg = " + msg}{Environment.NewLine}"); var appMsg = new MqttApplicationMessage(); appMsg.Payload = Encoding.UTF8.GetBytes(msg); appMsg.Topic = topic; appMsg.QualityOfServiceLevel = int.Parse(cbOosSelect.SelectedValue.ToString()) == 0 ? MqttQualityOfServiceLevel.AtMostOnce : MqttQualityOfServiceLevel.AtLeastOnce; appMsg.Retain = false; // 上行响应 client.PublishAsync(appMsg).Wait(); }))); } |
例如下发参数名为SmokeDetectorControl: SILENCE,参数值为50的命令。
命令下发成功后,Demo界面显示如下:
发布Topic
发布Topic是指设备主动向平台上报自己的属性或消息,详细见设备属性上报接口文档。
在FrmMqttDemo中实现了上报Topic、属性上报功能。
1 2 3 4 5 6 7 8 |
var appMsg = new MqttApplicationMessage(); appMsg.Payload = Encoding.UTF8.GetBytes(inputString); appMsg.Topic = topic; appMsg.QualityOfServiceLevel = int.Parse(cbOosSelect.SelectedValue.ToString()) == 0 ? MqttQualityOfServiceLevel.AtMostOnce : MqttQualityOfServiceLevel.AtLeastOnce; appMsg.Retain = false; // 上行响应 client.PublishAsync(appMsg).Wait(); |
发布Topic后,Demo界面显示如下:
设备上报属性成功后可在设备详情页面查看到上报的属性:
如果在“设备详情”页面没有最新上报数据,请修改产品模型中服务和属性的内容,确保设备上报的服务/属性和产品模型中的服务/属性一致,或者进入 页面,删除所有服务。
由于是同步命令需要端侧回复响应可参考接口。