C SDK
本节以Linux系统为例介绍C客户端如何连接DMS服务的Kafka队列,获取相关配置信息请参考准备环境章节。
DMS服务兼容Kafka原生API,相关调用方法请参考Kafka官网说明。
客户端环境搭建
- 下载SDK包。
解压后获取kafka-dms-c.tar.gz文件。
- 安装gcc工具。
执行如下命令检查是否已安装gcc工具。
gcc -v
如果没有gcc,需要安装gcc。- ubuntu操作系统:
- 执行apt-cache search gcc命令检查软件仓库中是否有gcc。如没有,请自行配置软件仓库的地址。
- 执行apt-get install gcc命令安装gcc。
- suse操作系统:
- 执行zypper search gcc命令检查软件仓库中是否有gcc。如没有,请自行配置软件仓库的地址。
- 执行zypper install gcc命令安装gcc。
- centos/redhat操作系统:
- yum search gcc
- yum install gcc
- ubuntu操作系统:
- 解压缩kafka-dms-c.tar.gz。
以下假设解压目录为{DMSPATH}。
- 执行如下命令,拷贝kafka依赖到本地include目录下。
cp {DMSPATH}/kafka-dms-c/librdkafka/include/librdkafka /usr/include/ -R
- 执行如下命令,拷贝kafka依赖到本地的lib库中。
cp {DMSPPATH}/kafka-dms-c/librdkafka/lib/* /usr/lib/ -R
- 执行如下命令编译示例代码。
cd {DMSPATH}/kafka-dms-c/example
gcc my_consumer.c -o my_consumer -lrdkafka -lz -lpthread -lrt
gcc my_producer.c -o my_producer -lrdkafka -lz -lpthread -lrt
说明:如果提示“cannot find -lz”,执行“apt-get install zlib1g-dev”即可。
运行示例
- 执行如下命令进行生产消息。
./my_producer <broker> <topic> <project_id> <access_key> <secrect_key>
表1 参数说明 参数
说明
获取方式
broker
配置为Kafka的endpoint地址。
请参考准备环境
topic
配置为Kafka Topic的id。
project_id
项目ID。
access_key
租户AK。
secrect_key
租户SK。
执行完命令后输入消息内容,按回车键发送消息到Kafka队列中,输入的每一行内容都将作为一条消息发送到Kafka队列。
如需停止生产使用Ctrl+C命令退出。
- 执行如下命令消费消息。
./my_consumer <broker> <topic> <group> <project_id> <access_key> <secrect_key>
表2 参数说明 参数
说明
获取方式
broker
配置为Kafka的endpoint地址。
请参考准备环境
topic
配置为Kafka Topic的id。
group
配置为Kafka队列的消费组id。
project_id
项目ID。
access_key
租户AK。
secrect_key
租户SK。
执行完命令后自动从队列中最早的消息开始消费,如需停止消费使用Ctrl+C命令退出。
示例代码
生产消息
if (rd_kafka_produce( /* Topic object */ rkt, /*使用内置的分区来选择分区*/ RD_KAFKA_PARTITION_UA, /*生成payload的副本*/ RD_KAFKA_MSG_F_COPY, /*消息体和长度*/ buf, len, /*可选键及其长度*/ NULL, 0, NULL) == -1){ fprintf(stderr, "%% Failed to produce to topic %s: %s\n", rd_kafka_topic_name(rkt), rd_kafka_err2str(rd_kafka_last_error())); if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){ /*如果内部队列满,等待消息传输完成并retry, 内部队列表示要发送的消息和已发送或失败的消息, 内部队列受限于queue.buffering.max.messages配置项*/ rd_kafka_poll(rk, 1000); goto retry; } }else{ fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n", len, rd_kafka_topic_name(rkt));
消费消息
if(!initKafka(brokers, group, topic, project_id, access_key, secrect_key)){ fprintf(stderr, "kafka server initialize error\n"); }else{ while(run){ rd_kafka_message_t *rkmessage; /*-轮询消费者的消息或事件,最多阻塞timeout_ms -应用程序应该定期调用consumer_poll(),即使没有预期的消息,注册过rebalance_cb,该操作尤为重要, 因为它需要被正确地调用和处理以同步内部消费者状态 */ rkmessage = rd_kafka_consumer_poll(rk, 1000); if(rkmessage){ msg_consume(rkmessage, NULL); /*释放rkmessage的资源,并把所有权还给rdkafka*/ rd_kafka_message_destroy(rkmessage); } } }