Updated on 2024-11-06 GMT+08:00

C# SDK Access Example

This topic describes how to connect an AMQP.Net Lite client to the IoT platform and receive subscribed messages from the platform.

Requirements for the Development Environment

.NET Framework 4.6 or later has been installed.

Obtaining the Java SDK

1. Right-click the project directory and choose Manage NuGet Packages.

2. In the NuGet manager, search for AmqpNetLite and install the required version.

Sample Code

For details about the parameters in the demo, see AMQP Client Access.
using Amqp;
using Amqp.Framing;
using Amqp.Sasl;
using System;
using System.Threading;

namespace AmqpDemo
{
    class Program
    {
        /// <summary>
        /// Access domain name. For details, see "AMQP Client Access".
        /// See Connection Configuration Parameters.
        /// </summary>
        static string Host = "${Host}";

        /// <summary>
        /// Port
        /// </summary>
        static int Port = 5671;

        /// <summary>
        /// Access key
        /// </summary>
        static string AccessKey = "${YourAccessKey}";

        /// <summary>
        /// Access code
        /// </summary>
        static string AccessCode = "${yourAccessCode}";

        /// <summary>
        /// Instance ID. This parameter is required when multiple instances of the Standard Edition are purchased in the same region.
        /// </summary>
        static string InstanceId = "${instanceId}";

        /// <summary>
        /// Queue name
        /// </summary>
        static string QueueName = "${yourQueue}";

        static Connection connection;

        static Session session;

        static ReceiverLink receiverLink;

        static DateTime lastConnectTime = DateTime.Now;

        static void Main(string[] args)
        {
            try
            {
                connection = CreateConnection();
                // Add a connection exception callback.
                connection.AddClosedCallback(ConnectionClosed);

                // Create a session.
                var session = new Session(connection);

                // Create a receiver link.
                receiverLink = new ReceiverLink(session, "receiverName", QueueName);

                // Receive a message.
                ReceiveMessage(receiverLink);
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
            }

            // Press Enter to exit the program.
            Console.ReadLine();

            ShutDown();
        }

        /// <summary>
        /// Create a connection.
        /// </summary>
        /// <returns>Connection</returns>
        static Connection CreateConnection()
        {
            lastConnectTime = DateTime.Now;
            long timestamp = new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
            string userName = "accessKey=" + AccessKey + "|timestamp=" + timestamp + "|instanceId=" + InstanceId;
            Address address = new Address(Host, Port, userName, AccessCode);
            ConnectionFactory factory = new ConnectionFactory();
            factory.SASL.Profile = SaslProfile.External;
            // Trust the server and skip certificate verification.
            factory.SSL.RemoteCertificateValidationCallback = (sender, certificate, chain, sslPolicyError) => { return true; };
            factory.AMQP.IdleTimeout = 8000;
            factory.AMQP.MaxFrameSize = 8 * 1024;
            factory.AMQP.HostName = "default";
            var connection = factory.CreateAsync(address).Result;
            return connection;
        }

        static void ReceiveMessage(ReceiverLink receiver)
        {
            receiver.Start(20, (link, message) =>
            {
                // Process the message in the thread pool to prevent the thread that pulls the message from being blocked.
                ThreadPool.QueueUserWorkItem((obj) => ProcessMessage(obj), message);
                // Return an ACK message.
                link.Accept(message);
            });
        }


        static void ProcessMessage(Object obj)
        {
            if (obj is Message message)
            {
                string body = message.Body.ToString();
                Console.WriteLine("receive message, body=" + body);
            }
        }


        static void ConnectionClosed(IAmqpObject amqpObject, Error e)
        {
            // Reconnection upon disconnection
            ThreadPool.QueueUserWorkItem((obj) =>
            {
                ShutDown();
                int times = 0;
                while (times++ < 5)
                {
                    try
                    {
                        Thread.Sleep(1000);
                        connection = CreateConnection();
                // Add a connection exception callback.
                        connection.AddClosedCallback(ConnectionClosed);

                // Create a session.
                        session = new Session(connection);

                        // Create a receiver link.
                        receiverLink = new ReceiverLink(session, "receiverName", QueueName);

                // Receive a message.
                        ReceiveMessage(receiverLink);
                        break;
                    }
                    catch (Exception exception)
                    {
                        Console.WriteLine("reconnect error, exception =" + exception);
                    }
                }
            });
        }

        static void ShutDown()
        {
            if (receiverLink != null)
            {
                try
                {
                    receiverLink.Close();
                }
                catch (Exception e)
                {
                    Console.WriteLine("close receiverLink error, exception =" + e);
                }

            }
            if (session != null)
            {
                try
                {
                    session.Close();
                }
                catch (Exception e)
                {
                    Console.WriteLine("close session error, exception =" + e);
                }

            }

            if (connection != null)
            {
                try
                {
                    connection.Close();
                }
                catch (Exception e)
                {
                    Console.WriteLine("close connection error, exception =" + e);
                }

            }
        }
    }
}