Updated on 2023-08-31 GMT+08:00

Producer API Sample

Function

The following code snippet belongs to the run method in the com.huawei.bigdata.kafka.example.Producer class. It is used by the Producer APIs to consume messages in the security topic.

Sample Code

/** 
* The producer thread executes a function to send messages periodically.
 */ 
public void run() {
    LOG.info("New Producer: start.");
    int messageNo = 1;
    
    while (messageNo <= MESSAGE_NUM) {
        String messageStr = "Message_" + messageNo;
        long startTime = System.currentTimeMillis();
        
        // Construct a message record.
        ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, messageNo, messageStr);
        
        if (isAsync) {
            // Send data asynchronously.
            producer.send(record, new DemoCallBack(startTime, messageNo, messageStr));
        } else {
            try {
                // Send data synchronously.
                producer.send(record).get();
                long elapsedTime = System.currentTimeMillis() - startTime;
                LOG.info("message(" + messageNo + ", " + messageStr + ") sent to topic(" + topic + ") in " + elapsedTime + " ms.");
            } catch (InterruptedException ie) {
                LOG.info("The InterruptedException occured : {}.", ie);
            } catch (ExecutionException ee) {
                LOG.info("The ExecutionException occured : {}.", ee);
            }
        }
        messageNo++;
    }
}