批量加载HBase数据并生成本地二级索引
场景介绍
HBase本身提供了ImportTsv&LoadIncremental工具来批量加载用户数据。当前提供了HIndexImportTsv来支持加载用户数据的同时可以完成对索引数据的批量加载。HIndexImportTsv继承了HBase批量加载数据工具ImportTsv的所有功能。此外,如果在执行HIndexImportTsv工具之前未建表,直接运行该工具,将会在创建表时创建索引,并在生成用户数据的同时生成索引数据。
前提条件
- 已安装客户端,具体请参考安装客户端章节。
- 已根据业务需要创建具有相应权限的组件业务用户。“机机”用户需要下载keytab文件,“人机”用户第一次登录时需修改密码。
使用HIndexImportTsv批量生成HBase本地二级索引数据
- 以客户端安装用户登录安装了客户端的节点。
- 执行以下命令配置环境变量并认证用户:
source bigdata_env
kinit 组件业务用户(未开启Kerberos认证的集群请跳过该操作)
- 将数据导入到HDFS中。
hdfs dfs -put <local_data_file> <inputdir>
例如定义数据文件“data.txt”,内容如下:
12005000201,Zhang San,Male,19,City a, Province a 12005000202,Li Wanting,Female,23,City b, Province b 12005000203,Wang Ming,Male,26,City c, Province c 12005000204,Li Gang,Male,18,City d, Province d 12005000205,Zhao Enru,Female,21,City e, Province e 12005000206,Chen Long,Male,32,City f, Province f 12005000207,Zhou Wei,Female,29,City g, Province g 12005000208,Yang Yiwen,Female,30,City h, Province h 12005000209,Xu Bing,Male,26,City i, Province i 12005000210,Xiao Kai,Male,25,City j, Province j
执行以下命令:
hdfs dfs -mkdir /datadirImport
hdfs dfs -put data.txt /datadirImport
- 执行以下命令创建表bulkTable:
hbase shell
create 'bulkTable', {NAME => 'info',COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'FAST_DIFF'},{NAME=>'address'}
命令执行完成后执行!quit退出hbase shell。
- 执行如下命令,生成HFile文件(StoreFiles):
hbase org.apache.hadoop.hbase.hindex.mapreduce.HIndexImportTsv -Dimporttsv.separator=<separator>
-Dimporttsv.bulk.output=</path/for/output> -Dindexspecs.to.add=<indexspecs> -Dimporttsv.columns=<columns> tableName <inputdir>
- -Dimport.separator:分隔符,例如,-Dimport.separator=','。
- -Dimport.bulk.output=</path/for/output>:表示执行结果输出路径,需指定一个不存在的路径。
- <columns>:表示导入数据在表中的对应关系,例如,-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,address:city,address:province。
- <tablename>:表示要操作的表名。
- <inputdir>:表示要批量导入的数据目录。
- -Dindexspecs.to.add=<indexspecs>:表示索引名与列的映射,例如-Dindexspecs.to.add='index_bulk=>info:[age->String]'。 其构成如下所示:
indexNameN=>familyN :[columnQualifierN-> columnQualifierDataType], [columnQualifierM-> columnQualifierDataType];familyM: [columnQualifierO-> columnQualifierDataType]# indexNameN=> familyM: [columnQualifierO-> columnQualifierDataType]
其中:
例如执行以下命令:
hbase org.apache.hadoop.hbase.hindex.mapreduce.HIndexImportTsv -Dimporttsv.separator=',' -Dimporttsv.bulk.output=/dataOutput -Dindexspecs.to.add='index_bulk=>info:[age->String]' -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,address:city,address:province bulkTable /datadirImport/data.txt
输出:
[root@shap000000406 opt]# hbase org.apache.hadoop.hbase.hindex.mapreduce.HIndexImportTsv -Dimporttsv.separator=',' -Dimporttsv.bulk.output=/dataOutput -Dindexspecs.to.add='index_bulk=>info:[age->String]' -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,address:city,address:province bulkTable /datadirImport/data.txt 2018-05-08 21:29:16,059 INFO [main] mapreduce.HFileOutputFormat2: Incremental table bulkTable output configured. 2018-05-08 21:29:16,069 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService 2018-05-08 21:29:16,069 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x80007c2cb4fd5b4d 2018-05-08 21:29:16,072 INFO [main] zookeeper.ZooKeeper: Session: 0x80007c2cb4fd5b4d closed 2018-05-08 21:29:16,072 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down for session: 0x80007c2cb4fd5b4d 2018-05-08 21:29:16,379 INFO [main] client.ConfiguredRMFailoverProxyProvider: Failing over to 147 2018-05-08 21:29:17,328 INFO [main] input.FileInputFormat: Total input files to process : 1 2018-05-08 21:29:17,413 INFO [main] mapreduce.JobSubmitter: number of splits:1 2018-05-08 21:29:17,430 INFO [main] Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum 2018-05-08 21:29:17,687 INFO [main] mapreduce.JobSubmitter: Submitting tokens for job: job_1525338489458_0002 2018-05-08 21:29:18,100 INFO [main] impl.YarnClientImpl: Submitted application application_1525338489458_0002 2018-05-08 21:29:18,136 INFO [main] mapreduce.Job: The url to track the job: http://shap000000407:8088/proxy/application_1525338489458_0002/ 2018-05-08 21:29:18,136 INFO [main] mapreduce.Job: Running job: job_1525338489458_0002 2018-05-08 21:29:28,248 INFO [main] mapreduce.Job: Job job_1525338489458_0002 running in uber mode : false 2018-05-08 21:29:28,249 INFO [main] mapreduce.Job: map 0% reduce 0% 2018-05-08 21:29:38,344 INFO [main] mapreduce.Job: map 100% reduce 0% 2018-05-08 21:29:51,421 INFO [main] mapreduce.Job: map 100% reduce 100% 2018-05-08 21:29:51,428 INFO [main] mapreduce.Job: Job job_1525338489458_0002 completed successfully 2018-05-08 21:29:51,523 INFO [main] mapreduce.Job: Counters: 50
- 执行如下命令将生成的HFile导入HBase中:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles </path/for/output> <tablename>
例如执行以下命令:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /dataOutput bulkTable
输出:
[root@shap000000406 opt]# hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /dataOutput bulkTable 2018-05-08 21:30:01,398 WARN [main] mapreduce.LoadIncrementalHFiles: Skipping non-directory hdfs://hacluster/dataOutput/_SUCCESS 2018-05-08 21:30:02,006 INFO [LoadIncrementalHFiles-0] hfile.CacheConfig: Created cacheConfig: CacheConfig:disabled 2018-05-08 21:30:02,006 INFO [LoadIncrementalHFiles-2] hfile.CacheConfig: Created cacheConfig: CacheConfig:disabled 2018-05-08 21:30:02,006 INFO [LoadIncrementalHFiles-1] hfile.CacheConfig: Created cacheConfig: CacheConfig:disabled 2018-05-08 21:30:02,085 INFO [LoadIncrementalHFiles-2] compress.CodecPool: Got brand-new decompressor [.snappy] 2018-05-08 21:30:02,120 INFO [LoadIncrementalHFiles-0] mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/address/042426c252f74e859858c7877b95e510 first=12005000201 last=12005000210 2018-05-08 21:30:02,120 INFO [LoadIncrementalHFiles-2] mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/info/f3995920ae0247a88182f637aa031c49 first=12005000201 last=12005000210 2018-05-08 21:30:02,128 INFO [LoadIncrementalHFiles-1] mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/d/c53b252248af42779f29442ab84f86b8 first=\x00index_bulk\x00\x00\x00\x00\x00\x00\x00\x0018\x00\x0012005000204 last=\x00index_bulk\x00\x00\x00\x00\x00\x00\x00\x0032\x00\x0012005000206 2018-05-08 21:30:02,231 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService 2018-05-08 21:30:02,231 INFO [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x81007c2cf0f55cc5 2018-05-08 21:30:02,235 INFO [main] zookeeper.ZooKeeper: Session: 0x81007c2cf0f55cc5 closed 2018-05-08 21:30:02,235 INFO [main-EventThread] zookeeper.ClientCnxn: EventThread shut down for session: 0x81007c2cf0f55cc5