Help Center/ MapReduce Service/ Developer Guide (Normal_Earlier Than 3.x)/ Flink Development Guide/ FAQs About Flink Application Development/ Why Does Non-static KafkaPartitioner Class Object Fail to Construct FlinkKafkaProducer010?
Updated on 2024-08-16 GMT+08:00

Why Does Non-static KafkaPartitioner Class Object Fail to Construct FlinkKafkaProducer010?

Issue

After the Flink kernel is upgraded to 1.3.0, an error is reported when Kafka calls the FlinkKafkaProducer010 that contains the non-static KafkaPartitioner class object as the parameter to construct a function.

The error message is as follows:

org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaPartitioner is not serializable. The object probably contains or references non serializable fields.

Answer

The FlinkKafkaDelegatePartitioner class has been added to Flink 1.3.0 so that Flink can be compatible with APIs that use KafkaPartitioner, for example, FlinkKafkaProducer010 that contains the KafkaPartitioner class object, to construct functions.

The FlinkKafkaDelegatePartitioner class defines the member variable kafkaPartitioner.

private final KafkaPartitioner<T> kafkaPartitioner;

When Flink transfers KafkaPartitioner as a parameter to construct FlinkKafkaProducer010, the following stack is invoked:

FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner)
->  FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner)
---->  FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner)
------>  ClosureCleaner::clean(Object func, boolean checkSerializable)

Use the KafkaPartitioner object to construct a FlinkKafkaDelegatePartitioner object, and then check whether the object is serializable. The ClosureCleaner::clean function is a static function. If the KafkaPartitioner object is non-static, the ClosureCleaner::clean function cannot access the non-static member variable kafkaPartitioner in the KafkaDelegatePartitioner class. As a result, an error is reported.

Either of the following methods can be used to solve the problem:

  • Change the KafkaPartitioner class into a static class.
  • Use the FlinkKafkaProducer010 that contains FlinkKafkaPartitioner as a parameter to construct functions. In this case, FlinkKafkaDelegatePartitioner will not be constructed and an error related to a member variable can be avoided.