更新时间:2024-08-06 GMT+08:00

Python

本文以Linux CentOS环境为例,介绍Python版本的Kafka客户端连接指导,包括Kafka客户端安装,以及生产、消费消息。

使用前请参考收集连接信息收集Kafka所需的连接信息。

准备环境

  • Python

    一般系统预装了Python,您可以在命令行输入python或者python3,查看Python是否已经安装。python命令只能查询Python 2.x版本,python3命令只能查询Python 3.x版本,如果无法确认Python版本,请分别输入两个命令查看结果。

    以Python 3.x为例,得到如下回显,说明Python已安装。

    [root@ecs-test python-kafka]# python3
    Python 3.7.1 (default, Jul  5 2020, 14:37:24) 
    [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>>

    如果未安装Python,请使用以下命令安装:

    yum install python

  • Python版的Kafka客户端

    执行以下命令,安装推荐版本的kafka-python:

    • Python 2.x版本:pip install kafka-python==2.0.1
    • Python 3.x版本:pip3 install kafka-python==2.0.1

生产消息

  1. 在客户端创建一个文件,用于存放生产消息的代码示例。

    touch producer.py

    producer.py表示文件名,您可以自定义文件名。

  2. 执行以下命令,编辑文件。

    vim producer.py

  3. 将以下生产消息的代码示例写入文件中,并保存。

    • SASL认证方式
      from kafka import KafkaProducer
      import ssl
      ##连接信息
      conf = {
          'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
          'topic_name': 'topic_name',
          'sasl_username': 'username',
          'sasl_password': 'password'
      }
      
      context = ssl.create_default_context()
      context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
      ##如果Kafka安全协议设置为“SASL_PLAINTEXT”,请注释以下参数。
      context.verify_mode = ssl.CERT_REQUIRED
      ##证书文件,SSL证书参考“收集连接信息”章节获取。如果Kafka安全协议设置为“SASL_PLAINTEXT”,请注释以下参数。
      context.load_verify_locations("phy_ca.crt")
      
      print('start producer')
      producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                              sasl_mechanism="PLAIN",
                              ssl_context=context,
                              security_protocol='SASL_SSL',
                              sasl_plain_username=conf['sasl_username'],
                              sasl_plain_password=conf['sasl_password'])
      
      data = bytes("hello kafka!", encoding="utf-8")
      producer.send(conf['topic_name'], data)
      producer.close()
      print('end producer')

      示例代码中的参数说明如下,请参考收集连接信息获取参数值。

      • bootstrap_servers:实例连接地址与端口。
      • topic_name:Topic名称。
      • sasl_plain_username/sasl_plain_password:首次开启密文接入时设置的用户名与密码,或者创建用户时设置的用户名和密码。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。
      • context.load_verify_locations:证书文件。如果Kafka安全协议设置为“SASL_SSL”,需要设置此参数。使用Python语言连接实例时,需要用CRT格式的证书。
      • sasl_mechanism:SASL认证机制。在Kafka实例控制台的基本信息页面中获取。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。很久前创建的Kafka实例在详情页如果未显示“开启的SASL认证机制”,默认使用PLAIN机制。
      • security_protocol:Kafka的安全协议。在Kafka实例控制台的基本信息页面中获取。很久前创建的Kafka实例在详情页如果未显示“启用的安全协议”,默认使用SASL_SSL协议。
        • 安全协议设置为“SASL_SSL”时,采用SASL方式进行认证,数据通过SSL证书进行加密传输,安全性更高。此时需要配置连接实例的用户名和密码,以及证书文件。
        • 安全协议设置为“SASL_PLAINTEXT”时,采用SASL方式进行认证,数据通过明文传输,性能更好。此时需要配置连接实例的用户名和密码,无需配置证书文件。
    • 非SASL认证方式
      from kafka import KafkaProducer
      
      conf = {
          'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
          'topic_name': 'topic-name',
      }
      
      print('start producer')
      producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])
      
      data = bytes("hello kafka!", encoding="utf-8")
      producer.send(conf['topic_name'], data)
      producer.close()
      print('end producer')

      示例代码中的参数说明如下,请参考收集连接信息获取参数值。

      • bootstrap_servers:实例连接地址与端口。
      • topic_name:Topic名称。

  4. 执行以下命令,运行生产消息的代码示例。

    # Python 2.x版本
    python producer.py
    
    # Python 3.x版本
    python3 producer.py

    运行成功后,返回如下回显。

    [root@ecs-test ~]# python3 producer.py 
    start producer
    end producer
    [root@ecs-test ~]# 

消费消息

  1. 在客户端创建一个文件,用于存放消费消息的代码示例。

    touch consumer.py

    consumer.py表示文件名,您可以自定义文件名。

  2. 执行以下命令,编辑文件。

    vim consumer.py

  3. 将以下消费消息的代码示例写入文件中,并保存。

    • SASL认证方式
      from kafka import KafkaConsumer
      import ssl
      ##连接信息
      conf = {
          'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
          'topic_name': 'topic_name',
          'sasl_username': 'username',
          'sasl_password': 'password',
          'consumer_id': 'consumer_id'
      }
      
      context = ssl.create_default_context()
      context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
      ##如果Kafka安全协议设置为“SASL_PLAINTEXT”,请注释以下参数。
      context.verify_mode = ssl.CERT_REQUIRED
      ##证书文件,SSL证书参考“收集连接信息”章节获取。如果Kafka安全协议设置为“SASL_PLAINTEXT”,请注释以下参数。
      context.load_verify_locations("phy_ca.crt")
      
      print('start consumer')
      consumer = KafkaConsumer(conf['topic_name'],
                              bootstrap_servers=conf['bootstrap_servers'],
                              group_id=conf['consumer_id'],
                              sasl_mechanism="PLAIN",
                              ssl_context=context,
                              security_protocol='SASL_SSL',
                              sasl_plain_username=conf['sasl_username'],
                              sasl_plain_password=conf['sasl_password'])
      
      for message in consumer:
          print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
      
      print('end consumer')

      示例代码中的参数说明如下,请参考收集连接信息获取参数值。

      • bootstrap_servers:实例连接地址与端口。
      • topic_name:Topic名称。
      • sasl_plain_username/sasl_plain_password:首次开启密文接入时设置的用户名与密码,或者创建用户时设置的用户名和密码。为了确保用户名和密码的安全性,建议对用户名和密码进行加密处理,使用时解密。
      • consumer_id:消费组名称。根据业务需求,自定义消费组名称,如果设置的消费组不存在,Kafka会自动创建。
      • context.load_verify_locations:证书文件。如果Kafka安全协议设置为“SASL_SSL”,需要设置此参数。使用Python语言连接实例时,需要用CRT格式的证书。
      • sasl_mechanism:SASL认证机制。在Kafka实例控制台的基本信息页面中获取。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。很久前创建的Kafka实例在详情页如果未显示“开启的SASL认证机制”,默认使用PLAIN机制。
      • security_protocol:Kafka的安全协议。在Kafka实例控制台的基本信息页面中获取。很久前创建的Kafka实例在详情页如果未显示“启用的安全协议”,默认使用SASL_SSL协议。
        • 安全协议设置为“SASL_SSL”时,采用SASL方式进行认证,数据通过SSL证书进行加密传输,安全性更高。此时需要配置连接实例的用户名和密码,以及证书文件。
        • 安全协议设置为“SASL_PLAINTEXT”时,采用SASL方式进行认证,数据通过明文传输,性能更好。此时需要配置连接实例的用户名和密码,无需配置证书文件。
    • 非SASL认证方式
      from kafka import KafkaConsumer
      
      conf = {
          'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
          'topic_name': 'topic-name',
          'consumer_id': 'consumer-id'
      }
      
      print('start consumer')
      consumer = KafkaConsumer(conf['topic_name'],
                              bootstrap_servers=conf['bootstrap_servers'],
                              group_id=conf['consumer_id'])
      
      for message in consumer:
          print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
      
      print('end consumer')

      示例代码中的参数说明如下,请参考收集连接信息获取参数值。

      • bootstrap_servers:实例连接地址与端口。
      • topic_name:Topic名称。
      • consumer_id:消费组名称。根据业务需求,自定义消费组名称,如果设置的消费组不存在,Kafka会自动创建。

  4. 执行以下命令,运行消费消息的代码示例。

    # Python 2.x版本
    python consumer.py
    
    # Python 3.x版本
    python3 consumer.py

    运行成功后,返回如下回显。

    [root@ecs-test ~]# python3 consumer.py
    start consumer
    

    如需停止消费使用Ctrl+C命令退出。