为什么非static的KafkaPartitioner类对象去构造FlinkKafkaProducer010,运行时会报错?
问题
Flink内核升级到1.3.0之后,当kafka调用带有非static的KafkaPartitioner类对象为参数的FlinkKafkaProducer010去构造函数时,运行时会报错。
报错内容如下:
org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaPartitioner is not serializable. The object probably contains or references non serializable fields.
回答
Flink的1.3.0版本,为了兼容原有那些使用KafkaPartitioner的API接口,如FlinkKafkaProducer010带KafkaPartitioner对象的构造函数,增加了FlinkKafkaDelegatePartitioner类。
该类定义了一个成员变量,即kafkaPartitioner:
private final KafkaPartitioner<T> kafkaPartitioner;
当Flink传入参数是KafkaPartitioner去构造FlinkKafkaProducer010时,调用栈如下:
FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) -> FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) ----> FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) ------> ClosureCleaner::clean(Object func, boolean checkSerializable)
首先使用KafkaPartitioner对象去构造一个FlinkKafkaDelegatePartitioner对象,然后再检查该对象是否可序列化。由于ClosureCleaner::clean函数是static函数,当用例中的KafkaPartitioner对象是非static时,ClosureCleaner::clean函数无法访问KafkaDelegatePartitioner类内的非static成员变量kafkaPartitioner,导致报错。
解决方法如下,两者任选其一:
- 将KafkaPartitioner类改成static类。
- 改用以FlinkKafkaPartitioner为参数的FlinkKafkaProducer010构造函数,内部实现不会去构造FlinkKafkaDelegatePartitioner,也就不会存在成员变量的问题。