Help Center>
IoT Device Access>
User Guide>
Message Communications>
Subscription and Push>
AMQP Subscription/Push>
Go SDK Access Example
Updated on 2023-01-09 GMT+08:00
Go SDK Access Example
This topic describes how to use a Go SDK to connect to the Huawei Cloud IoT platform and receive subscribed messages from the platform based on AMQP.
Development Environment
The development environments used in this example are as follows:
Go 1.16 or later.
Adding Dependencies
Add the following dependencies to go.mod:
require ( pack.ag/amqp v0.12.5 // v0.12.5 is used in this example. Select a version as required. )
Sample Code
package main import ( "context" "crypto/tls" "fmt" "pack.ag/amqp" "time" ) type AmqpClient struct { Title string Host string AccessKey string AccessCode string QueueName string address string userName string password string client *amqp.Client session *amqp.Session receiver *amqp.Receiver } type MessageHandler interface { Handle(message *amqp.Message) } func (ac *AmqpClient) InitConnect() { if ac.QueueName == "" { ac.QueueName = "DefaultQueue" } ac.address = "amqps://" + ac.Host + ":5671" ac.userName = fmt.Sprintf("accessKey=%s|timestamp=%d", ac.AccessKey, time.Now().UnixNano()/1000000) ac.password = ac.AccessCode } func (ac *AmqpClient) StartReceiveMessage(ctx context.Context, handler MessageHandler) { childCtx, _ := context.WithCancel(ctx) err := ac.generateReceiverWithRetry(childCtx) if nil != err { return } defer func() { _ = ac.receiver.Close(childCtx) _ = ac.session.Close(childCtx) _ = ac.client.Close() }() for { // Block message receiving. If ctx is a context created based on the background function, message receiving will not be blocked. message, err := ac.receiver.Receive(ctx) if nil == err { go handler.Handle(message) _ = message.Accept() } else { fmt.Println("amqp receive data error: ", err) // If message receiving is manually disabled, exit the program. select { case <-childCtx.Done(): return default: } // If message receiving is not manually disabled, retry the connection. err := ac.generateReceiverWithRetry(childCtx) if nil != err { return } } } } func (ac *AmqpClient) generateReceiverWithRetry(ctx context.Context) error { // Retries with exponential backoff, from 10 ms to 20s. duration := 10 * time.Millisecond maxDuration := 20000 * time.Millisecond times := 1 // Retries with exponential backoff for { select { case <-ctx.Done(): return amqp.ErrConnClosed default: } err := ac.generateReceiver() if nil != err { fmt.Println("amqp ac.generateReceiver error ", err) time.Sleep(duration) if duration < maxDuration { duration *= 2 } fmt.Println("amqp connect retry,times:", times, ",duration:", duration) times++ return nil } else { fmt.Println("amqp connect init success") return nil } } } // The statuses of the connection and session cannot be determined because the packets are unavailable. Retry the connection to obtain the information. func (ac *AmqpClient) generateReceiver() error { if ac.session != nil { receiver, err := ac.session.NewReceiver( amqp.LinkSourceAddress(ac.QueueName), amqp.LinkCredit(20), ) // If a network disconnection error occurs, the connection is ended and the session fails to be established. Otherwise, the connection is established. if err == nil { ac.receiver = receiver return nil } } // Delete the previous connection. if ac.client != nil { _ = ac.client.Close() } fmt.Println("[" + ac.Title + "] Dial... addr=[" + ac.address + "], username=[" + ac.userName + "], password=[" + ac.password + "]") client, err := amqp.Dial(ac.address, amqp.ConnSASLPlain(ac.userName, ac.password), amqp.ConnProperty("vhost", "default"), amqp.ConnServerHostname("default"), amqp.ConnTLSConfig(&tls.Config{InsecureSkipVerify: true, MaxVersion: tls.VersionTLS12, }), amqp.ConnConnectTimeout(8*time.Second)) if err != nil { fmt.Println("Dial", err) return err } ac.client = client session, err := client.NewSession() if err != nil { XDebug("Error: NewSession", err) return err } ac.session = session receiver, err := ac.session.NewReceiver( amqp.LinkTargetDurability(amqp.DurabilityUnsettledState), amqp.LinkSourceAddress(ac.QueueName), amqp.LinkCredit(100), ) if err != nil { XDebug("Error: NewReceiver", err) return err } ac.receiver = receiver return nil } func XDebug(s string, err error) { fmt.Println(s, err) } type CustomerMessageHandler struct { } func (c *CustomerMessageHandler) Handle(message *amqp.Message) { fmt.Println("AMQP receives messages.", message.Value) } func main() { // AMQP access domain name // Reference: https://support.huaweicloud.com/intl/en-us/usermanual-iothub/iot_01_00100_2.html#section2 amqpHost := "127.0.0.1" // Access key // Reference: https://support.huaweicloud.com/intl/en-us/usermanual-iothub/iot_01_00100_2.html#section3 amqpAccessKey := "your accessKey" // Access code // Reference: https://support.huaweicloud.com/intl/en-us/usermanual-iothub/iot_01_00100_2.html#section3 amqpAccessCode := "your accessCode" // Name of the subscription queue amqpQueueName := "DefaultQueue" amqpClient := &AmqpClient{ Title: "test", Host: amqpHost, AccessKey: amqpAccessKey, AccessCode: amqpAccessCode, QueueName: amqpQueueName, } handle := CustomerMessageHandler{} amqpClient.InitConnect() ctx := context.Background() amqpClient.StartReceiveMessage(ctx, &handle) }
Parent topic: AMQP Subscription/Push
Feedback
Was this page helpful?
Provide feedback
Thank you very much for your feedback. We will continue working to improve the
documentation.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot