Updated on 2024-08-16 GMT+08:00

Kafka Producer API Usage Sample

Function Description

The following code snippet belongs to the com.huawei.bigdata.kafka.example.Producer class. It is used by the new Producer APIs to produce messages for the security topic.

Sample Code

Consumption logic in the run method of the Producer threads

    public void run()
    {
        LOG.info("New Producer: start.");
        int messageNo = 1;
        // Specify the number of messages to be sent before the thread sleeps for one second.
        int intervalMessages=10; 
        
        while (messageNo <= messageNumToSend)
        {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            
            // Construct a message record.
            ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, messageNo, messageStr);
            
            if (isAsync)
            {
                // Send asynchronously.  
                producer.send(record, new DemoCallBack(startTime, messageNo, messageStr));
            }
            else
            {
                try
                {
                    // Send synchronously.
                    producer.send(record).get();
                }
                catch (InterruptedException ie)
                {
                    LOG.info("The InterruptedException occured : {}.", ie);
                }
                catch (ExecutionException ee)
                {
                    LOG.info("The ExecutionException occured : {}.", ee);
                }
            }
            messageNo++;
            
            if (messageNo % intervalMessages == 0)
            {
                // Send the number of messages specified for intervalMessage before the thread sleeps for one second.
                try
                {
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                LOG.info("The Producer have send {} messages.", messageNo);
            }
        }
        
    }