Help Center/ MapReduce Service/ Component Operation Guide (LTS)/ Using HBase/ Enterprise-Class Enhancements of HBase/ Using the Spark BulkLoad Tool to Synchronize Data to HBase Tables
Updated on 2024-12-13 GMT+08:00

Using the Spark BulkLoad Tool to Synchronize Data to HBase Tables

To quickly synchronize Hive or Spark table data to HBase tables, you can use the Spark BulkLoad tool. It also allows you to import full or incremental data in ORC/PAQUET format.

Pay attention to the following when using the Spark BulkLoad tool:

  • For details about data type conversion, see Table 1. The date type is converted to the string type before being stored in HBase. The number, string, and Boolean types are directly converted to byte arrays and stored in HBase. The system converts the byte arrays to the corresponding types during data parsing and check whether the values are null.
  • Do not directly synchronize table data of the Struct, Map, and Seq types to HBase tables. These types cannot be converted to byte arrays, and will be converted to strings instead, which may fail to be restored.

This topic is available for MRS 3.5.0 and later versions only.

Table 1 Data type conversion relationship

Hive/Spark table

HBase Table

Parsing Mode

TINYINT

Byte

Returns the first value in byte[].

SMALLINT

Short

Bytes.toShort(byte[])

INT/INTEGER

Integer

Bytes.toInt(byte[])

BIGINT

Long

Bytes.toLong(byte[], int, int)

FLOAT

Float

Bytes.toFloat(byte[])

DOUBLE

Double

Bytes.toDouble(byte[])

DECIMAL/NUMERIC

BigDecimal

Bytes.toBigDecimal(byte[])

TIMESTAMP

String

Bytes.toString(byte[])

DATE

String

Bytes.toString(byte[])

STRING

String

Bytes.toString(byte[])

VARCHAR

String

Bytes.toString(byte[])

CHAR

String

Bytes.toString(byte[])

BOOLEAN

Boolean

Bytes.toBoolean(byte[])

BINARY

byte[]

No need to parse.

ARRAY

String

Bytes.toString(byte[])

MAP

String

Bytes.toString(byte[])

STRUCT

String

Bytes.toString(byte[])

Prerequisites

  • The Spark and Hive services have been installed in the cluster.
  • The user who imports data must have the Spark permission (the SELECT permission of the source table), HBase permission (the RWXA permission of the HBase NameSpace), and HDFS permission (the read and write permission of the HFile output directory).
  • If Kerberos authentication is enabled for the cluster (the cluster is in security mode), set the value of spark.yarn.security.credentials.hbase.enabled to true in the Spark client installation directory/Spark/spark/conf/spark-defaults.conf configuration file.

Spark BulkLoad Commands

The command format is as follows:

spark-submit --master yarn --deploy-mode cluster --jars Client installation directory/HBase/hbase/lib/protobuf-java-2.5.0.jar, Client installation directory/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool Client installation directory/HBase/hbase/lib/hbase-it-bulk-load-*.jar com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool[-cf <arg>] [-comp <arg>] [-enc <arg>] -op <arg> -rc <arg> [-rn <arg>] [-sp <arg>] -sql <arg> [-sr] -tb <arg>

  • --jars specifies the path of the protobuf-java-2.5.0.jar file and the path of the HBase client configuration file. The HBase client configuration file is stored in Client installation directory/HBase/hbase/conf.
  • The number of executors, memory, and CPU can be specified in the command for resource control. For example, the following parameters can be specified during command submission:

    --driver-memory=20G --num-executors=10 --executor-memory=4G --executor-cores=2

