更新时间:2024-11-26 GMT+08:00
分享

使用Spark BulkLoad工具同步数据到HBase表中

Spark BulkLoad工具支持快速同步Hive或Spark表数据到HBase表中,支持全量或增量导入ORC/PAQUET等格式的表数据。

使用Spark BulkLoad同步数据类型数据到HBase表中时,存在以下限制:

  • 数据类型转换的对应关系请参见表1。日期类型会被先转换为String类型,再存储到HBase中; 数字类型、字符串类型、布尔类型均会直接转为byte数组存储到HBase中,解析数据时,请将byte数组直接转换为对应类型,同时需要注意判断空值。
  • 不建议将含有Strcuct、Map和Seq三种复杂类型的表数据直接同步到HBase表中,这些类型无法直接转换为byte数组,会先被转为String,再存储到HBase中,可能会导致无法还原数据。

该章节内容仅适用于MRS 3.5.0及之后版本。

表1 数据类型转换对应关系

Hive/Spark表

HBase表

解析方式

TINYINT

Byte

byte[]取第一个值

SMALLINT

Short

Bytes.toShort(byte[])

INT/INTEGER

Integer

Bytes.toInt(byte[])

BIGINT

Long

Bytes.toLong(byte[], int, int)

FLOAT

Float

Bytes.toFloat(byte[])

DOUBLE

Double

Bytes.toDouble(byte[])

DECIMAL/NUMERIC

BigDecimal

Bytes.toBigDecimal(byte[])

TIMESTAMP

String

Bytes.toString(byte[])

DATE

String

Bytes.toString(byte[])

STRING

String

Bytes.toString(byte[])

VARCHAR

String

Bytes.toString(byte[])

CHAR

String

Bytes.toString(byte[])

BOOLEAN

Boolean

Bytes.toBoolean(byte[])

BINARY

byte[]

无需解析

ARRAY

String

Bytes.toString(byte[])

MAP

String

Bytes.toString(byte[])

STRUCT

String

Bytes.toString(byte[])

前提条件

  • 集群安装了Spark及Hive服务。
  • 执行数据导入的用户需要同时具有Spark(对应源表的SELECT权限)、HBase权限(对应HBase NameSpace的RWXA权限)和HDFS权限(对应HFile输出目录的读写权限)。
  • 如果集群已启用Kerberos认证(安全模式),需修改Spark“客户端安装目录/Spark/spark/conf/spark-defaults.conf”配置文件中的“spark.yarn.security.credentials.hbase.enabled”参数值为“true”。

Spark BulkLoad命令介绍

Spark BulkLoad数据同步工具命令格式如下:

spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool [-cf <arg>] [-comp <arg>] [-enc <arg>] -op <arg> -rc <arg> [-rn <arg>] [-sp <arg>] -sql <arg> [-sr] -tb <arg>

  • --jars用于指定“protobuf-java-2.5.0.jar”文件所在路径和HBase客户端配置文件所在路径。HBase客户端配置文件所在路径为“客户端安装目录/HBase/hbase/conf”。
  • 支持在提交命令中指定executor数量、内存、CPU等实现资源控制,例如,提交时指定以下参数:

    --driver-memory=20G --num-executors=10 --executor-memory=4G --executor-cores=2

