Updated on 2022-12-08 GMT+08:00

Kafka Producer Writes Oversized Records

Symptom

When a user develops a Kafka application and invokes the new interface (org.apache.kafka.clients.producer.*) as a Producer to write data to Kafka, the size of a single record is 1100055, which exceeds the value (1000012) of message.max.bytes in the Kafka configuration file server.properties. After the values of message.max.bytes and replica.fetch.max.bytes in the Kafka service configuration are changed to 5242880, the exception persists. The error information is as follows:

..........
14749 [Thread-0] INFO  com.xxxxxx.bigdata.kafka.example.NewProducer  - The ExecutionException occured : {}.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1100093 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:739)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:483)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
at com.xxxxxx.bigdata.kafka.example.NewProducer.run(NewProducer.java:150)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is **** bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
.......

Cause Analysis

When data is written to Kafka, the Kafka client compares the value of max.request.size with the size of the data to be written. If the size of the data to be written exceeds the default value of max.request.size, the preceding exception is reported.

Solution

  1. You can set the value of max.request.size when initializing the Kafka Producer instance.

    For example, you can set this parameter to 5252880 as follows:
            // Protocol type: Currently, the SASL_PLAINTEXT or PLAINTEXT protocol types can be used.
            props.put(securityProtocol, kafkaProc.getValues(securityProtocol, "SASL_PLAINTEXT"));
             // Service name
            props.put(saslKerberosServiceName, "kafka");
            props.put("max.request.size", "5252880");
            .......