更新时间:2024-10-18 GMT+08:00
使用Producer API向安全Topic生产消息
功能简介
用于实现Producer API向安全Topic生产消息。
代码样例
以下为用于实现Producer API向安全Topic生产消息的代码片段。
详细内容在com.huawei.bigdata.kafka.example.Producer类的run方法中。
/**
* 生产者线程执行函数,循环发送消息。
*/
public void run() {
LOG.info("New Producer: start.");
int messageNo = 1;
while (messageNo <= MESSAGE_NUM) {
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
// 构造消息记录
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, messageNo, messageStr);
if (isAsync) {
// 异步发送
producer.send(record, new DemoCallBack(startTime, messageNo, messageStr));
} else {
try {
// 同步发送
producer.send(record).get();
long elapsedTime = System.currentTimeMillis() - startTime;
LOG.info("message(" + messageNo + ", " + messageStr + ") sent to topic(" + topic + ") in " + elapsedTime + " ms.");
} catch (InterruptedException ie) {
LOG.info("The InterruptedException occured : {}.", ie);
} catch (ExecutionException ee) {
LOG.info("The ExecutionException occured : {}.", ee);
}
}
messageNo++;
}
}
父主题: 开发Kafka应用