Kafka Multi-Thread Producer API Usage Sample
Function Description
The multi-thread producer function is implemented based on the code sample described in Kafka Producer API Usage Sample. Multiple producer threads can be started. Each thread sends messages to the partition whose key is the same as the thread ID.
The following code snippets belong to the com.huawei.bigdata.kafka.example.ProducerMultThread class. They are used to enable multiple threads to produce data.
Sample Code
Run method logic of the producer thread class
/** * The Producer thread executes a function to send messages periodically. */ public void run() { LOG.info("Producer: start."); // Record the number of messages. int messageCount = 1; // Specify the number of messages sent by each thread. int messagesPerThread = 5; while (messageCount <= messagesPerThread) { // Specify the content of messages to be sent. String messageStr = new String("Message_" + sendThreadId + "_" + messageCount); // Specify a key value for each thread to enable the thread to send messages to only a specified partition. Integer key = new Integer(sendThreadId); long startTime = System.currentTimeMillis(); // Construct a message record. ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, key, messageStr); if (isAsync) { // Send asynchronously. producer.send(record, new DemoCallBack(startTime, key, messageStr)); } else { try { // Send synchronously. producer.send(record).get(); } catch (InterruptedException ie) { LOG.info("The InterruptedException occurred : {}.", ie); } catch (ExecutionException ee) { LOG.info("The ExecutionException occurred : {}.", ee); } } LOG.info("Producer: send " + messageStr + " to " + topic + " with key: " + key); messageCount++; // Send a message every other second. try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
Thread startup logic of the ProducerMultThread main class
/** * Start multiple threads for sending. */ public void run() { // Specify whether to use the asynchronous sending mode. final boolean asyncEnable = false; // Specify the thread number, which is the unique identifier of a thread. for (int threadNum = 0; threadNum < PRODUCER_THREAD_COUNT; threadNum++) { ProducerThread producerThread = new ProducerThread(topic, asyncEnable, threadNum); producerThread.start(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot