Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using Flink/ Flink Performance Tuning/ Separate Storage of Cold and Hot Data for Flink Job State Backend
Updated on 2024-10-09 GMT+08:00

Separate Storage of Cold and Hot Data for Flink Job State Backend

This topic is available for MRS 3.3.0 or later only.

In wide table joins, each table contains a large number of fields. State backends hold a large volume of data and the processing speed is severely lowered. To solve this problem, you can enable tiered storage for hot-cold separation.

Procedure

  1. Install the client that contains services such as Flink and HBase. The installation path is for example, /opt/hadoopclient.
  2. Log in to the node where the client is installed as the client installation user and copy all configuration files in the /opt/client/HBase/hbase/conf/ directory of HBase to an empty directory of all nodes where FlinkServer is deployed, for example, /tmp/client/HBase/hbase/conf/.

    Change the owner of the configuration file directory and its upper-layer directory on the FlinkServer node to omm.

    chown omm: /tmp/client/HBase/ -R

    • FlinkServer nodes:

      Log in to FusionInsight Manager, choose Cluster > Services > Flink > Instances, and check the Service IP Address of FlinkServer.

    • If the node where a FlinkServer instance is deployed is the node where the HBase client is installed, skip this step on this node.

  3. Log in to Manager and choose Cluster > Services > Flink. Click Configurations then All Configurations, search for the HBASE_CONF_DIR parameter, and enter the FlinkServer directory (for example, /tmp/client/HBase/hbase/conf/) to which the HBase configuration files are copied in 2 in Value.
  4. After the parameters are configured, click Save. After confirming the modification, click OK.
  5. Click Instances, select all FlinkServer instances, choose More > Restart Instance, enter the password, and click OK to restart the instances.
  6. Log in to FusionInsight Manager as a user with the FlinkServer administrator rights.
  7. Choose Cluster > Services > Flink. In the Basic Information area, click the link next to Flink WebUI to access the Flink web UI.
  8. Click Job Management. The job management page is displayed.
  9. Locate the job that is to be optimized and is not in the Running state, and click Develop in the Operation column to go to the job development page.
  10. In the Custom Parameters area on the Job Development page, add the following parameters as required and save the settings. For details about hot data (regularly used data), see Table 1. For details about cold data (data that is not required often), see Table 2.

    Table 1 RocksDB state backend storage

    Parameter

    Description

    Example Value

    table.exec.state.cold.enabled

    Whether to enable RocksDB that stores hot and cold data separately

    • false (default value): Periodic dynamic scaling is disabled.
    • true: Periodic dynamic scaling is enabled.

    false

    state.backend.rocksdb.cold.localdir

    Directory for storing cold data

    -

    state.backend.rocksdb.cold.predefined-options

    Predefined configuration of cold data RocksDB:

    • DEFAULT (default value): RocksDB disk is not written forcibly. You are advised to use this value.
    • SPINNING_DISK_OPTIMIZED_HIGH_MEM: Parameters for optimizing RocksDB disk write. Flink job recovery does not depend on RocksDB, so you are not advised to use the current configuration.

    DEFAULT

    state.backend

    State backend storage medium. Set this parameter to rocksdb.

    rocksdb

    Table 2 HBase serves as the state backend storage for level-2 cold data

    Parameter

    Description

    Example Value

    table.exec.state.cold.enabled

    Whether to enable tiered storage for hot and cold data

    • false (default value): Periodic dynamic scaling is disabled.
    • true: Periodic dynamic scaling is enabled.

    false

    state.backend.cold

    State backend storage for cold data. Currently, only hbase is supported.

    hbase

    table.exec.state.ttl

    Timeout interval for data status changes

    • If table.exec.state.cold.enabled is true, this parameter indicates when hot data expires. When hot data is stored longer than the value, it becomes cold data.
    • If table.exec.state.cold.enabled is false, all expired data will be deleted.
    • Default value: 0, indicating that the data never expires.

    0

    state.backend.hbase.zookeeper.quorum

    ZooKeeper connection address used to access HBase. Format: Service IP address of the ZooKeeper quorumpeer instance:ZooKeeper client port ,Service IP address of the ZooKeeper quorumpeer instance:ZooKeeper client port ,Service IP address of the ZooKeeper quorumpeer instance:ZooKeeper client port

    192.168.10.10:24002,192.168.10.11:24002,192.168.10.12:24002

    state.backend

    State backend storage medium. Set this parameter to rocksdb.

    rocksdb

    • IP address of the ZooKeeper quorumpeer instance

      To obtain IP addresses of all ZooKeeper quorumpeer instances, log in to FusionInsight Manager and choose Cluster > Services > ZooKeeper. On the displayed page, click Instance and view the IP addresses of all the hosts where the quorumpeer instances locate.

    • Port number of the ZooKeeper client

      Log in to FusionInsight Manager and choose Cluster > Service > ZooKeeper. On the displayed page, click Configurations and check the value of clientPort.