C# SDK接入示例
本文介绍使AMQPNetLite客户端接入华为云物联网平台,接收服务端订阅消息的示例。
开发环境要求
本示例使用的开发环境为.NETFramework V4.6及以上版本。
获取SDK
1、在工程目录上单击鼠标右键打开"管理NuGet程序包"

2、在"NuGet管理器"中搜索到"AmqpNetLite"后安装所需版本

代码示例
using Amqp;
using Amqp.Framing;
using Amqp.Sasl;
using System;
using System.Threading;
namespace AmqpDemo
{
class Program
{
/// <summary>
/// 接入域名,请参见AMQP客户端接入说明文档
/// 请参考连接配置说明
/// </summary>
static string Host = "${Host}";
/// <summary>
/// 端口
/// </summary>
static int Port = 5671;
/// <summary>
/// 接入凭证键值
/// </summary>
static string AccessKey = "${YourAccessKey}";
/// <summary>
/// 接入凭证密钥
/// </summary>
static string AccessCode = "${yourAccessCode}";
/// <summary>
/// 实例Id信息,同一个Region购买多个标准版实例时需设置
/// </summary>
static string InstanceId = "${instanceId}";
/// <summary>
/// 队列名
/// </summary>
static string QueueName = "${yourQueue}";
static Connection connection;
static Session session;
static ReceiverLink receiverLink;
static DateTime lastConnectTime = DateTime.Now;
static void Main(string[] args)
{
try
{
connection = CreateConnection();
// 添加Connection Exception回调
connection.AddClosedCallback(ConnectionClosed);
// 创建Session。
var session = new Session(connection);
// 创建ReceiverLink
receiverLink = new ReceiverLink(session, "receiverName", QueueName);
//接收消息。
ReceiveMessage(receiverLink);
}
catch (Exception e)
{
Console.WriteLine(e);
}
// 按下"Enter"键后程序退出
Console.ReadLine();
ShutDown();
}
/// <summary>
/// 创建Connection
/// </summary>
/// <returns>Connection</returns>
static Connection CreateConnection()
{
lastConnectTime = DateTime.Now;
long timestamp = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
string userName = "accessKey=" + AccessKey + "|timestamp=" + timestamp + "|instanceId=" + InstanceId;
Address address = new Address(Host, Port, userName, AccessCode);
ConnectionFactory factory = new ConnectionFactory();
factory.SASL.Profile = SaslProfile.External;
// 信任服务端,跳过证书校验
factory.SSL.RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyError) => { return true; };
factory.AMQP.IdleTimeout = 8000;
factory.AMQP.MaxFrameSize = 8 * 1024;
factory.AMQP.HostName = "default";
var connection = factory.CreateAsync(address).Result;
return connection;
}
static void ReceiveMessage(ReceiverLink receiver)
{
receiver.Start(20, (link, message) =>
{
// 在线程池中处理消息,防止阻塞拉取消息的线程
ThreadPool.QueueUserWorkItem((obj) => ProcessMessage(obj), message);
// 回ACK
link.Accept(message);
});
}
static void ProcessMessage(Object obj)
{
if (obj is Message message)
{
string body = message.Body.ToString();
Console.WriteLine("receive message, body=" + body);
}
}
static void ConnectionClosed(IAmqpObject amqpObject, Error e)
{
// 断线重连
ThreadPool.QueueUserWorkItem((obj) =>
{
ShutDown();
int times = 0;
while (times++ < 5)
{
try
{
Thread.Sleep(1000);
connection = CreateConnection();
// 添加Connection Exception回调
connection.AddClosedCallback(ConnectionClosed);
// 创建Session。
session = new Session(connection);
// 创建ReceiverLink
receiverLink = new ReceiverLink(session, "receiverName", QueueName);
//接收消息。
ReceiveMessage(receiverLink);
break;
}
catch (Exception exception)
{
Console.WriteLine("reconnect error, exception =" + exception);
}
}
});
}
static void ShutDown()
{
if (receiverLink != null)
{
try
{
receiverLink.Close();
}
catch (Exception e)
{
Console.WriteLine("close receiverLink error, exception =" + e);
}
}
if (session != null)
{
try
{
session.Close();
}
catch (Exception e)
{
Console.WriteLine("close session error, exception =" + e);
}
}
if (connection != null)
{
try
{
connection.Close();
}
catch (Exception e)
{
Console.WriteLine("close connection error, exception =" + e);
}
}
}
}
}