更新时间:2022-02-21 GMT+08:00
Python SDK
本节以Linux系统为例介绍Python客户端如何连接DMS服务的Kafka队列,获取相关配置信息请参考准备环境章节。
DMS服务兼容Kafka原生API,相关调用方法请参考Kafka官网说明。
客户端环境搭建
- 根据python编码格式选择对应版本的SDK包。
- 下载SDK包,解压后获取如下两个文件。
- kafka-dms-python-code2.tar.gz
- kafka-dms-python-code4.tar.gz
- 在任意目录下,执行如下命令新增test.py文件。
将如下内容写入文件中,并保存。
import sys if sys.maxunicode > 65535: print 'Your python is suitable for kafka-dms-python-code4.tar.gz' else: print 'Your python is suitable for kafka-dms-python-code2.tar.gz'
- 执行如下命令选择Python SDK包。
- 根据界面提示选择SDK包。
- 下载SDK包,解压后获取如下两个文件。
- 解压缩SDK文件。
以下假设解压目录为{root_dir}。
- 执行如下命令配置环境变量。
vi ~/.bash_profile
将如下行写入配置文件中,保存退出。
export LD_LIBRARY_PATH={root_dir}/kafka-dms-python/confluent_kafka/lib:{root_dir}/kafka-dms-python/confluent_kafka/thirdPart
执行如下命令使修改生效。
source ~/.bash_profile
- 编辑{root_dir}/kafka-dms-python/conf.ini文件,配置如下项:
sasl.project.id=your project sasl.access.key=your ak sasl.security.key=your sk
获取方法请参考准备环境,修改完成后保存文件。
运行示例
- 执行如下命令生产消息。
cd {root_dir}/kafka-dms-python/
python producer.py <bootstrap-brokers> <topic> <msg1> <msg2> <msg3> ..
说明:bootstrap-brokers配置为Kafka的endpoint地址,topic配置为Kafka Topic的id,获取方法请参考准备环境。
示例:
python producer.py dms-kafka.cn-north-1.myhuaweicloud.com:37003 k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299 123 456 789
- 执行如下命令从队列中最早的消息开始消费消息。
python consumer.py <bootstrap-brokers> <group> <topic>
说明:group配置为Kafka队列的消费组id,获取方法请参考准备环境。
示例:
python consumer.py dms-kafka.cn-north-1.myhuaweicloud.com:37003 g-7ec0caac-01fb-4f91-a4f2-0a9dd48f8af7 k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299
示例代码
生产消息
# Read lines from stdin, produce each line to Kafka for msg in msgs: try: # Produce line (without newline) p.produce(topic, msg.rstrip(), callback=delivery_callback) except BufferError as e: sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p)) # Serve delivery callback queue. # NOTE: Since produce() is an asynchronous API this poll() call # will most likely not serve the delivery callback for the # last produce()d message. p.poll(0)
消费消息
# Read messages from Kafka, print to stdout try: while True: msg = c.poll(timeout=1.0) if msg is None: continue if msg.error(): # Error or event if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): # Error raise KafkaException(msg.error()) else: # Proper message sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' % (msg.topic(), msg.partition(), msg.offset(), str(msg.key()))) print(msg.value()) except KeyboardInterrupt: sys.stderr.write('%% Aborted by user\n')
父主题: Kafka普通版开发指南