更新时间:2024-08-03 GMT+08:00

Flink应用性能调优建议

配置内存

Flink是依赖内存计算,计算过程中内存不够对Flink的执行效率影响很大。可以通过监控GC(Garbage Collection),评估内存使用及剩余情况来判断内存是否变成性能瓶颈,并根据情况优化。

监控节点进程的YARN的Container GC日志,如果频繁出现Full GC,需要优化GC。

GC的配置:在客户端的“conf/flink-conf.yaml”配置文件中,在“env.java.opts”配置项中添加参数:“-Xloggc:<LOG_DIR>/gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=20 -XX:GCLogFileSize=20M”。 此处默认已经添加GC日志。

  • 优化GC。

    调整老年代和新生代的比值。在客户端的“conf/flink-conf.yaml”配置文件中,在“env.java.opts”配置项中添加参数:“-XX:NewRatio”。如“ -XX:NewRatio=2”,则表示老年代与新生代的比值为2:1,新生代占整个堆空间的1/3,老年代占2/3。

  • 开发Flink应用程序时,优化DataStream的数据分区或分组操作。
    • 当分区导致数据倾斜时,需要考虑优化分区。
    • 避免非并行度操作,有些对DataStream的操作会导致无法并行,例如WindowAll。
    • keyBy尽量不要使用String。

设置并行度

并行度控制任务的数量,影响操作后数据被切分成的块数。调整并行度让任务的数量和每个任务处理的数据与机器的处理能力达到最优。

查看CPU使用情况和内存占用情况,当任务和数据不是平均分布在各节点,而是集中在个别节点时,可以增大并行度使任务和数据更均匀的分布在各个节点。增加任务的并行度,充分利用集群机器的计算能力。

