更新时间:2024-06-11 GMT+08:00
GO Demo使用说明
本文以Go语言为例,介绍应用通过MQTTS协议接入平台,接收服务端订阅消息的示例。
前提条件
熟悉Go语言开发环境配置,熟悉Go语言基本语法。
开发环境
本示例使用了Go 1.18版本。
添加依赖
本示例使用的Go语言的Mqtt依赖为paho.mqtt.golang(本示例使用版本为v1.4.3),在go.mod中添加依赖的代码如下:
require ( github.com/eclipse/paho.mqtt.golang v1.4.3 )
代码示例
package main import ( "crypto/tls" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "os" "os/signal" "time" ) type MessageHandler func(message string) type MqttClient struct { Host string Port int ClientId string AccessKey string AccessCode string Topic string InstanceId string Qos int Client mqtt.Client messageHandlers []MessageHandler } func (mqttClient *MqttClient) Connect() bool { return mqttClient.connectWithRetry() } func (mqttClient *MqttClient) connectWithRetry() bool { // 退避重试,从10ms依次x2,直到20s。 duration := 10 * time.Millisecond maxDuration := 20000 * time.Millisecond // 建链失败进行重试 internal := mqttClient.connectInternal() times := 0 for !internal { time.Sleep(duration) if duration < maxDuration { duration *= 2 } times++ fmt.Println("connect mqttgo broker retry. times: ", times) internal = mqttClient.connectInternal() } return internal } func (mqttClient *MqttClient) connectInternal() bool { // 建链前先关闭已有连接 mqttClient.Close() options := mqtt.NewClientOptions() options.AddBroker(fmt.Sprintf("mqtts://%s:%d", mqttClient.Host, mqttClient.Port)) options.SetClientID(mqttClient.ClientId) userName := fmt.Sprintf("accessKey=%s|timestamp=%d", mqttClient.AccessKey, time.Now().UnixNano()/1000000) if len(mqttClient.InstanceId) != 0 { userName = userName + fmt.Sprintf("|instanceId=%s", mqttClient.InstanceId) } options.SetUsername(userName) options.SetPassword(mqttClient.AccessCode) options.SetConnectTimeout(10 * time.Second) options.SetKeepAlive(120 * time.Second) // 关闭sdk内部重连,使用自定义重连刷新时间戳 options.SetAutoReconnect(false) options.SetConnectRetry(false) tlsConfig := &tls.Config{ InsecureSkipVerify: true, MaxVersion: tls.VersionTLS12, MinVersion: tls.VersionTLS12, } options.SetTLSConfig(tlsConfig) options.OnConnectionLost = mqttClient.createConnectionLostHandler() client := mqtt.NewClient(options) if token := client.Connect(); token.Wait() && token.Error() != nil { fmt.Println("device create bootstrap client failed,error = ", token.Error().Error()) return false } mqttClient.Client = client fmt.Println("connect mqttgo broker success.") mqttClient.subscribeTopic() return true } func (mqttClient *MqttClient) subscribeTopic() { subRes := mqttClient.Client.Subscribe(mqttClient.Topic, 0, mqttClient.createMessageHandler()) if subRes.Wait() && subRes.Error() != nil { fmt.Printf("sub topic failed,error is %s\n", subRes.Error()) panic("subscribe topic failed.") } else { fmt.Printf("sub topic success\n") } } func (mqttClient *MqttClient) createMessageHandler() func(client mqtt.Client, message mqtt.Message) { messageHandler := func(client mqtt.Client, message mqtt.Message) { fmt.Println("receive message from server.") go func() { for _, handler := range mqttClient.messageHandlers { handler(string(message.Payload())) } }() } return messageHandler } func (mqttClient *MqttClient) createConnectionLostHandler() func(client mqtt.Client, reason error) { // 断链后进行自定义重连 connectionLostHandler := func(client mqtt.Client, reason error) { fmt.Printf("connection lost from server. begin to reconnect broker. reason: %s\n", reason.Error()) connected := mqttClient.connectWithRetry() if connected { fmt.Println("reconnect mqttgo broker success.") } } return connectionLostHandler } func (mqttClient *MqttClient) Close() { if mqttClient.Client != nil { mqttClient.Client.Disconnect(1000) } } func main() { // 以下参数配置请参考连接配置说明 // mqtt接入域名 mqttHost := "your mqtt host" // mqtt接入端口 mqttPort := 8883 //接入凭证键值 mqttAccessKey := os.Getenv("MQTT_ACCESS_KEY") //接入凭证密钥 mqttAccessCode := os.Getenv("MQTT_ACCESS_CODE") //订阅topic名称 mqttTopic := "your mqtt topic" //实例Id instanceId := "your instance Id" //mqttgo client id clientId := "your mqtt client id" mqttClient := MqttClient{ Host: mqttHost, Port: mqttPort, Topic: mqttTopic, ClientId: clientId, AccessKey: mqttAccessKey, AccessCode: mqttAccessCode, InstanceId: instanceId, } //自定义消息处理handler mqttClient.messageHandlers = []MessageHandler{func(message string) { fmt.Println(message) }} connect := mqttClient.Connect() if !connect { fmt.Println("init mqttgo client failed.") return } //阻塞方法,保持mqtt客户端一直拉消息 interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) for { <-interrupt break } }
成功示例
接入成功后,客户端打印信息如下:
图1 go mqtt客户端接入成功示例
父主题: 使用MQTT转发