更新时间:2022-07-14 GMT+08:00
分享

Producer API使用样例

功能简介

下面代码片段在com.huawei.bigdata.kafka.example.Producer类的run方法中,用于实现Producer API向安全Topic生产消息。

代码样例

/** 
 * 生产者线程执行函数,循环发送消息。 
 */ 
public void run() {
    LOG.info("New Producer: start.");
    int messageNo = 1;
    
    while (messageNo <= MESSAGE_NUM) {
        String messageStr = "Message_" + messageNo;
        long startTime = System.currentTimeMillis();
        
        // 构造消息记录
        ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, messageNo, messageStr);
        
        if (isAsync) {
            // 异步发送
            producer.send(record, new DemoCallBack(startTime, messageNo, messageStr));
        } else {
            try {
                // 同步发送
                producer.send(record).get();
                long elapsedTime = System.currentTimeMillis() - startTime;
                LOG.info("message(" + messageNo + ", " + messageStr + ") sent to topic(" + topic + ") in " + elapsedTime + " ms.");
            } catch (InterruptedException ie) {
                LOG.info("The InterruptedException occured : {}.", ie);
            } catch (ExecutionException ee) {
                LOG.info("The ExecutionException occured : {}.", ee);
            }
        }
        messageNo++;
    }
}

分享:

    相关文档

    相关产品