更新时间:2025-12-10 GMT+08:00
分享

使用Spark BulkLoad工具同步数据到HBase表中

操作场景

Spark BulkLoad工具支持快速同步Hive或Spark表数据到HBase表中,支持全量或增量导入ORC/PARQUET等格式的表数据。

约束与限制

使用Spark BulkLoad同步数据类型数据到HBase表中时,存在以下限制:

  • 数据类型转换的对应关系请参见表1。默认模式下,日期类型会被先转换为String类型,再存储到HBase中; 数字类型、字符串类型、布尔类型均会直接转为byte数组存储到HBase中,解析数据时,请将byte数组直接转换为对应类型,同时需要注意判断空值。MRS 3.6.0-LTS及之后版本,开启强制转String功能后,所有类型数据均会先转换为String类型,再转换成byte数组存储到HBase中,该功能存在一定风险,需经过评估后再开启,参数说明请参见表2中的fts参数介绍
  • 不建议将含有Struct、Map和Seq三种复杂类型的表数据直接同步到HBase表中,这些类型无法直接转换为byte数组,会先被转为String,再存储到HBase中,可能会导致无法还原数据。

该章节内容仅适用于MRS 3.5.0及之后版本。

表1 数据类型转换对应关系

Hive/Spark表

默认模式

开启数据强制转String功能(MRS 3.6.0-LTS 及之后版本)

开启数据强制转String功能(MRS 3.6.0-LTS 及之后版本)

HBase表

解析方式

HBase表

解析方式

TINYINT

Byte

byte[]取第一个值

String

Bytes.toString(byte[])

SMALLINT

Short

Bytes.toShort(byte[])

String

Bytes.toString(byte[])

INT/INTEGER

Integer

Bytes.toInt(byte[])

String

Bytes.toString(byte[])

BIGINT

Long

Bytes.toLong(byte[], int, int)

String

Bytes.toString(byte[])

FLOAT

Float

Bytes.toFloat(byte[])

String

Bytes.toString(byte[])

DOUBLE

Double

Bytes.toDouble(byte[])

String

Bytes.toString(byte[])

DECIMAL/NUMERIC

BigDecimal

Bytes.toBigDecimal(byte[])

String

Bytes.toString(byte[])

TIMESTAMP

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

DATE

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

STRING

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

VARCHAR

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

CHAR

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

BOOLEAN

Boolean

Bytes.toBoolean(byte[])

String

Bytes.toString(byte[])

BINARY

byte[]

无需解析

String

Bytes.toString(byte[])

ARRAY

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

MAP

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

STRUCT

String

Bytes.toString(byte[])

String

Bytes.toString(byte[])

前提条件

  • 集群安装了Spark及Hive服务。
  • 执行数据导入的用户需要同时具有Spark(对应源表的SELECT权限)、HBase权限(对应HBase NameSpace的RWXA权限)和HDFS权限(对应HFile输出目录的读写权限)。
  • 如果集群已启用Kerberos认证(安全模式),需修改Spark“客户端安装目录/Spark/spark/conf/spark-defaults.conf”配置文件中的“spark.yarn.security.credentials.hbase.enabled”参数值为“true”。

Spark BulkLoad命令介绍

Spark BulkLoad数据同步工具命令格式如下:

spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool [-cf <arg>] [-comp <arg>] [-enc <arg>] -op <arg> -rc <arg> [-rn <arg>] [-sp <arg>] -sql <arg> [-sr] -tb <arg>

还支持在提交命令中指定executor数量、内存、CPU等实现资源控制,例如,提交时指定以下参数:

--driver-memory=20G --num-executors=10 --executor-memory=4G --executor-cores=2

支持配置的其他参数请参见表2

表2 Spark BulkLoad命令参数介绍

参数

参数说明

--jars

用于指定“protobuf-java-2.5.0.jar”文件所在路径和HBase客户端配置文件所在路径。HBase客户端配置文件所在路径为“客户端安装目录/HBase/hbase/conf”。

-sql,--export-sql <arg>

导出数据SQL设置。从Hive/Spark表读取数据时,设置该参数可自行过滤部分无需同步的数据。

-rc,--rowkey-columns <arg>

指定源表中组成HBase Rowkey的列,如果有多个列,请使用逗号分隔。

SQL错误导致查询异常、查询数据为空、数据重复都会导致Spark BulkLoad任务执行失败,请保证SQL的正确性,以及Rowkey字段对应的数据组合不会出现重复。

-sp,--rowkey-separator <arg>(可选)

使用多个列值组合Rowkey时,用于设置字段值分隔符,默认值为“#”,拼接完成后整体作为Rowkey。

该分隔符仅支持单个字符,同时需要确保该字符不会出现在Rowkey字段数据中,否则会导致无法从Rowkey中解析对应的列值。

使用多个列作为组合Rowkey时,最终生成的数据Rowkey会以该字符作为分隔符,解析Rowkey时需要先获取分隔符的位置,再进行拆分转换,例如:

