更新时间:2024-12-10 GMT+08:00
Kafka Producer API使用样例
功能介绍
下面代码片段在com.huawei.bigdata.kafka.example.Producer类中,用于实现新Producer API向安全Topic生产消息。
样例代码
Producer线程run方法中的消费逻辑。
样例代码获取方式请参考获取MRS应用开发样例工程。
代码样例:
public void run()
{
LOG.info("New Producer: start.");
int messageNo = 1;
// 指定发送多少条消息后sleep1秒
int intervalMessages=10;
while (messageNo <= messageNumToSend)
{
String messageStr = "Message_" + messageNo;
long startTime = System.currentTimeMillis();
// 构造消息记录
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, messageNo, messageStr);
if (isAsync)
{
// 异步发送
producer.send(record, new DemoCallBack(startTime, messageNo, messageStr));
}
else
{
try
{
// 同步发送
producer.send(record).get();
}
catch (InterruptedException ie)
{
LOG.info("The InterruptedException occurred : {}.", ie);
}
catch (ExecutionException ee)
{
LOG.info("The ExecutionException occurred : {}.", ee);
}
}
messageNo++;
if (messageNo % intervalMessages == 0)
{
// 每发送intervalMessage条消息sleep1秒
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
LOG.info("The Producer have send {} messages.", messageNo);
}
}
}
父主题: 开发Kafka应用