更新时间:2024-12-10 GMT+08:00
Kafka 多线程Producer API使用样例
功能介绍
在Kafka Producer API使用样例基础上,实现了多线程Producer,可启动多个Producer线程,并通过指定相同key值的方式,使每个线程对应向特定Partition发送消息。
下面代码片段在com.huawei.bigdata.kafka.example.ProducerMultThread类中,用于实现多线程生产数据。
代码样例
生产者类线程类的run方法逻辑。
/**
* 生产者线程执行函数,循环发送消息。
*/
public void run()
{
LOG.info("Producer: start.");
// 用于记录消息条数
int messageCount = 1;
// 每个线程发送的消息条数
int messagesPerThread = 5;
while (messageCount <= messagesPerThread)
{
// 待发送的消息内容
String messageStr = new String("Message_" + sendThreadId + "_" + messageCount);
// 此处对于同一线程指定相同Key值,确保每个线程只向同一个Partition生产消息
Integer key = new Integer(sendThreadId);
long startTime = System.currentTimeMillis();
// 构造消息记录
ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, key, messageStr);
if (isAsync)
{
// 异步发送
producer.send(record, new DemoCallBack(startTime, key, messageStr));
}
else
{
try
{
// 同步发送
producer.send(record).get();
}
catch (InterruptedException ie)
{
LOG.info("The InterruptedException occurred : {}.", ie);
}
catch (ExecutionException ee)
{
LOG.info("The ExecutionException occurred : {}.", ee);
}
}
LOG.info("Producer: send " + messageStr + " to " + topic + " with key: " + key);
messageCount++;
// 每隔1s,发送1条消息
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
ProducerMultThread主类的线程启动逻辑
/**
* 启动多个线程进行发送
*/
public void run()
{
// 是否使用异步发送模式
final boolean asyncEnable = false;
// 指定的线程号,仅用于区分不同的线程
for (int threadNum = 0; threadNum < PRODUCER_THREAD_COUNT; threadNum++)
{
ProducerThread producerThread = new ProducerThread(topic, asyncEnable, threadNum);
producerThread.start();
}
}
父主题: 开发Kafka应用