更新时间:2022-02-21 GMT+08:00
Python SDK
本节以Linux系统为例介绍Python客户端如何连接DMS服务的Kafka队列,获取相关配置信息请参考准备环境章节。
DMS服务兼容Kafka原生API,相关调用方法请参考Kafka官网说明。
客户端环境搭建
- 根据python编码格式选择对应版本的SDK包。
- 下载SDK包,解压后获取如下两个文件。
- kafka-dms-python-code2.tar.gz
- kafka-dms-python-code4.tar.gz
- 在任意目录下,执行如下命令新增test.py文件。
将如下内容写入文件中,并保存。
import sys if sys.maxunicode > 65535: print 'Your python is suitable for kafka-dms-python-code4.tar.gz' else: print 'Your python is suitable for kafka-dms-python-code2.tar.gz' - 执行如下命令选择Python SDK包。
- 根据界面提示选择SDK包。
- 下载SDK包,解压后获取如下两个文件。
- 解压缩SDK文件。
以下假设解压目录为{root_dir}。
- 执行如下命令配置环境变量。
vi ~/.bash_profile
将如下行写入配置文件中,保存退出。
export LD_LIBRARY_PATH={root_dir}/kafka-dms-python/confluent_kafka/lib:{root_dir}/kafka-dms-python/confluent_kafka/thirdPart执行如下命令使修改生效。
source ~/.bash_profile
- 编辑{root_dir}/kafka-dms-python/conf.ini文件,配置如下项:
sasl.project.id=your project sasl.access.key=your ak sasl.security.key=your sk
获取方法请参考准备环境,修改完成后保存文件。
运行示例
- 执行如下命令生产消息。
cd {root_dir}/kafka-dms-python/
python producer.py <bootstrap-brokers> <topic> <msg1> <msg2> <msg3> ..
说明:bootstrap-brokers配置为Kafka的endpoint地址,topic配置为Kafka Topic的id,获取方法请参考准备环境。
示例:
python producer.py dms-kafka.cn-north-1.myhuaweicloud.com:37003 k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299 123 456 789
- 执行如下命令从队列中最早的消息开始消费消息。
python consumer.py <bootstrap-brokers> <group> <topic>
说明:group配置为Kafka队列的消费组id,获取方法请参考准备环境。
示例:
python consumer.py dms-kafka.cn-north-1.myhuaweicloud.com:37003 g-7ec0caac-01fb-4f91-a4f2-0a9dd48f8af7 k-bd67aaead60940d688b872c31bdc653b-4df89da6-ede4-4072-93e0-28dc6e866299
示例代码
生产消息
# Read lines from stdin, produce each line to Kafka
for msg in msgs:
try:
# Produce line (without newline)
p.produce(topic, msg.rstrip(), callback=delivery_callback)
except BufferError as e:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
len(p))
# Serve delivery callback queue.
# NOTE: Since produce() is an asynchronous API this poll() call
# will most likely not serve the delivery callback for the
# last produce()d message.
p.poll(0)
消费消息
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
# Error or event
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
# Error
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
父主题: Kafka普通版开发指南