使用Global Index Spark BulkLoad工具同步数据到HBase全局二级索引表
Global Spark BulkLoad工具支持快速同步Hive或Spark表数据到HBase全局二级索引表中,支持全量或增量导入ORC/PARQUET等格式的表数据。
使用Global Index Spark BulkLoad同步数据到HBase表中时,存在以下限制:
- 数据类型转换的对应关系请参见表1。日期类型会被先转换为String类型,再存储到HBase中; 数字类型、字符串类型、布尔类型均会直接转换为byte数组存储到HBase中,解析数据时,请将byte数组直接转换为对应类型,同时需要注意判断空值。开启强制转String功能后,所有类型数据均会先转成String类型,再转换成byte数组存储到HBase中,该功能存在一定风险,需经过评估后再开启,参数说明请参见fts参数介绍。
- 不建议将含有Strcuct、Map和Seq三种复杂类型的表数据直接同步到HBase表中,这些类型无法直接转换为byte数组,会先被转为String,再存储到HBase中,可能会导致无法还原数据。
- 索引列类型转换的对应关系请见表2,目前索引列数据类型仅支持STRING、INTEGER、FLOAT、LONG、DOUBLE、SHORT、BYTE,不建议使用其他类型的列作为索引列,其他类型会被转换为String类型。
- 不支持索引更新操作,并且要求导出的数据中不能包含目标中已有的数据(即相同Rowkey的数据),否则会导致残留无用索引数据,引发索引与数据表不一致问题。
- 导出数据时,要求Rowkey列和索引列必须存在(允许为空值)。
- 增量导出数据时,不支持为已有的数据创建新索引,如需创建新索引,请使用GlobalTableIndexer工具新建索引,并全量生成对应的索引数据后再进行导出。
- 增量导出数据时,目标表必须包含全局二级索引,且至少存在一个可用索引(索引状态为ACTIVE、UNUSABLE或BUILDING)。
- 导出数据时,不建议修改目标表的索引(包含修改状态、新增索引、删除索引),否则可能会导致任务失败或索引表与数据表不一致。
- 该章节操作仅适用于MRS 3.6.0-LTS及之后版本。
|
原始数据类型 |
默认模式 |
开启数据强制转String功能 |
||
|---|---|---|---|---|
|
Hive/Spark表 |
HBase表 |
解析方式 |
HBase表 |
解析方式 |
|
TINYINT |
Byte |
byte[]取第一个值 |
String |
Bytes.toString(byte[]) |
|
SMALLINT |
Short |
Bytes.toShort(byte[]) |
String |
Bytes.toString(byte[]) |
|
INT/INTEGER |
Integer |
Bytes.toInt(byte[]) |
String |
Bytes.toString(byte[]) |
|
BIGINT |
Long |
Bytes.toLong(byte[], int, int) |
String |
Bytes.toString(byte[]) |
|
FLOAT |
Float |
Bytes.toFloat(byte[]) |
String |
Bytes.toString(byte[]) |
|
DOUBLE |
Double |
Bytes.toDouble(byte[]) |
String |
Bytes.toString(byte[]) |
|
DECIMAL/NUMERIC |
BigDecimal |
Bytes.toBigDecimal(byte[]) |
String |
Bytes.toString(byte[]) |
|
TIMESTAMP |
String |
Bytes.toString(byte[]) |
String |
Bytes.toString(byte[]) |
|
DATE |
String |
Bytes.toString(byte[]) |
String |
Bytes.toString(byte[]) |
|
STRING |
String |
Bytes.toString(byte[]) |
String |
Bytes.toString(byte[]) |
|
VARCHAR |
String |
Bytes.toString(byte[]) |
String |
Bytes.toString(byte[]) |
|
CHAR |
String |
Bytes.toString(byte[]) |
String |
Bytes.toString(byte[]) |
|
BOOLEAN |
Boolean |
Bytes.toBoolean(byte[]) |
String |
Bytes.toString(byte[]) |
|
BINARY |
byte[] |
无需解析 |
String |
Bytes.toString(byte[]) |
|
ARRAY |
String |
Bytes.toString(byte[]) |
String |
Bytes.toString(byte[]) |
|
MAP |
String |
Bytes.toString(byte[]) |
String |
Bytes.toString(byte[]) |
|
STRUCT |
String |
Bytes.toString(byte[]) |
String |
Bytes.toString(byte[]) |
|
原始Hive/Spark表数据类型 |
默认模式对应的HBase索引列类型 |
开启数据强制转String功能对应的HBase索引列类型 |
|---|---|---|
|
TINYINT |
BYTE |
STRING |
|
SMALLINT |
SHORT |
STRING |
|
INT/INTEGER |
INTEGER |
STRING |
|
BIGINT |
LONG |
STRING |
|
FLOAT |
FLOAT |
STRING |
|
DOUBLE |
DOUBLE |
STRING |
|
DECIMAL/NUMERIC |
STRING |
STRING |
|
TIMESTAMP |
STRING |
STRING |
|
DATE |
STRING |
STRING |
|
STRING |
STRING |
STRING |
|
VARCHAR |
STRING |
STRING |
|
CHAR |
STRING |
STRING |
|
BOOLEAN |
STRING |
STRING |
|
BINARY |
STRING |
STRING |
|
ARRAY |
STRING |
STRING |
|
MAP |
STRING |
STRING |
|
STRUCT |
STRING |
STRING |
前提条件
- 集群安装了Spark及Hive服务。
- 执行数据导入的用户需要同时具有Spark(对应源表的SELECT权限)、HBase权限(对应HBase NameSpace的RWXA权限)和HDFS权限(对应HFile输出目录的读写权限)。
- 如果集群已启用Kerberos认证(安全模式),需修改Spark“客户端安装目录/Spark/spark/conf/spark-defaults.conf”配置文件中的“spark.yarn.security.credentials.hbase.enabled”参数值为“true”。
Global Index Spark BulkLoad命令介绍
Global Index Spark BulkLoad数据同步工具命令格式如下:
spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/lib/hbase-global-index-*.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar com.huawei.hadoop.hbase.tools.bulkload.GlobalIndexSparkBulkLoadTool [-cf <arg>] [-comp <arg>] [-enc <arg>] [-fts] [-ic <arg>] [-icaf <arg>] [-icf <arg>] [-ita <arg>] -op <arg> -rc <arg> [-rn <arg>] [-sm <arg>] [-sp <arg>] -sql <arg> [-sr] -tb <arg>
支持配置的其他参数如下:
- -sql,--export-sql <arg>
导出数据SQL设置。从Hive/Spark表读取数据时,设置该参数可自行过滤部分无需同步的数据。
- -rc,--rowkey-columns <arg>
指定源表中组成HBase Rowkey的列,如果有多个列,请使用逗号分隔。
SQL错误导致查询异常、查询数据为空、数据重复都会导致Spark BulkLoad任务执行失败,请保证SQL的正确性,以及Rowkey字段对应的数据组合不会出现重复。
- -sp,--rowkey-separator <arg>
可选参数,使用多个列值组合Rowkey时,字段值分隔符,默认值为“#”,拼接完成后整体作为Rowkey。
该分隔符仅支持单个字符,同时需要确保该字符不会出现在Rowkey字段数据中,否则会导致无法从Rowkey中解析对应的列值。使用多个列作为组合Rowkey时,最终生成的数据Rowkey会以该字符作为分隔符,解析Rowkey时需要先获取分隔符的位置,再进行拆分转换,例如:
分隔符为“#”、Rowkey由两个列组合而成(不开启强制转String功能),对应的Rowkey关系如表3所示,解析代码示例如下:
// 先找到分隔符位置 int idx = Bytes.indexOf(row, "#".getBytes(StandardCharsets.UTF_8)[0]); // 分割Rowkey、转换数据类型 byte[] aBytes = ArrayUtils.subarray(row, 0, idx); String aStr = Bytes.toString(aBytes); byte[] bBytes = ArrayUtils.subarray(row, idx + 1, row.length); Integer bInt = bBytes == null ? null : Bytes.toInt(bBytes);
- -tb,--table <arg>
目标HBase表,若目标表不存在时,会进行采样并创建目标表。
- -op,--output-path
HFile文件输出路径,最终导出的HFile会在该目录下的一个临时目录中,导入成功后会被清除。
如果开启了HDFS联邦,HFile文件输出路径必须和需要导入数据的HBase在同一个NameService中。
例如:HDFS目录挂载情况如表4所示,如果HBase服务目录挂载在NS1上,则Spark Bulkload工具输出路径必须挂载在NS1上,因此,可以指定输出路径在“/tmpns1”目录下。
- -rn,--region-nums <arg>
目标HBase Region个数,目标表不存在时,会使用该参数值预分区目标表,默认值为“100”。
建议根据源表需要导出的数据量来评估Region个数,估算方式如下:
源表大小(3副本) * 源表解压膨胀率 * HBase数据膨胀率(可估计为10)/ 单个Region上限(通常为10GB)/ 压缩及编码压缩率
请根据实际业务需求进行评估,例如,源表采用ORC格式存储,占用空间100GB,源表解压膨胀率可估计为5,目标表采样SNAPPY压缩以及FAST_DIFF编码,压缩率可以估计为3,得出最少Region数量为:100 * 5 * 10 / 10 / 3 ≈ 167。如果后续还需要执行增量同步数据,可以设置Region数为200。
- -cf,--column-family <arg>
可选参数,指定要导入的目标HBase表的列族名,如果目标表存在,但是该列族不存在,会添加该列族;如果目标表不存在,则会在HBase中创建一个列族为该参数值的表,默认列族为“info”。
- -comp,--compression <arg>
可选参数,目标HBase表的压缩格式,目前支持SNAPPY、NONE、GZ;如果目标表不存在,则会在HBase中创建一个压缩格式为该参数值的表,默认压缩格式为“SNAPPY”。
- -enc,--block-encoding <arg>
可选参数,目标HBase表的DATA BLOCK编码,目前支持NONE、PREFIX、DIFF、FAST_DIFF和ROW_INDEX_V1;目标表不存在时,会在HBase中创建一个DATA BLOCK编码为该参数值的表,默认值为“FAST_DIFF”。
- -sr,--skip-store-rowcol
可选参数,是否跳过存储Rowkey对应的列,默认会把Rowkey列冗余存储到HBase表中,当Rowkey组合比较复杂时,可避免解析Rowkey,如果需要节省存储占用,可以添加此参数。
- -sm,--sampling-multiple <arg>
可选参数,用于设置采样倍数,执行采样时,可以划分为更多的区间,单个Region下最多生成该参数值个文件,用于提升工具性能。
注意:该值越大,生成的HFile越多,会导致HBase compaction压力增大,该参数取值范围为[1,10],默认值为“1”,建议根据实际资源情况进行设置。
- -ita,--indexspecs-to-add <arg>
可选参数,仅用于全量导出时使用,用于指定要创建的索引的索引列,SQL查询的结果必须包含索引列(可以为空值),否则无法导出数据。参数值格式为“索引1=>索引列1, 索引列2#索引2=>索引列3”,索引分隔符为“#”,索引名与索引列分隔符为“=>”,索引列分隔符为“,”。例如,要创建两个索引idx1和idx2,索引列分别是col1和col2,则参数值为“idx1=>col1#idx2=>col2”。
不建议索引列中包含Rowkey列,该场景下,必须冗余存储Rowkey列到HBase数据表,命令中不允许携带-sr参数,会导致冗余存储。
- -ic,--indexspecs-covered <arg>
可选参数,仅用于全量导出时使用,用于指定要创建的索引的覆盖列,该参数值必须与-ita参数中指定的索引相匹配。参数值格式为“索引1=>覆盖列1, 覆盖列2#索引2=>覆盖列3”,索引分隔符为“#”,索引名与索引列分隔符为“=>”,覆盖列分隔符为“,”。例如,要创建两个索引idx1和idx2,其中idx1为覆盖索引,覆盖列为col3,则参数值为“idx1=>col3”。
- -icf,--indexspecs-covered-family <arg>
可选参数,仅用于全量导出时使用,用于指定要创建的索引是否覆盖-cf指定的列族,该参数值必须与-ita参数中指定的索引相匹配。参数值格式为“索引1#索引2”,索引分隔符为“#”。例如,要创建两个索引idx1和idx2,都需要覆盖指定的列族,则参数值为“idx1#idx2”。
- -icaf,--indexspecs-covered-all-family <arg>
可选参数,仅用于全量导出时使用,用于指定要创建的索引是否覆盖所有列族,该参数值必须与-ita参数中指定的索引相匹配。参数值格式为“索引1#索引2”,索引分隔符为“#”。例如,要创建两个索引idx1和idx2,其中idx1需要覆盖所有列族,则参数值为“idx1”。
- -fts,--force-to-str
可选参数,指定该参数时,源表数据会先被转为String类型,后存储到HBase表。
开启此参数后,会带来以下问题:
- 无法支持使用原始类型的过滤器进行数据筛选。
- 数据存储膨胀,HBase表占用空间将更大。
操作步骤
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令切换到客户端目录。
cd 客户端安装目录
- 执行以下命令配置环境变量。
source bigdata_env
- 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户
kinit 组件业务用户
如果当前集群未启用Kerberos认证,则执行以下命令设置Hadoop用户名:
export HADOOP_USER_NAME=hbase
- 进入Spark客户端目录,执行如下命令,创建全局二级索引,同时同步数据到目标表中。
cd Spark/spark/bin
例如,执行以下命令同步test.orc_table表的所有数据到HBase的test:orc_table表中,使用id+uuid组合作为Rowkey列,输出路径指定为“/tmp/orc_table”:
spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/lib/hbase-global-index-*.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.GlobalIndexSparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar -sql "select * from test.orc_table" -tb "test:orc_table" -rc "id,uuid" -op "/tmp/orc_table" -ita "idx1=>str_col1" -icaf "idx1"