设备接入 IoTDA设备接入 IoTDA

更新时间:2021/08/18 GMT+08:00
分享

C# SDK接入实例

本文介绍使AMQPNetLite客户端接入华为云物联网平台,接收服务端订阅消息的示例。

开发环境要求

本示例使用的开发环境为.NETFramework V3.5及以上版本。

获取SDK

1、在工程目录上点击鼠标右键打开"管理NuGet程序包"

2、在"NuGet管理器"中搜索到"AmqpNetLite"后安装所需版本

代码示例

您可以单击这里获取C# SDK接入示例,Demo中涉及的参数说明,请参考AMQP客户端接入说明

using Amqp;
using Amqp.Framing;
using Amqp.Sasl;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AmqpDemo
{
    class Program
    {
        /// <summary>
        /// 接入域名,请参见AMQP客户端接入说明文档
        /// 请参考 https://support.huaweicloud.com/devg-iothub/iot_01_00100_2.html#section2
        /// </summary>
        static string Host = "${Host}";

        /// <summary>
        /// 端口
        /// </summary>
        static int Port = 5671;

        /// <summary>
        /// 接入凭证键值
        /// </summary>
        static string AccessKey = "${YourAccessKey}";

        /// <summary>
        /// 接入凭证密钥
        /// </summary>
        static string AccessCode = "${yourAccessCode}";

        /// <summary>
        /// 队列名
        /// </summary>
        static string QueueName = "${yourQueue}";

        static Connection connection;

        static Session session;

        static ReceiverLink receiverLink;

        static DateTime lastConnectTime = DateTime.Now;

        static void Main(string[] args)
        {
            try
            {
                var connection = CreateConnection();
                // 添加Connection Exception回调
                connection.AddClosedCallback(ConnectionClosed);

                // 创建Session。
                var session = new Session(connection);

                // 创建ReceiverLink
                var receiver = new ReceiverLink(session, "receiverName", QueueName);

                //接收消息。
                ReceiveMessage(receiver);
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }

            // 防止主程序退出,这里休眠了60s,60s后程序会结束
            Thread.Sleep(60 * 1000);

            ShutDown();
        }

        /// <summary>
        /// 创建Connection
        /// </summary>
        /// <returns>Connection</returns>
        static Connection CreateConnection()
        {
            lastConnectTime = DateTime.Now;
            long timestamp = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
            string userName = "accessKey=" + AccessKey + "|timestamp=" + timestamp;
            Address address = new Address(Host, Port, userName, AccessCode);
            ConnectionFactory factory = new ConnectionFactory();
            factory.SASL.Profile = SaslProfile.External;
            // 信任服务端,跳过证书校验
            factory.SSL.RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyError) => { return true; };
            factory.AMQP.IdleTimeout = 8000;
            factory.AMQP.MaxFrameSize = 8 * 1024;
            factory.AMQP.HostName = "default";
            var connection = factory.CreateAsync(address).Result;
            return connection;
        }

        static void ReceiveMessage(ReceiverLink receiver)
        {
            receiver.Start(20, (link, message) =>
            {
                // 在线程池中处理消息,防止阻塞拉取消息的线程
                ThreadPool.QueueUserWorkItem((obj) => ProcessMessage(obj), message);
                // 回ACK
                link.Accept(message);
            });
        }

        static void ProcessMessage(Object obj)
        {
            if (obj is Message message)
            {
                string body = message.Body.ToString();
                Console.WriteLine("receive message, body=" + body);
            }

            var connection = CreateConnection();
            // 添加Connection Exception回调
            connection.AddClosedCallback(ConnectionClosed);

            // 创建Session。
            var session = new Session(connection);

            // 创建ReceiverLink
            var receiver = new ReceiverLink(session, "receiverName", QueueName);

            //接收消息。
            ReceiveMessage(receiver);
        }


        static void ConnectionClosed(IAmqpObject amqpObject, Error e)
        {
            // 断线重连,15s重连一次
            while (DateTime.Now.CompareTo(lastConnectTime.AddSeconds(15)) < 0)
            {
                Thread.Sleep(1000);
            }
            ShutDown();

            var connection = CreateConnection();
            // 添加Connection Exception回调
            connection.AddClosedCallback(ConnectionClosed);

            // 创建Session。
            var session = new Session(connection);

            // 创建ReceiverLink
            var receiver = new ReceiverLink(session, "receiverName", QueueName);

            //接收消息。
            ReceiveMessage(receiver);
        }

        static void ShutDown()
        {
            if (receiverLink != null)
            {
                try
                {
                    receiverLink.Close();
                }
                catch (Exception e)
                {
                    Console.WriteLine("close receiverLink error, exception =" + e);
                }

            }
            if (session != null)
            {
                try
                {
                    session.Close();
                }
                catch (Exception e)
                {
                    Console.WriteLine("close session error, exception =" + e);
                }

            }

            if (connection != null)
            {
                try
                {
                    connection.Close();
                }
                catch (Exception e)
                {
                    Console.WriteLine("close connection error, exception =" + e);
                }

            }
        }
    }
}
分享:

    相关文档

    相关产品