Go Demo
This section uses Go as an example to describe how to connect an MQTTS client to the platform and receive subscribed messages from the platform.
Prerequisites
Knowledge of basic Go syntax and how to configure development environments.
Development Environment
In this example, Go 1.18 is used.
Dependency
In this example, paho.mqtt.golang (version 1.4.3) is used. You can run the following command to add the dependency to go.mod.
require (
github.com/eclipse/paho.mqtt.golang v1.4.3
)
Sample Code
package main
import (
"crypto/tls"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"os"
"os/signal"
"time"
)
type MessageHandler func(message string)
type MqttClient struct {
Host string
Port int
ClientId string
AccessKey string
AccessCode string
Topic string
InstanceId string
Qos int
Client mqtt.Client
messageHandlers []MessageHandler
}
func (mqttClient *MqttClient) Connect() bool {
return mqttClient.connectWithRetry()
}
func (mqttClient *MqttClient) connectWithRetry() bool {
// Retries with exponential backoff, from 10 ms to 20s.
duration := 10 * time.Millisecond
maxDuration := 20000 * time.Millisecond
// Retry upon connection establishment failure.
internal := mqttClient.connectInternal()
times := 0
for !internal {
time.Sleep(duration)
if duration < maxDuration {
duration *= 2
}
times++
fmt.Println("connect mqttgo broker retry. times: ", times)
internal = mqttClient.connectInternal()
}
return internal
}
func (mqttClient *MqttClient) connectInternal() bool {
// Close the existing connection before establishing a connection.
mqttClient.Close()
options := mqtt.NewClientOptions()
options.AddBroker(fmt.Sprintf("mqtts://%s:%d", mqttClient.Host, mqttClient.Port))
options.SetClientID(mqttClient.ClientId)
userName := fmt.Sprintf("accessKey=%s|timestamp=%d", mqttClient.AccessKey, time.Now().UnixNano()/1000000)
if len(mqttClient.InstanceId) != 0 {
userName = userName + fmt.Sprintf("|instanceId=%s", mqttClient.InstanceId)
}
options.SetUsername(userName)
options.SetPassword(mqttClient.AccessCode)
options.SetConnectTimeout(10 * time.Second)
options.SetKeepAlive(120 * time.Second)
// Disable the SDK internal reconnection and use the custom reconnection to refresh the timestamp.
options.SetAutoReconnect(false)
options.SetConnectRetry(false)
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
MaxVersion: tls.VersionTLS12,
MinVersion: tls.VersionTLS12,
}
options.SetTLSConfig(tlsConfig)
options.OnConnectionLost = mqttClient.createConnectionLostHandler()
client := mqtt.NewClient(options)
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println("device create bootstrap client failed,error = ", token.Error().Error())
return false
}
mqttClient.Client = client
fmt.Println("connect mqttgo broker success.")
mqttClient.subscribeTopic()
return true
}
func (mqttClient *MqttClient) subscribeTopic() {
subRes := mqttClient.Client.Subscribe(mqttClient.Topic, 0, mqttClient.createMessageHandler())
if subRes.Wait() && subRes.Error() != nil {
fmt.Printf("sub topic failed,error is %s\n", subRes.Error())
panic("subscribe topic failed.")
} else {
fmt.Printf("sub topic success\n")
}
}
func (mqttClient *MqttClient) createMessageHandler() func(client mqtt.Client, message mqtt.Message) {
messageHandler := func(client mqtt.Client, message mqtt.Message) {
fmt.Println("receive message from server.")
go func() {
for _, handler := range mqttClient.messageHandlers {
handler(string(message.Payload()))
}
}()
}
return messageHandler
}
func (mqttClient *MqttClient) createConnectionLostHandler() func(client mqtt.Client, reason error) {
// Perform custom reconnection after disconnection.
connectionLostHandler := func(client mqtt.Client, reason error) {
fmt.Printf("connection lost from server. begin to reconnect broker. reason: %s\n", reason.Error())
connected := mqttClient.connectWithRetry()
if connected {
fmt.Println("reconnect mqttgo broker success.")
}
}
return connectionLostHandler
}
func (mqttClient *MqttClient) Close() {
if mqttClient.Client != nil {
mqttClient.Client.Disconnect(1000)
}
}
func main() {
// For details about how to set the following parameters, see the connection configuration description.
// MQTT access domain name
mqttHost := "your mqtt host"
// MQTT access port
mqttPort := 8883
// Access credential key value
mqttAccessKey := os.Getenv("MQTT_ACCESS_KEY")
// Access credential secret
mqttAccessCode := os.Getenv("MQTT_ACCESS_CODE")
// Name of the subscribed topic
mqttTopic := "your mqtt topic"
// Instance ID
instanceId := "your instance Id"
//mqttgo client id
clientId := "your mqtt client id"
mqttClient := MqttClient{
Host: mqttHost,
Port: mqttPort,
Topic: mqttTopic,
ClientId: clientId,
AccessKey: mqttAccessKey,
AccessCode: mqttAccessCode,
InstanceId: instanceId,
}
// Customize the handler for processing messages.
mqttClient.messageHandlers = []MessageHandler{func(message string) {
fmt.Println(message)
}}
connect := mqttClient.Connect()
if !connect {
fmt.Println("init mqttgo client failed.")
return
}
// Block method to keep the MQTT client always pulling messages.
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
for {
<-interrupt
break
}
}
Success Example
After the access is successful, the following information is displayed on the client.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot