Updated on 2023-08-31 GMT+08:00

Multi-thread Producer Example

Function

The multi-thread producer function is implemented based on the code sample described in Producer API Sample. Multiple producer threads can be started. Each thread sends messages to the partition whose key is the same as the thread ID.

The following code snippet belongs to the run method in the com.huawei.bigdata.kafka.example.ProducerMultThread class. They are used to enable multiple threads to produce data.

Sample Code

/**
 * Specify the key value as the current thread ID and send data.
 */
public void run()
{
LOG.info("Producer: start.");

    // Record the number of messages.
int messageCount = 1;

    // Specify the number of messages sent by each thread.
int messagesPerThread = 5;
while (messageCount <= messagesPerThread)
    {

        // Specify the content of messages to be sent.
        String messageStr = new String("Message_" + sendThreadId + "_" + messageCount);

        // Specify a key value for each thread to enable the thread to send messages to only a specified partition.
        String key = String.valueOf(sendThreadId);

        // Send a message.
        producer.send(new KeyedMessage<String, String>(sendTopic, key, messageStr));
LOG.info("Producer: send " + messageStr + " to " + sendTopic + " with key: " + key);
        messageCount++;
     }
}