Configuring Kafka Clients in Python
This section uses the Linux Centos environment as an example to describe how to connect a Python Kafka client to MQS, including Kafka client installation, message production, and message consumption.
Before getting started, ensure that you have collected the information listed in Collecting Connection Information.
Preparing the Environment
- Python:
Generally, the Python has been installed in the system. Enter python in a CLI. If the following information is displayed, Python 3.7.1 has already been installed:
[root@ecs-test python-kafka]# python3 Python 3.7.1 (default, Jul 5 2020, 14:37:24) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>>
If Python has not been installed, run the following command:
yum install python
- Kafka clients in Python:
Run the following command to install kafka-python of the recommended version:
pip install kafka-python
Producing Messages
- SASL authentication mode
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
from kafka import KafkaProducer import ssl ##Connection information conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED ##Certificate file context.load_verify_locations("phy_ca.crt") print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer')
- Non-SASL authentication mode
Replace the information in bold with the actual values.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
from kafka import KafkaProducer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic-name', } print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer')
Consuming Messages
- SASL authentication mode
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
from kafka import KafkaConsumer import ssl ##Connection information conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password', 'consumer_id': 'consumer_id' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED ##Certificate file context.load_verify_locations("phy_ca.crt") print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer')
- Non-SASL authentication mode
Replace the information in bold with the actual values.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
from kafka import KafkaConsumer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic-name', 'consumer_id': 'consumer-id' } print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer')
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot