设备接入 IoTDA
设备接入 IoTDA
- 最新动态
- 功能总览
- 服务公告
- 计费说明
- 产品介绍
- 快速入门
- 用户指南
- 最佳实践
- 开发指南
-
API参考
- 应用侧API参考
- 设备侧MQTT/MQTTS接口参考
- 设备侧HTTPS接口参考
- 设备侧LwM2M接口参考
- 安全隧道WebSocket接口参考
- 模组AT指令参考
- 修订记录
- SDK参考
- 场景代码示例
-
常见问题
- 热门问题
-
方案咨询
- 连接IoT平台的业务场景有哪些?
- 设备管理服务和设备接入服务合一后的差异点是什么?
- IAM子用户或子项目是否可以开通物联网平台服务?
- 物联网平台支持在华为云的哪些区域开通?
- 华为是否提供模组/硬件终端/应用软件等?
- IAM用户访问API提示没有权限?(是否区分版本?)
- 创建规则或者设置资源文件存储时候提示赋予Security Administrator权限
- 物联网平台设置默认资源空间的规则是什么?
- 设备接入服务如何获取设备数据?
- 物联网平台的资源空间和设备可以无限创建吗?
- 物联网平台支持批量注册设备吗?
- 物联网平台对应用侧和设备侧在开发或使用时有限制吗?
- 物联网平台支持的DTLS加密算法有哪些?
- 物联网平台支持二进制大小端模式切换吗?
- 什么是NB-IoT?
- 物联网平台支持的硬件架构和使用的相关组件有哪些?
- 如何获取平台接入地址?
- 设备集成相关问题
- 设备侧SDK相关问题
- 设备发放相关问题
- LWM2M/CoAP接入相关问题
- MQTT接入相关问题
- 泛协议接入相关问题
- 物模型相关问题
- 消息通信相关问题
- 订阅推送相关问题
- 编解码插件相关问题
- OTA升级相关问题
- 应用集成相关问题
- 实例管理相关问题
- 视频帮助
- 文档下载
- 通用参考
链接复制成功!
GO Demo使用说明
本文以Go语言为例,介绍应用通过MQTTS协议接入平台,接收服务端订阅消息的示例。
前提条件
熟悉Go语言开发环境配置,熟悉Go语言基本语法。
开发环境
本示例使用了Go 1.18版本。
添加依赖
本示例使用的Go语言的Mqtt依赖为paho.mqtt.golang(本示例使用版本为v1.4.3),在go.mod中添加依赖的代码如下:
require ( github.com/eclipse/paho.mqtt.golang v1.4.3 )
代码示例
package main import ( "crypto/tls" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "os" "os/signal" "time" ) type MessageHandler func(message string) type MqttClient struct { Host string Port int ClientId string AccessKey string AccessCode string Topic string InstanceId string Qos int Client mqtt.Client messageHandlers []MessageHandler } func (mqttClient *MqttClient) Connect() bool { return mqttClient.connectWithRetry() } func (mqttClient *MqttClient) connectWithRetry() bool { // 退避重试,从10ms依次x2,直到20s。 duration := 10 * time.Millisecond maxDuration := 20000 * time.Millisecond // 建链失败进行重试 internal := mqttClient.connectInternal() times := 0 for !internal { time.Sleep(duration) if duration < maxDuration { duration *= 2 } times++ fmt.Println("connect mqttgo broker retry. times: ", times) internal = mqttClient.connectInternal() } return internal } func (mqttClient *MqttClient) connectInternal() bool { // 建链前先关闭已有连接 mqttClient.Close() options := mqtt.NewClientOptions() options.AddBroker(fmt.Sprintf("mqtts://%s:%d", mqttClient.Host, mqttClient.Port)) options.SetClientID(mqttClient.ClientId) userName := fmt.Sprintf("accessKey=%s|timestamp=%d", mqttClient.AccessKey, time.Now().UnixNano()/1000000) if len(mqttClient.InstanceId) != 0 { userName = userName + fmt.Sprintf("|instanceId=%s", mqttClient.InstanceId) } options.SetUsername(userName) options.SetPassword(mqttClient.AccessCode) options.SetConnectTimeout(10 * time.Second) options.SetKeepAlive(120 * time.Second) // 关闭sdk内部重连,使用自定义重连刷新时间戳 options.SetAutoReconnect(false) options.SetConnectRetry(false) tlsConfig := &tls.Config{ InsecureSkipVerify: true, MaxVersion: tls.VersionTLS12, MinVersion: tls.VersionTLS12, } options.SetTLSConfig(tlsConfig) options.OnConnectionLost = mqttClient.createConnectionLostHandler() client := mqtt.NewClient(options) if token := client.Connect(); token.Wait() && token.Error() != nil { fmt.Println("device create bootstrap client failed,error = ", token.Error().Error()) return false } mqttClient.Client = client fmt.Println("connect mqttgo broker success.") mqttClient.subscribeTopic() return true } func (mqttClient *MqttClient) subscribeTopic() { subRes := mqttClient.Client.Subscribe(mqttClient.Topic, 0, mqttClient.createMessageHandler()) if subRes.Wait() && subRes.Error() != nil { fmt.Printf("sub topic failed,error is %s\n", subRes.Error()) panic("subscribe topic failed.") } else { fmt.Printf("sub topic success\n") } } func (mqttClient *MqttClient) createMessageHandler() func(client mqtt.Client, message mqtt.Message) { messageHandler := func(client mqtt.Client, message mqtt.Message) { fmt.Println("receive message from server.") go func() { for _, handler := range mqttClient.messageHandlers { handler(string(message.Payload())) } }() } return messageHandler } func (mqttClient *MqttClient) createConnectionLostHandler() func(client mqtt.Client, reason error) { // 断链后进行自定义重连 connectionLostHandler := func(client mqtt.Client, reason error) { fmt.Printf("connection lost from server. begin to reconnect broker. reason: %s\n", reason.Error()) connected := mqttClient.connectWithRetry() if connected { fmt.Println("reconnect mqttgo broker success.") } } return connectionLostHandler } func (mqttClient *MqttClient) Close() { if mqttClient.Client != nil { mqttClient.Client.Disconnect(1000) } } func main() { // 以下参数配置请参考连接配置说明 // mqtt接入域名 mqttHost := "your mqtt host" // mqtt接入端口 mqttPort := 8883 //接入凭证键值 mqttAccessKey := os.Getenv("MQTT_ACCESS_KEY") //接入凭证密钥 mqttAccessCode := os.Getenv("MQTT_ACCESS_CODE") //订阅topic名称 mqttTopic := "your mqtt topic" //实例Id instanceId := "your instance Id" //mqttgo client id clientId := "your mqtt client id" mqttClient := MqttClient{ Host: mqttHost, Port: mqttPort, Topic: mqttTopic, ClientId: clientId, AccessKey: mqttAccessKey, AccessCode: mqttAccessCode, InstanceId: instanceId, } //自定义消息处理handler mqttClient.messageHandlers = []MessageHandler{func(message string) { fmt.Println(message) }} connect := mqttClient.Connect() if !connect { fmt.Println("init mqttgo client failed.") return } //阻塞方法,保持mqtt客户端一直拉消息 interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) for { <-interrupt break } }
成功示例
接入成功后,客户端打印信息如下:
图1 go mqtt客户端接入成功示例
data:image/s3,"s3://crabby-images/ff2e3/ff2e3e06c7c702387831edb0f005f9cdee33e1e0" alt="点击放大"
父主题: 使用MQTT转发