更新时间:2024-08-05 GMT+08:00

使用多线程Producer发送消息

功能简介

使用Producer API向安全Topic生产消息基础上,实现了多线程Producer,可启动多个Producer线程,并通过指定相同key值的方式,使每个线程对应向特定Partition发送消息。

下面代码片段在com.huawei.bigdata.kafka.example.ProducerMultThread类的run方法中,用于实现多线程生产数据。

代码样例

/**
 * 指定Key值为当前ThreadId,发送数据。
 */
public void run()
{
LOG.info("Producer: start.");

    // 用于记录消息条数。
int messageCount = 1;

    // 每个线程发送的消息条数。
int messagesPerThread = 5;
while (messageCount <= messagesPerThread)
    {

        // 待发送的消息内容。
        String messageStr = new String("Message_" + sendThreadId + "_" + messageCount);

        // 此处对于同一线程指定相同Key值,确保每个线程只向同一个Partition生产消息。
        String key = String.valueOf(sendThreadId);

        // 消息发送。
        producer.send(new KeyedMessage<String, String>(sendTopic, key, messageStr));
LOG.info("Producer: send " + messageStr + " to " + sendTopic + " with key: " + key);
        messageCount++;
     }
}