Updated on 2024-08-06 GMT+08:00

Python

This section takes Linux CentOS as an example to describe how to access a Kafka instance using a Kafka client in Python, including how to install the client, and produce and consume messages.

Before getting started, ensure that you have collected the information listed in Collecting Connection Information.

Preparing the Environment

  • Python

    Generally, Python is pre-installed in the system. You can enter python or python3 in the command line to check whether Python has been installed. The python command checks whether python 2.x has been installed and the python3 command checks whether python 3.x has been installed. Try both when you are not sure of the version.

    For example, if the following information is displayed, Python 3.x has been installed:

    [root@ecs-test python-kafka]# python3
    Python 3.7.1 (default, Jul  5 2020, 14:37:24) 
    [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>>

    If Python is not installed, run the following command:

    yum install python

  • Kafka clients in Python

    Run the following command to install a Python client of the recommended version:

    • Python 2.x: pip install kafka-python==2.0.1
    • Python 3.x: pip3 install kafka-python==2.0.1

Producing Messages

  1. Create a file on the client to store the message production sample code.

    touch producer.py

    producer.py indicates a file name, which can be customized.

  2. Run the following command to edit the file:

    vim producer.py

  3. Write the following sample code into the file and save.

    • With SASL
      from kafka import KafkaProducer
      import ssl
      ##Connection information
      conf = {
          'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
          'topic_name': 'topic_name',
          'sasl_username': 'username',
          'sasl_password': 'password'
      }
      
      context = ssl.create_default_context()
      context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
      ## If Security Protocol is set to SASL_PLAINTEXT, comment out the following parameters:
      context.verify_mode = ssl.CERT_REQUIRED
      ## The certificate file. Obtain the SSL certificate by referring to "Collecting Connection Information". If Security Protocol is set to SASL_PLAINTEXT, comment out the following parameters:
      context.load_verify_locations("phy_ca.crt")
      
      print('start producer')
      producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                              sasl_mechanism="PLAIN",
                              ssl_context=context,
                              security_protocol='SASL_SSL',
                              sasl_plain_username=conf['sasl_username'],
                              sasl_plain_password=conf['sasl_password'])
      
      data = bytes("hello kafka!", encoding="utf-8")
      producer.send(conf['topic_name'], data)
      producer.close()
      print('end producer')

      The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

      • bootstrap_servers: instance connection address and port
      • topic_name: topic name
      • sasl_plain_username/sasl_plain_password: The username and password set when ciphertext access is enabled for the first time, or the ones set in user creation. For security purposes, you are advised to encrypt the username and password.
      • context.load_verify_locations: certificate file. This parameter is mandatory when Security Protocol is set to SASL_SSL. CRT certificates are used for accessing instances in Python.
      • sasl_mechanism: SASL authentication mechanism. View it on the Basic Information page of the Kafka instance console. If both SCRAM-SHA-512 and PLAIN are enabled, use either of them in connection configurations. For instances that were created much earlier, if SASL Mechanism is not displayed on the instance details page, PLAIN is used by default.
      • security_protocol: Kafka security protocol. Obtain it from the Basic Information page on the Kafka console. For Kafka instances that were created much earlier, if Security Protocol is not displayed on the instance details page, SASL_SSL is used by default.
        • When Security Protocol is set to SASL_SSL, SASL is used for authentication. Data is encrypted with SSL certificates for high-security transmission. You need to configure the instance username, password, and certificate file.
        • When Security Protocol is set to SASL_PLAINTEXT, SASL is used for authentication. Data is transmitted in plaintext with high performance. You need to configure the instance username and password.
    • Without SASL
      from kafka import KafkaProducer
      
      conf = {
          'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
          'topic_name': 'topic-name',
      }
      
      print('start producer')
      producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])
      
      data = bytes("hello kafka!", encoding="utf-8")
      producer.send(conf['topic_name'], data)
      producer.close()
      print('end producer')

      The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

      • bootstrap_servers: instance connection address and port
      • topic_name: topic name

  4. Run the following command to run the sample code.

    # Python 2.x
    python producer.py
    
    # Python 3.x
    python3 producer.py

    The following information will be displayed after the command is successfully executed.

    [root@ecs-test ~]# python3 producer.py 
    start producer
    end producer
    [root@ecs-test ~]# 

Consuming Messages

  1. Create a file on the client to store the message consumption sample code.

    touch consumer.py

    consumer.py indicates a file name, which can be customized.

  2. Run the following command to edit the file:

    vim consumer.py

  3. Write the following sample code into the file and save.

    • With SASL
      from kafka import KafkaConsumer
      import ssl
      ##Connection information
      conf = {
          'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
          'topic_name': 'topic_name',
          'sasl_username': 'username',
          'sasl_password': 'password',
          'consumer_id': 'consumer_id'
      }
      
      context = ssl.create_default_context()
      context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
      ## If Security Protocol is set to SASL_PLAINTEXT, comment out the following parameters:
      context.verify_mode = ssl.CERT_REQUIRED
      ## The certificate file. Obtain the SSL certificate by referring to "Collecting Connection Information". If Security Protocol is set to SASL_PLAINTEXT, comment out the following parameters:
      context.load_verify_locations("phy_ca.crt")
      
      print('start consumer')
      consumer = KafkaConsumer(conf['topic_name'],
                              bootstrap_servers=conf['bootstrap_servers'],
                              group_id=conf['consumer_id'],
                              sasl_mechanism="PLAIN",
                              ssl_context=context,
                              security_protocol='SASL_SSL',
                              sasl_plain_username=conf['sasl_username'],
                              sasl_plain_password=conf['sasl_password'])
      
      for message in consumer:
          print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
      
      print('end consumer')

      The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

      • bootstrap_servers: instance connection address and port
      • topic_name: topic name
      • sasl_plain_username/sasl_plain_password: The username and password set when ciphertext access is enabled for the first time, or the ones set in user creation. For security purposes, you are advised to encrypt the username and password.
      • consumer_id: custom consumer group name. If the specified consumer group does not exist, Kafka automatically creates one.
      • context.load_verify_locations: certificate file. This parameter is mandatory when Security Protocol is set to SASL_SSL. CRT certificates are used for accessing instances in Python.
      • sasl_mechanism: SASL authentication mechanism. View it on the Basic Information page of the Kafka instance console. If both SCRAM-SHA-512 and PLAIN are enabled, use either of them in connection configurations. For instances that were created much earlier, if SASL Mechanism is not displayed on the instance details page, PLAIN is used by default.
      • security_protocol: Kafka security protocol. Obtain it from the Basic Information page on the Kafka console. For Kafka instances that were created much earlier, if Security Protocol is not displayed on the instance details page, SASL_SSL is used by default.
        • When Security Protocol is set to SASL_SSL, SASL is used for authentication. Data is encrypted with SSL certificates for high-security transmission. You need to configure the instance username, password, and certificate file.
        • When Security Protocol is set to SASL_PLAINTEXT, SASL is used for authentication. Data is transmitted in plaintext with high performance. You need to configure the instance username and password.
    • Without SASL
      from kafka import KafkaConsumer
      
      conf = {
          'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
          'topic_name': 'topic-name',
          'consumer_id': 'consumer-id'
      }
      
      print('start consumer')
      consumer = KafkaConsumer(conf['topic_name'],
                              bootstrap_servers=conf['bootstrap_servers'],
                              group_id=conf['consumer_id'])
      
      for message in consumer:
          print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
      
      print('end consumer')

      The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

      • bootstrap_servers: instance connection address and port
      • topic_name: topic name
      • consumer_id: custom consumer group name. If the specified consumer group does not exist, Kafka automatically creates one.

  4. Run the following command to run the sample code.

    # Python 2.x
    python consumer.py
    
    # Python 3.x
    python3 consumer.py

    The following information will be displayed after the command is successfully executed.

    [root@ecs-test ~]# python3 consumer.py
    start consumer
    

    Press Ctrl+C to cancel.