GO Demo
This section uses Go as an example to describe how to connect an MQTTS client to the platform and receive subscribed messages from the platform.
Prerequisites
Knowledge of basic Go syntax and how to configure development environments.
Development Environment
In this example, Go 1.18 is used.
Dependency
In this example, paho.mqtt.golang (version 1.4.3) is used. You can run the following command to add the dependency to go.mod.
require ( github.com/eclipse/paho.mqtt.golang v1.4.3 )
Sample Code
package main import ( "crypto/tls" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "os" "os/signal" "time" ) type MessageHandler func(message string) type MqttClient struct { Host string Port int ClientId string AccessKey string AccessCode string Topic string InstanceId string Qos int Client mqtt.Client messageHandlers []MessageHandler } func (mqttClient *MqttClient) Connect() bool { return mqttClient.connectWithRetry() } func (mqttClient *MqttClient) connectWithRetry() bool { // Retries with exponential backoff, from 10 ms to 20s. duration := 10 * time.Millisecond maxDuration := 20000 * time.Millisecond // Retry upon connection establishment failure. internal := mqttClient.connectInternal() times := 0 for !internal { time.Sleep(duration) if duration < maxDuration { duration *= 2 } times++ fmt.Println("connect mqttgo broker retry. times: ", times) internal = mqttClient.connectInternal() } return internal } func (mqttClient *MqttClient) connectInternal() bool { // Close the existing connection before establishing a connection. mqttClient.Close() options := mqtt.NewClientOptions() options.AddBroker(fmt.Sprintf("mqtts://%s:%d", mqttClient.Host, mqttClient.Port)) options.SetClientID(mqttClient.ClientId) userName := fmt.Sprintf("accessKey=%s|timestamp=%d", mqttClient.AccessKey, time.Now().UnixNano()/1000000) if len(mqttClient.InstanceId) != 0 { userName = userName + fmt.Sprintf("|instanceId=%s", mqttClient.InstanceId) } options.SetUsername(userName) options.SetPassword(mqttClient.AccessCode) options.SetConnectTimeout(10 * time.Second) options.SetKeepAlive(120 * time.Second) // Disable the SDK internal reconnection and use the custom reconnection to refresh the timestamp. options.SetAutoReconnect(false) options.SetConnectRetry(false) tlsConfig := &tls.Config{ InsecureSkipVerify: true, MaxVersion: tls.VersionTLS12, MinVersion: tls.VersionTLS12, } options.SetTLSConfig(tlsConfig) options.OnConnectionLost = mqttClient.createConnectionLostHandler() client := mqtt.NewClient(options) if token := client.Connect(); token.Wait() && token.Error() != nil { fmt.Println("device create bootstrap client failed,error = ", token.Error().Error()) return false } mqttClient.Client = client fmt.Println("connect mqttgo broker success.") mqttClient.subscribeTopic() return true } func (mqttClient *MqttClient) subscribeTopic() { subRes := mqttClient.Client.Subscribe(mqttClient.Topic, 0, mqttClient.createMessageHandler()) if subRes.Wait() && subRes.Error() != nil { fmt.Printf("sub topic failed,error is %s\n", subRes.Error()) panic("subscribe topic failed.") } else { fmt.Printf("sub topic success\n") } } func (mqttClient *MqttClient) createMessageHandler() func(client mqtt.Client, message mqtt.Message) { messageHandler := func(client mqtt.Client, message mqtt.Message) { fmt.Println("receive message from server.") go func() { for _, handler := range mqttClient.messageHandlers { handler(string(message.Payload())) } }() } return messageHandler } func (mqttClient *MqttClient) createConnectionLostHandler() func(client mqtt.Client, reason error) { // Perform custom reconnection after disconnection. connectionLostHandler := func(client mqtt.Client, reason error) { fmt.Printf("connection lost from server. begin to reconnect broker. reason: %s\n", reason.Error()) connected := mqttClient.connectWithRetry() if connected { fmt.Println("reconnect mqttgo broker success.") } } return connectionLostHandler } func (mqttClient *MqttClient) Close() { if mqttClient.Client != nil { mqttClient.Client.Disconnect(1000) } } func main() { // For details about how to set the following parameters, see the connection configuration description. // MQTT access domain name mqttHost := "your mqtt host" // MQTT access port mqttPort := 8883 // Access credential key value mqttAccessKey := os.Getenv("MQTT_ACCESS_KEY") // Access credential secret mqttAccessCode := os.Getenv("MQTT_ACCESS_CODE") // Name of the subscribed topic mqttTopic := "your mqtt topic" // Instance ID instanceId := "your instance Id" //mqttgo client id clientId := "your mqtt client id" mqttClient := MqttClient{ Host: mqttHost, Port: mqttPort, Topic: mqttTopic, ClientId: clientId, AccessKey: mqttAccessKey, AccessCode: mqttAccessCode, InstanceId: instanceId, } // Customize the handler for processing messages. mqttClient.messageHandlers = []MessageHandler{func(message string) { fmt.Println(message) }} connect := mqttClient.Connect() if !connect { fmt.Println("init mqttgo client failed.") return } // Block method to keep the MQTT client always pulling messages. interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) for { <-interrupt break } }
Success Example
After the access is successful, the following information is displayed on the client.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot