C# Demo使用说明
本文以C#语言为例,介绍应用通过MQTTS协议接入平台,接收服务端订阅消息的示例。
前提条件
熟悉.NETFramework开发环境配置,熟悉C#语言基本语法。
开发环境
本示例所使用的开发环境为.NETFramework 4.6.2版本,.Net SDK 6.0.421版本。请前往.NET官网下载。安装成功之后可以通过以下命令查看.Net SDK版本。
dotnet -v
添加依赖
本示例使用C#语言的Mqtt依赖为MQTTnet和MQTTnet.Extension.ManagedClient(使用版本为3.0.11),可以在NuGet管理器中搜索到"MQTTnet"后安装所需版本。
代码示例
ClientConf.cs代码如下:
using MQTTnet.Protocol;
namespace mqttcs
{
public class ClientConf
{
// mqtt订阅地址
public string ServerUri { get; set; }
// mqtt订阅端口号
public int Port { get; set; }
// mqtt接入凭据access_key
public string AccessKey { get; set; }
// mqtt接入凭据access_code
public string AccessCode { get; set; }
// mqtt 客户端ID
public string ClientId { get; set; }
// 实例Id,同一Region购买多个标准版实例时需要填写该参数
public string InstanceId { get; set; }
// mqtt订阅topic
public string Topic { get; set; }
// mqtt qos
public MqttQualityOfServiceLevel Qos { get; set; }
}
}
MqttListener代码如下:
using System;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Extensions.ManagedClient;
namespace mqttcs
{
public interface MqttListener
{
// mqtt客户端与服务端断链回调函数
void ConnectionLost(MqttClientDisconnectedEventArgs e);
// mqtt客户端与服务端建链成功回调函数
void ConnectComplete(MqttClientConnectResultCode resultCode, String reason);
// mqtt客户端消费消息回调函数
void OnMessageReceived(String message);
// mqtt客户端与服务端建链失败回调函数
void ConnectFail(ManagedProcessFailedEventArgs e);
}
}
MqttConnection.cs代码示例如下:
using System;
using System.Text;
using System.Threading;
using MQTTnet;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
namespace mqttcs
{
public class MqttConnection
{
private static IManagedMqttClient client = null;
private static ManualResetEvent mre = new ManualResetEvent(false);
private static readonly ushort DefaultKeepLive = 120;
private static int _retryTimes = 0;
private readonly int _retryTimeWait = 1000;
private readonly ClientConf _clientConf;
private MqttListener _listener;
public MqttConnection(ClientConf clientConf, MqttListener listener)
{
_clientConf = clientConf;
_listener = listener;
}
public int Connect()
{
client?.StopAsync();
// 退避重试,从1s直到20s
var duration = 1000;
var maxDuration = 20 * 1000;
var rc = InternalConnect();
while (rc != 0)
{
Thread.Sleep((int)duration);
if (duration < maxDuration)
{
duration *= 2;
}
client?.StopAsync();
_retryTimes++;
Console.WriteLine("connect mqtt broker retry. times: " + _retryTimes);
rc = InternalConnect();
}
return rc;
}
private int InternalConnect()
{
try
{
client = new MqttFactory().CreateManagedMqttClient();
client.ApplicationMessageReceivedHandler =
new MqttApplicationMessageReceivedHandlerDelegate(ApplicationMessageReceiveHandlerMethod);
client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnMqttClientConnected);
client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnMqttClientDisconnected);
client.ConnectingFailedHandler = new ConnectingFailedHandlerDelegate(OnMqttClientConnectingFailed);
IManagedMqttClientOptions options = GetOptions();
// Connects to the platform.
client.StartAsync(options);
mre.Reset();
mre.WaitOne();
if (!client.IsConnected)
{
return -1;
}
var mqttTopicFilter = new MqttTopicFilterBuilder().WithTopic(_clientConf.Topic).WithQualityOfServiceLevel(_clientConf.Qos).Build();
client.SubscribeAsync(mqttTopicFilter).Wait();
Console.WriteLine("subscribe topic success.");
return 0;
}
catch (Exception e)
{
Console.WriteLine("Connect to mqtt server failed. err: " + e);
return -1;
}
}
private void ApplicationMessageReceiveHandlerMethod(MqttApplicationMessageReceivedEventArgs e)
{
string payload = null;
if (e.ApplicationMessage.Payload != null)
{
payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
}
try
{
_listener?.OnMessageReceived(payload);
}
catch (Exception ex)
{
Console.WriteLine("Message received error, the message is " + payload);
}
}
private void OnMqttClientConnected(MqttClientConnectedEventArgs e)
{
try
{
_retryTimes = 0;
_listener?.ConnectComplete(e.AuthenticateResult.ResultCode, e.AuthenticateResult.ReasonString);
mre.Set();
}
catch (Exception exception)
{
Console.WriteLine("handle connect callback failed. e: " + exception.Message);
}
}
private void OnMqttClientDisconnected(MqttClientDisconnectedEventArgs e)
{
try
{
_listener?.ConnectionLost(e);
}
catch (Exception exception)
{
Console.WriteLine("handle disConnect callback failed. e: " + exception.Message);
}
}
private void OnMqttClientConnectingFailed(ManagedProcessFailedEventArgs e)
{
try
{
if (_listener != null)
{
_listener.ConnectFail(e);
}
Thread.Sleep(_retryTimeWait);
Connect();
}
catch (Exception exception)
{
Console.WriteLine("handle connect failed callback failed. e: " + exception.Message);
}
}
private IManagedMqttClientOptions GetOptions()
{
IManagedMqttClientOptions options = null;
long timestamp = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
string userName = "accessKey=" + _clientConf.AccessKey + "|timestamp=" + timestamp + "|instanceId=" + _clientConf.InstanceId;
options = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(new MqttClientOptionsBuilder()
.WithTcpServer(_clientConf.ServerUri, _clientConf.Port)
.WithCredentials(userName, _clientConf.AccessCode)
.WithClientId(_clientConf.ClientId)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(DefaultKeepLive))
.WithTls(new MqttClientOptionsBuilderTlsParameters()
{
AllowUntrustedCertificates = true,
UseTls = true,
CertificateValidationHandler = delegate { return true; },
IgnoreCertificateChainErrors = false,
IgnoreCertificateRevocationErrors = false,
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
})
.WithProtocolVersion(MqttProtocolVersion.V500)
.Build())
.Build();
return options;
}
}
}
MqttClient.cs代码示例如下:
using System;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Protocol;
namespace mqttcs
{
class MqttClient: MqttListener
{
private static ManualResetEvent mre = new ManualResetEvent(false);
public static async Task Main(string[] args)
{
ClientConf clientConf = new ClientConf();
clientConf.ClientId = "your mqtt clientId";
clientConf.ServerUri = "your mqtt host";
clientConf.Port = 8883;
clientConf.AccessKey = Environment.GetEnvironmentVariable("MQTT_ACCESS_KEY");
clientConf.AccessCode = Environment.GetEnvironmentVariable("MQTT_ACCESS_CODE");
clientConf.InstanceId = "your instanceId";
clientConf.Topic = "your mqtt topic";
clientConf.Qos = MqttQualityOfServiceLevel.AtMostOnce;
MqttConnection connection = new MqttConnection(clientConf, new MqttClient());
var connect = connection.Connect();
if (connect == 0)
{
Console.WriteLine("success to init mqtt connection.");
mre.WaitOne();
}
}
public void ConnectionLost(MqttClientDisconnectedEventArgs e)
{
if (e?.Exception != null)
{
Console.WriteLine("connect was lost. exception: " + e.Exception.Message);
return;
}
Console.WriteLine("connect was lost");
}
public void ConnectComplete(MqttClientConnectResultCode resultCode, String reason)
{
Console.WriteLine("connect success. resultCode: " + resultCode + " reason: " + reason);
}
public void OnMessageReceived(string message)
{
Console.WriteLine("receive msg: " + message);
}
public void ConnectFail(ManagedProcessFailedEventArgs e)
{
Console.WriteLine("connect mqtt broker failed. e: " + e.Exception.Message);
}
}
}
成功示例
接入成功后,客户端打印如下: