On this page

Producer API Usage Sample

Updated on 2022-09-14 GMT+08:00

Function Description

The following code snippet belongs to the com.huawei.bigdata.kafka.example.Producer class. It is used by the new Producer APIs to produce messages for the security topic.

Sample Code

Consumption logic in the run method of the Producer threads

    public void run()
    {
        LOG.info("New Producer: start.");
        int messageNo = 1;
        // Specify the number of messages to be sent before the thread sleeps for one second.
        int intervalMessages=10; 
        
        while (messageNo <= messageNumToSend)
        {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            
            // Construct a message record.
            ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, messageNo, messageStr);
            
            if (isAsync)
            {
                // Send asynchronously.  
                producer.send(record, new DemoCallBack(startTime, messageNo, messageStr));
            }
            else
            {
                try
                {
                    // Send synchronously.
                    producer.send(record).get();
                }
                catch (InterruptedException ie)
                {
                    LOG.info("The InterruptedException occured : {}.", ie);
                }
                catch (ExecutionException ee)
                {
                    LOG.info("The ExecutionException occured : {}.", ee);
                }
            }
            messageNo++;
            
            if (messageNo % intervalMessages == 0)
            {
                // Send the number of messages specified for intervalMessage before the thread sleeps for one second.
                try
                {
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                LOG.info("The Producer have send {} messages.", messageNo);
            }
        }
        
    }
Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback