C SDK
This section describes how to access a DMS Kafka queue by using a Linux C client. To obtain related configuration information, see Preparing the Environment.
DMS is compatible with native Kafka APIs. For details on how to call native Kafka APIs, see the Kafka Documentation.
Preparing the Client Environment
- Download the SDK package.
Decompress the kafka-dms-c.tar.gz file from the SDK package.
- Install the GNU Compiler Collection (GCC) tool.
Run the following command to check whether the GCC tool has been installed:
gcc -v
Install the GCC tool if it has not been installed.- Ubuntu OS:
- Run the apt-cache search gcc command to check whether GCC exists in the software repository. If not, configure the correct software repository address.
- Run the apt-get install gcc command to install GCC.
- SUSE OS:
- Run the zypper search gcc command to check whether GCC exists in the software repository. If not, configure the correct software repository address.
- Run the zypper install gcc command to install GCC.
- CentOS or Red Hat OS:
- yum search gcc
- yum install gcc
- Ubuntu OS:
- Decompress the kafka-dms-c.tar.gz package.
The following assumes that the package is decompressed in the {DMSPATH} directory.
- Run the following command to copy Kafka dependency packages to the local include directory:
cp {DMSPATH}/kafka-dms-c/librdkafka/include/librdkafka /usr/include/ -R
- Run the following command to copy the Kafka dependency packages to the lib directory:
cp {DMSPPATH}/kafka-dms-c/librdkafka/lib/* /usr/lib/ -R
- Run the following commands to compile the sample code:
cd {DMSPATH}/kafka-dms-c/example
gcc my_consumer.c -o my_consumer -lrdkafka -lz -lpthread -lrt
gcc my_producer.c -o my_producer -lrdkafka -lz -lpthread -lrt
If the message "cannot find -lz" is displayed, run the apt-get install zlib1g-dev command.
Running Example
- Run the following command to produce messages:
./my_producer <broker> <topic> <project_id> <access_key> <secrect_key>
Table 1 Parameter description Parameter
Description
How to Obtain
broker
Kafka endpoint
For details, see Preparing the Environment.
topic
Kafka topic ID
project_id
Project ID
access_key
Tenant AK
secrect_key
Tenant SK
After running the preceding command, you can send a message to the Kafka instance by writing it and pressing Enter. Each line of content is sent as a message.
To stop creating messages, press Ctrl+C to exit.
- Run the following command to consume messages:
./my_consumer <broker> <topic> <group> <project_id> <access_key> <secrect_key>
Table 2 Parameter description Parameter
Description
How to Obtain
broker
Kafka endpoint
For details, see Preparing the Environment.
topic
Kafka topic ID
group
Kafka consumer group ID
project_id
Project ID
access_key
Tenant AK
secrect_key
Tenant SK
After the preceding command is run, messages will be consumed from the earliest message in the queue. To stop consuming messages, press Ctrl+C to exit.
Code Example
Producer parameters
if (rd_kafka_produce( /* Topic object */ rkt, /* Selects built-in partitions.*/ RD_KAFKA_PARTITION_UA, /* Generates a copy of the payload.*/ RD_KAFKA_MSG_F_COPY, /* Message body and length*/ buf, len, /* Optional key and its length*/ NULL, 0, NULL) == -1){ fprintf(stderr, "%% Failed to produce to topic %s: %s\n", rd_kafka_topic_name(rkt), rd_kafka_err2str(rd_kafka_last_error())); if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){ /* If the internal queue is full, wait until the message transmission is complete and then try again. The internal queue indicates messages to be sent and messages that have been sent or failed to be sent. The internal queue is limited by the queue.buffering.max.messages configuration.*/ rd_kafka_poll(rk, 1000); goto retry; } }else{ fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n", len, rd_kafka_topic_name(rkt));
Consumer parameters
if(!initKafka(brokers, group, topic, project_id, access_key, secrect_key)){ fprintf(stderr, "kafka server initialize error\n"); }else{ while(run){ rd_kafka_message_t *rkmessage; /* Polls for consumers' messages or events, which will block messages for timeout_ms at most. - The application should periodically call consumer_poll () even if there are no expected messages. Calling this function is important especially when rebalance_cb has been registered. This is because this function needs to be correctly called and processed to synchronize the internal consumer status. */ rkmessage = rd_kafka_consumer_poll(rk, 1000); if(rkmessage){ msg_consume(rkmessage, NULL); /* Releases the resources of rkmessage and gives the ownership back to rdkafka*/. rd_kafka_message_destroy(rkmessage); } } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot