文档首页 > > 开发指南> Flink应用开发> FAQ> 为什么非static的KafkaPartitioner类对象去构造FlinkKafkaProducer010,运行时会报错?

为什么非static的KafkaPartitioner类对象去构造FlinkKafkaProducer010,运行时会报错?

分享
更新时间: 2019/09/06 GMT+08:00

问题

Flink内核升级到1.3.0之后,当kafka调用带有非static的KafkaPartitioner类对象为参数的FlinkKafkaProducer010去构造函数时,运行时会报错。

报错内容如下:

org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaPartitioner is not serializable. The object probably contains or references non serializable fields.

回答

Flink的1.3.0版本,为了兼容原有那些使用KafkaPartitioner的API接口,如FlinkKafkaProducer010带KafkaPartitioner对象的构造函数,增加了FlinkKafkaDelegatePartitioner类。

该类定义了一个成员变量,即kafkaPartitioner:

private final KafkaPartitioner<T> kafkaPartitioner;

当Flink传入参数是KafkaPartitioner去构造FlinkKafkaProducer010时,调用栈如下:

FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner)
->  FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner)
---->  FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner)
------>  ClosureCleaner::clean(Object func, boolean checkSerializable)

首先使用KafkaPartitioner对象去构造一个FlinkKafkaDelegatePartitioner对象,然后再检查该对象是否可序列化。由于ClosureCleaner::clean函数是static函数,当用例中的KafkaPartitioner对象是非static时,ClosureCleaner::clean函数无法访问KafkaDelegatePartitioner类内的非static成员变量kafkaPartitioner,导致报错。

解决方法如下,两者任选其一:

  • 将KafkaPartitioner类改成static类。
  • 改用以FlinkKafkaPartitioner为参数的FlinkKafkaProducer010构造函数,内部实现不会去构造FlinkKafkaDelegatePartitioner,也就不会存在成员变量的问题。
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

智能客服提问云社区提问