Updated on 2024-11-06 GMT+08:00

GO Demo

This section uses Go as an example to describe how to connect an MQTTS client to the platform and receive subscribed messages from the platform.

Prerequisites

Knowledge of basic Go syntax and how to configure development environments.

Development Environment

In this example, Go 1.18 is used.

Dependency

In this example, paho.mqtt.golang (version 1.4.3) is used. You can run the following command to add the dependency to go.mod.

require (
    github.com/eclipse/paho.mqtt.golang v1.4.3
)

Sample Code

package main

import (
   "crypto/tls"
   "fmt"
   mqtt "github.com/eclipse/paho.mqtt.golang"
   "os"
   "os/signal"
   "time"
)
type MessageHandler func(message string)
type MqttClient struct {
   Host            string
   Port            int
   ClientId        string
   AccessKey       string
   AccessCode      string
   Topic           string
   InstanceId      string
   Qos             int
   Client          mqtt.Client
   messageHandlers []MessageHandler
}
func (mqttClient *MqttClient) Connect() bool {
   return mqttClient.connectWithRetry()
}
func (mqttClient *MqttClient) connectWithRetry() bool {
   // Retries with exponential backoff, from 10 ms to 20s.
   duration := 10 * time.Millisecond
   maxDuration := 20000 * time.Millisecond
   // Retry upon connection establishment failure.
   internal := mqttClient.connectInternal()
   times := 0
   for !internal {
      time.Sleep(duration)
      if duration < maxDuration {
         duration *= 2
      }
      times++
      fmt.Println("connect mqttgo broker retry. times: ", times)
      internal = mqttClient.connectInternal()
   }
   return internal
}
func (mqttClient *MqttClient) connectInternal() bool {
   // Close the existing connection before establishing a connection.
   mqttClient.Close()
   options := mqtt.NewClientOptions()
   options.AddBroker(fmt.Sprintf("mqtts://%s:%d", mqttClient.Host, mqttClient.Port))
   options.SetClientID(mqttClient.ClientId)
   userName := fmt.Sprintf("accessKey=%s|timestamp=%d", mqttClient.AccessKey, time.Now().UnixNano()/1000000)
   if len(mqttClient.InstanceId) != 0 {
      userName = userName + fmt.Sprintf("|instanceId=%s", mqttClient.InstanceId)
   }
   options.SetUsername(userName)
   options.SetPassword(mqttClient.AccessCode)
   options.SetConnectTimeout(10 * time.Second)
   options.SetKeepAlive(120 * time.Second)
   // Disable the SDK internal reconnection and use the custom reconnection to refresh the timestamp.
   options.SetAutoReconnect(false)
   options.SetConnectRetry(false)
   tlsConfig := &tls.Config{
      InsecureSkipVerify: true,
      MaxVersion:         tls.VersionTLS12,
      MinVersion:         tls.VersionTLS12,
   }
   options.SetTLSConfig(tlsConfig)
   options.OnConnectionLost = mqttClient.createConnectionLostHandler()
   client := mqtt.NewClient(options)
   if token := client.Connect(); token.Wait() && token.Error() != nil {
      fmt.Println("device create bootstrap client failed,error = ", token.Error().Error())
      return false
   }
   mqttClient.Client = client
   fmt.Println("connect mqttgo broker success.")
   mqttClient.subscribeTopic()
   return true
}
func (mqttClient *MqttClient) subscribeTopic() {
   subRes := mqttClient.Client.Subscribe(mqttClient.Topic, 0, mqttClient.createMessageHandler())
   if subRes.Wait() && subRes.Error() != nil {
      fmt.Printf("sub topic failed,error is %s\n", subRes.Error())
      panic("subscribe topic failed.")
   } else {
      fmt.Printf("sub topic success\n")
   }
}
func (mqttClient *MqttClient) createMessageHandler() func(client mqtt.Client, message mqtt.Message) {
   messageHandler := func(client mqtt.Client, message mqtt.Message) {
      fmt.Println("receive message from server.")
      go func() {
         for _, handler := range mqttClient.messageHandlers {
            handler(string(message.Payload()))
         }
      }()
   }
   return messageHandler
}
func (mqttClient *MqttClient) createConnectionLostHandler() func(client mqtt.Client, reason error) {
   // Perform custom reconnection after disconnection.
   connectionLostHandler := func(client mqtt.Client, reason error) {
      fmt.Printf("connection lost from server. begin to reconnect broker. reason: %s\n", reason.Error())
      connected := mqttClient.connectWithRetry()
      if connected {
         fmt.Println("reconnect mqttgo broker success.")
      }
   }
   return connectionLostHandler
}
func (mqttClient *MqttClient) Close() {
   if mqttClient.Client != nil {
      mqttClient.Client.Disconnect(1000)
   }
}
func main() {
   // For details about how to set the following parameters, see the connection configuration description.
   // MQTT access domain name
   mqttHost := "your mqtt host"
   // MQTT access port
   mqttPort := 8883
   // Access credential key value
   mqttAccessKey := os.Getenv("MQTT_ACCESS_KEY")
   // Access credential secret
   mqttAccessCode := os.Getenv("MQTT_ACCESS_CODE")
   // Name of the subscribed topic
   mqttTopic := "your mqtt topic"
   // Instance ID
   instanceId := "your instance Id"
   //mqttgo client id
   clientId := "your mqtt client id"

   mqttClient := MqttClient{
      Host:       mqttHost,
      Port:       mqttPort,
      Topic:      mqttTopic,
      ClientId:   clientId,
      AccessKey:  mqttAccessKey,
      AccessCode: mqttAccessCode,
      InstanceId: instanceId,
   }
   // Customize the handler for processing messages.
   mqttClient.messageHandlers = []MessageHandler{func(message string) {
      fmt.Println(message)
   }}
   connect := mqttClient.Connect()
   if !connect {
      fmt.Println("init mqttgo client failed.")
      return
   }
   // Block method to keep the MQTT client always pulling messages.
   interrupt := make(chan os.Signal, 1)
   signal.Notify(interrupt, os.Interrupt)
   for {
      <-interrupt
      break
   }
}

Success Example

After the access is successful, the following information is displayed on the client.

Figure 1 Example of successful MQTT client access using Go