更新时间:2024-08-03 GMT+08:00

Kafka 多线程Producer API使用样例

功能介绍

Kafka Producer API使用样例基础上,实现了多线程Producer,可启动多个Producer线程,并通过指定相同key值的方式,使每个线程对应向特定Partition发送消息。

下面代码片段在com.huawei.bigdata.kafka.example.ProducerMultThread类中,用于实现多线程生产数据。

代码样例

生产者类线程类的run方法逻辑

        /**
         * 生产者线程执行函数,循环发送消息。
         */
        public void run()
        {
            LOG.info("Producer: start.");
            // 用于记录消息条数
            int messageCount = 1;

            // 每个线程发送的消息条数
            int messagesPerThread = 5;

            while (messageCount <= messagesPerThread)
            {
                // 待发送的消息内容
                String messageStr = new String("Message_" + sendThreadId + "_" + messageCount);

                // 此处对于同一线程指定相同Key值,确保每个线程只向同一个Partition生产消息
                Integer key = new Integer(sendThreadId);

                long startTime = System.currentTimeMillis();

                // 构造消息记录
                ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(topic, key, messageStr);

                if (isAsync)
                {
                    // 异步发送
                    producer.send(record, new DemoCallBack(startTime, key, messageStr));
                }
                else
                {
                    try
                    {
                        // 同步发送
                        producer.send(record).get();
                    }
                    catch (InterruptedException ie)
                    {
                        LOG.info("The InterruptedException occured : {}.", ie);
                    }
                    catch (ExecutionException ee)
                    {
                        LOG.info("The ExecutionException occured : {}.", ee);
                    }
                }
                LOG.info("Producer: send " + messageStr + " to " + topic + " with key: " + key);
                messageCount++;

                // 每隔1s,发送1条消息
                try
                {
                    Thread.sleep(1000);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }

ProducerMultThread主类的线程启动逻辑

    /**
     * 启动多个线程进行发送
     */
    public void run()
    {
        // 是否使用异步发送模式
        final boolean asyncEnable = false;

        // 指定的线程号,仅用于区分不同的线程
        for (int threadNum = 0; threadNum < PRODUCER_THREAD_COUNT; threadNum++)
        {
            ProducerThread producerThread = new ProducerThread(topic, asyncEnable, threadNum);
            producerThread.start();
        }

    }