RocksDB State Backend of Flink Jobs
This topic is available for MRS 3.3.0 and later versions only.
Flink Job RocksDB
When RocksDB is enabled as the state backend for jobs, a large amount of state data causes poor read and write performance of RocksDB. You can perform the following operations to check whether the operator performance is affected by RocksDB:
- On the ThreadDump of TaskManager, check whether the operator has been executed on the RocksDB operation interface for a long time. If the following information is displayed after multiple refreshes, the operator is executed on the RocksDB operation interface for a long time.
Join[5] -> Calc[6] -> Sink: print[7] (1/1)#0" Id=113 RUNNABLE (in native) at org.rocksdb.RocksDB.put(Native Method) at org.rocksdb.RocksDB.put(RocksDB.java:955) at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:103)
- Enable the flame graph (set rest.flamegraph.enabled to true) and submit the job again to view operator hotspots. Operator hotspots reach 100% in the figure below.
Figure 1 Viewing operator hotspots in a flame graph
When the RockDB read/write latency is long, you can enable RocksDB monitoring and alarm reporting to optimize RockDB parameters based on the monitoring and alarm items. After job optimization, you are advised to disable RockDB monitoring and alarm reporting because they will deteriorate RocksDB performance by 5% to 10%.
To avoid impact on other jobs, RocksDB monitoring is configured by setting user-defined parameters. This section describes how to enable RocksDB monitoring, alarm reporting, and optimization parameters.
Enabling RocksDB Monitoring for Flink Jobs
- Log in to FusionInsight Manager as a user with the FlinkServer administrator rights.
- Choose Cluster > Services > Flink. In the Basic Information area, click the link next to Flink WebUI to access the Flink web UI.
- Click Job Management. The job management page is displayed.
- Locate the job that is to be optimized and is not in the Running state, and click Develop in the Operation column to go to the job development page.
- In the Custom Configuration area on the job development page, add the following parameters and save the settings:
- Enabling RocksDB monitoring
Table 1 RocksDB monitoring configuration Parameter
Value
Description
state.backend.rocksdb.metrics.hot.enabled
true
Non-statistical monitoring of RocksDB includes the monitoring items contained in RocksDB Property.
state.backend.rocksdb.metrics.statistics.enabled
true
RocksDB monitoring statistics
state.backend.rocksdb.metrics.num-immutable-mem-table
true
Monitors the number of immutable memtables in RocksDB. If the value keeps increasing or exceeds the threshold, the write performance will be affected.
state.backend.rocksdb.metrics.mem-table-flush-pending
true
Monitors the number of pending memtable flushes in RocksDB.
state.backend.rocksdb.metrics.compaction-pending
true
Monitors the number of pending compactions in RocksDB. If there are any pending compactions, 1 is returned. Otherwise, 0 is returned.
state.backend.rocksdb.metrics.background-errors
true
Monitors the number of metrics.background-errors in RocksDB.
state.backend.rocksdb.metrics.cur-size-active-mem-table
true
Monitors the approximate size of the active memtable, in bytes.
state.backend.rocksdb.metrics.cur-size-all-mem-tables
true
Monitors the approximate size of active and unflushed immutable memtables, in bytes.
state.backend.rocksdb.metrics.size-all-mem-tables
true
Monitors the approximate size of active, unflushed, and pinned memtables, in bytes.
state.backend.rocksdb.metrics.num-entries-active-mem-table
true
Monitors the total number of entries in active memtables.
state.backend.rocksdb.metrics.num-entries-imm-mem-tables
true
Monitors the total number of entries in immutable memtables.
state.backend.rocksdb.metrics.num-deletes-active-mem-table
true
Monitors the total number of deleted entries in active memtables.
state.backend.rocksdb.metrics.num-deletes-imm-mem-tables
true
Monitors the total number of deleted entries in unflushed immutable memtables.
state.backend.rocksdb.metrics.estimate-num-keys
true
Monitors the number of keys in RocksDB.
state.backend.rocksdb.metrics.estimate-table-readers-mem
true
Monitors the memory used to read SST tables, excluding the memory used in the block cache (such as filters and index blocks), in bytes.
state.backend.rocksdb.metrics.num-snapshots
true
Monitors the number of unpublished snapshots in the database.
state.backend.rocksdb.metrics.num-live-versions
true
Monitors the number of real-time versions. A version is an internal data schema. If there are too many versions, RocksDB may fail to delete old versions due to query or compaction operations.
state.backend.rocksdb.metrics.estimate-live-data-size
true
Monitors the real-time data volume, in bytes (usually smaller than the size of an SST file due to space amplification).
state.backend.rocksdb.metrics.total-sst-files-size
true
Monitors the total size of SST files of all versions, in bytes. Too many files may slow down query.
state.backend.rocksdb.metrics.live-sst-files-size
true
Monitors the total size of all SST files of the latest version, in bytes. Too many files may slow down query.
state.backend.rocksdb.metrics.estimate-pending-compaction-bytes
true
Monitors the total size of compaction data, in bytes. This ensures that the size of compaction data at all levels is smaller than the target size, and other compactions beyond the levels are invalid.
state.backend.rocksdb.metrics.num-running-compactions
true
Monitors the number of running compactions. If all threads are in the Running state, the write performance may be affected.
state.backend.rocksdb.metrics.num-running-flushes
true
Monitors the number of running flush tasks. If all threads are in the Running state, the write performance may be affected.
state.backend.rocksdb.metrics.actual-delayed-write-rate
true
Monitors the actual delayed write rate. If 0 is returned, there is no delay.
state.backend.rocksdb.metrics.is-write-stopped
true
Monitors whether write to RocksDB is stopped. If the write is stopped, 1 is returned. Otherwise, 0 is returned.
state.backend.rocksdb.metrics.block-cache-capacity
true
Monitors the block cache capacity.
state.backend.rocksdb.metrics.block-cache-usage
true
Monitors the memory occupied by data in the block cache.
state.backend.rocksdb.metrics.block-cache-pinned-usage
true
Monitors the memory occupied by pinned data in the block cache.
state.backend.rocksdb.metrics.compression-ratio
true
Monitors the compression ratio of each layer.
state.backend.rocksdb.metrics.compression-ratio-levelN
7
Number of layers whose compression ratio is to be monitored. The value must be at least 0 and not greater than the configured number of layers.
state.backend.rocksdb.metrics.num-files
true
Monitors the number of files at each layer.
state.backend.rocksdb.metrics.num-files-levelN
7
Number of layers whose file quantity is to be monitored. The value must be at least 0 and not greater than the configured number of layers.
state.backend.rocksdb.metrics.statistics.ticker
block.cache.miss,block.cache.hit,block.cache.index.miss,block.cache.index.hit,block.cache.filter.miss,block.cache.filter.hit,block.cache.data.miss,block.cache.data.hit,bloom.filter.useful,memtable.hit,memtable.miss,l0.hit,l1.hit,l2andup.hit,stall.micros
Statistics ticker monitoring item
To add a monitoring item, add it to the end of the value and separate monitoring items with commas (,).
state.backend.rocksdb.metrics.statistics.histogram
db.get.micros,db.write.micros,db.flush.micros,compaction.times.micros
Statistics straight square monitoring item
To add a monitoring item, add it to the end of the value and separate monitoring items with commas (,).
- Enabling the RocksDB alarm function
Table 2 RocksDB alarm configuration Configuration Item
Default Value
Description
metrics.reporter.alarm.job.alarm.rocksdb.metrics.enable
true
Whether to enable RocksDB monitoring. This function is disabled by default. This configuration is valid only when the state backend is RocksDB.
metrics.reporter.alarm.job.alarm.rocksdb.metrics.duration
180s
Interval for RocksDB to monitor alarms
metrics.reporter.alarm.job.alarm.rocksdb.metrics.print.enabled
true
Whether to print RocksDB monitoring information to TaskManager
If metrics.reporter.alarm.job.alarm.rocksdb.metrics.enable is set to true, this parameter is automatically set to true by default.
metrics.reporter.alarm.job.alarm.rocksdb.metrics.print.interval
5min
Interval for printing RocksDB monitoring information to TaskManager
metrics.reporter.alarm.job.alarm.rocksdb.get.micros.threshold
1000
Time threshold of the Get operation, in μs. If the time consumed by a job exceeds the threshold consecutively within the period specified by metrics.reporter.alarm.job.alarm.rocksdb.metrics.duration, an alarm is reported.
metrics.reporter.alarm.job.alarm.rocksdb.write.micros.threshold
3000
Time threshold of the Write operation, in μs. If the time consumed by a job exceeds the threshold consecutively within the period specified by metrics.reporter.alarm.job.alarm.rocksdb.metrics.duration, an alarm is reported.
metrics.reporter.alarm.job.alarm.actual-delayed-write-rate.threshold
0
If the write rate of a job is limited consecutively within the period specified by metrics.reporter.alarm.job.alarm.rocksdb.metrics.duration, an alarm is reported. The value 0 indicates that the write rate is not limited.
metrics.reporter.alarm.job.alarm.rocksdb.background.jobs.multiplier
2
An alarm is reported when the number of flush or compaction requests exceeds the multiplier of state.backend.rocksdb.thread.num.
- Enabling RocksDB monitoring
- On the Job Management page, click Start to run the job. Based on the RocksDB monitoring and alarm information, add the following parameters in the Custom Parameters area on the job development page to optimize the job. After job optimization is complete, you are advised to disable RocksDB monitoring and alarm reporting.
Table 3 RocksDB optimization parameters Parameter
Value
Description
state.backend.rocksdb.writebuffer.count
2
Number of active and immutable memtables. If the write speed is too fast or the number of Flink threads is too small, the write is blocked. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 4.
It is recommended that the value be greater than or equal to the value of state.backend.rocksdb.writebuffer.number-to-merge plus 2.
state.backend.rocksdb.writebuffer.size
64MB
Memtable size
state.backend.rocksdb.thread.num
2
Number of RocksDB flush and compaction threads. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 4.
state.backend.rocksdb.writebuffer.number-to-merge
1
Number of immutable flushes. Deduplication is performed when n immutable flushes are performed. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 3.
state.backend.rocksdb.compaction.level.max-size-level-base
256MB
Total size of SSL files at level 1. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 1 GB.
state.backend.rocksdb.compaction.level.target-file-size-base
64MB
Size of SSL files at level 1+. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 128 MB.
state.backend.rocksdb.num_levels
7
Number of RocksDB levels
state.backend.rocksdb.level0_slowdown_writes_trigger
20
Number of files that trigger slowdown at level 0. If the value is smaller than 0, slowdown will never be triggered.
state.backend.rocksdb.level0_stop_writes_trigger
36
Maximum number of files that trigger stop at level 0
state.backend.rocksdb.max_compaction_bytes
-
Maximum number of bytes in a compaction. The default value is the state.backend.rocksdb.compaction.level.target-file-size-base value x 25.
state.backend.rocksdb.level0_file_num_compaction_trigger
4
Compaction from level 0 to level 1 is triggered when the number of Level 0 SSTs reaches the threshold.
state.backend.rocksdb.compression
snappy
SST file compression algorithm
The value can be null, snapp, zlib, bzip2, lz4, lz4hc, xpress, or zstd.
state.backend.rocksdb.bottommost_compression
snappy
The bottom layer uses heavyweight compression types to reduce space. The underlying data may be cold data. To enable this function, you are advised to use zstd or zlib.
The value can be null, snapp, zlib, bzip2, lz4, lz4hc, xpress, or zstd.
state.backend.rocksdb.max_bytes_for_level_multiplier
10
Data volume multiplier factor of level 1 plus two adjacent layers
state.backend.rocksdb.hard-pending-compaction-bytes-limit
256GB
When the pending compaction size exceeds the threshold, write operations are stopped.
state.backend.rocksdb.soft-pending-compaction-bytes-limit
64GB
When the pending compaction size exceeds the threshold, the write traffic is limited.
state.backend.rocksdb.use-bloom-filter
true
Bloom filter. After this function is enabled, each newly created SST file contains a Bloom filter.
state.backend.rocksdb.block.cache-size
8MB
Cache size. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 256MB.
state.backend.rocksdb.block.blocksize
4KB
Block size. When SPINNING_DISK_OPTIMIZED_HIGH_MEM is enabled, the default value is 128KB.
state.backend.rocksdb.files.open
-1
Maximum number of opened handles, which is mainly used for SST file handles. The value -1 indicates that the number is not limited.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot