Help Center/
MapReduce Service/
Developer Guide (Normal_Earlier Than 3.x)/
Kafka Development Guide/
Developing a Kafka Application/
Kafka Producer API Usage Sample
Updated on 2024-08-16 GMT+08:00
Kafka Producer API Usage Sample
Function Description
The following code snippet belongs to the com.huawei.bigdata.kafka.example.Producer class. It is used by the new Producer APIs to produce messages for the security topic.
Sample Code
Consumption logic in the run method of the Producer threads
public void run() { LOG.info("New Producer: start."); int messageNo = 1; // Specify the number of messages to be sent before the thread sleeps for one second. int intervalMessages=10; while (messageNo <= messageNumToSend) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); // Construct a message record. ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, messageNo, messageStr); if (isAsync) { // Send asynchronously. producer.send(record, new DemoCallBack(startTime, messageNo, messageStr)); } else { try { // Send synchronously. producer.send(record).get(); } catch (InterruptedException ie) { LOG.info("The InterruptedException occured : {}.", ie); } catch (ExecutionException ee) { LOG.info("The ExecutionException occured : {}.", ee); } } messageNo++; if (messageNo % intervalMessages == 0) { // Send the number of messages specified for intervalMessage before the thread sleeps for one second. try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } LOG.info("The Producer have send {} messages.", messageNo); } } }
Parent topic: Developing a Kafka Application
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot