Updated on 2023-10-10 GMT+08:00

Configuring a Kafka Client in Python

Scenarios

This section uses the Linux CentOS environment as an example to describe how to connect a Python Kafka client to MQS (including Kafka client installation), and how to produce and consume messages.

Prerequisites

  • You have obtained MQS connection information. For details, see Preparations.
  • You have installed the development tool and Python development environment. For details, see Preparations.

Installing the Kafka Client

MQS is developed based on Kafka 1.1.0 and 2.7. View the Kafka version information in the MQS Information area on the Instance Information page on the ROMA Connect console. For details about how to use the Python open-source client, see suggested client versions.

Run the following command to install the Python Kafka client of the corresponding version:

pip install kafka-python==2.0.1

Producing Messages

  • SASL authentication mode

    Replace the information in bold with the actual values.

    from kafka import KafkaProducer
    import ssl
    ##Connection information
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic_name',
        'sasl_plain_username': 'username',
        'sasl_plain_password': 'password'
    }
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    ##Certificate file
    context.load_verify_locations("phy_ca.crt")
    
    print('start producer')
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            sasl_plain_username=conf['sasl_plain_username'],
                            sasl_plain_password=conf['sasl_plain_password'])
    
    data = bytes("hello kafka!", encoding="utf-8")
    producer.send(conf['topic_name'], data)
    producer.close()
    print('end producer')

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • bootstrap_servers: MQS connection addresses and ports
    • topic_name: name of the topic that produces messages
    • sasl_plain_username and sasl_plain_password: username and password used for SASL_SSL authentication
    • context.load_verify_locations: client certificate used for SASL_SSL authentication
  • Non-SASL authentication mode

    Replace the information in bold with the actual values.

    from kafka import KafkaProducer
    
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic_name',
    }
    
    print('start producer')
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])
    
    data = bytes("hello kafka!", encoding="utf-8")
    producer.send(conf['topic_name'], data)
    producer.close()
    print('end producer')

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • bootstrap_servers: MQS connection addresses and ports
    • topic_name: name of the topic that produces messages

Consuming Messages

  • SASL authentication mode

    Replace the information in bold with the actual values.

    from kafka import KafkaConsumer
    import ssl
    ##Connection information
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic_name',
        'sasl_plain_username': 'username',
        'sasl_plain_password': 'password',
        'consumer_id': 'consumer_id'
    }
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    ##Certificate file
    context.load_verify_locations("phy_ca.crt")
    
    print('start consumer')
    consumer = KafkaConsumer(conf['topic_name'],
                            bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            sasl_plain_username=conf['sasl_plain_username'],
                            sasl_plain_password=conf['sasl_plain_password'])
    
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
    print('end consumer')

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • bootstrap_servers: MQS connection addresses and ports
    • topic_name: name of the topic that consumes messages
    • sasl_plain_username and sasl_plain_password: username and password used for SASL_SSL authentication
    • consumer_id: consumer group name. If the specified consumer group does not exist, the system automatically creates one.
    • context.load_verify_locations: client certificate used for SASL_SSL authentication
  • Non-SASL authentication mode

    Replace the information in bold with the actual values.

    from kafka import KafkaConsumer
    
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic_name',
        'consumer_id': 'consumer_id'
    }
    
    print('start consumer')
    consumer = KafkaConsumer(conf['topic_name'],
                            bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'])
    
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
    print('end consumer')

    The parameters in the example code are as follows. For details about how to obtain the parameter values, see Obtaining MQS Connection Information.

    • bootstrap_servers: MQS connection addresses and ports
    • topic_name: name of the topic that consumes messages
    • consumer_id: consumer group name. If the specified consumer group does not exist, the system automatically creates one.