更新时间:2022-03-11 GMT+08:00
分享

Python客户端使用说明

操作场景

本文以Linux CentOS环境为例,介绍Python版本的Kafka客户端连接指导,包括Kafka客户端安装,以及生产、消费消息。

前提条件

  • 已获取MQS连接信息,具体请参见开发准备
  • 已安装开发工具和Python开发语言环境,具体请参见开发准备

引入Kafka客户端

MQS基于Kafka社区版本1.1.0和2.3.0,您可以在ROMA Connect实例控制台的“实例信息”页面,在“MQS基本信息”下查看Kafka版本信息。Python开源客户端的版本使用请参见客户端版本使用建议

执行以下命令,安装对应版本的Python Kafka客户端:

pip install kafka-python==2.0.1

生产消息

  • SASL认证方式
    注意,加粗内容需要替换为实例自有信息。
    from kafka import KafkaProducer
    import ssl
    ##连接信息
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic_name',
        'sasl_plain_username': 'username',
        'sasl_plain_password': 'password'
    }
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    ##证书文件
    context.load_verify_locations("phy_ca.crt")
    
    print('start producer')
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            sasl_plain_username=conf['sasl_plain_username'],
                            sasl_plain_password=conf['sasl_plain_password'])
    
    data = bytes("hello kafka!", encoding="utf-8")
    producer.send(conf['topic_name'], data)
    producer.close()
    print('end producer')
  • 非SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    from kafka import KafkaProducer
    
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic-name',
    }
    
    print('start producer')
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])
    
    data = bytes("hello kafka!", encoding="utf-8")
    producer.send(conf['topic_name'], data)
    producer.close()
    print('end producer')

消费消息

  • SASL认证方式
    注意,加粗内容需要替换为实例自有信息。
    from kafka import KafkaConsumer
    import ssl
    ##连接信息
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic_name',
        'sasl_plain_username': 'username',
        'sasl_plain_password': 'password',
        'consumer_id': 'consumer_id'
    }
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    ##证书文件
    context.load_verify_locations("phy_ca.crt")
    
    print('start consumer')
    consumer = KafkaConsumer(conf['topic_name'],
                            bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            sasl_plain_username=conf['sasl_plain_username'],
                            sasl_plain_password=conf['sasl_plain_password'])
    
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
    print('end consumer')
  • 非SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

    from kafka import KafkaConsumer
    
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic-name',
        'consumer_id': 'consumer-id'
    }
    
    print('start consumer')
    consumer = KafkaConsumer(conf['topic_name'],
                            bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'])
    
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
    print('end consumer')
分享:

    相关文档

    相关产品

close