Help Center/
IoT Device Access/
User Guide/
Rules/
Data Forwarding to Third-Party Applications/
AMQP Data Forwarding/
Go SDK Access Example
Updated on 2025-04-23 GMT+08:00
Go SDK Access Example
This topic describes how to use a Go SDK to connect to the Huawei Cloud IoT platform and receive subscribed messages from the platform based on AMQP.
Development Environment
Go 1.16 or later has been installed.
Dependency
Add the following dependencies to go.mod:
require (
pack.ag/amqp v0.12.5 // v0.12.5 is used in this example. Select a version as required.
)
Sample Code
package main
import (
"context"
"crypto/tls"
"fmt"
"pack.ag/amqp"
"time"
)
type AmqpClient struct {
Title string
Host string
AccessKey string
AccessCode string
InstanceId string
QueueName string
address string
userName string
password string
client *amqp.Client
session *amqp.Session
receiver *amqp.Receiver
}
type MessageHandler interface {
Handle(message *amqp.Message)
}
func (ac *AmqpClient) InitConnect() {
if ac.QueueName == "" {
ac.QueueName = "DefaultQueue"
}
ac.address = "amqps://" + ac.Host + ":5671"
ac.userName = fmt.Sprintf("accessKey=%s|timestamp=%d|instanceId=%s", ac.AccessKey, time.Now().UnixNano()/1000000, ac.InstanceId)
ac.password = ac.AccessCode
}
func (ac *AmqpClient) StartReceiveMessage(ctx context.Context, handler MessageHandler) {
childCtx, _ := context.WithCancel(ctx)
err := ac.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
defer func() {
_ = ac.receiver.Close(childCtx)
_ = ac.session.Close(childCtx)
_ = ac.client.Close()
}()
for {
// Block message receiving. If ctx is a context created based on the background function, message receiving will not be blocked.
message, err := ac.receiver.Receive(ctx)
if nil == err {
go handler.Handle(message)
_ = message.Accept()
} else {
fmt.Println("amqp receive data error: ", err)
// If message receiving is manually disabled, exit the program.
select {
case <-childCtx.Done():
return
default:
}
// If message receiving is not manually disabled, retry the connection.
err := ac.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
}
}
}
func (ac *AmqpClient) generateReceiverWithRetry(ctx context.Context) error {
// Retries with exponential backoff, from 10 ms to 20s.
duration := 10 * time.Millisecond
maxDuration := 20000 * time.Millisecond
times := 1
// Retries with exponential backoff
for {
select {
case <-ctx.Done():
return amqp.ErrConnClosed
default:
}
err := ac.generateReceiver()
if nil != err {
fmt.Println("amqp ac.generateReceiver error ", err)
time.Sleep(duration)
if duration < maxDuration {
duration *= 2
}
fmt.Println("amqp connect retry,times:", times, ",duration:", duration)
times++
return nil
} else {
fmt.Println("amqp connect init success")
return nil
}
}
}
// The statuses of the connection and session cannot be determined because the packets are unavailable. Retry the connection to obtain the information.
func (ac *AmqpClient) generateReceiver() error {
if ac.session != nil {
receiver, err := ac.session.NewReceiver(
amqp.LinkSourceAddress(ac.QueueName),
amqp.LinkCredit(20),
)
// If a network disconnection error occurs, the connection is ended and the session fails to be established. Otherwise, the connection is established.
if err == nil {
ac.receiver = receiver
return nil
}
}
// Delete the previous connection.
if ac.client != nil {
_ = ac.client.Close()
}
ac.userName = fmt.Sprintf("accessKey=%s|timestamp=%d|instanceId=%s", ac.AccessKey, time.Now().UnixNano()/1000000, ac.InstanceId)
fmt.Println("[" + ac.Title + "] Dial... addr=[" + ac.address + "], username=[" + ac.userName + "], password=[" + ac.password + "]")
client, err := amqp.Dial(ac.address,
amqp.ConnSASLPlain(ac.userName, ac.password),
amqp.ConnProperty("vhost", "default"),
amqp.ConnServerHostname("default"),
amqp.ConnTLSConfig(&tls.Config{InsecureSkipVerify: true,
MaxVersion: tls.VersionTLS12,
}),
amqp.ConnConnectTimeout(8*time.Second))
if err != nil {
fmt.Println("Dial", err)
return err
}
ac.client = client
session, err := client.NewSession()
if err != nil {
XDebug("Error: NewSession", err)
return err
}
ac.session = session
receiver, err := ac.session.NewReceiver(
amqp.LinkTargetDurability(amqp.DurabilityUnsettledState),
amqp.LinkSourceAddress(ac.QueueName),
amqp.LinkCredit(100),
)
if err != nil {
XDebug("Error: NewReceiver", err)
return err
}
ac.receiver = receiver
return nil
}
func XDebug(s string, err error) {
fmt.Println(s, err)
}
type CustomerMessageHandler struct {
}
func (c *CustomerMessageHandler) Handle(message *amqp.Message) {
fmt.Println("AMQP receives messages.", message.Value)
}
func main() {
// For details about how to set the following parameters, see Connection Configuration.
// AMQP access domain name
amqpHost := "127.0.0.1"
// Access key
amqpAccessKey := "your accessKey"
// Access code
amqpAccessCode := "your accessCode"
// Instance ID
instanceId:= "your intanceId"
// Name of the subscription queue
amqpQueueName := "DefaultQueue"
amqpClient := &AmqpClient{
Title: "test",
Host: amqpHost,
AccessKey: amqpAccessKey,
AccessCode: amqpAccessCode,
InstanceId: instanceId,
QueueName: amqpQueueName,
}
handle := CustomerMessageHandler{}
amqpClient.InitConnect()
ctx := context.Background()
amqpClient.StartReceiveMessage(ctx, &handle)
}
Parent topic: AMQP Data Forwarding
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot