C# Demo使用说明
本文以C#语言为例,介绍应用通过MQTTS协议接入平台,接收服务端订阅消息的示例。
前提条件
熟悉.NETFramework开发环境配置,熟悉C#语言基本语法。
开发环境
本示例所使用的开发环境为.NETFramework 4.6.2版本,.Net SDK 6.0.421版本。请前往.NET官网下载。安装成功之后可以通过以下命令查看.Net SDK版本。
dotnet -v
添加依赖
本示例使用C#语言的Mqtt依赖为MQTTnet和MQTTnet.Extension.ManagedClient(使用版本为3.0.11),可以在NuGet管理器中搜索到"MQTTnet"后安装所需版本。
代码示例
ClientConf.cs代码如下:
using MQTTnet.Protocol; namespace mqttcs { public class ClientConf { // mqtt订阅地址 public string ServerUri { get; set; } // mqtt订阅端口号 public int Port { get; set; } // mqtt接入凭据access_key public string AccessKey { get; set; } // mqtt接入凭据access_code public string AccessCode { get; set; } // mqtt 客户端ID public string ClientId { get; set; } // 实例Id,同一Region购买多个标准版实例时需要填写该参数 public string InstanceId { get; set; } // mqtt订阅topic public string Topic { get; set; } // mqtt qos public MqttQualityOfServiceLevel Qos { get; set; } } }
MqttListener代码如下:
using System; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Extensions.ManagedClient; namespace mqttcs { public interface MqttListener { // mqtt客户端与服务端断链回调函数 void ConnectionLost(MqttClientDisconnectedEventArgs e); // mqtt客户端与服务端建链成功回调函数 void ConnectComplete(MqttClientConnectResultCode resultCode, String reason); // mqtt客户端消费消息回调函数 void OnMessageReceived(String message); // mqtt客户端与服务端建链失败回调函数 void ConnectFail(ManagedProcessFailedEventArgs e); } }
MqttConnection.cs代码示例如下:
using System; using System.Text; using System.Threading; using MQTTnet; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Extensions.ManagedClient; using MQTTnet.Formatter; namespace mqttcs { public class MqttConnection { private static IManagedMqttClient client = null; private static ManualResetEvent mre = new ManualResetEvent(false); private static readonly ushort DefaultKeepLive = 120; private static int _retryTimes = 0; private readonly int _retryTimeWait = 1000; private readonly ClientConf _clientConf; private MqttListener _listener; public MqttConnection(ClientConf clientConf, MqttListener listener) { _clientConf = clientConf; _listener = listener; } public int Connect() { client?.StopAsync(); // 退避重试,从1s直到20s var duration = 1000; var maxDuration = 20 * 1000; var rc = InternalConnect(); while (rc != 0) { Thread.Sleep((int)duration); if (duration < maxDuration) { duration *= 2; } client?.StopAsync(); _retryTimes++; Console.WriteLine("connect mqtt broker retry. times: " + _retryTimes); rc = InternalConnect(); } return rc; } private int InternalConnect() { try { client = new MqttFactory().CreateManagedMqttClient(); client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(ApplicationMessageReceiveHandlerMethod); client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnMqttClientConnected); client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnMqttClientDisconnected); client.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(OnMqttClientConnectingFailed); IManagedMqttClientOptions options = GetOptions(); // Connects to the platform. client.StartAsync(options); mre.Reset(); mre.WaitOne(); if (!client.IsConnected) { return -1; } var mqttTopicFilter = new MqttTopicFilterBuilder().WithTopic(_clientConf.Topic).WithQualityOfServiceLevel(_clientConf.Qos).Build(); client.SubscribeAsync(mqttTopicFilter).Wait(); Console.WriteLine("subscribe topic success."); return 0; } catch (Exception e) { Console.WriteLine("Connect to mqtt server failed. err: " + e); return -1; } } private void ApplicationMessageReceiveHandlerMethod(MqttApplicationMessageReceivedEventArgs e) { string payload = null; if (e.ApplicationMessage.Payload != null) { payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); } try { _listener?.OnMessageReceived(payload); } catch (Exception ex) { Console.WriteLine("Message received error, the message is " + payload); } } private void OnMqttClientConnected(MqttClientConnectedEventArgs e) { try { _retryTimes = 0; _listener?.ConnectComplete(e.AuthenticateResult.ResultCode, e.AuthenticateResult.ReasonString); mre.Set(); } catch (Exception exception) { Console.WriteLine("handle connect callback failed. e: " + exception.Message); } } private void OnMqttClientDisconnected(MqttClientDisconnectedEventArgs e) { try { _listener?.ConnectionLost(e); } catch (Exception exception) { Console.WriteLine("handle disConnect callback failed. e: " + exception.Message); } } private void OnMqttClientConnectingFailed(ManagedProcessFailedEventArgs e) { try { if (_listener != null) { _listener.ConnectFail(e); } Thread.Sleep(_retryTimeWait); Connect(); } catch (Exception exception) { Console.WriteLine("handle connect failed callback failed. e: " + exception.Message); } } private IManagedMqttClientOptions GetOptions() { IManagedMqttClientOptions options = null; long timestamp = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds(); string userName = "accessKey=" + _clientConf.AccessKey + "|timestamp=" + timestamp + "|instanceId=" + _clientConf.InstanceId; options = new ManagedMqttClientOptionsBuilder() .WithClientOptions(new MqttClientOptionsBuilder() .WithTcpServer(_clientConf.ServerUri, _clientConf.Port) .WithCredentials(userName, _clientConf.AccessCode) .WithClientId(_clientConf.ClientId) .WithKeepAlivePeriod(TimeSpan.FromSeconds(DefaultKeepLive)) .WithTls(new MqttClientOptionsBuilderTlsParameters() { AllowUntrustedCertificates = true, UseTls = true, CertificateValidationHandler = delegate { return true; }, IgnoreCertificateChainErrors = false, IgnoreCertificateRevocationErrors = false, SslProtocol = System.Security.Authentication.SslProtocols.Tls12, }) .WithProtocolVersion(MqttProtocolVersion.V500) .Build()) .Build(); return options; } } }
MqttClient.cs代码示例如下:
using System; using System.Threading; using System.Threading.Tasks; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Extensions.ManagedClient; using MQTTnet.Protocol; namespace mqttcs { class MqttClient: MqttListener { private static ManualResetEvent mre = new ManualResetEvent(false); public static async Task Main(string[] args) { ClientConf clientConf = new ClientConf(); clientConf.ClientId = "your mqtt clientId"; clientConf.ServerUri = "your mqtt host"; clientConf.Port = 8883; clientConf.AccessKey = Environment.GetEnvironmentVariable("MQTT_ACCESS_KEY"); clientConf.AccessCode = Environment.GetEnvironmentVariable("MQTT_ACCESS_CODE"); clientConf.InstanceId = "your instanceId"; clientConf.Topic = "your mqtt topic"; clientConf.Qos = MqttQualityOfServiceLevel.AtMostOnce; MqttConnection connection = new MqttConnection(clientConf, new MqttClient()); var connect = connection.Connect(); if (connect == 0) { Console.WriteLine("success to init mqtt connection."); mre.WaitOne(); } } public void ConnectionLost(MqttClientDisconnectedEventArgs e) { if (e?.Exception != null) { Console.WriteLine("connect was lost. exception: " + e.Exception.Message); return; } Console.WriteLine("connect was lost"); } public void ConnectComplete(MqttClientConnectResultCode resultCode, String reason) { Console.WriteLine("connect success. resultCode: " + resultCode + " reason: " + reason); } public void OnMessageReceived(string message) { Console.WriteLine("receive msg: " + message); } public void ConnectFail(ManagedProcessFailedEventArgs e) { Console.WriteLine("connect mqtt broker failed. e: " + e.Exception.Message); } } }
成功示例
接入成功后,客户端打印如下: