更新时间:2024-01-15 GMT+08:00
分享

Python

本文以Linux CentOS环境为例,介绍Python版本的Kafka客户端连接指导,包括Kafka客户端安装,以及生产、消费消息。

使用前请参考收集连接信息收集Kafka所需的连接信息。

准备环境

  • Python

    一般系统预装了Python。在命令行输入python,得到如下回显,说明Python已安装。

    [root@ecs-test python-kafka]# python3
    Python 3.7.1 (default, Jul  5 2020, 14:37:24) 
    [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>>

    如果未安装Python,请使用以下命令安装:

    yum install python

  • Python版的Kafka客户端

    执行以下命令,安装推荐版本的kafka-python:

    pip install kafka-python==2.0.1

生产消息

以下加粗内容需要替换为实例自有信息,请根据实际情况替换。

  • SASL认证方式
    from kafka import KafkaProducer
    import ssl
    ##连接信息
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic_name',
        'sasl_username': 'username',
        'sasl_password': 'password'
    }
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    ##证书文件,SSL证书参考“收集连接信息”章节获取
    context.load_verify_locations("phy_ca.crt")
    
    print('start producer')
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            sasl_plain_username=conf['sasl_username'],
                            sasl_plain_password=conf['sasl_password'])
    
    data = bytes("hello kafka!", encoding="utf-8")
    producer.send(conf['topic_name'], data)
    producer.close()
    print('end producer')

    示例代码中的参数说明如下,请参考收集连接信息获取参数值。

    • bootstrap_servers:实例连接地址与端口。
    • topic_name:Topic名称。
    • sasl_plain_username/sasl_plain_password:开启SASL_SSL时输入的用户名与密码,或者创建SASL_SSL用户时设置的用户名和密码。
    • context.load_verify_locations:证书文件。使用Python语言连接实例时,需要用CRT格式的证书。
    • sasl_mechanism:SASL认证机制。在Kafka实例控制台的基本信息页面中获取。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。如果页面未显示“开启的SASL认证机制”,默认使用PLAIN机制。
    • security_protocol:Kafka的安全协议。在Kafka实例控制台的基本信息页面中获取。如果页面未显示“启用的安全协议”,默认使用SASL_SSL协议。
  • 非SASL认证方式
    from kafka import KafkaProducer
    
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic-name',
    }
    
    print('start producer')
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])
    
    data = bytes("hello kafka!", encoding="utf-8")
    producer.send(conf['topic_name'], data)
    producer.close()
    print('end producer')

    示例代码中的参数说明如下,请参考收集连接信息获取参数值。

    • bootstrap_servers:实例连接地址与端口。
    • topic_name:Topic名称。

消费消息

  • SASL认证方式
    from kafka import KafkaConsumer
    import ssl
    ##连接信息
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic_name',
        'sasl_username': 'username',
        'sasl_password': 'password',
        'consumer_id': 'consumer_id'
    }
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    ##证书文件,SSL证书参考“收集连接信息”章节获取
    context.load_verify_locations("phy_ca.crt")
    
    print('start consumer')
    consumer = KafkaConsumer(conf['topic_name'],
                            bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            sasl_plain_username=conf['sasl_username'],
                            sasl_plain_password=conf['sasl_password'])
    
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
    print('end consumer')

    示例代码中的参数说明如下,请参考收集连接信息获取参数值。

    • bootstrap_servers:实例连接地址与端口。
    • topic_name:Topic名称。
    • sasl_plain_username/sasl_plain_password:开启SASL_SSL时输入的用户名与密码,或者创建SASL_SSL用户时设置的用户名和密码。
    • consumer_id:消费组名称。根据业务需求,自定义消费组名称,如果设置的消费组不存在,Kafka会自动创建。
    • context.load_verify_locations:证书文件。使用Python语言连接实例时,需要用CRT格式的证书。
    • sasl_mechanism:SASL认证机制。在Kafka实例控制台的基本信息页面中获取。如果SCRAM-SHA-512和PLAIN都开启了,根据实际情况选择其中任意一种配置连接。如果页面未显示“开启的SASL认证机制”,默认使用PLAIN机制。
    • security_protocol:Kafka的安全协议。在Kafka实例控制台的基本信息页面中获取。如果页面未显示“启用的安全协议”,默认使用SASL_SSL协议。
  • 非SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    from kafka import KafkaConsumer
    
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic-name',
        'consumer_id': 'consumer-id'
    }
    
    print('start consumer')
    consumer = KafkaConsumer(conf['topic_name'],
                            bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'])
    
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
    print('end consumer')

    示例代码中的参数说明如下,请参考收集连接信息获取参数值。

    • bootstrap_servers:实例连接地址与端口。
    • topic_name:Topic名称。
    • consumer_id:消费组名称。根据业务需求,自定义消费组名称,如果设置的消费组不存在,Kafka会自动创建。
分享:

    相关文档

    相关产品