Other parameters that can be configured are as follows:

  • -sql,--export-sql <arg>

    Sets the SQL statements for exporting data. When reading data from Hive/Spark tables, you can set this parameter to filter out data that does not need to be synchronized.

  • -rc,--rowkey-columns <arg>
    Specifies the columns that compose the HBase Rowkey in the source table. If there are multiple columns, separate them with commas (,).

    The Spark BulkLoad task will fail if the query is abnormal due to incorrect SQL statements, non-exist target data, or duplicate data. Ensure that the SQL statements are correct and the data combination corresponding to Rowkey fields is unique.

  • -sp,--rowkey-separator <arg>

    (Optional) Specifies the separator between field values when multiple column values are used as a rowkey. The default value is #. The values are concentrated to form the rowkey.

    The separator can contain only one character. Avoid the characters used by the Rowkey field value to prevent failures in parsing column values. A composite rowkey (data rowkey) consists of multiple columns separated by the specified separator. To parse this rowkey, the separator must be located to split the rowkey splitting and converts data types. For example:

    A rowkey consists of two columns separated by a number sign (#). Table 2 shows the corresponding relationship. The code for parsing is as follows:

    // Locate the separator.
    int idx = Bytes.indexOf(row, "#".getBytes(StandardCharsets.UTF_8)[0]);
    // Split the Rowkey and convert data types.
    byte[] aBytes = ArrayUtils.subarray(row, 0, idx);
    String aStr = Bytes.toString(aBytes);
    byte[] bBytes = ArrayUtils.subarray(row, idx + 1, row.length);
    Integer bInt = bBytes == null ? null : Bytes.toInt(bBytes);
    Table 2 Composite rowkey example

    Column A (String)

    Column B (int)

    Data Rowkey

    a

    1

    a#1

    b

    null

    b#

  • -tb,--table <arg>

    Specifies the target HBase table. If the target table does not exist, sampling will be performed and the target table will be created.

  • -op,--output-path

    Specifies the output path of HFiles. The exported HFiles are stored in a temporary directory in this directory and will be deleted after successful import.

    If HDFS federation is enabled, the HFile output path and the HBase to which data is to be imported must be in the same NameService.

    Table 3 shows an example of mounted HDFS directory. If the HBase service directory is mounted to NS1, the output path of the Spark Bulkload tool must be mounted to NS1. You can specify the output path to the /tmpns1 directory.

    Table 3 HDFS directory examples

    Global Directory

    Target NameService

    Object Directory

    /hbase

    NS1

    /hbase

    /tmp

    hacluster

    /tmp

    /tmpns1

    NS1

    /tmpns1

  • -rn,--region-nums <arg>

    Specifies the number of target HBase regions. If the target table does not exist, this parameter value will be used to pre-partition the target table. The default value is 100.

    Evaluate the number of regions based on the amount of data to be exported from the source table. The estimation method is as follows:

    Size of the source table (three copies) x Decompression rate of the source table x HBase data expansion rate (estimated to 10)/Upper limit of a single region (usually 10 GB)/Compression and encoding compression rate

    Evaluate the number of regions. For example, if the source table is stored in ORC format and occupies 100 GB, the decompression expansion rate of the source table can be to 5. If sampled data is SNAPPY-compressed and FAST_DIFF-encoded in the target table, the compression rate can be 3. The minimum number of regions is: 100 x 5 x 10/10/3 ≈ 167. If you need to perform incremental data synchronization later, you can set the number of regions to 200.

  • -cf,--column-family <arg>

    (Optional) Specifies the column family name of the target HBase table to which data is to be imported. If the column family does not exist, data synchronization will fail. If the target table does not exist, a table that contains this column family will be created in HBase. The default column family is info.

  • -comp,--compression <arg>

    (Optional) Specifies the compression format of the target HBase table. Currently, SNAPPY, NONE, ZSTD, and GZ are supported. If the target table does not exist, a table using the specified compression format will be created in HBase. The default compression format is SNAPPY.

  • -enc,--block-encoding <arg>

    (Optional) Specifies the data block encoding mode of the target HBase table. Currently, NONE, PREFIX, DIFF, FAST_DIFF, and ROW_INDEX_V1 are supported. If the target table does not exist, a table using the DATA BLOCK encoding mode will be created in HBase. The default value is FAST_DIFF.

  • -sr,--skip-store-rowcol

    (Optional) Specifies whether to skip the columns corresponding to the Rowkey. By default, the Rowkey columns are redundantly stored in the HBase table. This parameter allows you to reduce the storage usage by skipping Rowkey parsing when it consists of multiple columns.

  • -sm,--sampling-multiple <arg>

    (Optional) Specified the maximum number of files can be generated in a single region to improve the tool performance. There can be more ranges during sampling.

    Note: A larger value indicates more generated HFiles, which increases the HBase compaction pressure. The value range is [1,10]. The default value is 1. You are advised to set this parameter based on actual resources.

Procedure

  1. Log in to the node where the client is installed as the client installation user.
  2. Go to the client directory.

    cd Client installation directory

  3. Configure environment variables.

    source bigdata_env

  4. If Kerberos authentication is enabled for the cluster, authenticate the user.

    kinit Component service user

    If Kerberos authentication is disabled for the cluster, set the Hadoop username.

    export HADOOP_USER_NAME=hbase

  5. Go to the Spark client directory and synchronize data to the target HBase table.

    cd Spark/spark/bin

    For example, run the following command to synchronize all data in the test.orc_table table to the test:orc_table table of HBase, use id+uuid as the rowkey column, and set the output path to /tmp/orc_table:

    spark-submit --master yarn --deploy-mode cluster --jars Client installation directory/HBase/hbase/lib/protobuf-java-2.5.0.jar, Client installation directory/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool Client installation directory/HBase/hbase/lib/hbase-it-bulk-load-*.jar -sql "select * from test.orc_table" -tb "test:orc_table" -rc "id,uuid" -op "/tmp/orc_table"