内置MQTT broker默认开启端口进行TLS(Transport Layer Security)安全认证,客户端必须带上证书才能访问MQTT broker。
- 证书与边缘节点绑定,在一个边缘节点下申请的证书只能用来访问该边缘节点的MQTT broker,如果访问其他边缘节点的MQTT broker,会导致认证失败。
- 一个边缘节点最多只能申请10份证书。
- 证书的有效期为5年。
- MQTT使用限制
- 支持QoS 0
- 支持Topic自定义
- 不支持QoS 1和QoS 2
- 不支持will、retain msg
采用TCP通道基础 + TLS协议(TLSV1.2 版本)
- 登录IEF管理控制台。
- 选择左侧导航栏的“边缘资源 > 边缘节点”。
- 单击边缘节点名称,进入边缘节点详情。
- 选择“证书”页签,单击“添加证书”。
- 输入证书名称,单击“确定”。
证书用于终端设备与MQTT broker通信时鉴权。
- 客户端不需要校验服务端证书,单向认证即可。
- 内置MQTT broker默认开启8883端口。
- 样例中的Go语言MQTT Client引用了github.com/eclipse/paho.mqtt.golang开源库。
- 客户端需要处理断连事件,实现掉线重连机制,提高连接可靠性。
package main import ( "crypto/tls" "crypto/x509" "fmt" "math/rand" "sync" "time" MQTT "github.com/eclipse/paho.mqtt.golang" ) func main() { subClient := InitMqttClient(onSubConnectionLost) pubClient := InitMqttClient(onPubConnectionLost) wait := sync.WaitGroup{} wait.Add(1) go func() { for { time.Sleep(1*time.Second) pubClient.Publish("topic", 0, false, "hello world") } }() subClient.Subscribe("topic", 0, onReceived) wait.Wait() } func InitMqttClient(onConnectionLost MQTT.ConnectionLostHandler) MQTT.Client { pool := x509.NewCertPool() cert, err := tls.LoadX509KeyPair("/tmp/example_cert.crt", "/tmp/example_cert.key") if err != nil { panic(err) } tlsConfig := &tls.Config{ RootCAs: pool, Certificates: []tls.Certificate{cert}, // 单向认证,client不校验服务端证书 InsecureSkipVerify: true, } // 使用tls或者ssl协议,连接8883端口 opts := MQTT.NewClientOptions().AddBroker("tls://").SetClientID(fmt.Sprintf("%f",rand.Float64())) opts.SetTLSConfig(tlsConfig) opts.OnConnect = onConnect opts.AutoReconnect = false // 回调函数,客户端与服务端断连后立刻被触发 opts.OnConnectionLost = onConnectionLost client := MQTT.NewClient(opts) loopConnect(client) return client } func onReceived(client MQTT.Client, message MQTT.Message) { fmt.Printf("Receive topic: %s, payload: %s \n", message.Topic(), string(message.Payload())) } // sub客户端与服务端断连后,触发重连机制 func onSubConnectionLost(client MQTT.Client, err error) { fmt.Println("on sub connect lost, try to reconnect") loopConnect(client) client.Subscribe("topic", 0, onReceived) } // pub客户端与服务端断连后,触发重连机制 func onPubConnectionLost(client MQTT.Client, err error) { fmt.Println("on pub connect lost, try to reconnect") loopConnect(client) } func onConnect(client MQTT.Client) { fmt.Println("on connect") } func loopConnect(client MQTT.Client) { for { token := client.Connect() if rs, err := CheckClientToken(token); !rs { fmt.Printf("connect error: %s\n", err.Error()) } else { break } time.Sleep(1 * time.Second) } } func CheckClientToken(token MQTT.Token) (bool, error) { if token.Wait() && token.Error() != nil { return false, token.Error() } return true, nil }
/***************************************************************************** Description: MQTT消息收发JAVA demo。需要先创建边缘节点并下载获取客户端证书 ****************************************************************************/ package com.example.demo; import javax.net.ssl.SSLSocketFactory; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /******************************************************************** * MQTT Demo 展示客户端连接边缘节点broker进行消息的收发,连接进行SSL安全认证,demo演示包括如下: * 1、MQTT接收客户端,接收MQTT消息 * 2、MQTT发送客户端,发送MQTT消息 ********************************************************************/ public class MqttClientDemo { private static int QOS_TYPE = 2; //MQTT服务器地址 private static final String MQTT_HOST = "ssl://x.x.x.x:8883"; //MQTT发送客户端id private static final String MQTT_PUB_CLIENT_ID = "pub_client_1"; //MQTT接收客户端id private static final String MQTT_SUB_CLIENT_ID = "sub_client_1"; //MQTT通道订阅主题topic private static final String TOPIC = "/hello"; //MQTT客户端连接SSL证书配置路径 public static final String CLIENT_CRT_FILE_PATH = "example_cert.crt"; public static final String CLIENT_KEY_FILE_PATH = "example_cert.key"; //MQTT客户端连接超时时间(秒) public static final int TIME_OUT_INTERVAL = 10; //MQTT客户端发送心跳间隔(秒) public static final int HEART_TIME_INTERVAL = 20; //MQTT客户端断线重试间隔(毫秒) public static final int RECONNECT_INTERVAL = 10000; //MQTT客户端发送消息间隔(毫秒) public static final int PUBLISH_MSG_INTERVAL = 3000; //MQTT client客户端 private MqttClient mqttClient; //MQTT client连接MQTT的客户端ID,一般以客户端唯一标识符表示 private String clientId; //MQTT client连接配置项 private MqttConnectOptions connOpts; //初始化MQTT客户端未订阅任何topic private boolean isSubscribe = false; public MqttClientDemo(String id) throws MqttException { setClientId(id); initMqttClient(); initCallback(); initConnectOptions(); connectMqtt(); } /******************************************************************** * 发送消息 * @param message 待发送的消息 * @throws MqttException ********************************************************************/ public void publishMessage(String message) throws MqttException { MqttMessage mqttMessage = new MqttMessage(message.getBytes()); mqttMessage.setQos(QOS_TYPE); mqttMessage.setRetained(false); mqttClient.publish(TOPIC, mqttMessage); System.out.println(String.format("MQTT Client[%s] publish message[%s]", clientId, message)); } /******************************************************************** * 订阅topic * @throws MqttException ********************************************************************/ public void subscribeTopic() throws MqttException { int[] Qos = {QOS_TYPE}; String[] topics = {TOPIC}; mqttClient.subscribe(topics, Qos); isSubscribe = true; } /******************************************************************** * 启动线程定时发送MQTT消息 * @throws MqttException ********************************************************************/ public void startPublishMessage() { new Thread() { @Override public void run() { while (true) { try { Thread.sleep(PUBLISH_MSG_INTERVAL); } catch (InterruptedException e) { e.printStackTrace(); } try { publishMessage("hello world!"); } catch (MqttException e) { System.out.println(String.format("MQTT client[%s] publish message error,errorMsg[%s]", clientId, e.getMessage())); } } } }.start(); } /******************************************************************** * 初始化MQTT客户端 * @throws MqttException 连接异常 ********************************************************************/ private void initMqttClient() throws MqttException { MemoryPersistence persistence = new MemoryPersistence(); mqttClient = new MqttClient(MQTT_HOST, clientId, persistence); } /******************************************************************** * 初始化连接配置 * @throws MqttException 连接异常 ********************************************************************/ private void initConnectOptions() { connOpts = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 connOpts.setCleanSession(true); connOpts.setHttpsHostnameVerificationEnabled(false); // 设置超时时间,单位为秒 connOpts.setConnectionTimeout(TIME_OUT_INTERVAL); // 设置会话心跳时间,单位为秒,服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 connOpts.setKeepAliveInterval(HEART_TIME_INTERVAL); SSLSocketFactory factory = null; try { factory = SslUtil.getSocketFactory(CLIENT_CRT_FILE_PATH, CLIENT_KEY_FILE_PATH); } catch (Exception e) { e.printStackTrace(); } // TLS连接配置 connOpts.setSocketFactory(factory); } /******************************************************************** * 发起连接MQTT connect请求 * @throws MqttException 连接异常 ********************************************************************/ private void connectMqtt() throws MqttException { mqttClient.connect(connOpts); System.out.println(String.format("MQTT client[%s] is connected,the connctOptions: \n%s", clientId, connOpts.toString())); } /******************************************************************** * 设置回调接口 * @throws MqttException 连接异常 ********************************************************************/ private void initCallback() { mqttClient.setCallback(new MqttMessageCallback()); } private void setClientId(String id) { clientId = id; } /******************************************************************** * MQTT Client重连函数,调用连接函数并判断是否订阅过Topic,如果订阅过topic则重新订阅topic * @throws MqttException ********************************************************************/ private void rconnectMqtt() throws MqttException { connectMqtt(); if (isSubscribe) { subscribeTopic(); } } /******************************************************************** * MQTT client 订阅topic后,MQTT 通道有数据,则通过该回调接口接收消息 * @version V1.0 ********************************************************************/ private class MqttMessageCallback implements MqttCallback { @Override public void connectionLost(Throwable cause) { System.out.println(String.format("MQTT Client[%s] connect lost,Retry in 10 seconds,info[%s]", clientId, cause.getMessage())); while (!mqttClient.isConnected()) { try { Thread.sleep(RECONNECT_INTERVAL); System.out.println(String.format("MQTT Client[%s] reconnect ....", clientId)); rconnectMqtt(); } catch (Exception e) { continue; } } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) { String message = new String(mqttMessage.getPayload()); System.out.println(String.format("MQTT Client[%s] receive message[%s] from topic[%s]", clientId, message, topic)); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } } public static void main(String[] args) throws MqttException { try { //订阅MQTT通道 MqttClientDemo mqttsubClientDemo = new MqttClientDemo(MqttClientDemo.MQTT_SUB_CLIENT_ID); mqttsubClientDemo.subscribeTopic(); //往MQTT通道发送:hello world MqttClientDemo mqttpubClientDemo = new MqttClientDemo(MqttClientDemo.MQTT_PUB_CLIENT_ID); mqttpubClientDemo.startPublishMessage(); } catch (MqttException e) { System.out.println(String.format("program start error,errorMessage[%s]", e.getMessage())); } } } |
/***************************************************************************** Description: SSL工具类,加载client ssl证书配置,忽略服务器证书校验 ****************************************************************************/ package com.example.demo; import java.io.ByteArrayInputStream; import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Paths; import java.security.KeyPair; import java.security.KeyStore; import java.security.Security; import java.security.cert.Certificate; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.openssl.PEMReader; import org.bouncycastle.openssl.PasswordFinder; public class SslUtil { /******************************************************************** * 验证并获取SSLSocketFactory ********************************************************************/ public static SSLSocketFactory getSocketFactory(final String crtFile, final String keyFile) throws Exception { Security.addProvider(new BouncyCastleProvider()); // 1、加载客户端证书 PEMReader reader_client = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(crtFile))))); X509Certificate cert = (X509Certificate) reader_client.readObject(); reader_client.close(); // 2、加载客户端key reader_client = new PEMReader( new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFile)))), new PasswordFinder() { @Override public char[] getPassword() { return null; } } ); // 3、发送客户端密钥和证书到服务器进行身份验证 KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); ks.load(null, null); ks.setCertificateEntry("certificate", cert); ks.setKeyEntry("private-key", ((KeyPair) reader_client.readObject()).getPrivate(), "".toCharArray(), new Certificate[]{cert}); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(ks, "".toCharArray()); // 4、创建socket factory SSLContext context = SSLContext.getInstance("TLSv1.2"); TrustManager[] tms = new TrustManager[1]; TrustManager miTM = new TrustAllManager(); tms[0] = miTM; context.init(kmf.getKeyManagers(), tms, null); reader_client.close(); return context.getSocketFactory(); } /******************************************************************** * 忽略服务端证书校验 ********************************************************************/ static class TrustAllManager implements TrustManager, X509TrustManager { @Override public X509Certificate[] getAcceptedIssuers() { return null; } @Override public void checkServerTrusted(X509Certificate[] certs, String authType) throws CertificateException { } public boolean isServerTrusted(X509Certificate[] certs) { return true; } public boolean isClientTrusted(X509Certificate[] certs) { return true; } @Override public void checkClientTrusted(X509Certificate[] certs, String authType) throws CertificateException { } } } |
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>mqtt.example</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>7</source> <target>7</target> </configuration> </plugin> </plugins> </build> <dependencies> <!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.bouncycastle/bcprov-jdk16 --> <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcprov-jdk16</artifactId> <version>1.45</version> </dependency> </dependencies> </project> |
