Producer API Usage Sample
Function Description
The following code snippet belongs to the com.huawei.bigdata.kafka.example.Producer class. It is used by the new Producer APIs to produce messages for the security topic.
Sample Code
Consumption logic in the run method of the Producer threads
public void run() { LOG.info("New Producer: start."); int messageNo = 1; // Specify the number of messages to be sent before the thread sleeps for one second. int intervalMessages=10; while (messageNo <= messageNumToSend) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); // Construct a message record. ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, messageNo, messageStr); if (isAsync) { // Send asynchronously. producer.send(record, new DemoCallBack(startTime, messageNo, messageStr)); } else { try { // Send synchronously. producer.send(record).get(); } catch (InterruptedException ie) { LOG.info("The InterruptedException occured : {}.", ie); } catch (ExecutionException ee) { LOG.info("The ExecutionException occured : {}.", ee); } } messageNo++; if (messageNo % intervalMessages == 0) { // Send the number of messages specified for intervalMessage before the thread sleeps for one second. try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } LOG.info("The Producer have send {} messages.", messageNo); } } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.