Why Does Non-static KafkaPartitioner Class Object Fail to Construct FlinkKafkaProducer010?
Question
After Flink kernel is upgraded to 1.3.0 or later versions, if Kafka calls the FlinkKafkaProducer010 that contains the non-static KafkaPartitioner class object as parameter to construct functions, an error is reported.
The error message is as follows:
org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaPartitioner is not serializable. The object probably contains or references non serializable fields.
Answer
In the 1.3.0 version of Flink, the FlinkKafkaDelegatePartitioner class is added, so that Flink allows APIs that use KafkaPartitioner, for example, FlinkKafkaProducer010 that contains KafkaPartitioner object, to construct functions.
The FlinkKafkaDelegatePartitioner class defines the member variable kafkaPartitioner.
private final KafkaPartitioner<T> kafkaPartitioner;
When Flink input parameter KafkaPartitioner constructs FlinkKafkaProducer010, the call stack is as follows:
FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) -> FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) ----> FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) ------> ClosureCleaner::clean(Object func, boolean checkSerializable)
Run the KafkaPartitioner object to construct a FlinkKafkaDelegatePartitioner object, and then check whether the object can be serializable. The ClosureCleaner::clean function is a static function. If the KafkaPartitioner object in a case is non-static, the ClosureCleaner::clean function cannot access the non-static member variable kafkaPartitioner in the KafkaDelegatePartitioner class and an exception is reported.
Either of the following methods can be used to solve the problem:
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.