任务的并行度可以通过以下四种层次(按优先级从高到低排列)指定,用户可以根据实际的内存、CPU、数据以及应用程序逻辑的情况调整并行度参数。

  • 算子层次

    一个算子、数据源和sink的并行度可以通过调用setParallelism()方法来指定,例如

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> text = [...]
    DataStream<Tuple2<String, Integer>> wordCounts = text
        .flatMap(new LineSplitter())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1).setParallelism(5);
    wordCounts.print();
    env.execute("Word Count Example");
  • 执行环境层次

    Flink程序运行在执行环境中。执行环境为所有执行的算子、数据源、data sink定义了一个默认的并行度。

    执行环境的默认并行度可以通过调用setParallelism()方法指定。例如:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(3);
    DataStream<String> text = [...]
    DataStream<Tuple2<String, Integer>> wordCounts = [...]
    wordCounts.print();
    env.execute("Word Count Example");
  • 客户端层次

    并行度可以在客户端将job提交到Flink时设定。对于CLI客户端,可以通过“-p”参数指定并行度。例如:

    ./bin/flink run -p 10 ../examples/*WordCount-java*.jar
  • 系统层次

    在系统级可以通过修改Flink客户端conf目录下“flink-conf.yaml”文件中的“parallelism.default”配置选项来指定所有执行环境的默认并行度。

配置进程参数

Flink on YARN模式下,有JobManager和TaskManager两种进程。在任务调度和运行的过程中,JobManager和TaskManager承担了很大的责任。

因而JobManager和TaskManager的参数配置对Flink应用的执行有着很大的影响意义。用户可通过如下操作对Flink集群性能做优化。

  1. 配置JobManager内存。

    JobManager负责任务的调度,以及TaskManager、RM之间的消息通信。当任务数变多,任务平行度增大时,JobManager内存都需要相应增大。

    您可以根据实际任务数量的多少,为JobManager设置一个合适的内存。

    • 在使用yarn-session命令时,添加“-jm MEM”参数设置内存。
    • 在使用yarn-cluster命令时,添加“-yjm MEM”参数设置内存。

  2. 配置TaskManager个数。

    每个TaskManager每个核同时能跑一个task,所以增加了TaskManager的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加TaskManager的个数,以提高运行效率。

    • 在使用yarn-session命令时,添加“-n NUM”参数设置TaskManager个数。
    • 在使用yarn-cluster命令时,添加“-yn NUM”参数设置TaskManager个数。

  3. 配置TaskManager Slot数。

    每个TaskManager多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用TaskManager的内存,所以要在内存和核数之间做好平衡。

    • 在使用yarn-session命令时,添加“-s NUM”参数设置SLOT数。
    • 在使用yarn-cluster命令时,添加“-ys NUM”参数设置SLOT数。

  4. 配置TaskManager内存。

    TaskManager的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加。

    • 将在使用yarn-session命令时,添加“-tm MEM”参数设置内存。
    • 将在使用yarn-cluster命令时,添加“-ytm MEM”参数设置内存。

设计分区方法

合理的设计分区依据,可以优化task的切分。在程序编写过程中要尽量分区均匀,这样可以实现每个task数据不倾斜,防止由于某个task的执行时间过长导致整个任务执行缓慢。

以下是几种分区方法。

  • 随机分区:将元素随机的进行分区。
    dataStream.shuffle();
  • Rebalancing (Round-robin partitioning)基于round-robin对元素进行分区,使得每个分区负载均衡。对于存在数据倾斜的性能优化是很有用的。
    dataStream.rebalance();
  • Rescaling以round-robin的形式将元素分区到下游操作的子集中。如果你想要将数据从一个源的每个并行实例中散发到一些mappers的子集中,用来分散负载,但是又不想要完全的rebalance 介入(引入`rebalance()`),这会非常有用。
    dataStream.rescale();
  • 广播:广播每个元素到所有分区。
    dataStream.broadcast();
  • 自定义分区:使用一个用户自定义的Partitioner对每一个元素选择目标task,由于用户对自己的数据更加熟悉,可以按照某个特征进行分区,从而优化任务执行。

    简单示例如下所示:

    // fromElements构造简单的Tuple2流
    DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("hello",1), Tuple2.of("test",2), Tuple2.of("world",100));
    // 定义用于分区的key值,返回即属于哪个partition的,该值加1就是对应的子任务的id号
    Partitioner<Tuple2<String, Integer>> strPartitioner = new Partitioner<Tuple2<String, Integer>>() {
        @Override
        public int partition(Tuple2<String, Integer> key, int numPartitions) {
            return (key.f0.length() + key.f1) % numPartitions;
        }
    };
    // 使用Tuple2进行分区的key值
    dataStream.partitionCustom(strPartitioner, new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
            return value;
        }
    }).print();

配置netty网络通信

Flink通信主要依赖netty网络,所以在Flink应用执行过程中,netty的设置尤为重要,网络通信的好坏直接决定着数据交换的速度以及任务执行的效率。

以下配置均可在客户端的“conf/flink-conf.yaml”配置文件中进行修改适配,默认已经是相对较优解,请谨慎修改,防止性能下降。

  • “taskmanager.network.netty.num-arenas”: 默认是“taskmanager.numberOfTaskSlots”,表示netty的域的数量。
  • “taskmanager.network.netty.server.numThreads”和“taskmanager.network.netty.client.numThreads”:默认是“taskmanager.numberOfTaskSlots”,表示netty的客户端和服务端的线程数目设置。
  • “taskmanager.network.netty.client.connectTimeoutSec”:默认是120s,表示taskmanager的客户端连接超时的时间。
  • “taskmanager.network.netty.sendReceiveBufferSize”:默认是系统缓冲区大小(cat /proc/sys/net/ipv4/tcp_[rw]mem) ,一般为4MB,表示netty的发送和接收的缓冲区大小。
  • “taskmanager.network.netty.transport”:默认为“nio”方式,表示netty的传输方式,有“nio”和“epoll”两种方式。

经验总结

数据倾斜

当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。

  • 需要重新设计key,以更小粒度的key使得task大小合理化。
  • 修改并行度。
  • 调用rebalance操作,使数据分区均匀。

缓冲区超时设置

  • 由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。
  • 当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。

    示例可以参考如下:

    env.setBufferTimeout(timeoutMillis);
    env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);