Updated on 2024-10-11 GMT+08:00

Kafka Consumer Reads Oversized Records

Symptom

After data is written to Kafka, a user develops an application and invokes the interface (org.apache.kafka.clients.consumer.*) to read data from Kafka as a Consumer. However, the reading fails and the following error is reported:

..........
1687 [KafkaConsumerExample] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Successfully joined group DemoConsumer with generation 1
1688 [KafkaConsumerExample] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Setting newly assigned partitions [default-0, default-1, default-2] for group DemoConsumer
2053 [KafkaConsumerExample] ERROR com.XXXXXX.bigdata.kafka.example.NewConsumer  - [KafkaConsumerExample], Error due to 
org.apache.kafka.common.errors.RecordTooLargeException: There are some messages at [Partition=Offset]: {default-0=177} whose size is larger than the fetch size 1048576 and hence cannot be ever returned. Increase the fetch size on the client (using max.partition.fetch.bytes), or decrease the maximum message size the broker will allow (using message.max.bytes).
2059 [KafkaConsumerExample] INFO  com.XXXXXX.bigdata.kafka.example.NewConsumer  - [KafkaConsumerExample], Stopped 
.......

Cause Analysis

When reading data, the Kafka client compares the size of the data to be read with the value of max.partition.fetch.bytes. If the size exceeds the value of max.partition.fetch.bytes, the preceding exception is reported.

Solution

  1. When creating a Kafka Consumer instance during initialization, set max.partition.fetch.bytes.

    For example, you can set this parameter to 5252880 as follows:
    ......
    // Security protocol type
    props.put(securityProtocol, kafkaProc.getValues(securityProtocol, "SASL_PLAINTEXT"));
    // Service name
    props.put(saslKerberosServiceName, "kafka");
            
    props.put("max.partition.fetch.bytes","5252880");
    ......