使用Spark BulkLoad工具同步数据到HBase表中
Spark BulkLoad工具支持快速同步Hive或Spark表数据到HBase表中,支持全量或增量导入ORC/PAQUET等格式的表数据。
使用Spark BulkLoad同步数据类型数据到HBase表中时,存在以下限制:
- 数据类型转换的对应关系请参见表1。日期类型会被先转换为String类型,再存储到HBase中; 数字类型、字符串类型、布尔类型均会直接转为byte数组存储到HBase中,解析数据时,请将byte数组直接转换为对应类型,同时需要注意判断空值。
- 不建议将含有Strcuct、Map和Seq三种复杂类型的表数据直接同步到HBase表中,这些类型无法直接转换为byte数组,会先被转为String,再存储到HBase中,可能会导致无法还原数据。
该章节内容仅适用于MRS 3.5.0及之后版本。
Hive/Spark表 |
HBase表 |
解析方式 |
---|---|---|
TINYINT |
Byte |
byte[]取第一个值 |
SMALLINT |
Short |
Bytes.toShort(byte[]) |
INT/INTEGER |
Integer |
Bytes.toInt(byte[]) |
BIGINT |
Long |
Bytes.toLong(byte[], int, int) |
FLOAT |
Float |
Bytes.toFloat(byte[]) |
DOUBLE |
Double |
Bytes.toDouble(byte[]) |
DECIMAL/NUMERIC |
BigDecimal |
Bytes.toBigDecimal(byte[]) |
TIMESTAMP |
String |
Bytes.toString(byte[]) |
DATE |
String |
Bytes.toString(byte[]) |
STRING |
String |
Bytes.toString(byte[]) |
VARCHAR |
String |
Bytes.toString(byte[]) |
CHAR |
String |
Bytes.toString(byte[]) |
BOOLEAN |
Boolean |
Bytes.toBoolean(byte[]) |
BINARY |
byte[] |
无需解析 |
ARRAY |
String |
Bytes.toString(byte[]) |
MAP |
String |
Bytes.toString(byte[]) |
STRUCT |
String |
Bytes.toString(byte[]) |
前提条件
- 集群安装了Spark及Hive服务。
- 执行数据导入的用户需要同时具有Spark(对应源表的SELECT权限)、HBase权限(对应HBase NameSpace的RWXA权限)和HDFS权限(对应HFile输出目录的读写权限)。
- 如果集群已启用Kerberos认证(安全模式),需修改Spark“客户端安装目录/Spark/spark/conf/spark-defaults.conf”配置文件中的“spark.yarn.security.credentials.hbase.enabled”参数值为“true”。
Spark BulkLoad命令介绍
Spark BulkLoad数据同步工具命令格式如下:
spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool [-cf <arg>] [-comp <arg>] [-enc <arg>] -op <arg> -rc <arg> [-rn <arg>] [-sp <arg>] -sql <arg> [-sr] -tb <arg>
支持配置的其他参数如下:
- -sql,--export-sql <arg>
导出数据SQL设置。从Hive/Spark表读取数据时,设置该参数可自行过滤部分无需同步的数据。
- -rc,--rowkey-columns <arg>
指定源表中组成HBase Rowkey的列,如果有多个列,请使用逗号分隔。
SQL错误导致查询异常、查询数据为空、数据重复都会导致Spark BulkLoad任务执行失败,请保证SQL的正确性,以及Rowkey字段对应的数据组合不会出现重复。
- -sp,--rowkey-separator <arg>
可选参数,使用多个列值组合Rowkey时,字段值分隔符,默认值为“#”,拼接完成后整体作为Rowkey。
该分隔符仅支持单个字符,同时需要确保该字符不会出现在Rowkey字段数据中,否则会导致无法从Rowkey中解析对应的列值。使用多个列作为组合Rowkey时,最终生成的数据Rowkey会以该字符作为分隔符,解析Rowkey时需要先获取分隔符的位置,再进行拆分转换,例如:
分隔符为“#”、Rowkey由两个列组合而成,对应的Rowkey关系如表2所示,解析代码示例如下:
// 先找到分隔符位置 int idx = Bytes.indexOf(row, "#".getBytes(StandardCharsets.UTF_8)[0]); // 分割Rowkey、转换数据类型 byte[] aBytes = ArrayUtils.subarray(row, 0, idx); String aStr = Bytes.toString(aBytes); byte[] bBytes = ArrayUtils.subarray(row, idx + 1, row.length); Integer bInt = bBytes == null ? null : Bytes.toInt(bBytes);
- -tb,--table <arg>
目标HBase表,若目标表不存在时,会进行采样并创建目标表。
- -op,--output-path
HFile文件输出路径,最终导出的HFile会在该目录下的一个临时目录中,导入成功后会被清除。
如果开启了HDFS联邦,HFile文件输出路径必须和需要导入数据的HBase在同一个NameService中。
例如:HDFS目录挂载情况如表3所示,如果HBase服务目录挂载在NS1上,则Spark Bulkload工具输出路径必须挂载在NS1上,因此,可以指定输出路径在“/tmpns1”目录下。
- -rn,--region-nums <arg>
目标HBase Region个数,目标表不存在时,会使用该参数值预分区目标表,默认值为“100”。
建议根据源表需要导出的数据量来评估Region个数,估算方式如下:
源表大小(3副本) * 原表解压膨胀率 * HBase数据膨胀率(可估计为10)/ 单个Region上限(通常为10GB)/ 压缩及编码压缩率
请根据实际业务需求进行评估,例如,源表采用ORC格式存储,占用空间100GB,原表解压膨胀率可估计为5,目标表采样SNAPPY压缩以及FAST_DIFF编码,压缩率可以估计为3,得出最少Region数量为:100 * 5 * 10 / 10 / 3 ≈ 167。如果后续还需要执行增量同步数据,可以设置Region数为200。
- -cf,--column-family <arg>
可选参数,指定要导入的目标HBase表的列族名,如果目标表存在,但是该列族不存在,会添加该列族;如果目标表不存在,则会在HBase中创建一个列族为该参数值的表,默认列族为“info”。
- -comp,--compression <arg>
可选参数,目标HBase表的压缩格式,目前支持SNAPPY、NONE、ZSTD、GZ;如果目标表不存在,则会在HBase中创建一个压缩格式为该参数值的表,默认压缩格式为“SNAPPY”。
- -enc,--block-encoding <arg>
可选参数,目标HBase表的DATA BLOCK编码,目前支持NONE、PREFIX、DIFF、FAST_DIFF和ROW_INDEX_V1;目标表不存在时,会在HBase中创建一个DATA BLOCK编码为该参数值的表,默认值为“FAST_DIFF”。
- -sr,--skip-store-rowcol
可选参数,是否跳过存储Rowkey对应的列,默认会把Rowkey列冗余存储到HBase表中,当Rowkey组合比较复杂时,可避免解析Rowkey,如果需要节省存储占用,可以添加此参数。
- -sm,--sampling-multiple <arg>
可选参数,用于设置采样倍数,执行采样时,可以划分为更多的区间,单个Region下最多生成该参数值个文件,用于提升工具性能。
注意:该值越大,生成的HFile越多,会导致HBase compaction压力增大,该参数取值范围为[1,10],默认值为“1”,建议根据实际资源情况进行设置。
操作步骤
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令切换到客户端目录。
cd 客户端安装目录
- 执行以下命令配置环境变量。
source bigdata_env
- 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户
kinit 组件业务用户
如果当前集群未启用Kerberos认证,则执行以下命令设置Hadoop用户名:
export HADOOP_USER_NAME=hbase
- 进入Spark客户端目录,执行如下命令,同步数据到HBase目标表中。
cd Spark/spark/bin
例如,执行以下命令同步test.orc_table表的所有数据到HBase的test:orc_table表中,使用id+uuid组合作为rowkey列,输出路径指定为“/tmp/orc_table”:
spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar -sql "select * from test.orc_table" -tb "test:orc_table" -rc "id,uuid" -op "/tmp/orc_table"