支持配置的其他参数如下:

  • -sql,--export-sql <arg>

    导出数据SQL设置。从Hive/Spark表读取数据时,设置该参数可自行过滤部分无需同步的数据。

  • -rc,--rowkey-columns <arg>
    指定源表中组成HBase Rowkey的列,如果有多个列,请使用逗号分隔。

    SQL错误导致查询异常、查询数据为空、数据重复都会导致Spark BulkLoad任务执行失败,请保证SQL的正确性,以及Rowkey字段对应的数据组合不会出现重复。

  • -sp,--rowkey-separator <arg>

    可选参数,使用多个列值组合Rowkey时,字段值分隔符,默认值为“#”,拼接完成后整体作为Rowkey。

    该分隔符仅支持单个字符,同时需要确保该字符不会出现在Rowkey字段数据中,否则会导致无法从Rowkey中解析对应的列值。使用多个列作为组合Rowkey时,最终生成的数据Rowkey会以该字符作为分隔符,解析Rowkey时需要先获取分隔符的位置,再进行拆分转换,例如:

    分隔符为“#”、Rowkey由两个列组合而成,对应的Rowkey关系如表2所示,解析代码示例如下:

    // 先找到分隔符位置
    int idx = Bytes.indexOf(row, "#".getBytes(StandardCharsets.UTF_8)[0]);
    // 分割Rowkey、转换数据类型
    byte[] aBytes = ArrayUtils.subarray(row, 0, idx);
    String aStr = Bytes.toString(aBytes);
    byte[] bBytes = ArrayUtils.subarray(row, idx + 1, row.length);
    Integer bInt = bBytes == null ? null : Bytes.toInt(bBytes);
    表2 组合Rowkey示例

    A列(String)

    B列(int)

    数据Rowkey

    a

    1

    a#1

    b

    null

    b#

  • -tb,--table <arg>

    目标HBase表,若目标表不存在时,会进行采样并创建目标表。

  • -op,--output-path

    HFile文件输出路径,最终导出的HFile会在该目录下的一个临时目录中,导入成功后会被清除。

    如果开启了HDFS联邦,HFile文件输出路径必须和需要导入数据的HBase在同一个NameService中。

    例如:HDFS目录挂载情况如表3所示,如果HBase服务目录挂载在NS1上,则Spark Bulkload工具输出路径必须挂载在NS1上,因此,可以指定输出路径在“/tmpns1”目录下。

    表3 HDFS目录挂载示例

    全局路径

    目标NameService

    目标路径

    /hbase

    NS1

    /hbase

    /tmp

    hacluster

    /tmp

    /tmpns1

    NS1

    /tmpns1

  • -rn,--region-nums <arg>

    目标HBase Region个数,目标表不存在时,会使用该参数值预分区目标表,默认值为“100”。

    建议根据源表需要导出的数据量来评估Region个数,估算方式如下:

    源表大小(3副本) * 原表解压膨胀率 * HBase数据膨胀率(可估计为10)/ 单个Region上限(通常为10GB)/ 压缩及编码压缩率

    请根据实际业务需求进行评估,例如,源表采用ORC格式存储,占用空间100GB,原表解压膨胀率可估计为5,目标表采样SNAPPY压缩以及FAST_DIFF编码,压缩率可以估计为3,得出最少Region数量为:100 * 5 * 10 / 10 / 3 ≈ 167。如果后续还需要执行增量同步数据,可以设置Region数为200。

  • -cf,--column-family <arg>

    可选参数,指定要导入的目标HBase表的列族名,如果目标表存在,但是该列族不存在,会添加该列族;如果目标表不存在,则会在HBase中创建一个列族为该参数值的表,默认列族为“info”。

  • -comp,--compression <arg>

    可选参数,目标HBase表的压缩格式,目前支持SNAPPY、NONE、ZSTD、GZ;如果目标表不存在,则会在HBase中创建一个压缩格式为该参数值的表,默认压缩格式为“SNAPPY”。

  • -enc,--block-encoding <arg>

    可选参数,目标HBase表的DATA BLOCK编码,目前支持NONE、PREFIX、DIFF、FAST_DIFF和ROW_INDEX_V1;目标表不存在时,会在HBase中创建一个DATA BLOCK编码为该参数值的表,默认值为“FAST_DIFF”。

  • -sr,--skip-store-rowcol

    可选参数,是否跳过存储Rowkey对应的列,默认会把Rowkey列冗余存储到HBase表中,当Rowkey组合比较复杂时,可避免解析Rowkey,如果需要节省存储占用,可以添加此参数。

  • -sm,--sampling-multiple <arg>

    可选参数,用于设置采样倍数,执行采样时,可以划分为更多的区间,单个Region下最多生成该参数值个文件,用于提升工具性能。

    注意:该值越大,生成的HFile越多,会导致HBase compaction压力增大,该参数取值范围为[1,10],默认值为“1”,建议根据实际资源情况进行设置。

操作步骤

  1. 以客户端安装用户,登录安装客户端的节点。
  2. 执行以下命令切换到客户端目录。

    cd 客户端安装目录

  3. 执行以下命令配置环境变量。

    source bigdata_env

  4. 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户

    kinit 组件业务用户

    如果当前集群未启用Kerberos认证,则执行以下命令设置Hadoop用户名:

    export HADOOP_USER_NAME=hbase

  5. 进入Spark客户端目录,执行如下命令,同步数据到HBase目标表中。

    cd Spark/spark/bin

    例如,执行以下命令同步test.orc_table表的所有数据到HBase的test:orc_table表中,使用id+uuid组合作为rowkey列,输出路径指定为“/tmp/orc_table”:

    spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar -sql "select * from test.orc_table" -tb "test:orc_table" -rc "id,uuid" -op "/tmp/orc_table"

相关文档