Updated on 2023-11-14 GMT+08:00

Go SDK Access Example

This topic describes how to use a Go SDK to connect to the Huawei Cloud IoT platform and receive subscribed messages from the platform based on AMQP.

Development Environment Requirements

Go 1.16 or later has been installed.

Adding Dependencies

Add the following dependencies to go.mod:

require (
    pack.ag/amqp v0.12.5 // v0.12.5 is used in this example. Select a version as required.
)

Sample Code

package main

import (
	"context"
	"crypto/tls"
	"fmt"
	"pack.ag/amqp"
	"time"
)

type AmqpClient struct {
	Title      string
	Host       string
	AccessKey  string
	AccessCode string
        InstanceId string
	QueueName  string

	address  string
	userName string
	password string

	client   *amqp.Client
	session  *amqp.Session
	receiver *amqp.Receiver
}

type MessageHandler interface {
	Handle(message *amqp.Message)
}

func (ac *AmqpClient) InitConnect() {
	if ac.QueueName == "" {
		ac.QueueName = "DefaultQueue"
	}
	ac.address = "amqps://" + ac.Host + ":5671"
	ac.userName = fmt.Sprintf("accessKey=%s|timestamp=%d|instanceId=%s", ac.AccessKey, time.Now().UnixNano()/1000000, ac.InstanceId)
	ac.password = ac.AccessCode
}

func (ac *AmqpClient) StartReceiveMessage(ctx context.Context, handler MessageHandler) {
	childCtx, _ := context.WithCancel(ctx)
	err := ac.generateReceiverWithRetry(childCtx)
	if nil != err {
		return
	}
	defer func() {
		_ = ac.receiver.Close(childCtx)
		_ = ac.session.Close(childCtx)
		_ = ac.client.Close()
	}()

	for {
		// Block message receiving. If ctx is a context created based on the background function, message receiving will not be blocked.
		message, err := ac.receiver.Receive(ctx)
		if nil == err {
			go handler.Handle(message)
			_ = message.Accept()
		} else {
			fmt.Println("amqp receive data error: ", err)

			// If message receiving is manually disabled, exit the program.
			select {
			case <-childCtx.Done():
				return
			default:
			}

			// If message receiving is not manually disabled, retry the connection.
			err := ac.generateReceiverWithRetry(childCtx)
			if nil != err {
				return
			}
		}
	}
}

func (ac *AmqpClient) generateReceiverWithRetry(ctx context.Context) error {
	// Retries with exponential backoff, from 10 ms to 20s.
	duration := 10 * time.Millisecond
	maxDuration := 20000 * time.Millisecond
	times := 1

	// Retries with exponential backoff
	for {
		select {
		case <-ctx.Done():
			return amqp.ErrConnClosed
		default:
		}

		err := ac.generateReceiver()
		if nil != err {
			fmt.Println("amqp ac.generateReceiver error ", err)
			time.Sleep(duration)
			if duration < maxDuration {
				duration *= 2
			}
			fmt.Println("amqp connect retry,times:", times, ",duration:", duration)
			times++
			return nil
		} else {
			fmt.Println("amqp connect init success")
			return nil
		}
	}
}

// The statuses of the connection and session cannot be determined because the packets are unavailable. Retry the connection to obtain the information.
func (ac *AmqpClient) generateReceiver() error {

	if ac.session != nil {
		receiver, err := ac.session.NewReceiver(
			amqp.LinkSourceAddress(ac.QueueName),
			amqp.LinkCredit(20),
		)
		// If a network disconnection error occurs, the connection is ended and the session fails to be established. Otherwise, the connection is established.
		if err == nil {
			ac.receiver = receiver
			return nil
		}
	}

	// Delete the previous connection.
	if ac.client != nil {
		_ = ac.client.Close()
	}
        ac.userName = fmt.Sprintf("accessKey=%s|timestamp=%d|instanceId=%s", ac.AccessKey, time.Now().UnixNano()/1000000, ac.InstanceId)
	fmt.Println("[" + ac.Title + "] Dial... addr=[" + ac.address + "], username=[" + ac.userName + "], password=[" + ac.password + "]")
	client, err := amqp.Dial(ac.address,
		amqp.ConnSASLPlain(ac.userName, ac.password),
		amqp.ConnProperty("vhost", "default"),
		amqp.ConnServerHostname("default"),
		amqp.ConnTLSConfig(&tls.Config{InsecureSkipVerify: true,
			MaxVersion: tls.VersionTLS12,
		}),
		amqp.ConnConnectTimeout(8*time.Second))
	if err != nil {
		fmt.Println("Dial", err)
		return err
	}
	ac.client = client
	session, err := client.NewSession()
	if err != nil {
		XDebug("Error: NewSession", err)
		return err
	}
	ac.session = session

	receiver, err := ac.session.NewReceiver(
		amqp.LinkTargetDurability(amqp.DurabilityUnsettledState),
		amqp.LinkSourceAddress(ac.QueueName),
		amqp.LinkCredit(100),
	)
	if err != nil {
		XDebug("Error: NewReceiver", err)
		return err
	}
	ac.receiver = receiver

	return nil
}

func XDebug(s string, err error) {
	fmt.Println(s, err)
}

type CustomerMessageHandler struct {
}

func (c *CustomerMessageHandler) Handle(message *amqp.Message) {
	fmt.Println("AMQP receives messages.", message.Value)
}

func main() {
	// For details about how to set the following parameters, see Connection Configuration Parameters.
	// AMQP access domain name
	amqpHost := "127.0.0.1"

	// Access key
	amqpAccessKey := "your accessKey"

	// Access code
	amqpAccessCode := "your accessCode"

        // Instance ID
	instanceId:= "your intanceId"

	// Name of the subscription queue
	amqpQueueName := "DefaultQueue"

	amqpClient := &AmqpClient{
		Title:      "test",
		Host:       amqpHost,
		AccessKey:  amqpAccessKey,
		AccessCode: amqpAccessCode,
                InstanceId: instanceId,
		QueueName:  amqpQueueName,
	}

	handle := CustomerMessageHandler{}
	amqpClient.InitConnect()
	ctx := context.Background()
	amqpClient.StartReceiveMessage(ctx, &handle)
}