更新时间:2022-02-21 GMT+08:00

C SDK

本节以Linux系统为例介绍C客户端如何连接DMS服务的Kafka队列,获取相关配置信息请参考准备环境章节。

DMS服务兼容Kafka原生API,相关调用方法请参考Kafka官网说明

客户端环境搭建

  1. 下载SDK包

    解压后获取kafka-dms-c.tar.gz文件。

  2. 安装gcc工具。

    执行如下命令检查是否已安装gcc工具。

    gcc -v

    如果没有gcc,需要安装gcc。
    • ubuntu操作系统:
      • 执行apt-cache search gcc命令检查软件仓库中是否有gcc。如没有,请自行配置软件仓库的地址。
      • 执行apt-get install gcc命令安装gcc。
    • suse操作系统:
      • 执行zypper search gcc命令检查软件仓库中是否有gcc。如没有,请自行配置软件仓库的地址。
      • 执行zypper install gcc命令安装gcc。
    • centos/redhat操作系统:
      • yum search gcc
      • yum install gcc

  3. 解压缩kafka-dms-c.tar.gz。

    以下假设解压目录为{DMSPATH}。

  4. 执行如下命令,拷贝kafka依赖到本地include目录下。

    cp {DMSPATH}/kafka-dms-c/librdkafka/include/librdkafka /usr/include/ -R

  5. 执行如下命令,拷贝kafka依赖到本地的lib库中。

    cp {DMSPPATH}/kafka-dms-c/librdkafka/lib/* /usr/lib/ -R

  6. 执行如下命令编译示例代码。

    cd {DMSPATH}/kafka-dms-c/example

    gcc my_consumer.c -o my_consumer -lrdkafka -lz -lpthread -lrt

    gcc my_producer.c -o my_producer -lrdkafka -lz -lpthread -lrt

    说明:如果提示“cannot find -lz”,执行“apt-get install zlib1g-dev”即可。

运行示例

  1. 执行如下命令进行生产消息。

    ./my_producer <broker> <topic> <project_id> <access_key> <secrect_key>

    表1 参数说明

    参数

    说明

    获取方式

    broker

    配置为Kafka的endpoint地址。

    请参考准备环境

    topic

    配置为Kafka Topic的id。

    project_id

    项目ID。

    access_key

    租户AK。

    secrect_key

    租户SK。

    执行完命令后输入消息内容,按回车键发送消息到Kafka队列中,输入的每一行内容都将作为一条消息发送到Kafka队列。

    如需停止生产使用Ctrl+C命令退出。

  2. 执行如下命令消费消息。

    ./my_consumer <broker> <topic> <group> <project_id> <access_key> <secrect_key>

    表2 参数说明

    参数

    说明

    获取方式

    broker

    配置为Kafka的endpoint地址。

    请参考准备环境

    topic

    配置为Kafka Topic的id。

    group

    配置为Kafka队列的消费组id。

    project_id

    项目ID。

    access_key

    租户AK。

    secrect_key

    租户SK。

    执行完命令后自动从队列中最早的消息开始消费,如需停止消费使用Ctrl+C命令退出。

示例代码

生产消息

        if (rd_kafka_produce(
                    /* Topic object */
                     rkt,
                    /*使用内置的分区来选择分区*/
                     RD_KAFKA_PARTITION_UA,
                    /*生成payload的副本*/
                     RD_KAFKA_MSG_F_COPY,
                    /*消息体和长度*/
                     buf, len,
                    /*可选键及其长度*/
                     NULL, 0,
                     NULL) == -1){
             fprintf(stderr, 
                 "%% Failed to produce to topic %s: %s\n", 
                 rd_kafka_topic_name(rkt),
                 rd_kafka_err2str(rd_kafka_last_error()));
 
             if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){
                /*如果内部队列满,等待消息传输完成并retry,
                内部队列表示要发送的消息和已发送或失败的消息,
                内部队列受限于queue.buffering.max.messages配置项*/
                 rd_kafka_poll(rk, 1000);
                 goto retry;
             }  
         }else{
             fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n", 
                 len, rd_kafka_topic_name(rkt));

消费消息

    if(!initKafka(brokers, group, topic, project_id, access_key, secrect_key)){
        fprintf(stderr, "kafka server initialize error\n");
    }else{
        while(run){
            rd_kafka_message_t *rkmessage;
            /*-轮询消费者的消息或事件,最多阻塞timeout_ms
             -应用程序应该定期调用consumer_poll(),即使没有预期的消息,注册过rebalance_cb,该操作尤为重要,
             因为它需要被正确地调用和处理以同步内部消费者状态 */
            rkmessage = rd_kafka_consumer_poll(rk, 1000);
            if(rkmessage){
                msg_consume(rkmessage, NULL);
                /*释放rkmessage的资源,并把所有权还给rdkafka*/
                rd_kafka_message_destroy(rkmessage);
            }
        }
    }