更新时间:2022-12-14 GMT+08:00

数据序列化

操作场景

Spark支持两种方式的序列化 :

  • Java原生序列化JavaSerializer
  • Kryo序列化KryoSerializer

序列化对于Spark应用的性能来说,具有很大的影响。在特定的数据格式的情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,而对于一些Int之类的基本类型数据,性能的提升就几乎可以忽略。

KryoSerializer依赖Twitter的Chill库来实现,相对于JavaSerializer,主要的问题在于不是所有的Java Serializable对象都能支持,兼容性不好,所以需要手动注册类。

序列化功能用在两个地方:序列化任务和序列化数据。Spark任务序列化只支持JavaSerializer,数据序列化支持JavaSerializer和KryoSerializer。

操作步骤

Spark程序运行时,在shuffle和RDD Cache等过程中,会有大量的数据需要序列化,默认使用JavaSerializer,通过配置让KryoSerializer作为数据序列化器来提升序列化性能。

在开发应用程序时,添加如下代码来使用KryoSerializer作为数据序列化器。

  • 实现类注册器并手动注册类。
    package com.etl.common;
    
    import com.esotericsoftware.kryo.Kryo;
    import org.apache.spark.serializer.KryoRegistrator; 
    
    public class DemoRegistrator implements KryoRegistrator
    {
        @Override
        public void registerClasses(Kryo kryo)
        {
            //以下为示例类,请注册自定义的类
            kryo.register(AggrateKey.class);
            kryo.register(AggrateValue.class);
        }
    }

    您可以在Spark客户端对spark.kryo.registrationRequired参数进行配置,设置是否需要Kryo注册序列化。

    当参数设置为true时,如果工程中存在未被序列化的类,则会抛出异常。如果设置为false(默认值),Kryo会自动将未注册的类名写到对应的对象中。此操作会对系统性能造成影响。设置为true时,用户需手动注册类,针对未序列化的类,系统不会自动写入类名,而是抛出异常,相对比false,其性能较好。

  • 配置KryoSerializer作为数据序列化器和类注册器。
    val conf = new SparkConf()
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.kryo.registrator", "com.etl.common.DemoRegistrator")