Configuring a Kafka Client in Python
Scenarios
This section uses the Linux CentOS environment as an example to describe how to connect a Python Kafka client to MQS (including Kafka client installation), and how to produce and consume messages.
Prerequisites
- You have obtained MQS connection information. For details, see Preparations.
- You have installed the development tool and Python development environment. For details, see Preparations.
Installing the Kafka Client
MQS is developed based on Kafka 1.1.0 and 2.7. View the Kafka version information in the MQS Information area on the Instance Information page on the ROMA Connect console. For details about how to use the Python open-source client, see suggested client versions.
Run the following command to install the Python Kafka client of the corresponding version:
pip install kafka-python==2.0.1
Producing Messages
- SASL authentication mode
Replace the information in bold with the actual values.
from kafka import KafkaProducer import ssl ##Connection information conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED ##Certificate file context.load_verify_locations("phy_ca.crt") print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer')
The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.
- bootstrap_servers: MQS connection addresses and ports
- topic_name: name of the topic that produces messages
- sasl_plain_username and sasl_plain_password: username and password used for SASL_SSL authentication
- context.load_verify_locations: client certificate used for SASL_SSL authentication
- Non-SASL authentication mode
Replace the information in bold with the actual values.
from kafka import KafkaProducer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', } print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer')
The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.
- bootstrap_servers: MQS connection addresses and ports
- topic_name: name of the topic that produces messages
Consuming Messages
- SASL authentication mode
Replace the information in bold with the actual values.
from kafka import KafkaConsumer import ssl ##Connection information conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password', 'consumer_id': 'consumer_id' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED ##Certificate file context.load_verify_locations("phy_ca.crt") print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer')
The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.
- bootstrap_servers: MQS connection addresses and ports
- topic_name: name of the topic that consumes messages
- sasl_plain_username and sasl_plain_password: username and password used for SASL_SSL authentication
- consumer_id: consumer group name. If the specified consumer group does not exist, the system automatically creates one.
- context.load_verify_locations: client certificate used for SASL_SSL authentication
- Non-SASL authentication mode
Replace the information in bold with the actual values.
from kafka import KafkaConsumer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'consumer_id': 'consumer_id' } print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer')
The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.
- bootstrap_servers: MQS connection addresses and ports
- topic_name: name of the topic that consumes messages
- consumer_id: consumer group name. If the specified consumer group does not exist, the system automatically creates one.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot