批量加载索引数据
场景介绍
HBase本身支持使用ImportTsv和LoadIncremental工具批量加载用户数据,还支持使用GlobalIndexImportTsv和GlobalIndexBulkLoadHFilesTool加载用户数据的同时批量加载全局索引数据。GlobalIndexImportTsv继承了HBase批量加载数据工具ImportTsv的所有功能。
若在执行GlobalIndexImportTsv工具之前未建表,直接运行该工具,将会在创建表时创建全局索引,并在生成用户数据的同时生成索引数据。由于自动建表不会进行预分区,可能导致性能问题,因此建议提前建表后再运行GlobalIndexImportTsv工具进行数据加载。
操作步骤
- 以客户端安装用户登录安装了客户端的节点,并执行以下命令:
cd 客户端安装目录
source bigdata_env
kinit 组件业务用户(集群未启用Kerberos认证(普通模式)请跳过该操作)
- 将数据导入到HDFS中。
hdfs dfs -mkdir <inputdir>
hdfs dfs -put <local_data_file> <inputdir>
例如定义数据文件“data.txt”,内容如下:
12005000201,Zhang San,Male,19,City a,Province a 12005000202,Li Wanting,Female,23,City b,Province b 12005000203,Wang Ming,Male,26,City c,Province c 12005000204,Li Gang,Male,18,City d,Province d 12005000205,Zhao Enru,Female,21,City e,Province e 12005000206,Chen Long,Male,32,City f,Province f 12005000207,Zhou Wei,Female,29,City g,Province g 12005000208,Yang Yiwen,Female,30,City h,Province h 12005000209,Xu Bing,Male,26,City i,Province i 12005000210,Xiao Kai,Male,25,City j,Province j
执行以下命令导入至HDFS中:
hdfs dfs -mkdir /datadirImport
hdfs dfs -put data.txt /datadirImport
- 执行以下命令创建表bulkTable:
hbase shell
create 'bulkTable', {NAME => 'info',COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'FAST_DIFF'},{NAME=>'address'}
表创建成功后退出HBase Shell命令行。
- 执行如下命令,创建全局索引:
hbase org.apache.hadoop.hbase.hindex.global.mapreduce.GlobalTableIndexer -Dtablename.to.index='bulkTable' -Dindexspecs.to.add='index_bulk=>info:[age->String]' -Dindexspecs.coveredallcolumn.to.add='index_bulk=>true' -Dindexspecs.splitkeys.to.set='index_bulk=>[\x010,\x011,\x012]'
命令的具体使用请参考创建索引。
- 执行如下命令,生成HFile文件(StoreFiles):
hbase org.apache.hadoop.hbase.hindex.global.mapreduce.GlobalIndexImportTsv -Dimporttsv.separator=<separator>
-Dimporttsv.bulk.output=</path/for/output> <columns> tableName <inputdir>
- -Dimport.separator:分隔符,例如:-Dimport.separator=','。
- -Dimport.bulk.output=</path/for/output>:表示执行结果输出路径,需指定一个不存在的路径。
- <columns>:表示导入数据在表中的对应关系,例如:-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,address:city,address:province。
- <tablename>:表示要操作的表名。
- <inputdir>:表示要批量导入的数据目录。
- -Dindexspecs.covered.to.add(可选):表示索引中冗余存储的数据表的列(覆盖列定义),例如:-Dindexspecs.covered.to.add='IDX1=>cf1:[q1];cf2:[q1]#IDX2=>cf0:[q5]'。
- -Dindexspecs.covered.family.to.add(可选):表示索引表冗余存储的数据表的列族(覆盖列族定义),例如:-Dindexspecs.covered.family.to.add='IDX1=>cf_0#IDX2=>cf_1;cf_2'。
- -Dindexspecs.coveredallcolumn.to.add(可选):表示索引表冗余存储数据表中的所有数据(覆盖所有列),例如: -Dindexspecs.coveredallcolumn.to.add='IDX1=>true#IDX2=>true'。
- -Dindexspecs.splitkeys.to.set(可选):表示索引表预分区切分点,建议指定,避免索引表Region成为热点,例如,预分区指定格式为:
- '#'用于分隔索引。
- splitkey包含在'[]'中。
- ','用于分隔splitkey。
例如:-Dindexspecs.splitkeys.to.set='IDX1=>[1,2,3]#IDX2=>[a,b,c]'
- -Dindexspecs.to.add=<indexspecs>(可选):表示索引名与列的映射,例如:-Dindexspecs.to.add='index_bulk=>info:[age->String]'。 其构成可以表示如下:
indexNameN=>familyN :[columnQualifierN-> columnQualifierDataType], [columnQualifierM-> columnQualifierDataType];familyM: [columnQualifierO-> columnQualifierDataType]# indexNameN=> familyM: [columnQualifierO-> columnQualifierDataType]
其中:
- 列限定符用逗号(,)分隔。例如:index1 => f1 :[c1-> String], [c2-> String]
- 列族由分号(;)分隔。例如:index1 => f1 :[c1-> String], [c2-> String]; f2 :[c3-> Long]
- 多个索引由#号键(#)分隔。例如:index1 => f1 :[c1-> String], [c2-> String]; f2 :[c3-> Long]#index2 => f2 :[c3-> Long]
- 列限定的数据类型。
- 数据类型不区分大小写。
- indexspecs.covered.to.add、indexspecs.covered.family.to.add、indexspecs.coveredallcolumn.to.add、indexspecs.splitkeys.to.set、indexspecs.to.add五个参数只有在要操作的表不存在并且需要自动建表时才会生效。
例如执行以下命令:
hbase org.apache.hadoop.hbase.hindex.global.mapreduce.GlobalIndexImportTsv -Dimporttsv.separator=',' -Dimporttsv.bulk.output=/dataOutput -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,address:city,address:province bulkTable /datadirImport/data.txt
- 执行如下命令将生成的HFile导入HBase中:
hbase org.apache.hadoop.hbase.tool.GlobalIndexBulkLoadHFilesTool </path/for/output> <tablename>
例如执行以下命令:
hbase org.apache.hadoop.hbase.tool.GlobalIndexBulkLoadHFilesTool /dataOutput bulkTable
输出内容为:
2024-01-13 18:29:03,043 INFO [GlobalIndexBulkLoadHFiles-0] hdfs.DFSClient: Created token for admintest: HDFS_DELEGATION_TOKEN owner=admintest@HADOOP.COM, renewer=renewer, realUser=, issueDate=1705141743030, maxDate=1705746543030, sequenceNumber=4261, masterKeyId=5 on ha-hdfs:hacluster 2024-01-13 18:29:03,123 INFO [LoadIncrementalHFiles-0] compress.CodecPool: Got brand-new decompressor [.snappy] 2024-01-13 18:29:03,127 INFO [LoadIncrementalHFiles-0] compress.CodecPool: Got brand-new decompressor [.snappy] 2024-01-13 18:29:03,127 INFO [LoadIncrementalHFiles-1] compress.CodecPool: Got brand-new decompressor [.snappy] 2024-01-13 18:29:03,127 INFO [LoadIncrementalHFiles-4] compress.CodecPool: Got brand-new decompressor [.snappy] 2024-01-13 18:29:03,128 INFO [LoadIncrementalHFiles-0] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/0/8610217824254455849576409ebf8f53 first=Optional[\x0118\x00\x0112005000204\x00] last=Optional[\x0119\x00\x0112005000201\x00] 2024-01-13 18:29:03,128 INFO [LoadIncrementalHFiles-1] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/0/fa17bc8e753341ffa0ba9e702200c04a first=Optional[\x0121\x00\x0112005000205\x00] last=Optional[\x0132\x00\x0112005000206\x00] 2024-01-13 18:29:03,129 INFO [LoadIncrementalHFiles-2] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/address/7a0308810d264d61bda32c385f50260c first=Optional[\x0121\x00\x0112005000205\x00] last=Optional[\x0132\x00\x0112005000206\x00] 2024-01-13 18:29:03,129 INFO [LoadIncrementalHFiles-4] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/info/27cb42f48cb14597badb6cf8b302d4e8 first=Optional[\x0118\x00\x0112005000204\x00] last=Optional[\x0119\x00\x0112005000201\x00] 2024-01-13 18:29:03,130 INFO [LoadIncrementalHFiles-3] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/address/fe8487c5e2cf4bbaaeb9e638b8acc2c1 first=Optional[\x0118\x00\x0112005000204\x00] last=Optional[\x0119\x00\x0112005000201\x00] 2024-01-13 18:29:03,131 INFO [LoadIncrementalHFiles-5] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/info/657937b1edd6401b8f5575e42e7ec92b first=Optional[\x0121\x00\x0112005000205\x00] last=Optional[\x0132\x00\x0112005000206\x00] 2024-01-13 18:29:03,539 INFO [GlobalIndexBulkLoadHFiles-0] hdfs.DFSClient: Cancelling token for admintest: HDFS_DELEGATION_TOKEN owner=admintest@HADOOP.COM, renewer=renewer, realUser=, issueDate=1705141743030, maxDate=1705746543030, sequenceNumber=4261, masterKeyId=5 on ha-hdfs:hacluster 2024-01-13 18:29:03,571 INFO [GlobalIndexBulkLoadHFiles-0] client.ConnectionImplementation: Closing master protocol: MasterService 2024-01-13 18:29:03,678 INFO [GlobalIndexBulkLoadHFiles-0-EventThread] zookeeper.ClientCnxn: EventThread shut down for session: 0x3201ef383210e59e 2024-01-13 18:29:03,678 INFO [GlobalIndexBulkLoadHFiles-0] zookeeper.ZooKeeper: Connection: 0x3201ef383210e59e closed 2024-01-13 18:29:03,679 INFO [GlobalIndexBulkLoadHFiles-0] client.ConnectionImplementation: Connection has been closed by GlobalIndexBulkLoadHFiles-0.
索引数据生成和加载过程中,禁止对索引进行更改,包括但不限于新增索引、删除索引、索引状态变更等操作,否则会因为无法保证索引数据的一致性导致运行的任务失败,需要待索引状态稳定后再重新执行任务。