更新时间:2024-10-18 GMT+08:00
使用Producer API向安全Topic生产消息
功能简介
用于实现Producer API向安全Topic生产消息。
代码样例
以下为用于实现Producer API向安全Topic生产消息的代码片段。
详细内容在com.huawei.bigdata.kafka.example.Producer类的run方法中。
/** * 生产者线程执行函数,循环发送消息。 */ public void run() { LOG.info("New Producer: start."); int messageNo = 1; while (messageNo <= MESSAGE_NUM) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); // 构造消息记录 ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, messageNo, messageStr); if (isAsync) { // 异步发送 producer.send(record, new DemoCallBack(startTime, messageNo, messageStr)); } else { try { // 同步发送 producer.send(record).get(); long elapsedTime = System.currentTimeMillis() - startTime; LOG.info("message(" + messageNo + ", " + messageStr + ") sent to topic(" + topic + ") in " + elapsedTime + " ms."); } catch (InterruptedException ie) { LOG.info("The InterruptedException occured : {}.", ie); } catch (ExecutionException ee) { LOG.info("The ExecutionException occured : {}.", ee); } } messageNo++; } }
父主题: 开发Kafka应用