Updated on 2024-10-09 GMT+08:00

RocksDB State Backend of Flink Jobs

This topic is available for MRS 3.3.0 and later versions only.

Flink Job RocksDB

When RocksDB is enabled as the state backend for jobs, a large amount of state data causes poor read and write performance of RocksDB. You can perform the following operations to check whether the operator performance is affected by RocksDB:

  • On the ThreadDump of TaskManager, check whether the operator has been executed on the RocksDB operation interface for a long time. If the following information is displayed after multiple refreshes, the operator is executed on the RocksDB operation interface for a long time.
    Join[5] -> Calc[6] -> Sink: print[7] (1/1)#0" Id=113 RUNNABLE (in native)
    	at org.rocksdb.RocksDB.put(Native Method)
    	at org.rocksdb.RocksDB.put(RocksDB.java:955)
    	at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
  • Enable the flame graph (set rest.flamegraph.enabled to true) and submit the job again to view operator hotspots. Operator hotspots reach 100% in the figure below.
    Figure 1 Viewing operator hotspots in a flame graph

When the RockDB read/write latency is long, you can enable RocksDB monitoring and alarm reporting to optimize RockDB parameters based on the monitoring and alarm items. After job optimization, you are advised to disable RockDB monitoring and alarm reporting because they will deteriorate RocksDB performance by 5% to 10%.

To avoid impact on other jobs, RocksDB monitoring is configured by setting user-defined parameters. This section describes how to enable RocksDB monitoring, alarm reporting, and optimization parameters.

Enabling RocksDB Monitoring for Flink Jobs

  1. Log in to FusionInsight Manager as a user with the FlinkServer administrator rights.
  2. Choose Cluster > Services > Flink. In the Basic Information area, click the link next to Flink WebUI to access the Flink web UI.
  3. Click Job Management. The job management page is displayed.
  4. Locate the job that is to be optimized and is not in the Running state, and click Develop in the Operation column to go to the job development page.
  5. In the Custom Configuration area on the job development page, add the following parameters and save the settings:

    • Enabling RocksDB monitoring
      Table 1 RocksDB monitoring configuration

      Parameter

      Value

      Description

      state.backend.rocksdb.metrics.hot.enabled

      true

      Non-statistical monitoring of RocksDB includes the monitoring items contained in RocksDB Property.

      state.backend.rocksdb.metrics.statistics.enabled

      true

      RocksDB monitoring statistics

      state.backend.rocksdb.metrics.num-immutable-mem-table

      true

      Monitors the number of immutable memtables in RocksDB. If the value keeps increasing or exceeds the threshold, the write performance will be affected.

      state.backend.rocksdb.metrics.mem-table-flush-pending

      true

      Monitors the number of pending memtable flushes in RocksDB.

      state.backend.rocksdb.metrics.compaction-pending

      true

      Monitors the number of pending compactions in RocksDB. If there are any pending compactions, 1 is returned. Otherwise, 0 is returned.

      state.backend.rocksdb.metrics.background-errors

      true

      Monitors the number of metrics.background-errors in RocksDB.

      state.backend.rocksdb.metrics.cur-size-active-mem-table

      true

      Monitors the approximate size of the active memtable, in bytes.

      state.backend.rocksdb.metrics.cur-size-all-mem-tables

      true

      Monitors the approximate size of active and unflushed immutable memtables, in bytes.

      state.backend.rocksdb.metrics.size-all-mem-tables

      true

      Monitors the approximate size of active, unflushed, and pinned memtables, in bytes.

      state.backend.rocksdb.metrics.num-entries-active-mem-table

      true

      Monitors the total number of entries in active memtables.

      state.backend.rocksdb.metrics.num-entries-imm-mem-tables

      true

      Monitors the total number of entries in immutable memtables.

      state.backend.rocksdb.metrics.num-deletes-active-mem-table

      true

      Monitors the total number of deleted entries in active memtables.

      state.backend.rocksdb.metrics.num-deletes-imm-mem-tables

      true

      Monitors the total number of deleted entries in unflushed immutable memtables.

      state.backend.rocksdb.metrics.estimate-num-keys

      true

      Monitors the number of keys in RocksDB.

      state.backend.rocksdb.metrics.estimate-table-readers-mem

      true

      Monitors the memory used to read SST tables, excluding the memory used in the block cache (such as filters and index blocks), in bytes.

      state.backend.rocksdb.metrics.num-snapshots

      true

      Monitors the number of unpublished snapshots in the database.

      state.backend.rocksdb.metrics.num-live-versions

      true

      Monitors the number of real-time versions. A version is an internal data schema. If there are too many versions, RocksDB may fail to delete old versions due to query or compaction operations.

      state.backend.rocksdb.metrics.estimate-live-data-size

      true

      Monitors the real-time data volume, in bytes (usually smaller than the size of an SST file due to space amplification).

      state.backend.rocksdb.metrics.total-sst-files-size

      true

      Monitors the total size of SST files of all versions, in bytes. Too many files may slow down query.

      state.backend.rocksdb.metrics.live-sst-files-size

      true

      Monitors the total size of all SST files of the latest version, in bytes. Too many files may slow down query.

      state.backend.rocksdb.metrics.estimate-pending-compaction-bytes

      true

      Monitors the total size of compaction data, in bytes. This ensures that the size of compaction data at all levels is smaller than the target size, and other compactions beyond the levels are invalid.

      state.backend.rocksdb.metrics.num-running-compactions

      true

      Monitors the number of running compactions. If all threads are in the Running state, the write performance may be affected.

      state.backend.rocksdb.metrics.num-running-flushes

      true

      Monitors the number of running flush tasks. If all threads are in the Running state, the write performance may be affected.

      state.backend.rocksdb.metrics.actual-delayed-write-rate

      true

      Monitors the actual delayed write rate. If 0 is returned, there is no delay.

      state.backend.rocksdb.metrics.is-write-stopped

      true

      Monitors whether write to RocksDB is stopped. If the write is stopped, 1 is returned. Otherwise, 0 is returned.

      state.backend.rocksdb.metrics.block-cache-capacity

      true

      Monitors the block cache capacity.

      state.backend.rocksdb.metrics.block-cache-usage

      true

      Monitors the memory occupied by data in the block cache.

      state.backend.rocksdb.metrics.block-cache-pinned-usage

      true

      Monitors the memory occupied by pinned data in the block cache.

      state.backend.rocksdb.metrics.compression-ratio

      true

      Monitors the compression ratio of each layer.

      state.backend.rocksdb.metrics.compression-ratio-levelN

      7

      Number of layers whose compression ratio is to be monitored. The value must be at least 0 and not greater than the configured number of layers.

      state.backend.rocksdb.metrics.num-files

      true

      Monitors the number of files at each layer.

      state.backend.rocksdb.metrics.num-files-levelN

      7

      Number of layers whose file quantity is to be monitored. The value must be at least 0 and not greater than the configured number of layers.

      state.backend.rocksdb.metrics.statistics.ticker

      block.cache.miss,block.cache.hit,block.cache.index.miss,block.cache.index.hit,block.cache.filter.miss,block.cache.filter.hit,block.cache.data.miss,block.cache.data.hit,bloom.filter.useful,memtable.hit,memtable.miss,l0.hit,l1.hit,l2andup.hit,stall.micros

      Statistics ticker monitoring item

      To add a monitoring item, add it to the end of the value and separate monitoring items with commas (,).

      state.backend.rocksdb.metrics.statistics.histogram

      db.get.micros,db.write.micros,db.flush.micros,compaction.times.micros

      Statistics straight square monitoring item

      To add a monitoring item, add it to the end of the value and separate monitoring items with commas (,).

    • Enabling the RocksDB alarm function
      Table 2 RocksDB alarm configuration

      Configuration Item

      Default Value

      Description

      metrics.reporter.alarm.job.alarm.rocksdb.metrics.enable

      true

      Whether to enable RocksDB monitoring. This function is disabled by default. This configuration is valid only when the state backend is RocksDB.

      metrics.reporter.alarm.job.alarm.rocksdb.metrics.duration

      180s

      Interval for RocksDB to monitor alarms

      metrics.reporter.alarm.job.alarm.rocksdb.metrics.print.enabled

      true

      Whether to print RocksDB monitoring information to TaskManager

      If metrics.reporter.alarm.job.alarm.rocksdb.metrics.enable is set to true, this parameter is automatically set to true by default.

      metrics.reporter.alarm.job.alarm.rocksdb.metrics.print.interval

      5min

      Interval for printing RocksDB monitoring information to TaskManager

      metrics.reporter.alarm.job.alarm.rocksdb.get.micros.threshold

      1000

      Time threshold of the Get operation, in μs. If the time consumed by a job exceeds the threshold consecutively within the period specified by metrics.reporter.alarm.job.alarm.rocksdb.metrics.duration, an alarm is reported.

      metrics.reporter.alarm.job.alarm.rocksdb.write.micros.threshold

      3000

      Time threshold of the Write operation, in μs. If the time consumed by a job exceeds the threshold consecutively within the period specified by metrics.reporter.alarm.job.alarm.rocksdb.metrics.duration, an alarm is reported.

      metrics.reporter.alarm.job.alarm.actual-delayed-write-rate.threshold

      0

      If the write rate of a job is limited consecutively within the period specified by metrics.reporter.alarm.job.alarm.rocksdb.metrics.duration, an alarm is reported. The value 0 indicates that the write rate is not limited.

      metrics.reporter.alarm.job.alarm.rocksdb.background.jobs.multiplier

      2

      An alarm is reported when the number of flush or compaction requests exceeds the multiplier of state.backend.rocksdb.thread.num.

  6. On the Job Management page, click Start to run the job. Based on the RocksDB monitoring and alarm information, add the following parameters in the Custom Parameters area on the job development page to optimize the job. After job optimization is complete, you are advised to disable RocksDB monitoring and alarm reporting.

    Table 3 RocksDB optimization parameters

    Parameter

    Value

    Description

    state.backend.rocksdb.writebuffer.count

    2

    Number of active and immutable memtables. If the write speed is too fast or the number of Flink threads is too small, the write is blocked. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 4.

    It is recommended that the value be greater than or equal to the value of state.backend.rocksdb.writebuffer.number-to-merge plus 2.

    state.backend.rocksdb.writebuffer.size

    64MB

    Memtable size

    state.backend.rocksdb.thread.num

    2

    Number of RocksDB flush and compaction threads. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 4.

    state.backend.rocksdb.writebuffer.number-to-merge

    1

    Number of immutable flushes. Deduplication is performed when n immutable flushes are performed. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 3.

    state.backend.rocksdb.compaction.level.max-size-level-base

    256MB

    Total size of SSL files at level 1. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 1 GB.

    state.backend.rocksdb.compaction.level.target-file-size-base

    64MB

    Size of SSL files at level 1+. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 128 MB.

    state.backend.rocksdb.num_levels

    7

    Number of RocksDB levels

    state.backend.rocksdb.level0_slowdown_writes_trigger

    20

    Number of files that trigger slowdown at level 0. If the value is smaller than 0, slowdown will never be triggered.

    state.backend.rocksdb.level0_stop_writes_trigger

    36

    Maximum number of files that trigger stop at level 0

    state.backend.rocksdb.max_compaction_bytes

    -

    Maximum number of bytes in a compaction. The default value is the state.backend.rocksdb.compaction.level.target-file-size-base value x 25.

    state.backend.rocksdb.level0_file_num_compaction_trigger

    4

    Compaction from level 0 to level 1 is triggered when the number of Level 0 SSTs reaches the threshold.

    state.backend.rocksdb.compression

    snappy

    SST file compression algorithm

    The value can be null, snapp, zlib, bzip2, lz4, lz4hc, xpress, or zstd.

    state.backend.rocksdb.bottommost_compression

    snappy

    The bottom layer uses heavyweight compression types to reduce space. The underlying data may be cold data. To enable this function, you are advised to use zstd or zlib.

    The value can be null, snapp, zlib, bzip2, lz4, lz4hc, xpress, or zstd.

    state.backend.rocksdb.max_bytes_for_level_multiplier

    10

    Data volume multiplier factor of level 1 plus two adjacent layers

    state.backend.rocksdb.hard-pending-compaction-bytes-limit

    256GB

    When the pending compaction size exceeds the threshold, write operations are stopped.

    state.backend.rocksdb.soft-pending-compaction-bytes-limit

    64GB

    When the pending compaction size exceeds the threshold, the write traffic is limited.

    state.backend.rocksdb.use-bloom-filter

    true

    Bloom filter. After this function is enabled, each newly created SST file contains a Bloom filter.

    state.backend.rocksdb.block.cache-size

    8MB

    Cache size. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 256MB.

    state.backend.rocksdb.block.blocksize

    4KB

    Block size. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 128KB.

    state.backend.rocksdb.files.open

    -1

    Maximum number of opened handles, which is mainly used for SST file handles. The value -1 indicates that the number is not limited.