Updated on 2024-08-16 GMT+08:00

Kafka Multi-Thread Producer API Usage Sample

Function Description

The multi-thread producer function is implemented based on the code sample described in Kafka Producer API Usage Sample. Multiple producer threads can be started. Each thread sends messages to the partition whose key is the same as the thread ID.

The following code snippets belong to the com.huawei.bigdata.kafka.example.ProducerMultThread class. They are used to enable multiple threads to produce data.

Sample Code

Run method logic of the producer thread class

        /**
         * The Producer thread executes a function to send messages periodically.
         */
        public void run()
        {
            LOG.info("Producer: start.");
            // Record the number of messages.
            int messageCount = 1;

            // Specify the number of messages sent by each thread.
            int messagesPerThread = 5;

            while (messageCount <= messagesPerThread)
            {
                // Specify the content of messages to be sent.
                String messageStr = new String("Message_" + sendThreadId + "_" + messageCount);

                // Specify a key value for each thread to enable the thread to send messages to only a specified partition.
                Integer key = new Integer(sendThreadId);

                long startTime = System.currentTimeMillis();

                // Construct a message record.
                ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, key, messageStr);

                if (isAsync)
                {
                    // Send asynchronously.
                    producer.send(record, new DemoCallBack(startTime, key, messageStr));
                }
                else
                {
                    try
                    {
                        // Send synchronously.
                        producer.send(record).get();
                    }
                    catch (InterruptedException ie)
                    {
                        LOG.info("The InterruptedException occured : {}.", ie);
                    }
                    catch (ExecutionException ee)
                    {
                        LOG.info("The ExecutionException occured : {}.", ee);
                    }
                }
                LOG.info("Producer: send " + messageStr + " to " + topic + " with key: " + key);
                messageCount++;

                // Send a message every other second.
                try
                {
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }

Thread startup logic of the ProducerMultThread main class

    /**
     * Start multiple threads for sending.
     */
    public void run()
    {
        // Specify whether to use the asynchronous sending mode.
        final boolean asyncEnable = false;

        // Specify the thread number, which is the unique identifier of a thread.
        for (int threadNum = 0; threadNum < PRODUCER_THREAD_COUNT; threadNum++)
        {
            ProducerThread producerThread = new ProducerThread(topic, asyncEnable, threadNum);
            producerThread.start();
        }

    }