Python Demo
This section uses Python as an example to describe how to connect an MQTTS client to the platform and receive subscribed messages from the platform.
Prerequisites
Knowledge of basic Python syntax and how to configure development environments.
Development Environment
In this example, Python 3.8.8 is used.
Dependency
In this example, paho-mqtt (version 2.0.0) is used. You can run the following command to download the dependency:
pip install paho-mqtt==2.0.0
Sample Code
ClientConf code:
from typing import Optional
class ClientConf:
def __init__(self):
# MQTT subscription address
self.__host: Optional[str] = None
# MQTT subscription port number
self.__port: Optional[int] = None
# MQTT access credential access_key
self.__access_key: Optional[str] = None
# MQTT access credential access_code
self.__access_code: Optional[str] = None
# MQTT subscription topic
self.__topic: Optional[str] = None
# Instance ID. This parameter is mandatory when multiple instances of the standard edition are purchased in the same region.
self.__instance_id: Optional[str] = None
# mqtt qos
self.__qos = 1
@property
def host(self):
return self.__host
@host.setter
def host(self, host):
self.__host = host
@property
def port(self):
return self.__port
@port.setter
def port(self, port):
self.__port = port
@property
def access_key(self):
return self.__access_key
@access_key.setter
def access_key(self, access_key):
self.__access_key = access_key
@property
def access_code(self):
return self.__access_code
@access_code.setter
def access_code(self, access_code):
self.__access_code = access_code
@property
def topic(self):
return self.__topic
@topic.setter
def topic(self, topic):
self.__topic = topic
@property
def instance_id(self):
return self.__instance_id
@instance_id.setter
def instance_id(self, instance_id):
self.__instance_id = instance_id
@property
def qos(self):
return self.__qos
@qos.setter
def qos(self, qos):
self.__qos = qos
MqttClient code:
import os
import ssl
import threading
import time
import traceback
import secrets
from client_conf import ClientConf
import paho.mqtt.client as mqtt
class MqttClient:
def __init__(self, client_conf: ClientConf):
self.__host = client_conf.host
self.__port = client_conf.port
self.__access_key = client_conf.access_key
self.__access_code = client_conf.access_code
self.__topic = client_conf.topic
self.__instance_id = client_conf.instance_id
self.__qos = client_conf.qos
self.__paho_client: Optional[mqtt.Client] = None
self.__connect_result_code = -1
self.__default_backoff = 1000
self.__retry_times = 0
self.__min_backoff = 1 * 1000 # 1s
self.__max_backoff = 30 * 1000 # 30s
def connect(self):
self.__valid_params()
rc = self.__connect()
while rc != 0:
# Backoff reconnection
low_bound = int(self.__default_backoff * 0.8)
high_bound = int(self.__default_backoff * 1.0)
random_backoff = secrets.randbelow(high_bound - low_bound)
backoff_with_jitter = int(pow(2, self.__retry_times)) * (random_backoff + low_bound)
wait_time_ms = self.__max_backoff if (self.__min_backoff + backoff_with_jitter) > self.__max_backoff else (
self.__min_backoff + backoff_with_jitter)
wait_time_s = round(wait_time_ms / 1000, 2)
print("client will try to reconnect after " + str(wait_time_s) + " s")
time.sleep(wait_time_s)
self.__retry_times += 1
self.close() # Release the previous connection.
rc = self.__connect()
# If the value of rc is 0, the connection is set up successfully. Otherwise, the connection fails.
if rc != 0:
print("connect with result code: " + str(rc))
if rc == 134:
print("connect failed with bad username or password, "
"reconnection will not be performed")
pass
return rc
def __connect(self):
try:
timestamp = self.current_time_millis()
user_name = "accessKey=" + self.__access_key + "|timestamp=" + timestamp
if self.__instance_id:
user_name = user_name + "|instanceId=" + self.__instance_id
pass_word = self.__access_code
self.__paho_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "mqttClient")
# Disable automatic retry and update the timestamp by manual retry.
self.__paho_client._reconnect_on_failure = False
# Set the callback function.
self._set_callback()
# Topics are stored in userdata. The callback function directly subscribes to topics.
self.__paho_client.user_data_set(self.__topic)
self.__paho_client.username_pw_set(user_name, pass_word)
# Currently, the MQTT broker supports only TLS 1.2.
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
# Not verifying the server certificate.
context.verify_mode = ssl.CERT_NONE
context.check_hostname = False
self.__paho_client.tls_set_context(context)
rc = self.__paho_client.connect(self.__host, self.__port)
self.__connect_result_code = rc
if rc == 0:
threading.Thread(target=self.__paho_client.loop_forever, args=(1, False), name="MqttThread").start()
# Wait for connection establishment.
time.sleep(1)
except Exception as e:
self.__connect_result_code = -1
print("Mqtt connection error. traceback: " + traceback.format_exc())
if self.__paho_client.is_connected():
return 0
else:
return self.__connect_result_code
def __valid_params(self):
assert self.__access_key is not None
assert self.__access_code is not None
assert self.__topic is not None
@staticmethod
def current_time_millis():
return str(int(round(time.time() * 1000)))
def _set_callback(self):
# Execute self._on_connect() when the platform responds to the connection request.
self.__paho_client.on_connect = self._on_connect
# Execute self._on_disconnect() when disconnecting from the platform.
self.__paho_client.on_disconnect = self._on_disconnect
# Execute self._on_subscribe when subscribing to a topic.
self.__paho_client.on_subscribe = self._on_subscribe
# Execute self._on_message() when an original message is received.
self.__paho_client.on_message = self._on_message
def _on_connect(self, client, userdata, flags, rc: mqtt.ReasonCode, properties):
if rc == 0:
print("Connected to Mqtt Broker! topic " + self.__topic)
client.subscribe(userdata, 1)
else:
# Automatic reconnection is not performed only when the username or password is incorrect.
# If the disconnect() method is not used here, loop_forever keeps reconnecting.
if rc == 134:
self.__paho_client.disconnect()
print("Failed to connect. return code :" + str(rc.value) + ", reason" + rc.getName())
def _on_subscribe(self, client, userdata, mid, granted_qos, properties):
print("Subscribed: " + str(mid) + " " + str(granted_qos) + " topic: " + self.__topic)
def _on_message(self, client, userdata, message: mqtt.MQTTMessage):
print("topic " + self.__topic + " Received message: " + message.payload.decode())
def _on_disconnect(self, client, userdata, flags, rc, properties):
print("Disconnect to Mqtt Broker. topic: " + self.__topic)
# Shut down the client after the disconnection and manually reconnect the client to refresh the timestamp.
try:
self.__paho_client.disconnect()
except Exception as e:
print("Mqtt connection error. traceback: " + traceback.format_exc())
self.connect()
def close(self):
if self.__paho_client is not None and self.__paho_client.is_connected():
try:
self.__paho_client.disconnect()
print("Mqtt connection close")
except Exception as e:
print("paho client disconnect failed. exception: " + str(e))
else:
pass
MqttDemo code:
from client_conf import ClientConf
from mqtt_client import MqttClient
import os
from typing import Optional
def main():
client_conf = ClientConf()
client_conf.host = "your ip host"
client_conf.port = 8883
client_conf.topic = "your mqtt topic"
# MQTT access credential access_key can be injected using environment variables.
client_conf.access_key = os.environ.get("MQTT_ACCESS_KEY")
# MQTT access credential access_code can be injected using environment variables.
client_conf.access_code = os.environ.get("MQTT_ACCESS_CODE")
client_conf.instance_id = "your instance id"
mqtt_client = MqttClient(client_conf)
if mqtt_client.connect() != 0:
print("init failed")
return
if __name__ == "__main__":
main()
Success Example
After the access is successful, the following information is displayed on the client.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot