Updated on 2023-11-30 GMT+08:00

C SDK

This section describes how to access a DMS Kafka queue by using a Linux C client. To obtain related configuration information, see Preparing the Environment.

DMS is compatible with native Kafka APIs. For details on how to call native Kafka APIs, see the Kafka Documentation.

Preparing the Client Environment

  1. Download the SDK package.

    Decompress the kafka-dms-c.tar.gz file from the SDK package.

  2. Install the GNU Compiler Collection (GCC) tool.

    Run the following command to check whether the GCC tool has been installed:

    gcc -v

    Install the GCC tool if it has not been installed.
    • Ubuntu OS:
      • Run the apt-cache search gcc command to check whether GCC exists in the software repository. If not, configure the correct software repository address.
      • Run the apt-get install gcc command to install GCC.
    • SUSE OS:
      • Run the zypper search gcc command to check whether GCC exists in the software repository. If not, configure the correct software repository address.
      • Run the zypper install gcc command to install GCC.
    • CentOS or Red Hat OS:
      • yum search gcc
      • yum install gcc

  3. Decompress the kafka-dms-c.tar.gz package.

    The following assumes that the package is decompressed in the {DMSPATH} directory.

  4. Run the following command to copy Kafka dependency packages to the local include directory:

    cp {DMSPATH}/kafka-dms-c/librdkafka/include/librdkafka /usr/include/ -R

  5. Run the following command to copy the Kafka dependency packages to the lib directory:

    cp {DMSPPATH}/kafka-dms-c/librdkafka/lib/* /usr/lib/ -R

  6. Run the following commands to compile the sample code:

    cd {DMSPATH}/kafka-dms-c/example

    gcc my_consumer.c -o my_consumer -lrdkafka -lz -lpthread -lrt

    gcc my_producer.c -o my_producer -lrdkafka -lz -lpthread -lrt

    If the message "cannot find -lz" is displayed, run the apt-get install zlib1g-dev command.

Running Example

  1. Run the following command to produce messages:

    ./my_producer <broker> <topic> <project_id> <access_key> <secrect_key>

    Table 1 Parameter description

    Parameter

    Description

    How to Obtain

    broker

    Kafka endpoint

    For details, see Preparing the Environment.

    topic

    Kafka topic ID

    project_id

    Project ID

    access_key

    Tenant AK

    secrect_key

    Tenant SK

    After running the preceding command, you can send a message to the Kafka instance by writing it and pressing Enter. Each line of content is sent as a message.

    To stop creating messages, press Ctrl+C to exit.

  2. Run the following command to consume messages:

    ./my_consumer <broker> <topic> <group> <project_id> <access_key> <secrect_key>

    Table 2 Parameter description

    Parameter

    Description

    How to Obtain

    broker

    Kafka endpoint

    For details, see Preparing the Environment.

    topic

    Kafka topic ID

    group

    Kafka consumer group ID

    project_id

    Project ID

    access_key

    Tenant AK

    secrect_key

    Tenant SK

    After the preceding command is run, messages will be consumed from the earliest message in the queue. To stop consuming messages, press Ctrl+C to exit.

Code Example

Producer parameters

        if (rd_kafka_produce(
                    /* Topic object */
                     rkt,
                    /* Selects built-in partitions.*/
                     RD_KAFKA_PARTITION_UA,
                    /* Generates a copy of the payload.*/
                     RD_KAFKA_MSG_F_COPY,
                    /* Message body and length*/ 
                     buf, len,
                    /* Optional key and its length*/
                     NULL, 0,
                     NULL) == -1){
             fprintf(stderr, 
                 "%% Failed to produce to topic %s: %s\n", 
                 rd_kafka_topic_name(rkt),
                 rd_kafka_err2str(rd_kafka_last_error()));
 
             if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){
                /* If the internal queue is full, wait until the message transmission is complete and then try again.
                The internal queue indicates messages to be sent and messages that have been sent or failed to be sent.
                The internal queue is limited by the queue.buffering.max.messages configuration.*/
                 rd_kafka_poll(rk, 1000);
                 goto retry;
             }  
         }else{
             fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n", 
                 len, rd_kafka_topic_name(rkt));

Consumer parameters

    if(!initKafka(brokers, group, topic, project_id, access_key, secrect_key)){
        fprintf(stderr, "kafka server initialize error\n");
    }else{
        while(run){
            rd_kafka_message_t *rkmessage;
            /* Polls for consumers' messages or events, which will block messages for timeout_ms at most.
              - The application should periodically call consumer_poll () even if there are no expected messages. Calling this function is important especially when rebalance_cb has been registered.
                This is because this function needs to be correctly called and processed to synchronize the internal consumer status. */
            rkmessage = rd_kafka_consumer_poll(rk, 1000);
            if(rkmessage){
                msg_consume(rkmessage, NULL);
                /* Releases the resources of rkmessage and gives the ownership back to rdkafka*/.
                rd_kafka_message_destroy(rkmessage);
            }
        }
    }