Updated on 2022-09-14 GMT+08:00

Colocation

Function Description

Colocation is to store associated data or data on which associated operations are performed on the same storage node. The HDFS Colocation feature stores files on which associated operations are performed on the same data node so that data can be obtained from the same data node during associated operations. This greatly reduces network bandwidth consumption.

Before using the Colocation function, you are advised to be familiar with the internal mechanisms of Colocation, including:

Colocation node allocation principle

Capacity expansion and Colocation allocation

Colocation and data node capacity

  • Colocation node allocation principle

    Colocation allocates data nodes to locators evenly according to the allocation node quantity.

    The allocation algorithm principle is as follows: Colocation queries all locators, reads the data nodes allocated to the locators, and records the number of times. Based on the number of times, Colocation sorts the data nodes. The data nodes that are rarely used are placed at the beginning and selected first. The count increase by 1 each time after a node is selected. The nodes are shorted again, and the subsequent node will be selected.

  • Capacity expansion and Colocation allocation
    After cluster capacity expansion, you can select one of the following two policies shown in Table 1 to balance the usage of data nodes and ensure that the allocation frequency of the newly added nodes is consistent with that of the old data nodes.
    Table 1 Allocation policies

    No.

    Policy

    Description

    1

    Delete the original locators and create new locators for all data nodes in the cluster.

    1. The original locator before the capacity expansion evenly uses all data nodes. After the capacity expansion, the newly added nodes are not allocated to existing locators, so Colocation stores data only to the old data nodes.
    2. Data nodes are allocated to specific locators. Therefore, after capacity expansion, Colocation needs to reallocated data nodes to locators.

    2

    Create new locators and plan the data storage mode again.

    The old locators use the old data nodes, while the newly created locators mainly use the new data nodes. Therefore, locators need to be planned again based on the actual service requirements on data.

    Generally, you are advised to use the policy to reallocate data nodes to locators after capacity expansion to prevent data from being stored only to the new data nodes.

  • Colocation and data node capacity

    When Colocation is used to store data, the data is stored to the data node of a specified locator. If no locator planning is performed, the data node capacity will be uneven. Table 2 summarizes the two usage principles to ensure even data node capacity.

    Table 2 Usage Principle

    No.

    Usage Principle

    Description

    1

    All the data nodes are used in the same frequency in locators.

    Assume that there are N data nodes, the number of locators must an integral multiple of N (N, 2 N, ...).

    2

    A proper data storage plan must be made for all locators to ensure that data is evenly stored in the locators.

    None

During HDFS secondary development, you can obtain the DFSColocationAdmin and DFSColocationClient instances to create groups, delete groups, write files, and delete files in or from the location.

  • When the Colocation function is enabled and users specify DataNodes, the data volume will be large on some nodes. Serious data skew will result in HDFS data write failures.
  • Because of data skew, MapReduce accesses only several nodes. In this case, the load is heavy on these nodes, while other nodes are idle.
  • For a single application task, the DFSColocationAdmin and DFSColocationClient instances can be used only once. If the instances are used for many time, excessive HDFS links will be created and use up HDFS resources.
  • If you need to perform the balance operation for a file uploaded by colocation, you can set the oi.dfs.colocation.file.pattern parameter on FusionInsight Manager to the file path to avoid invalid colocation. If there are multiple files, use commas (,) to separate the file paths, for example, /test1,/test2.
  • Colocation stores associated data or data on which associated operations are performed on the same storage. When Balancer- or Mover-related operations are performed, data blocks will be moved. As a result, the Colocation function becomes unavailable. Therefore, when using the Colocation function, you are advised to set the HDFS configuration item dfs.datanode.block-pinning.enabled to true. In this case, files written by Colocation will not be moved when Balancer- or Mover-related operations are performed in the cluster, ensuring file colocation.

Example Codes

For complete example codes, see com.huawei.bigdata.hdfs.examples.ColocationExample.

  • Specify a user to run a Colocation project. This user must be a member of the supergroup group.
  • When the Colocation project is run, the HDFS parameter fs.defaultFS cannot be set to viewfs://ClusterX.
  1. Initialization

    The user to run the Colocation project must be specified before the running.

    private static void init() throws IOException {
        // //Set the user. If HADOOP_USER_NAME is not set for the use, use USER.
        if (System.getenv("HADOOP_USER_NAME") == null && System.getProperty("HADOOP_USER_NAME") == null) {
            System.setProperty("HADOOP_USER_NAME", USER);
        }
    }
  2. Obtain instances.

    Example: Colocation operations require the DFSColocationAdmin and DFSColocationClient instances. Therefore, the instances must be obtained before operations, such as creating a group.

      dfsAdmin = new DFSColocationAdmin(conf);
      dfs = new DFSColocationClient();
      dfs.initialize(URI.create(conf.get("fs.defaultFS")), conf);
  3. Create a group.

    Example: Create a gid01 group, which contains three locators.

    /**
     * create group
     * 
     * @throws java.io.IOException
     */
    private static void createGroup() throws IOException {
        dfsAdmin.createColocationGroup(COLOCATION_GROUP_GROUP01,
                Arrays.asList(new String[] { "lid01", "lid02", "lid03" }));
    }
  4. Write data into a file. The related group must be created before writing data into the file.
    Example: Write data into the testfile.txt file.
    /**
     * create and write file
     * 
     * @throws java.io.IOException
     */
    private static void put() throws IOException {
        FSDataOutputStream out = dfs.create(new Path(TESTFILE_TXT), true, COLOCATION_GROUP_GROUP01, "lid01");
        // the data to be written to the hdfs.
        byte[] readBuf = "Hello World".getBytes("UTF-8");
        out.write(readBuf, 0, readBuf.length);
        out.close();
    }
  5. Delete a file.
    Example: Delete the testfile.txt file.
     /**
     * delete file
     *
     * @throws java.io.IOException
     */
    @SuppressWarnings("deprecation")
    private static void delete() throws IOException {
        dfs.delete(new Path(TESTFILE_TXT));
    }
  6. Delete a group.
    Example: Delete gid01.
    /**
     * delete group
     * 
     * @throws java.io.IOException
     */
    private static void deleteGroup() throws IOException {
        dfsAdmin.deleteColocationGroup(COLOCATION_GROUP_GROUP01);
    }