文档首页 > > 开发指南> MQS开发指南> Python

Python

分享
更新时间: 2019/10/26 GMT+08:00

本文以Linux Centos环境为例,介绍Python版本的Kafka客户端连接指导,包括Kafka客户端安装,以及生产、消费消息。

使用前请参考收集连接信息收集Kafka所需的连接信息。

准备环境

  • Python

    一般系统预装了Python。在命令行输入python,得到如下回显,说明Python已安装。

    [root@ecs-heru bin]# python
    Python 2.7.5 (default, Oct 30 2018, 23:45:53) 
    [GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux2
    Type "help", "copyright", "credits" or "license" for more information.
    >>> 

    如果未安装Python,请使用以下命令安装:

    yum install python

  • Python版的Kafka客户端

    推荐使用以下命令安装:

    pip install kafka-python

    如需使用指定版本,可参考以下方式安装。依次执行命令:

    wget https://github.com/dpkp/kafka-python/archive/1.1.0.tar.gz

    mv 1.1.0.tar.gz kafka-python-1.1.0.tar.gz

    tar -xvf kafka-python-1.1.0.tar.gz

    cd kafka-python-1.1.0

    python setup.py install

生产消息

  • SASL认证方式
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    from kafka import KafkaProducer
    import ssl
    #连接信息
    conf = {
        'bootstrap_servers': ["ip1:port1,ip2:port2,ip3:port3"],
        'topic_name': 'topic_name',
        'sasl_plain_username': 'username',
        'sasl_plain_password': 'password'
    }
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    #证书文件
    context.load_verify_locations("phy_ca.crt")
    
    print ("start producer")
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            sasl_plain_username=conf['sasl_plain_username'],
                            sasl_plain_password=conf['sasl_plain_password'])
    
    data = "hello kafka!"
    producer.send(conf['topic_name'], data)
    producer.close()
    print ("end producer")
    
  • 非SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    from kafka import KafkaProducer
    
    conf = {
        'bootstrap_servers': 'ip1:port1,ip2:port2,ip3:port3',
        'topic_name': 'topic-name',
    }
    
    print ("start producer")
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])
    
    data = "hello kafka!"
    producer.send(conf['topic_name'], data)
    producer.close()
    print ("end producer")
    

消费消息

  • SASL认证方式
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    from kafka import KafkaProducer
    import ssl
    #连接信息
    conf = {
        'bootstrap_servers': ["ip1:port1,ip2:port2,ip3:port3"],
        'topic_name': 'topic_name',
        'sasl_plain_username': 'username',
        'sasl_plain_password': 'password',
        'consumer_id': 'consumer_id'
    }
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    context.verify_mode = ssl.CERT_REQUIRED
    #证书文件
    context.load_verify_locations("phy_ca.crt")
    
    print ("start consumer")
    consumer = KafkaConsumer(conf['topic_name'],
                            bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            sasl_plain_username=conf['sasl_plain_username'],
                            sasl_plain_password=conf['sasl_plain_password'])
    
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
    print ("end consumer")
    
  • 非SASL认证方式

    注意,加粗内容需要替换为实例自有信息。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    from kafka import KafkaConsumer
    
    conf = {
        'bootstrap_servers': 'ip1:port1,ip2:port2,ip3:port3',
        'topic_name': 'topic-name',
        'consumer_id': 'consumer-id'
    }
    
    print ("start consumer")
    consumer = KafkaConsumer(conf['topic_name'],
                            bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'])
    
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
    print ("end consumer")
    

分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

跳转到云社区