文档首页/ MapReduce服务 MRS/ 组件操作指南(LTS版)/ 使用HBase/ HBase企业级能力增强/ 自研Spark BulkLoad工具/ 使用Global Index Spark BulkLoad工具同步数据到HBase全局二级索引表
更新时间:2025-12-10 GMT+08:00
分享

使用Global Index Spark BulkLoad工具同步数据到HBase全局二级索引表

Global Spark BulkLoad工具支持快速同步Hive或Spark表数据到HBase全局二级索引表中,支持全量或增量导入ORC/PARQUET等格式的表数据。

使用Global Index Spark BulkLoad同步数据到HBase表中时,存在以下限制:

  • 数据类型转换的对应关系请参见表1。日期类型会被先转换为String类型,再存储到HBase中; 数字类型、字符串类型、布尔类型均会直接转换为byte数组存储到HBase中,解析数据时,请将byte数组直接转换为对应类型,同时需要注意判断空值。开启强制转String功能后,所有类型数据均会先转成String类型,再转换成byte数组存储到HBase中,该功能存在一定风险,需经过评估后再开启,参数说明请参见fts参数介绍
  • 不建议将含有Strcuct、Map和Seq三种复杂类型的表数据直接同步到HBase表中,这些类型无法直接转换为byte数组,会先被转为String,再存储到HBase中,可能会导致无法还原数据。
  • 索引列类型转换的对应关系请见表2,目前索引列数据类型仅支持STRING、INTEGER、FLOAT、LONG、DOUBLE、SHORT、BYTE,不建议使用其他类型的列作为索引列,其他类型会被转换为String类型。
  • 不支持索引更新操作,并且要求导出的数据中不能包含目标中已有的数据(即相同Rowkey的数据),否则会导致残留无用索引数据,引发索引与数据表不一致问题。
  • 导出数据时,要求Rowkey列和索引列必须存在(允许为空值)。
  • 增量导出数据时,不支持为已有的数据创建新索引,如需创建新索引,请使用GlobalTableIndexer工具新建索引,并全量生成对应的索引数据后再进行导出。
  • 增量导出数据时目标表必须包含全局二级索引,且至少存在一个可用索引(索引状态为ACTIVEUNUSABLEBUILDING)。
  • 导出数据时,不建议修改目标表的索引(包含修改状态、新增索引、删除索引),否则可能会导致任务失败或索引表与数据表不一致。
  • 该章节操作仅适用于MRS 3.6.0-LTS及之后版本。
表1 数据类型转换对应关系

原始数据类型

默认模式

开启数据强制转String功能

Hive/Spark表

HBase表

解析方式

HBase表

解析方式

TINYINT

Byte

byte[]取第一个值

String

Bytes.toString(byte[])

SMALLINT

Short

Bytes.toShort(byte[])

String

Bytes.toString(byte[])

INT/INTEGER

Integer

Bytes.toInt(byte[])

String

Bytes.toString(byte[])

BIGINT

Long

Bytes.toLong(byte[], int, int)

String

Bytes.toString(byte[])

FLOAT

Float

Bytes.toFloat(byte[])

String

Bytes.toString(byte[])

DOUBLE

Double

Bytes.toDouble(byte[])

String

Bytes.toString(byte[])

DECIMAL/NUMERIC

BigDecimal

Bytes.toBigDecimal(byte[])

String

Bytes.toString(byte[])

TIMESTAMP

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

DATE

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

STRING

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

VARCHAR

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

CHAR

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

BOOLEAN

Boolean

Bytes.toBoolean(byte[])

String

Bytes.toString(byte[])

BINARY

byte[]

无需解析

String

Bytes.toString(byte[])

ARRAY

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

MAP

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

STRUCT

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

表2 原始数据类型与索引列类型对应关系

原始Hive/Spark表数据类型

默认模式对应的HBase索引列类型

开启数据强制转String功能对应的HBase索引列类型

TINYINT

BYTE

STRING

SMALLINT

SHORT

STRING

INT/INTEGER

INTEGER

STRING

BIGINT

LONG

STRING

FLOAT

FLOAT

STRING

DOUBLE

DOUBLE

STRING

DECIMAL/NUMERIC

STRING

STRING

TIMESTAMP

STRING

STRING

DATE

STRING

STRING

STRING

STRING

STRING

VARCHAR

STRING

STRING

CHAR

STRING

STRING

BOOLEAN

STRING

STRING

BINARY

STRING

STRING

ARRAY

STRING

STRING

MAP

STRING

STRING

STRUCT

STRING

STRING

