Why Does Non-static KafkaPartitioner Class Object Fail to Construct FlinkKafkaProducer010?
Issue
After the Flink kernel is upgraded to 1.3.0, an error is reported when Kafka calls the FlinkKafkaProducer010 that contains the non-static KafkaPartitioner class object as the parameter to construct a function.
The error message is as follows:
org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaPartitioner is not serializable. The object probably contains or references non serializable fields.
Answer
The FlinkKafkaDelegatePartitioner class has been added to Flink 1.3.0 so that Flink can be compatible with APIs that use KafkaPartitioner, for example, FlinkKafkaProducer010 that contains the KafkaPartitioner class object, to construct functions.
The FlinkKafkaDelegatePartitioner class defines the member variable kafkaPartitioner.
private final KafkaPartitioner<T> kafkaPartitioner;
When Flink transfers KafkaPartitioner as a parameter to construct FlinkKafkaProducer010, the following stack is invoked:
FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) -> FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) ----> FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) ------> ClosureCleaner::clean(Object func, boolean checkSerializable)
Use the KafkaPartitioner object to construct a FlinkKafkaDelegatePartitioner object, and then check whether the object is serializable. The ClosureCleaner::clean function is a static function. If the KafkaPartitioner object is non-static, the ClosureCleaner::clean function cannot access the non-static member variable kafkaPartitioner in the KafkaDelegatePartitioner class. As a result, an error is reported.
Either of the following methods can be used to solve the problem:
- Change the KafkaPartitioner class into a static class.
- Use the FlinkKafkaProducer010 that contains FlinkKafkaPartitioner as a parameter to construct functions. In this case, FlinkKafkaDelegatePartitioner will not be constructed and an error related to a member variable can be avoided.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot