更新时间:2024-06-11 GMT+08:00

C# SDK接入示例

本文介绍使AMQPNetLite客户端接入华为云物联网平台,接收服务端订阅消息的示例。

开发环境要求

本示例使用的开发环境为.NETFramework V4.6及以上版本。

获取SDK

1、在工程目录上单击鼠标右键打开"管理NuGet程序包"

2、在"NuGet管理器"中搜索到"AmqpNetLite"后安装所需版本

代码示例

Demo中涉及的参数说明,请参考AMQP客户端接入说明
using Amqp;
using Amqp.Framing;
using Amqp.Sasl;
using System;
using System.Threading;

namespace AmqpDemo
{
    class Program
    {
        /// <summary>
        /// 接入域名,请参见AMQP客户端接入说明文档
        /// 请参考连接配置说明
        /// </summary>
        static string Host = "${Host}";

        /// <summary>
        /// 端口
        /// </summary>
        static int Port = 5671;

        /// <summary>
        /// 接入凭证键值
        /// </summary>
        static string AccessKey = "${YourAccessKey}";

        /// <summary>
        /// 接入凭证密钥
        /// </summary>
        static string AccessCode = "${yourAccessCode}";

        /// <summary>
        /// 实例Id信息,同一个Region购买多个标准版实例时需设置
        /// </summary>
        static string InstanceId = "${instanceId}";

        /// <summary>
        /// 队列名
        /// </summary>
        static string QueueName = "${yourQueue}";

        static Connection connection;

        static Session session;

        static ReceiverLink receiverLink;

        static DateTime lastConnectTime = DateTime.Now;

        static void Main(string[] args)
        {
            try
            {
                connection = CreateConnection();
                // 添加Connection Exception回调
                connection.AddClosedCallback(ConnectionClosed);

                // 创建Session。
                var session = new Session(connection);

                // 创建ReceiverLink
                receiverLink = new ReceiverLink(session, "receiverName", QueueName);

                //接收消息。
                ReceiveMessage(receiverLink);
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }

            // 按下"Enter"键后程序退出
            Console.ReadLine();

            ShutDown();
        }

        /// <summary>
        /// 创建Connection
        /// </summary>
        /// <returns>Connection</returns>
        static Connection CreateConnection()
        {
            lastConnectTime = DateTime.Now;
            long timestamp = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
            string userName = "accessKey=" + AccessKey + "|timestamp=" + timestamp + "|instanceId=" + InstanceId;
            Address address = new Address(Host, Port, userName, AccessCode);
            ConnectionFactory factory = new ConnectionFactory();
            factory.SASL.Profile = SaslProfile.External;
            // 信任服务端,跳过证书校验
            factory.SSL.RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyError) => { return true; };
            factory.AMQP.IdleTimeout = 8000;
            factory.AMQP.MaxFrameSize = 8 * 1024;
            factory.AMQP.HostName = "default";
            var connection = factory.CreateAsync(address).Result;
            return connection;
        }

        static void ReceiveMessage(ReceiverLink receiver)
        {
            receiver.Start(20, (link, message) =>
            {
                // 在线程池中处理消息,防止阻塞拉取消息的线程
                ThreadPool.QueueUserWorkItem((obj) => ProcessMessage(obj), message);
                // 回ACK
                link.Accept(message);
            });
        }


        static void ProcessMessage(Object obj)
        {
            if (obj is Message message)
            {
                string body = message.Body.ToString();
                Console.WriteLine("receive message, body=" + body);
            }
        }


        static void ConnectionClosed(IAmqpObject amqpObject, Error e)
        {
            // 断线重连
            ThreadPool.QueueUserWorkItem((obj) =>
            {
                ShutDown();
                int times = 0;
                while (times++ < 5)
                {
                    try
                    {
                        Thread.Sleep(1000);
                        connection = CreateConnection();
                        // 添加Connection Exception回调
                        connection.AddClosedCallback(ConnectionClosed);

                        // 创建Session。
                        session = new Session(connection);

                        // 创建ReceiverLink
                        receiverLink = new ReceiverLink(session, "receiverName", QueueName);

                        //接收消息。
                        ReceiveMessage(receiverLink);
                        break;
                    }
                    catch (Exception exception)
                    {
                        Console.WriteLine("reconnect error, exception =" + exception);
                    }
                }
            });
        }

        static void ShutDown()
        {
            if (receiverLink != null)
            {
                try
                {
                    receiverLink.Close();
                }
                catch (Exception e)
                {
                    Console.WriteLine("close receiverLink error, exception =" + e);
                }

            }
            if (session != null)
            {
                try
                {
                    session.Close();
                }
                catch (Exception e)
                {
                    Console.WriteLine("close session error, exception =" + e);
                }

            }

            if (connection != null)
            {
                try
                {
                    connection.Close();
                }
                catch (Exception e)
                {
                    Console.WriteLine("close connection error, exception =" + e);
                }

            }
        }
    }
}