Updated on 2023-11-30 GMT+08:00

Python SDK

This chapter describes how to access a DMS Kafka queue by using a Linux Python client. To obtain related configuration information, see Preparing the Environment.

DMS is compatible with native Kafka APIs. For details on how to call native Kafka APIs, see the Kafka Documentation.

Preparing the Client Environment

  1. Select an SDK package of the required version based on the Python encoding format.

    1. Download the SDK package and decompress the following from the package:
      • kafka-dms-python-code2.tar.gz
      • kafka-dms-python-code4.tar.gz
    2. Run the following command in any directory to add the test.py file:

      vi test.py

      Add the following content to the file and save the file.

      import sys
          if sys.maxunicode > 65535:
              print 'Your python is suitable for kafka-dms-python-code4.tar.gz'
          else:
              print 'Your python is suitable for kafka-dms-python-code2.tar.gz'
    3. Run the following command to select a Python SDK package:

      python test.py

    4. Select the required SDK package as prompted.

  2. Decompress the SDK package.

    The following assumes that the SDK package is decompressed in the {root_dir} package.

  3. Run the following commands to configure environment variables:

    vi ~/.bash_profile

    Add the following content to the configuration file, and then save and exit.

    export LD_LIBRARY_PATH={root_dir}/kafka-dms-python/confluent_kafka/lib:{root_dir}/kafka-dms-python/confluent_kafka/thirdPart

    Run the following command to make the modification take effect:

    source ~/.bash_profile

  4. Configure the following parameters in the {root_dir}/kafka-dms-python/conf.ini file:

    sasl.project.id=your project
    sasl.access.key=your ak
    sasl.security.key=your sk

    For details on how to obtain the values of these parameters, see Preparing the Environment. After the modification is complete, save the file.

Running Example

  1. Run the following command to produce messages:

    cd {root_dir}/kafka-dms-python/

    python producer.py <bootstrap-brokers> <topic> <msg1> <msg2> <msg3> ..

    Set bootstrap-brokers to a Kafka endpoint and topic to a Kafka Topic ID. For details on how to obtain an endpoint and a Kafka topic ID, see Preparing the Environment.

    Examples:

    python producer.py dms-kafka.cn-north-1.myhuaweicloud.com:37003 k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299 123 456 789

  2. Run the following command to start consuming message from the earliest message in the queue.

    python consumer.py <bootstrap-brokers> <group> <topic>

    Set group to the ID of a consumer group for the Kafka queue. For details on how to obtain a consumer group ID, see Preparing the Environment.

    Examples:

    python consumer.py dms-kafka.cn-north-1.myhuaweicloud.com:37003 g-7ec0caac-01fb-4f91-a4f2-0a9dd48f8af7 k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299

Code Example

Producer parameters

    # Read lines from stdin, produce each line to Kafka
    for msg in msgs:
        try:
            # Produce line (without newline)
            p.produce(topic, msg.rstrip(), callback=delivery_callback)
    
        except BufferError as e:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                             len(p))

    # Serve delivery callback queue.
    # NOTE: Since produce() is an asynchronous API this poll() call
    #       will most likely not serve the delivery callback for the
    #       last produce()d message.
    p.poll(0)

Consumer parameters

    # Read messages from Kafka, print to stdout
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                # Error or event
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    # Error
                    raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')