分隔符为“#”、Rowkey由两个列组合而成(不开启强制转String功能),对应的Rowkey关系如表3所示,解析代码示例如下:

// 先找到分隔符位置
int idx = Bytes.indexOf(row, "#".getBytes(StandardCharsets.UTF_8)[0]);
// 分割Rowkey、转换数据类型
byte[] aBytes = ArrayUtils.subarray(row, 0, idx);
String aStr = Bytes.toString(aBytes);
byte[] bBytes = ArrayUtils.subarray(row, idx + 1, row.length);
Integer bInt = bBytes == null ? null : Bytes.toInt(bBytes);

-tb,--table <arg>

用于设置目标HBase表,如果目标表不存在,则会进行采样并创建目标表。

-op,--output-path

用于设置HFile文件输出路径,最终导出的HFile会在该目录下的一个临时目录中,导入成功后会被清除。

如果开启了HDFS联邦,HFile文件输出路径必须和需要导入数据的HBase在同一个NameService中。

例如:HDFS目录挂载情况如表4所示,如果HBase服务目录挂载在NS1上,则Spark Bulkload工具输出路径必须挂载在NS1上,因此,可以指定输出路径在“/tmpns1”目录下。

-rn,--region-nums <arg>

用于设置目标HBase Region个数,目标表不存在时,会使用该参数值预分区目标表,默认值为“100”。

建议根据源表需要导出的数据量来评估Region个数,估算方式如下:

源表大小(3副本) * 源表解压膨胀率 * HBase数据膨胀率(可估计为10)/ 单个Region上限(通常为10GB)/ 压缩及编码压缩率

请根据实际业务需求进行评估,例如,源表采用ORC格式存储,占用空间100GB,源表解压膨胀率可估计为5,目标表采样SNAPPY压缩以及FAST_DIFF编码,压缩率可以估计为3,得出最少Region数量为:100 * 5 * 10 / 10 / 3 ≈ 167。

如果后续还需要执行增量同步数据,可以设置Region数为200。

-cf,--column-family <arg>(可选)

用于指定要导入的目标HBase表的列族名,如果目标表存在,但是该列族不存在,会添加该列族;如果目标表不存在,则会在HBase中创建一个列族为该参数值的表,默认列族为“info”。

-comp,--compression <arg>

用于设置目标HBase表的压缩格式,目前支持SNAPPY、NONE、GZ;如果目标表不存在,则会在HBase中创建一个压缩格式为该参数值的表,默认压缩格式为“SNAPPY”。

-enc,--block-encoding <arg>

用于设置目标HBase表的DATA BLOCK编码,目前支持NONE、PREFIX、DIFF、FAST_DIFF和ROW_INDEX_V1;目标表不存在时,会在HBase中创建一个DATA BLOCK编码为该参数值的表,默认值为“FAST_DIFF”。

-sr,--skip-store-rowcol

是否跳过存储Rowkey对应的列,默认会把Rowkey列冗余存储到HBase表中,当Rowkey组合比较复杂时,可避免解析Rowkey,如果需要节省存储占用,可以添加此参数。

-sm,--sampling-multiple <arg>

用于设置采样倍数,执行采样时,可以划分为更多的区间,单个Region下最多生成该参数值个文件,用于提升工具性能。

注意:该值越大,生成的HFile越多,会导致HBase compaction压力增大,该参数取值范围为[1,10],默认值为“1”,建议根据实际资源情况进行设置。

-fts,--force-to-str

MRS 3.6.0-LTS及之后版本,可选参数,指定该参数时,源表数据会先被转为String类型,后存储到HBase表。

注意:

开启此参数后,会带来以下问题:

  • 无法支持使用原始类型的过滤器进行数据筛选。
  • 数据存储膨胀,HBase表占用空间将更大。
表3 组合Rowkey示例

A列(String)

B列(int)

数据Rowkey

a

1

a#1

b

null

b#

表4 HDFS目录挂载示例

全局路径

目标NameService

目标路径

/hbase

NS1

/hbase

/tmp

hacluster

/tmp

/tmpns1

NS1

/tmpns1

操作步骤

  1. 以客户端安装用户,登录安装客户端的节点。
  2. 执行以下命令切换到客户端目录

    cd 客户端安装目录

  3. 执行以下命令配置环境变量

    source bigdata_env

  4. 如果当前集群已启用Kerberos认证(安全模式),执行以下命令认证当前用户

    kinit 组件业务用户

    如果当前集群未启用Kerberos认证(普通模式),则执行以下命令设置Hadoop用户名:

    export HADOOP_USER_NAME=hbase

  5. 进入Spark客户端目录,执行如下命令,同步数据到HBase目标表中

    cd Spark/spark/bin

    例如,执行以下命令同步test.orc_table表的所有数据到HBase的test:orc_table表中,使用id+uuid组合作为rowkey列,输出路径指定为“/tmp/orc_table”:

    spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar -sql "select * from test.orc_table" -tb "test:orc_table" -rc "id,uuid" -op "/tmp/orc_table"

相关文档