Python SDK
This chapter describes how to access a DMS Kafka queue by using a Linux Python client. To obtain related configuration information, see Preparing the Environment.
DMS is compatible with native Kafka APIs. For details on how to call native Kafka APIs, see the Kafka Documentation.
Preparing the Client Environment
- Select an SDK package of the required version based on the Python encoding format.
- Download the SDK package and decompress the following from the package:
- kafka-dms-python-code2.tar.gz
- kafka-dms-python-code4.tar.gz
- Run the following command in any directory to add the test.py file:
vi test.py
Add the following content to the file and save the file.
import sys if sys.maxunicode > 65535: print 'Your python is suitable for kafka-dms-python-code4.tar.gz' else: print 'Your python is suitable for kafka-dms-python-code2.tar.gz'
- Run the following command to select a Python SDK package:
- Select the required SDK package as prompted.
- Download the SDK package and decompress the following from the package:
- Decompress the SDK package.
The following assumes that the SDK package is decompressed in the {root_dir} package.
- Run the following commands to configure environment variables:
vi ~/.bash_profile
Add the following content to the configuration file, and then save and exit.
export LD_LIBRARY_PATH={root_dir}/kafka-dms-python/confluent_kafka/lib:{root_dir}/kafka-dms-python/confluent_kafka/thirdPart
Run the following command to make the modification take effect:
source ~/.bash_profile
- Configure the following parameters in the {root_dir}/kafka-dms-python/conf.ini file:
sasl.project.id=your project sasl.access.key=your ak sasl.security.key=your sk
For details on how to obtain the values of these parameters, see Preparing the Environment. After the modification is complete, save the file.
Running Example
- Run the following command to produce messages:
cd {root_dir}/kafka-dms-python/
python producer.py <bootstrap-brokers> <topic> <msg1> <msg2> <msg3> ..
Set bootstrap-brokers to a Kafka endpoint and topic to a Kafka Topic ID. For details on how to obtain an endpoint and a Kafka topic ID, see Preparing the Environment.
Examples:
python producer.py dms-kafka.cn-north-1.myhuaweicloud.com:37003 k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299 123 456 789
- Run the following command to start consuming message from the earliest message in the queue.
python consumer.py <bootstrap-brokers> <group> <topic>
Set group to the ID of a consumer group for the Kafka queue. For details on how to obtain a consumer group ID, see Preparing the Environment.
Examples:
python consumer.py dms-kafka.cn-north-1.myhuaweicloud.com:37003 g-7ec0caac-01fb-4f91-a4f2-0a9dd48f8af7 k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299
Code Example
Producer parameters
# Read lines from stdin, produce each line to Kafka for msg in msgs: try: # Produce line (without newline) p.produce(topic, msg.rstrip(), callback=delivery_callback) except BufferError as e: sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p)) # Serve delivery callback queue. # NOTE: Since produce() is an asynchronous API this poll() call # will most likely not serve the delivery callback for the # last produce()d message. p.poll(0)
Consumer parameters
# Read messages from Kafka, print to stdout try: while True: msg = c.poll(timeout=1.0) if msg is None: continue if msg.error(): # Error or event if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): # Error raise KafkaException(msg.error()) else: # Proper message sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' % (msg.topic(), msg.partition(), msg.offset(), str(msg.key()))) print(msg.value()) except KeyboardInterrupt: sys.stderr.write('%% Aborted by user\n')
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot