Help Center/ MapReduce Service/ Developer Guide (LTS)/ Flink Development Guide (Normal Mode)/ More Information/ FAQ/ Why Does Non-static KafkaPartitioner Class Object Fail to Construct FlinkKafkaProducer010?
Updated on 2022-07-11 GMT+08:00

Why Does Non-static KafkaPartitioner Class Object Fail to Construct FlinkKafkaProducer010?

Question

After Flink kernel is upgraded to 1.3.0 or later versions, if Kafka calls the FlinkKafkaProducer010 that contains the non-static KafkaPartitioner class object as parameter to construct functions, an error is reported.

The error message is as follows:

org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaPartitioner is not serializable. The object probably contains or references non serializable fields.

Answer

In the 1.3.0 version of Flink, the FlinkKafkaDelegatePartitioner class is added, so that Flink allows APIs that use KafkaPartitioner, for example, FlinkKafkaProducer010 that contains KafkaPartitioner object, to construct functions.

The FlinkKafkaDelegatePartitioner class defines the member variable kafkaPartitioner.

private final KafkaPartitioner<T> kafkaPartitioner;

When Flink input parameter KafkaPartitioner constructs FlinkKafkaProducer010, the call stack is as follows:

FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner)
->  FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner)
---->  FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner)
------>  ClosureCleaner::clean(Object func, boolean checkSerializable)

Run the KafkaPartitioner object to construct a FlinkKafkaDelegatePartitioner object, and then check whether the object can be serializable. The ClosureCleaner::clean function is a static function. If the KafkaPartitioner object in a case is non-static, the ClosureCleaner::clean function cannot access the non-static member variable kafkaPartitioner in the KafkaDelegatePartitioner class and an exception is reported.

Either of the following methods can be used to solve the problem:

  • Change the KafkaPartitioner class into static class.

  • Use the FlinkKafkaProducer010 that contains FlinkKafkaPartitioner as the parameter to construct functions. In this case, FlinkKafkaDelegatePartitioner is not constructed and the exception about member variable is avoided.