更新时间:2024-06-14 GMT+08:00
Kafka Producer API使用样例
功能介绍
下面代码片段在com.huawei.bigdata.kafka.example.Producer类中,用于实现新Producer API向安全Topic生产消息。
样例代码
Producer线程run方法中的消费逻辑
public void run() { LOG.info("New Producer: start."); int messageNo = 1; // 指定发送多少条消息后sleep1秒 int intervalMessages=10; while (messageNo <= messageNumToSend) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); // 构造消息记录 ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, messageNo, messageStr); if (isAsync) { // 异步发送 producer.send(record, new DemoCallBack(startTime, messageNo, messageStr)); } else { try { // 同步发送 producer.send(record).get(); } catch (InterruptedException ie) { LOG.info("The InterruptedException occured : {}.", ie); } catch (ExecutionException ee) { LOG.info("The ExecutionException occured : {}.", ee); } } messageNo++; if (messageNo % intervalMessages == 0) { // 每发送intervalMessage条消息sleep1秒 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } LOG.info("The Producer have send {} messages.", messageNo); } } }
父主题: 开发Kafka应用