前提条件

  • 集群安装了Spark及Hive服务。
  • 执行数据导入的用户需要同时具有Spark(对应源表的SELECT权限)、HBase权限(对应HBase NameSpace的RWXA权限)和HDFS权限(对应HFile输出目录的读写权限)。
  • 如果集群已启用Kerberos认证(安全模式),需修改Spark“客户端安装目录/Spark/spark/conf/spark-defaults.conf”配置文件中的“spark.yarn.security.credentials.hbase.enabled”参数值为“true”。

Global Index Spark BulkLoad命令介绍

Global Index Spark BulkLoad数据同步工具命令格式如下:

spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/lib/hbase-global-index-*.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar com.huawei.hadoop.hbase.tools.bulkload.GlobalIndexSparkBulkLoadTool [-cf <arg>] [-comp <arg>] [-enc <arg>] [-fts] [-ic <arg>] [-icaf <arg>] [-icf <arg>] [-ita <arg>] -op <arg> -rc <arg> [-rn <arg>] [-sm <arg>] [-sp <arg>] -sql <arg> [-sr] -tb <arg>

  • --jars用于指定“protobuf-java-2.5.0.jar”文件所在路径和HBase客户端配置文件所在路径。HBase客户端配置文件所在路径为“客户端安装目录/HBase/hbase/conf”。
  • 支持在提交命令中指定executor数量、内存、CPU等实现资源控制,例如,提交时指定以下参数:

    --driver-memory=20G --num-executors=10 --executor-memory=4G --executor-cores=2

支持配置的其他参数如下:

  • -sql,--export-sql <arg>

    导出数据SQL设置。从Hive/Spark表读取数据时,设置该参数可自行过滤部分无需同步的数据。

  • -rc,--rowkey-columns <arg>
    指定源表中组成HBase Rowkey的列,如果有多个列,请使用逗号分隔。

    SQL错误导致查询异常、查询数据为空、数据重复都会导致Spark BulkLoad任务执行失败,请保证SQL的正确性,以及Rowkey字段对应的数据组合不会出现重复。

  • -sp,--rowkey-separator <arg>

    可选参数,使用多个列值组合Rowkey时,字段值分隔符,默认值为“#”,拼接完成后整体作为Rowkey。

    该分隔符仅支持单个字符,同时需要确保该字符不会出现在Rowkey字段数据中,否则会导致无法从Rowkey中解析对应的列值。使用多个列作为组合Rowkey时,最终生成的数据Rowkey会以该字符作为分隔符,解析Rowkey时需要先获取分隔符的位置,再进行拆分转换,例如:

    分隔符为“#”、Rowkey由两个列组合而成(不开启强制转String功能),对应的Rowkey关系如表3所示,解析代码示例如下:

    // 先找到分隔符位置
    int idx = Bytes.indexOf(row, "#".getBytes(StandardCharsets.UTF_8)[0]);
    // 分割Rowkey、转换数据类型
    byte[] aBytes = ArrayUtils.subarray(row, 0, idx);
    String aStr = Bytes.toString(aBytes);
    byte[] bBytes = ArrayUtils.subarray(row, idx + 1, row.length);
    Integer bInt = bBytes == null ? null : Bytes.toInt(bBytes);
    表3 组合Rowkey示例

    A列(String)

    B列(int)

    数据Rowkey

    a

    1

    a#1

    b

    null

    b#

  • -tb,--table <arg>

    目标HBase表,若目标表不存在时,会进行采样并创建目标表。

  • -op,--output-path

    HFile文件输出路径,最终导出的HFile会在该目录下的一个临时目录中,导入成功后会被清除。

    如果开启了HDFS联邦,HFile文件输出路径必须和需要导入数据的HBase在同一个NameService中。

    例如:HDFS目录挂载情况如表4所示,如果HBase服务目录挂载在NS1上,则Spark Bulkload工具输出路径必须挂载在NS1上,因此,可以指定输出路径在“/tmpns1”目录下。

    表4 HDFS目录挂载示例

    全局路径

    目标NameService

    目标路径

    /hbase

    NS1

    /hbase

    /tmp

    hacluster

    /tmp

    /tmpns1

    NS1

    /tmpns1

  • -rn,--region-nums <arg>

    目标HBase Region个数,目标表不存在时,会使用该参数值预分区目标表,默认值为“100”。

    建议根据源表需要导出的数据量来评估Region个数,估算方式如下:

    源表大小(3副本) * 源表解压膨胀率 * HBase数据膨胀率(可估计为10)/ 单个Region上限(通常为10GB)/ 压缩及编码压缩率

    请根据实际业务需求进行评估,例如,源表采用ORC格式存储,占用空间100GB,源表解压膨胀率可估计为5,目标表采样SNAPPY压缩以及FAST_DIFF编码,压缩率可以估计为3,得出最少Region数量为:100 * 5 * 10 / 10 / 3 ≈ 167。如果后续还需要执行增量同步数据,可以设置Region数为200。

  • -cf,--column-family <arg>

    可选参数,指定要导入的目标HBase表的列族名,如果目标表存在,但是该列族不存在,会添加该列族;如果目标表不存在,则会在HBase中创建一个列族为该参数值的表,默认列族为“info”。

  • -comp,--compression <arg>

    可选参数,目标HBase表的压缩格式,目前支持SNAPPY、NONE、GZ;如果目标表不存在,则会在HBase中创建一个压缩格式为该参数值的表,默认压缩格式为“SNAPPY”。

  • -enc,--block-encoding <arg>

    可选参数,目标HBase表的DATA BLOCK编码,目前支持NONE、PREFIX、DIFF、FAST_DIFF和ROW_INDEX_V1;目标表不存在时,会在HBase中创建一个DATA BLOCK编码为该参数值的表,默认值为“FAST_DIFF”。

  • -sr,--skip-store-rowcol

    可选参数,是否跳过存储Rowkey对应的列,默认会把Rowkey列冗余存储到HBase表中,当Rowkey组合比较复杂时,可避免解析Rowkey,如果需要节省存储占用,可以添加此参数。

  • -sm,--sampling-multiple <arg>

    可选参数,用于设置采样倍数,执行采样时,可以划分为更多的区间,单个Region下最多生成该参数值个文件,用于提升工具性能。

    注意:该值越大,生成的HFile越多,会导致HBase compaction压力增大,该参数取值范围为[1,10],默认值为“1”,建议根据实际资源情况进行设置。

  • -ita,--indexspecs-to-add <arg>

    可选参数,仅用于全量导出时使用,用于指定要创建的索引的索引列,SQL查询的结果必须包含索引列(可以为空值),否则无法导出数据。参数值格式为“索引1=>索引列1, 索引列2#索引2=>索引列3”,索引分隔符为“#”,索引名与索引列分隔符为“=>”,索引列分隔符为“,”。例如,要创建两个索引idx1idx2,索引列分别是col1col2,则参数值为“idx1=>col1#idx2=>col2”。

    不建议索引列中包含Rowkey列,该场景下,必须冗余存储Rowkey列到HBase数据表,命令中不允许携带-sr参数,会导致冗余存储。

  • -ic,--indexspecs-covered <arg>

    可选参数,仅用于全量导出时使用,用于指定要创建的索引的覆盖列,该参数值必须与-ita参数中指定的索引相匹配。参数值格式为“索引1=>覆盖列1, 覆盖列2#索引2=>覆盖列3”,索引分隔符为“#”,索引名与索引列分隔符为“=>”,覆盖列分隔符为“,”。例如,要创建两个索引idx1idx2,其中idx1为覆盖索引,覆盖列为col3,则参数值为“idx1=>col3”。

  • -icf,--indexspecs-covered-family <arg>

    可选参数,仅用于全量导出时使用,用于指定要创建的索引是否覆盖-cf指定的列族,该参数值必须与-ita参数中指定的索引相匹配。参数值格式为“索引1#索引2”,索引分隔符为“#”。例如,要创建两个索引idx1idx2,都需要覆盖指定的列族,则参数值为“idx1#idx2”。

  • -icaf,--indexspecs-covered-all-family <arg>

    可选参数,仅用于全量导出时使用,用于指定要创建的索引是否覆盖所有列族,该参数值必须与-ita参数中指定的索引相匹配。参数值格式为“索引1#索引2”,索引分隔符为“#”。例如,要创建两个索引idx1idx2,其中idx1需要覆盖所有列族,则参数值为“idx1”。

  • -fts,--force-to-str

    可选参数,指定该参数时,源表数据会先被转为String类型,后存储到HBase表。

    开启此参数后,会带来以下问题:

    • 无法支持使用原始类型的过滤器进行数据筛选。
    • 数据存储膨胀,HBase表占用空间将更大。

操作步骤

  1. 以客户端安装用户,登录安装客户端的节点。
  2. 执行以下命令切换到客户端目录。

    cd 客户端安装目录

  3. 执行以下命令配置环境变量。

    source bigdata_env

  4. 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户

    kinit 组件业务用户

    如果当前集群未启用Kerberos认证,则执行以下命令设置Hadoop用户名:

    export HADOOP_USER_NAME=hbase

  5. 进入Spark客户端目录,执行如下命令,创建全局二级索引,同时同步数据到目标表中。

    cd Spark/spark/bin

    例如,执行以下命令同步test.orc_table表的所有数据到HBase的test:orc_table表中,使用id+uuid组合作为Rowkey列,输出路径指定为“/tmp/orc_table”:

    spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/lib/hbase-global-index-*.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.GlobalIndexSparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar -sql "select * from test.orc_table" -tb "test:orc_table" -rc "id,uuid" -op "/tmp/orc_table" -ita "idx1=>str_col1" -icaf "idx1"

相关文档