Help Center/
IoT Device Access/
User Guide/
Rules/
Data Forwarding to Third-Party Applications/
AMQP Data Forwarding/
Go SDK Access Example
Updated on 2024-11-06 GMT+08:00
Go SDK Access Example
This topic describes how to use a Go SDK to connect to the Huawei Cloud IoT platform and receive subscribed messages from the platform based on AMQP.
Development Environment Requirements
Go 1.16 or later has been installed.
Adding Dependencies
Add the following dependencies to go.mod:
require ( pack.ag/amqp v0.12.5 // v0.12.5 is used in this example. Select a version as required. )
Sample Code
package main import ( "context" "crypto/tls" "fmt" "pack.ag/amqp" "time" ) type AmqpClient struct { Title string Host string AccessKey string AccessCode string InstanceId string QueueName string address string userName string password string client *amqp.Client session *amqp.Session receiver *amqp.Receiver } type MessageHandler interface { Handle(message *amqp.Message) } func (ac *AmqpClient) InitConnect() { if ac.QueueName == "" { ac.QueueName = "DefaultQueue" } ac.address = "amqps://" + ac.Host + ":5671" ac.userName = fmt.Sprintf("accessKey=%s|timestamp=%d|instanceId=%s", ac.AccessKey, time.Now().UnixNano()/1000000, ac.InstanceId) ac.password = ac.AccessCode } func (ac *AmqpClient) StartReceiveMessage(ctx context.Context, handler MessageHandler) { childCtx, _ := context.WithCancel(ctx) err := ac.generateReceiverWithRetry(childCtx) if nil != err { return } defer func() { _ = ac.receiver.Close(childCtx) _ = ac.session.Close(childCtx) _ = ac.client.Close() }() for { // Block message receiving. If ctx is a context created based on the background function, message receiving will not be blocked. message, err := ac.receiver.Receive(ctx) if nil == err { go handler.Handle(message) _ = message.Accept() } else { fmt.Println("amqp receive data error: ", err) // If message receiving is manually disabled, exit the program. select { case <-childCtx.Done(): return default: } // If message receiving is not manually disabled, retry the connection. err := ac.generateReceiverWithRetry(childCtx) if nil != err { return } } } } func (ac *AmqpClient) generateReceiverWithRetry(ctx context.Context) error { // Retries with exponential backoff, from 10 ms to 20s. duration := 10 * time.Millisecond maxDuration := 20000 * time.Millisecond times := 1 // Retries with exponential backoff for { select { case <-ctx.Done(): return amqp.ErrConnClosed default: } err := ac.generateReceiver() if nil != err { fmt.Println("amqp ac.generateReceiver error ", err) time.Sleep(duration) if duration < maxDuration { duration *= 2 } fmt.Println("amqp connect retry,times:", times, ",duration:", duration) times++ return nil } else { fmt.Println("amqp connect init success") return nil } } } // The statuses of the connection and session cannot be determined because the packets are unavailable. Retry the connection to obtain the information. func (ac *AmqpClient) generateReceiver() error { if ac.session != nil { receiver, err := ac.session.NewReceiver( amqp.LinkSourceAddress(ac.QueueName), amqp.LinkCredit(20), ) // If a network disconnection error occurs, the connection is ended and the session fails to be established. Otherwise, the connection is established. if err == nil { ac.receiver = receiver return nil } } // Delete the previous connection. if ac.client != nil { _ = ac.client.Close() } ac.userName = fmt.Sprintf("accessKey=%s|timestamp=%d|instanceId=%s", ac.AccessKey, time.Now().UnixNano()/1000000, ac.InstanceId) fmt.Println("[" + ac.Title + "] Dial... addr=[" + ac.address + "], username=[" + ac.userName + "], password=[" + ac.password + "]") client, err := amqp.Dial(ac.address, amqp.ConnSASLPlain(ac.userName, ac.password), amqp.ConnProperty("vhost", "default"), amqp.ConnServerHostname("default"), amqp.ConnTLSConfig(&tls.Config{InsecureSkipVerify: true, MaxVersion: tls.VersionTLS12, }), amqp.ConnConnectTimeout(8*time.Second)) if err != nil { fmt.Println("Dial", err) return err } ac.client = client session, err := client.NewSession() if err != nil { XDebug("Error: NewSession", err) return err } ac.session = session receiver, err := ac.session.NewReceiver( amqp.LinkTargetDurability(amqp.DurabilityUnsettledState), amqp.LinkSourceAddress(ac.QueueName), amqp.LinkCredit(100), ) if err != nil { XDebug("Error: NewReceiver", err) return err } ac.receiver = receiver return nil } func XDebug(s string, err error) { fmt.Println(s, err) } type CustomerMessageHandler struct { } func (c *CustomerMessageHandler) Handle(message *amqp.Message) { fmt.Println("AMQP receives messages.", message.Value) } func main() { // For details about how to set the following parameters, see Connection Configuration Parameters. // AMQP access domain name amqpHost := "127.0.0.1" // Access key amqpAccessKey := "your accessKey" // Access code amqpAccessCode := "your accessCode" // Instance ID instanceId:= "your intanceId" // Name of the subscription queue amqpQueueName := "DefaultQueue" amqpClient := &AmqpClient{ Title: "test", Host: amqpHost, AccessKey: amqpAccessKey, AccessCode: amqpAccessCode, InstanceId: instanceId, QueueName: amqpQueueName, } handle := CustomerMessageHandler{} amqpClient.InitConnect() ctx := context.Background() amqpClient.StartReceiveMessage(ctx, &handle) }
Parent topic: AMQP Data Forwarding
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot