Python Demo
This section uses Python as an example to describe how to connect an MQTTS client to the platform and receive subscribed messages from the platform.
Prerequisites
Knowledge of basic Python syntax and how to configure development environments.
Development Environment
In this example, Python 3.8.8 is used.
Dependency
In this example, paho-mqtt (version 2.0.0) is used. You can run the following command to download the dependency:
pip install paho-mqtt==2.0.0
Sample Code
ClientConf code:
from typing import Optional class ClientConf: def __init__(self): # MQTT subscription address self.__host: Optional[str] = None # MQTT subscription port number self.__port: Optional[int] = None # MQTT access credential access_key self.__access_key: Optional[str] = None # MQTT access credential access_code self.__access_code: Optional[str] = None # MQTT subscription topic self.__topic: Optional[str] = None # Instance ID. This parameter is mandatory when multiple instances of the standard edition are purchased in the same region. self.__instance_id: Optional[str] = None # mqtt qos self.__qos = 1 @property def host(self): return self.__host @host.setter def host(self, host): self.__host = host @property def port(self): return self.__port @port.setter def port(self, port): self.__port = port @property def access_key(self): return self.__access_key @access_key.setter def access_key(self, access_key): self.__access_key = access_key @property def access_code(self): return self.__access_code @access_code.setter def access_code(self, access_code): self.__access_code = access_code @property def topic(self): return self.__topic @topic.setter def topic(self, topic): self.__topic = topic @property def instance_id(self): return self.__instance_id @instance_id.setter def instance_id(self, instance_id): self.__instance_id = instance_id @property def qos(self): return self.__qos @qos.setter def qos(self, qos): self.__qos = qos
MqttClient code:
import os import ssl import threading import time import traceback import secrets from client_conf import ClientConf import paho.mqtt.client as mqtt class MqttClient: def __init__(self, client_conf: ClientConf): self.__host = client_conf.host self.__port = client_conf.port self.__access_key = client_conf.access_key self.__access_code = client_conf.access_code self.__topic = client_conf.topic self.__instance_id = client_conf.instance_id self.__qos = client_conf.qos self.__paho_client: Optional[mqtt.Client] = None self.__connect_result_code = -1 self.__default_backoff = 1000 self.__retry_times = 0 self.__min_backoff = 1 * 1000 # 1s self.__max_backoff = 30 * 1000 # 30s def connect(self): self.__valid_params() rc = self.__connect() while rc != 0: # Backoff reconnection low_bound = int(self.__default_backoff * 0.8) high_bound = int(self.__default_backoff * 1.0) random_backoff = secrets.randbelow(high_bound - low_bound) backoff_with_jitter = int(pow(2, self.__retry_times)) * (random_backoff + low_bound) wait_time_ms = self.__max_backoff if (self.__min_backoff + backoff_with_jitter) > self.__max_backoff else ( self.__min_backoff + backoff_with_jitter) wait_time_s = round(wait_time_ms / 1000, 2) print("client will try to reconnect after " + str(wait_time_s) + " s") time.sleep(wait_time_s) self.__retry_times += 1 self.close() # Release the previous connection. rc = self.__connect() # If the value of rc is 0, the connection is set up successfully. If not, the connection fails. if rc != 0: print("connect with result code: " + str(rc)) if rc == 134: print("connect failed with bad username or password, " "reconnection will not be performed") pass return rc def __connect(self): try: timestamp = self.current_time_millis() user_name = "accessKey=" + self.__access_key + "|timestamp=" + timestamp if self.__instance_id: user_name = user_name + "|instanceId=" + self.__instance_id pass_word = self.__access_code self.__paho_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "mqttClient") # Disable automatic retry and update the timestamp by manual retry. self.__paho_client._reconnect_on_failure = False # Set the callback function. self._set_callback() # Topics are stored in userdata. The callback function directly subscribes to topics. self.__paho_client.user_data_set(self.__topic) self.__paho_client.username_pw_set(user_name, pass_word) # Currently, the MQTT broker supports only TLS 1.2. context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) # Not verifying the server certificate. context.verify_mode = ssl.CERT_NONE context.check_hostname = False self.__paho_client.tls_set_context(context) rc = self.__paho_client.connect(self.__host, self.__port) self.__connect_result_code = rc if rc == 0: threading.Thread(target=self.__paho_client.loop_forever, args=(1, False), name="MqttThread").start() # Wait for connection establishment. time.sleep(1) except Exception as e: self.__connect_result_code = -1 print("Mqtt connection error. traceback: " + traceback.format_exc()) if self.__paho_client.is_connected(): return 0 else: return self.__connect_result_code def __valid_params(self): assert self.__access_key is not None assert self.__access_code is not None assert self.__topic is not None @staticmethod def current_time_millis(): return str(int(round(time.time() * 1000))) def _set_callback(self): # Execute self._on_connect() when the platform responds to the connection request. self.__paho_client.on_connect = self._on_connect # Execute self._on_disconnect() when disconnecting from the platform. self.__paho_client.on_disconnect = self._on_disconnect # Execute self._on_subscribe when subscribing to a topic. self.__paho_client.on_subscribe = self._on_subscribe # Execute self._on_message() when an original message is received. self.__paho_client.on_message = self._on_message def _on_connect(self, client, userdata, flags, rc: mqtt.ReasonCode, properties): if rc == 0: print("Connected to Mqtt Broker! topic " + self.__topic) client.subscribe(userdata, 1) else: # Automatic reconnection is not performed only when the username or password is incorrect. # If the disconnect() method is not used here, loop_forever keeps reconnecting. if rc == 134: self.__paho_client.disconnect() print("Failed to connect. return code :" + str(rc.value) + ", reason" + rc.getName()) def _on_subscribe(self, client, userdata, mid, granted_qos, properties): print("Subscribed: " + str(mid) + " " + str(granted_qos) + " topic: " + self.__topic) def _on_message(self, client, userdata, message: mqtt.MQTTMessage): print("topic " + self.__topic + " Received message: " + message.payload.decode()) def _on_disconnect(self, client, userdata, flags, rc, properties): print("Disconnect to Mqtt Broker. topic: " + self.__topic) # Shut down the client after the disconnection and manually reconnect the client to refresh the timestamp. try: self.__paho_client.disconnect() except Exception as e: print("Mqtt connection error. traceback: " + traceback.format_exc()) self.connect() def close(self): if self.__paho_client is not None and self.__paho_client.is_connected(): try: self.__paho_client.disconnect() print("Mqtt connection close") except Exception as e: print("paho client disconnect failed. exception: " + str(e)) else: pass
MqttDemo code:
from client_conf import ClientConf from mqtt_client import MqttClient import os from typing import Optional def main(): client_conf = ClientConf() client_conf.host = "your ip host" client_conf.port = 8883 client_conf.topic = "your mqtt topic" # MQTT access credential access_key can be injected using environment variables. client_conf.access_key = os.environ.get("MQTT_ACCESS_KEY") # MQTT access credential access_code can be injected using environment variables. client_conf.access_code = os.environ.get("MQTT_ACCESS_CODE") client_conf.instance_id = "your instance id" mqtt_client = MqttClient(client_conf) if mqtt_client.connect() != 0: print("init failed") return if __name__ == "__main__": main()
Success Example
After the access is successful, the following information is displayed on the client.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot