使用Spark BulkLoad工具同步数据到HBase表中
Spark BulkLoad工具支持快速同步Hive或Spark表数据到HBase表中,支持全量或增量导入ORC/PAQUET等格式的表数据。
约束与限制
使用Spark BulkLoad同步数据类型数据到HBase表中时,存在以下限制:
- 数据类型转换的对应关系请参见表1。默认模式下,日期类型会被先转换为String类型,再存储到HBase中; 数字类型、字符串类型、布尔类型均会直接转为byte数组存储到HBase中,解析数据时,请将byte数组直接转换为对应类型,同时需要注意判断空值。
- 不建议将含有Struct、Map和Seq三种复杂类型的表数据直接同步到HBase表中,这些类型无法直接转换为byte数组,会先被转为String,再存储到HBase中,可能会导致无法还原数据。
该章节内容仅适用于MRS 3.5.0及之后版本。
| Hive/Spark表 | 默认模式 | |
|---|---|---|
| HBase表 | 解析方式 | |
| TINYINT | Byte | byte[]取第一个值 | 
| SMALLINT | Short | Bytes.toShort(byte[]) | 
| INT/INTEGER | Integer | Bytes.toInt(byte[]) | 
| BIGINT | Long | Bytes.toLong(byte[], int, int) | 
| FLOAT | Float | Bytes.toFloat(byte[]) | 
| DOUBLE | Double | Bytes.toDouble(byte[]) | 
| DECIMAL/NUMERIC | BigDecimal | Bytes.toBigDecimal(byte[]) | 
| TIMESTAMP | String | Bytes.toString(byte[]) | 
| DATE | String | Bytes.toString(byte[]) | 
| STRING | String | Bytes.toString(byte[]) | 
| VARCHAR | String | Bytes.toString(byte[]) | 
| CHAR | String | Bytes.toString(byte[]) | 
| BOOLEAN | Boolean | Bytes.toBoolean(byte[]) | 
| BINARY | byte[] | 无需解析 | 
| ARRAY | String | Bytes.toString(byte[]) | 
| MAP | String | Bytes.toString(byte[]) | 
| STRUCT | String | Bytes.toString(byte[]) | 
前提条件
- 集群安装了Spark及Hive服务。
- 执行数据导入的用户需要同时具有Spark(对应源表的SELECT权限)、HBase权限(对应HBase NameSpace的RWXA权限)和HDFS权限(对应HFile输出目录的读写权限)。
- 如果集群已启用Kerberos认证(安全模式),需修改Spark“客户端安装目录/Spark/spark/conf/spark-defaults.conf”配置文件中的“spark.yarn.security.credentials.hbase.enabled”参数值为“true”。
Spark BulkLoad命令介绍
Spark BulkLoad数据同步工具命令格式如下:
spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool [-cf <arg>] [-comp <arg>] [-enc <arg>] -op <arg> -rc <arg> [-rn <arg>] [-sp <arg>] -sql <arg> [-sr] -tb <arg>
 
 
   
  支持配置的其他参数如下:
- -sql,--export-sql <arg>
    导出数据SQL设置。从Hive/Spark表读取数据时,设置该参数可自行过滤部分无需同步的数据。 
- -rc,--rowkey-columns <arg>
    指定源表中组成HBase Rowkey的列,如果有多个列,请使用逗号分隔。  SQL错误导致查询异常、查询数据为空、数据重复都会导致Spark BulkLoad任务执行失败,请保证SQL的正确性,以及Rowkey字段对应的数据组合不会出现重复。 
- -sp,--rowkey-separator <arg>
    可选参数,使用多个列值组合Rowkey时,字段值分隔符,默认值为“#”,拼接完成后整体作为Rowkey。   该分隔符仅支持单个字符,同时需要确保该字符不会出现在Rowkey字段数据中,否则会导致无法从Rowkey中解析对应的列值。使用多个列作为组合Rowkey时,最终生成的数据Rowkey会以该字符作为分隔符,解析Rowkey时需要先获取分隔符的位置,再进行拆分转换,例如: 分隔符为“#”、Rowkey由两个列组合而成(不开启强制转String功能),对应的Rowkey关系如表2所示,解析代码示例如下: // 先找到分隔符位置 int idx = Bytes.indexOf(row, "#".getBytes(StandardCharsets.UTF_8)[0]); // 分割Rowkey、转换数据类型 byte[] aBytes = ArrayUtils.subarray(row, 0, idx); String aStr = Bytes.toString(aBytes); byte[] bBytes = ArrayUtils.subarray(row, idx + 1, row.length); Integer bInt = bBytes == null ? null : Bytes.toInt(bBytes); 
- -tb,--table <arg>
    目标HBase表,若目标表不存在时,会进行采样并创建目标表。 
- -op,--output-path
    HFile文件输出路径,最终导出的HFile会在该目录下的一个临时目录中,导入成功后会被清除。   如果开启了HDFS联邦,HFile文件输出路径必须和需要导入数据的HBase在同一个NameService中。 例如:HDFS目录挂载情况如表3所示,如果HBase服务目录挂载在NS1上,则Spark Bulkload工具输出路径必须挂载在NS1上,因此,可以指定输出路径在“/tmpns1”目录下。 
- -rn,--region-nums <arg>
    目标HBase Region个数,目标表不存在时,会使用该参数值预分区目标表,默认值为“100”。   建议根据源表需要导出的数据量来评估Region个数,估算方式如下: 源表大小(3副本) * 源表解压膨胀率 * HBase数据膨胀率(可估计为10)/ 单个Region上限(通常为10GB)/ 压缩及编码压缩率 请根据实际业务需求进行评估,例如,源表采用ORC格式存储,占用空间100GB,源表解压膨胀率可估计为5,目标表采样SNAPPY压缩以及FAST_DIFF编码,压缩率可以估计为3,得出最少Region数量为:100 * 5 * 10 / 10 / 3 ≈ 167。 如果后续还需要执行增量同步数据,可以设置Region数为200。 
- -cf,--column-family <arg>
    可选参数,指定要导入的目标HBase表的列族名,如果目标表存在,但是该列族不存在,会添加该列族;如果目标表不存在,则会在HBase中创建一个列族为该参数值的表,默认列族为“info”。 
- -comp,--compression <arg>
    可选参数,目标HBase表的压缩格式,目前支持SNAPPY、NONE、GZ;如果目标表不存在,则会在HBase中创建一个压缩格式为该参数值的表,默认压缩格式为“SNAPPY”。 
- -enc,--block-encoding <arg>
    可选参数,目标HBase表的DATA BLOCK编码,目前支持NONE、PREFIX、DIFF、FAST_DIFF和ROW_INDEX_V1;目标表不存在时,会在HBase中创建一个DATA BLOCK编码为该参数值的表,默认值为“FAST_DIFF”。 
- -sr,--skip-store-rowcol
    可选参数,是否跳过存储Rowkey对应的列,默认会把Rowkey列冗余存储到HBase表中,当Rowkey组合比较复杂时,可避免解析Rowkey,如果需要节省存储占用,可以添加此参数。 
- -sm,--sampling-multiple <arg>
    可选参数,用于设置采样倍数,执行采样时,可以划分为更多的区间,单个Region下最多生成该参数值个文件,用于提升工具性能。 注意:该值越大,生成的HFile越多,会导致HBase compaction压力增大,该参数取值范围为[1,10],默认值为“1”,建议根据实际资源情况进行设置。 
操作步骤
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令切换到客户端目录。
    
    cd 客户端安装目录 
- 执行以下命令配置环境变量。
    
    source bigdata_env 
- 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户
    
    kinit 组件业务用户 如果当前集群未启用Kerberos认证,则执行以下命令设置Hadoop用户名: export HADOOP_USER_NAME=hbase 
- 进入Spark客户端目录,执行如下命令,同步数据到HBase目标表中。
    
    cd Spark/spark/bin 例如,执行以下命令同步test.orc_table表的所有数据到HBase的test:orc_table表中,使用id+uuid组合作为rowkey列,输出路径指定为“/tmp/orc_table”: spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar -sql "select * from test.orc_table" -tb "test:orc_table" -rc "id,uuid" -op "/tmp/orc_table" 
 
  