Updated on 2024-01-18 GMT+08:00

Python

This section describes how to access a Kafka premium instance using a Kafka client in Python on the Linux CentOS, including how to install the client, and produce and consume messages.

Before getting started, ensure that you have collected the information listed in Collecting Connection Information.

Preparing the Environment

  • Python

    Generally, Python is pre-installed in the system. Enter python in a CLI. If the following information is displayed, Python has already been installed.

    [root@ecs-test python-kafka]# python3
    Python 3.7.1 (default, Jul  5 2020, 14:37:24) 
    [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>>

    If Python is not installed, run the following command:

    yum install python

  • Kafka clients in Python

    Run the following command to install a Python client of the recommended version:

    pip install kafka-python==2.0.1

Producing Messages

Replace the following information in bold with the actual values.

  • With SASL
    from kafka import KafkaProducer
    import ssl
    ##Connection information
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic_name',
        'sasl_username': 'username',
        'sasl_password': 'password'
    }
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    ## Certificate file. For details about how to obtain an SSL certificate, see section "Collecting Connection Information."
    context.load_verify_locations("phy_ca.crt")
    
    print('start producer')
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            sasl_plain_username=conf['sasl_username'],
                            sasl_plain_password=conf['sasl_password'])
    
    data = bytes("hello kafka!", encoding="utf-8")
    producer.send(conf['topic_name'], data)
    producer.close()
    print('end producer')

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • bootstrap_servers: instance connection address and port
    • topic_name: topic name
    • sasl_plain_username and sasl_plain_password: username and password you set when enabling SASL_SSL during Kafka instance creation or when creating a SASL_SSL user.
    • context.load_verify_locations: certificate file. CRT certificates are used for connecting to instances in Python.
    • sasl_mechanism: SASL authentication mechanism. View it on the Basic Information page of the Kafka instance console. If both SCRAM-SHA-512 and PLAIN are enabled, configure either of them for connections. If SASL Mechanism is not displayed, PLAIN is used by default.
    • security_protocol: Kafka security protocol. View it on the Basic Information page of the Kafka instance console. If Security Protocol is not displayed, SASL_SSL is used by default.
  • Without SASL
    from kafka import KafkaProducer
    
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic-name',
    }
    
    print('start producer')
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])
    
    data = bytes("hello kafka!", encoding="utf-8")
    producer.send(conf['topic_name'], data)
    producer.close()
    print('end producer')

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • bootstrap_servers: instance connection address and port
    • topic_name: topic name

Consuming Messages

  • With SASL
    from kafka import KafkaConsumer
    import ssl
    ##Connection information
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic_name',
        'sasl_username': 'username',
        'sasl_password': 'password',
        'consumer_id': 'consumer_id'
    }
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    ## Certificate file. For details about how to obtain an SSL certificate, see section "Collecting Connection Information."
    context.load_verify_locations("phy_ca.crt")
    
    print('start consumer')
    consumer = KafkaConsumer(conf['topic_name'],
                            bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            sasl_plain_username=conf['sasl_username'],
                            sasl_plain_password=conf['sasl_password'])
    
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
    print('end consumer')

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • bootstrap_servers: instance connection address and port
    • topic_name: topic name
    • sasl_plain_username and sasl_plain_password: username and password you set when enabling SASL_SSL during Kafka instance creation or when creating a SASL_SSL user.
    • consumer_id: custom consumer group name. If the specified consumer group does not exist, Kafka automatically creates one.
    • context.load_verify_locations: certificate file. CRT certificates are used for connecting to instances in Python.
    • sasl_mechanism: SASL authentication mechanism. View it on the Basic Information page of the Kafka instance console. If both SCRAM-SHA-512 and PLAIN are enabled, configure either of them for connections. If SASL Mechanism is not displayed, PLAIN is used by default.
    • security_protocol: Kafka security protocol. View it on the Basic Information page of the Kafka instance console. If Security Protocol is not displayed, SASL_SSL is used by default.
  • Without SASL

    Replace the information in bold with the actual values.

    from kafka import KafkaConsumer
    
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic-name',
        'consumer_id': 'consumer-id'
    }
    
    print('start consumer')
    consumer = KafkaConsumer(conf['topic_name'],
                            bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'])
    
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
    print('end consumer')

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • bootstrap_servers: instance connection address and port
    • topic_name: topic name
    • consumer_id: custom consumer group name. If the specified consumer group does not exist, Kafka automatically creates one.