更新时间:2022-02-21 GMT+08:00

Python SDK

本节以Linux系统为例介绍Python客户端如何连接DMS服务的Kafka队列,获取相关配置信息请参考准备环境章节。

DMS服务兼容Kafka原生API,相关调用方法请参考Kafka官网说明

客户端环境搭建

  1. 根据python编码格式选择对应版本的SDK包。

    1. 下载SDK包,解压后获取如下两个文件。
      • kafka-dms-python-code2.tar.gz
      • kafka-dms-python-code4.tar.gz
    2. 在任意目录下,执行如下命令新增test.py文件。

      vi test.py

      将如下内容写入文件中,并保存。

      import sys
          if sys.maxunicode > 65535:
              print 'Your python is suitable for kafka-dms-python-code4.tar.gz'
          else:
              print 'Your python is suitable for kafka-dms-python-code2.tar.gz'
    3. 执行如下命令选择Python SDK包。

      python test.py

    4. 根据界面提示选择SDK包。

  2. 解压缩SDK文件。

    以下假设解压目录为{root_dir}。

  3. 执行如下命令配置环境变量。

    vi ~/.bash_profile

    将如下行写入配置文件中,保存退出。

    export LD_LIBRARY_PATH={root_dir}/kafka-dms-python/confluent_kafka/lib:{root_dir}/kafka-dms-python/confluent_kafka/thirdPart

    执行如下命令使修改生效。

    source ~/.bash_profile

  4. 编辑{root_dir}/kafka-dms-python/conf.ini文件,配置如下项:

    sasl.project.id=your project
    sasl.access.key=your ak
    sasl.security.key=your sk

    获取方法请参考准备环境,修改完成后保存文件。

运行示例

  1. 执行如下命令生产消息。

    cd {root_dir}/kafka-dms-python/

    python producer.py <bootstrap-brokers> <topic> <msg1> <msg2> <msg3> ..

    说明:bootstrap-brokers配置为Kafka的endpoint地址,topic配置为Kafka Topic的id,获取方法请参考准备环境

    示例:

    python producer.py dms-kafka.cn-north-1.myhuaweicloud.com:37003 k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299 123 456 789

  2. 执行如下命令从队列中最早的消息开始消费消息。

    python consumer.py <bootstrap-brokers> <group> <topic>

    说明:group配置为Kafka队列的消费组id,获取方法请参考准备环境

    示例:

    python consumer.py dms-kafka.cn-north-1.myhuaweicloud.com:37003 g-7ec0caac-01fb-4f91-a4f2-0a9dd48f8af7 k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299

示例代码

生产消息

    # Read lines from stdin, produce each line to Kafka
    for msg in msgs:
        try:
            # Produce line (without newline)
            p.produce(topic, msg.rstrip(), callback=delivery_callback)
    
        except BufferError as e:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                             len(p))

    # Serve delivery callback queue.
    # NOTE: Since produce() is an asynchronous API this poll() call
    #       will most likely not serve the delivery callback for the
    #       last produce()d message.
    p.poll(0)

消费消息

    # Read messages from Kafka, print to stdout
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                # Error or event
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    # Error
                    raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')