更新时间:2024-06-14 GMT+08:00
分享

Kafka Producer API使用样例

功能介绍

下面代码片段在com.huawei.bigdata.kafka.example.Producer类中,用于实现新Producer API向安全Topic生产消息。

样例代码

Producer线程run方法中的消费逻辑

    public void run()
    {
        LOG.info("New Producer: start.");
        int messageNo = 1;
        // 指定发送多少条消息后sleep1秒
        int intervalMessages=10; 
        
        while (messageNo <= messageNumToSend)
        {
            String messageStr = "Message_" + messageNo;
            long startTime = System.currentTimeMillis();
            
            // 构造消息记录
            ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, messageNo, messageStr);
            
            if (isAsync)
            {
                // 异步发送
                producer.send(record, new DemoCallBack(startTime, messageNo, messageStr));
            }
            else
            {
                try
                {
                    // 同步发送
                    producer.send(record).get();
                }
                catch (InterruptedException ie)
                {
                    LOG.info("The InterruptedException occured : {}.", ie);
                }
                catch (ExecutionException ee)
                {
                    LOG.info("The ExecutionException occured : {}.", ee);
                }
            }
            messageNo++;
            
            if (messageNo % intervalMessages == 0)
            {
                // 每发送intervalMessage条消息sleep1秒
                try
                {
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
                LOG.info("The Producer have send {} messages.", messageNo);
            }
        }
        
    }

相关文档