GO Demo使用说明
本文以Go语言为例,介绍应用通过MQTTS协议接入平台,接收服务端订阅消息的示例。
前提条件
熟悉Go语言开发环境配置,熟悉Go语言基本语法。
开发环境
本示例使用了Go 1.18版本。
添加依赖
本示例使用的Go语言的Mqtt依赖为paho.mqtt.golang(本示例使用版本为v1.4.3),在go.mod中添加依赖的代码如下:
require (
github.com/eclipse/paho.mqtt.golang v1.4.3
)
代码示例
package main
import (
"crypto/tls"
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"os"
"os/signal"
"time"
)
type MessageHandler func(message string)
type MqttClient struct {
Host string
Port int
ClientId string
AccessKey string
AccessCode string
Topic string
InstanceId string
Qos int
Client mqtt.Client
messageHandlers []MessageHandler
}
func (mqttClient *MqttClient) Connect() bool {
return mqttClient.connectWithRetry()
}
func (mqttClient *MqttClient) connectWithRetry() bool {
// 退避重试,从10ms依次x2,直到20s。
duration := 10 * time.Millisecond
maxDuration := 20000 * time.Millisecond
// 建链失败进行重试
internal := mqttClient.connectInternal()
times := 0
for !internal {
time.Sleep(duration)
if duration < maxDuration {
duration *= 2
}
times++
fmt.Println("connect mqttgo broker retry. times: ", times)
internal = mqttClient.connectInternal()
}
return internal
}
func (mqttClient *MqttClient) connectInternal() bool {
// 建链前先关闭已有连接
mqttClient.Close()
options := mqtt.NewClientOptions()
options.AddBroker(fmt.Sprintf("mqtts://%s:%d", mqttClient.Host, mqttClient.Port))
options.SetClientID(mqttClient.ClientId)
userName := fmt.Sprintf("accessKey=%s|timestamp=%d", mqttClient.AccessKey, time.Now().UnixNano()/1000000)
if len(mqttClient.InstanceId) != 0 {
userName = userName + fmt.Sprintf("|instanceId=%s", mqttClient.InstanceId)
}
options.SetUsername(userName)
options.SetPassword(mqttClient.AccessCode)
options.SetConnectTimeout(10 * time.Second)
options.SetKeepAlive(120 * time.Second)
// 关闭sdk内部重连,使用自定义重连刷新时间戳
options.SetAutoReconnect(false)
options.SetConnectRetry(false)
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
MaxVersion: tls.VersionTLS12,
MinVersion: tls.VersionTLS12,
}
options.SetTLSConfig(tlsConfig)
options.OnConnectionLost = mqttClient.createConnectionLostHandler()
client := mqtt.NewClient(options)
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println("device create bootstrap client failed,error = ", token.Error().Error())
return false
}
mqttClient.Client = client
fmt.Println("connect mqttgo broker success.")
mqttClient.subscribeTopic()
return true
}
func (mqttClient *MqttClient) subscribeTopic() {
subRes := mqttClient.Client.Subscribe(mqttClient.Topic, 0, mqttClient.createMessageHandler())
if subRes.Wait() && subRes.Error() != nil {
fmt.Printf("sub topic failed,error is %s\n", subRes.Error())
panic("subscribe topic failed.")
} else {
fmt.Printf("sub topic success\n")
}
}
func (mqttClient *MqttClient) createMessageHandler() func(client mqtt.Client, message mqtt.Message) {
messageHandler := func(client mqtt.Client, message mqtt.Message) {
fmt.Println("receive message from server.")
go func() {
for _, handler := range mqttClient.messageHandlers {
handler(string(message.Payload()))
}
}()
}
return messageHandler
}
func (mqttClient *MqttClient) createConnectionLostHandler() func(client mqtt.Client, reason error) {
// 断链后进行自定义重连
connectionLostHandler := func(client mqtt.Client, reason error) {
fmt.Printf("connection lost from server. begin to reconnect broker. reason: %s\n", reason.Error())
connected := mqttClient.connectWithRetry()
if connected {
fmt.Println("reconnect mqttgo broker success.")
}
}
return connectionLostHandler
}
func (mqttClient *MqttClient) Close() {
if mqttClient.Client != nil {
mqttClient.Client.Disconnect(1000)
}
}
func main() {
// 以下参数配置请参考连接配置说明
// mqtt接入域名
mqttHost := "your mqtt host"
// mqtt接入端口
mqttPort := 8883
//接入凭证键值
mqttAccessKey := os.Getenv("MQTT_ACCESS_KEY")
//接入凭证密钥
mqttAccessCode := os.Getenv("MQTT_ACCESS_CODE")
//订阅topic名称
mqttTopic := "your mqtt topic"
//实例Id
instanceId := "your instance Id"
//mqttgo client id
clientId := "your mqtt client id"
mqttClient := MqttClient{
Host: mqttHost,
Port: mqttPort,
Topic: mqttTopic,
ClientId: clientId,
AccessKey: mqttAccessKey,
AccessCode: mqttAccessCode,
InstanceId: instanceId,
}
//自定义消息处理handler
mqttClient.messageHandlers = []MessageHandler{func(message string) {
fmt.Println(message)
}}
connect := mqttClient.Connect()
if !connect {
fmt.Println("init mqttgo client failed.")
return
}
//阻塞方法,保持mqtt客户端一直拉消息
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
for {
<-interrupt
break
}
}
成功示例
接入成功后,客户端打印信息如下: