Updated on 2024-11-06 GMT+08:00

Python Demo

This section uses Python as an example to describe how to connect an MQTTS client to the platform and receive subscribed messages from the platform.

Prerequisites

Knowledge of basic Python syntax and how to configure development environments.

Development Environment

In this example, Python 3.8.8 is used.

Dependency

In this example, paho-mqtt (version 2.0.0) is used. You can run the following command to download the dependency:

pip install paho-mqtt==2.0.0

Sample Code

ClientConf code:

from typing import Optional
class ClientConf:
    def __init__(self):
        # MQTT subscription address
        self.__host: Optional[str] = None
        # MQTT subscription port number
        self.__port: Optional[int] = None
        # MQTT access credential access_key
        self.__access_key: Optional[str] = None
        # MQTT access credential access_code
        self.__access_code: Optional[str] = None
        # MQTT subscription topic
        self.__topic: Optional[str] = None
        # Instance ID. This parameter is mandatory when multiple instances of the standard edition are purchased in the same region.
        self.__instance_id: Optional[str] = None
        # mqtt qos
        self.__qos = 1

    @property
    def host(self):
        return self.__host
    @host.setter
    def host(self, host):
        self.__host = host
    @property
    def port(self):
        return self.__port
    @port.setter
    def port(self, port):
        self.__port = port
    @property
    def access_key(self):
        return self.__access_key
    @access_key.setter
    def access_key(self, access_key):
        self.__access_key = access_key
    @property
    def access_code(self):
        return self.__access_code
    @access_code.setter
    def access_code(self, access_code):
        self.__access_code = access_code
    @property
    def topic(self):
        return self.__topic
    @topic.setter
    def topic(self, topic):
        self.__topic = topic
    @property
    def instance_id(self):
        return self.__instance_id
    @instance_id.setter
    def instance_id(self, instance_id):
        self.__instance_id = instance_id
    @property
    def qos(self):
        return self.__qos
    @qos.setter
    def qos(self, qos):
        self.__qos = qos

MqttClient code:

import os
import ssl
import threading
import time
import traceback
import secrets
from client_conf import ClientConf
import paho.mqtt.client as mqtt
class MqttClient:
    def __init__(self, client_conf: ClientConf):
        self.__host = client_conf.host
        self.__port = client_conf.port
        self.__access_key = client_conf.access_key
        self.__access_code = client_conf.access_code
        self.__topic = client_conf.topic
        self.__instance_id = client_conf.instance_id
        self.__qos = client_conf.qos
        self.__paho_client: Optional[mqtt.Client] = None
        self.__connect_result_code = -1
        self.__default_backoff = 1000
        self.__retry_times = 0
        self.__min_backoff = 1 * 1000  # 1s
        self.__max_backoff = 30 * 1000  # 30s

    def connect(self):
        self.__valid_params()
        rc = self.__connect()
        while rc != 0:
            # Backoff reconnection
            low_bound = int(self.__default_backoff * 0.8)
            high_bound = int(self.__default_backoff * 1.0)
            random_backoff = secrets.randbelow(high_bound - low_bound)
            backoff_with_jitter = int(pow(2, self.__retry_times)) * (random_backoff + low_bound)
            wait_time_ms = self.__max_backoff if (self.__min_backoff + backoff_with_jitter) > self.__max_backoff else (
                    self.__min_backoff + backoff_with_jitter)
            wait_time_s = round(wait_time_ms / 1000, 2)
            print("client will try to reconnect after " + str(wait_time_s) + " s")
            time.sleep(wait_time_s)
            self.__retry_times += 1
            self.close() # Release the previous connection.
            rc = self.__connect()
            # If the value of rc is 0, the connection is set up successfully. If not, the connection fails.
            if rc != 0:
                print("connect with result code: " + str(rc))
                if rc == 134:
                    print("connect failed with bad username or password, "
                          "reconnection will not be performed")
                    pass
        return rc
    def __connect(self):
        try:
            timestamp = self.current_time_millis()
            user_name = "accessKey=" + self.__access_key + "|timestamp=" + timestamp
            if self.__instance_id:
                user_name = user_name + "|instanceId=" + self.__instance_id
            pass_word = self.__access_code
            self.__paho_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "mqttClient")
            # Disable automatic retry and update the timestamp by manual retry.
            self.__paho_client._reconnect_on_failure = False
            # Set the callback function.
            self._set_callback()
            # Topics are stored in userdata. The callback function directly subscribes to topics.
            self.__paho_client.user_data_set(self.__topic)
            self.__paho_client.username_pw_set(user_name, pass_word)
            # Currently, the MQTT broker supports only TLS 1.2.
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
            # Not verifying the server certificate.
            context.verify_mode = ssl.CERT_NONE
            context.check_hostname = False
            self.__paho_client.tls_set_context(context)
            rc = self.__paho_client.connect(self.__host, self.__port)
            self.__connect_result_code = rc
            if rc == 0:
                threading.Thread(target=self.__paho_client.loop_forever, args=(1, False), name="MqttThread").start()
            # Wait for connection establishment.
            time.sleep(1)
        except Exception as e:
            self.__connect_result_code = -1
            print("Mqtt connection error. traceback: " + traceback.format_exc())
        if self.__paho_client.is_connected():
            return 0
        else:
            return self.__connect_result_code
    def __valid_params(self):
        assert self.__access_key is not None
        assert self.__access_code is not None
        assert self.__topic is not None

    @staticmethod
    def current_time_millis():
        return str(int(round(time.time() * 1000)))
    def _set_callback(self):
        # Execute self._on_connect() when the platform responds to the connection request.
        self.__paho_client.on_connect = self._on_connect
        # Execute self._on_disconnect() when disconnecting from the platform.
        self.__paho_client.on_disconnect = self._on_disconnect
        # Execute self._on_subscribe when subscribing to a topic.
        self.__paho_client.on_subscribe = self._on_subscribe
        # Execute self._on_message() when an original message is received.
        self.__paho_client.on_message = self._on_message
    def _on_connect(self, client, userdata, flags, rc: mqtt.ReasonCode, properties):
        if rc == 0:
            print("Connected to Mqtt Broker! topic " + self.__topic)
            client.subscribe(userdata, 1)
        else:
            # Automatic reconnection is not performed only when the username or password is incorrect.
            # If the disconnect() method is not used here, loop_forever keeps reconnecting.
            if rc == 134:
                self.__paho_client.disconnect()
            print("Failed to connect. return code :" + str(rc.value) + ", reason" + rc.getName())
    def _on_subscribe(self, client, userdata, mid, granted_qos, properties):
        print("Subscribed: " + str(mid) + " " + str(granted_qos) + " topic: " + self.__topic)
    def _on_message(self, client, userdata, message: mqtt.MQTTMessage):
        print("topic " + self.__topic + " Received message: " + message.payload.decode())
    def _on_disconnect(self, client, userdata, flags, rc, properties):
        print("Disconnect to Mqtt Broker. topic: " + self.__topic)
        # Shut down the client after the disconnection and manually reconnect the client to refresh the timestamp.
        try:
            self.__paho_client.disconnect()
        except Exception as e:
            print("Mqtt connection error. traceback: " + traceback.format_exc())
        self.connect()
    def close(self):
        if self.__paho_client is not None and self.__paho_client.is_connected():
            try:
                self.__paho_client.disconnect()
                print("Mqtt connection close")
            except Exception as e:
                print("paho client disconnect failed. exception: " + str(e))
        else:
            pass

MqttDemo code:

from client_conf import ClientConf
from mqtt_client import MqttClient
import os
from typing import Optional
def main():
    client_conf = ClientConf()
    client_conf.host = "your ip host"
    client_conf.port = 8883
    client_conf.topic = "your mqtt topic"
    # MQTT access credential access_key can be injected using environment variables.
    client_conf.access_key = os.environ.get("MQTT_ACCESS_KEY")
    # MQTT access credential access_code can be injected using environment variables.
    client_conf.access_code = os.environ.get("MQTT_ACCESS_CODE")
    client_conf.instance_id = "your instance id"
    mqtt_client = MqttClient(client_conf)
    if mqtt_client.connect() != 0:
        print("init failed")
        return
if __name__ == "__main__":
    main()

Success Example

After the access is successful, the following information is displayed on the client.

Figure 1 Example of successful MQTT subscription using Python