更新时间:2024-11-29 GMT+08:00

批量加载索引数据

场景介绍

HBase本身提供了ImportTsv&LoadIncremental工具来批量加载用户数据。当前提供了HIndexImportTsv来支持加载用户数据的同时可以完成对索引数据的批量加载。HIndexImportTsv继承了HBase批量加载数据工具ImportTsv的所有功能。此外,若在执行HIndexImportTsv工具之前未建表,直接运行该工具,将会在创建表时创建索引,并在生成用户数据的同时生成索引数据。

操作步骤

  1. 将数据导入到HDFS中。

    hdfs dfs -mkdir <inputdir>

    hdfs dfs -put <local_data_file> <inputdir>

    例如定义数据文件“data.txt”,内容如下:

    12005000201,Zhang San,Male,19,City a, Province a
    12005000202,Li Wanting,Female,23,City b, Province b
    12005000203,Wang Ming,Male,26,City c, Province c
    12005000204,Li Gang,Male,18,City d, Province d
    12005000205,Zhao Enru,Female,21,City e, Province e
    12005000206,Chen Long,Male,32,City f, Province f
    12005000207,Zhou Wei,Female,29,City g, Province g
    12005000208,Yang Yiwen,Female,30,City h, Province h
    12005000209,Xu Bing,Male,26,City i, Province i
    12005000210,Xiao Kai,Male,25,City j, Province j

    执行以下命令:

    hdfs dfs -mkdir /datadirImport

    hdfs dfs -put data.txt /datadirImport

  2. 建表bulkTable,进入hbase shell,执行命令建表,例如:

    create 'bulkTable', {NAME => 'info',COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'FAST_DIFF'},{NAME=>'address'}

    执行完成后退出hbase shell。

  3. 执行如下命令,生成HFile文件(StoreFiles):

    hbase org.apache.hadoop.hbase.hindex.mapreduce.HIndexImportTsv -Dimporttsv.separator=<separator>

    -Dimporttsv.bulk.output=</path/for/output> -Dindexspecs.to.add=<indexspecs> -Dimporttsv.columns=<columns> tableName <inputdir>

    • -Dimport.separator:分隔符,例如,-Dimport.separator=','。
    • -Dimport.bulk.output=</path/for/output>:指的是执行结果输出路径,需指定一个不存在的路径。
    • <columns>:指的是导入数据在表中的对应关系,例如,-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,address:city,address:province。
    • <tablename>:指的是要操作的表名。
    • <inputdir>:指的是要批量导入的数据目录。
    • -Dindexspecs.to.add=<indexspecs>:指的是索引名与列的映射,例如-Dindexspecs.to.add='index_bulk=>info:[age->String]'。 其构成可以表示如下:

      indexNameN=>familyN :[columnQualifierN-> columnQualifierDataType], [columnQualifierM-> columnQualifierDataType];familyM: [columnQualifierO-> columnQualifierDataType]# indexNameN=> familyM: [columnQualifierO-> columnQualifierDataType]

      其中,列限定符用逗号(,)分隔

      例如:“index1 => f1:[c1-> String],[c2-> String]”

      列族由分号(;)分隔

      例如:“index1 => f1:[c1-> String],[c2-> String]; f2:[c3-> Long]”

      多个索引由#号键(#)分隔

      例如:“index1 => f1:[c1-> String],[c2-> String]; f2:[c3-> Long]#index2 => f2:[c3-> Long]”

      列限定的数据类型:

      可用的数据类型有:STRING,INTEGER,FLOAT,LONG,DOUBLE,SHORT,BYTE,CHAR

      数据类型也可以用小写传递。

    例如执行以下命令:

    hbase org.apache.hadoop.hbase.hindex.mapreduce.HIndexImportTsv -Dimporttsv.separator=',' -Dimporttsv.bulk.output=/dataOutput -Dindexspecs.to.add='index_bulk=>info:[age->String]' -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,address:city,address:province bulkTable /datadirImport/data.txt

    输出:

    [root@shap000000406 opt]# hbase org.apache.hadoop.hbase.hindex.mapreduce.HIndexImportTsv -Dimporttsv.separator=',' -Dimporttsv.bulk.output=/dataOutput -Dindexspecs.to.add='index_bulk=>info:[age->String]' -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,address:city,address:province bulkTable /datadirImport/data.txt
    2018-05-08 21:29:16,059 INFO  [main] mapreduce.HFileOutputFormat2: Incremental table bulkTable output configured.
    2018-05-08 21:29:16,069 INFO  [main] client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
    2018-05-08 21:29:16,069 INFO  [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x80007c2cb4fd5b4d
    2018-05-08 21:29:16,072 INFO  [main] zookeeper.ZooKeeper: Session: 0x80007c2cb4fd5b4d closed
    2018-05-08 21:29:16,072 INFO  [main-EventThread] zookeeper.ClientCnxn: EventThread shut down for session: 0x80007c2cb4fd5b4d
    2018-05-08 21:29:16,379 INFO  [main] client.ConfiguredRMFailoverProxyProvider: Failing over to 147
    2018-05-08 21:29:17,328 INFO  [main] input.FileInputFormat: Total input files to process : 1
    2018-05-08 21:29:17,413 INFO  [main] mapreduce.JobSubmitter: number of splits:1
    2018-05-08 21:29:17,430 INFO  [main] Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
    2018-05-08 21:29:17,687 INFO  [main] mapreduce.JobSubmitter: Submitting tokens for job: job_1525338489458_0002
    2018-05-08 21:29:18,100 INFO  [main] impl.YarnClientImpl: Submitted application application_1525338489458_0002
    2018-05-08 21:29:18,136 INFO  [main] mapreduce.Job: The url to track the job: http://shap000000407:8088/proxy/application_1525338489458_0002/
    2018-05-08 21:29:18,136 INFO  [main] mapreduce.Job: Running job: job_1525338489458_0002
    2018-05-08 21:29:28,248 INFO  [main] mapreduce.Job: Job job_1525338489458_0002 running in uber mode : false
    2018-05-08 21:29:28,249 INFO  [main] mapreduce.Job:  map 0% reduce 0%
    2018-05-08 21:29:38,344 INFO  [main] mapreduce.Job:  map 100% reduce 0%
    2018-05-08 21:29:51,421 INFO  [main] mapreduce.Job:  map 100% reduce 100%
    2018-05-08 21:29:51,428 INFO  [main] mapreduce.Job: Job job_1525338489458_0002 completed successfully
    2018-05-08 21:29:51,523 INFO  [main] mapreduce.Job: Counters: 50
  4. 执行如下命令将生成的HFile导入HBase中:

    hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles </path/for/output> <tablename>

    例如执行以下命令:

    hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /dataOutput bulkTable

    输出:

    [root@shap000000406 opt]# hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /dataOutput bulkTable
    2018-05-08 21:30:01,398 WARN  [main] mapreduce.LoadIncrementalHFiles: Skipping non-directory hdfs://hacluster/dataOutput/_SUCCESS
    2018-05-08 21:30:02,006 INFO  [LoadIncrementalHFiles-0] hfile.CacheConfig: Created cacheConfig: CacheConfig:disabled
    2018-05-08 21:30:02,006 INFO  [LoadIncrementalHFiles-2] hfile.CacheConfig: Created cacheConfig: CacheConfig:disabled
    2018-05-08 21:30:02,006 INFO  [LoadIncrementalHFiles-1] hfile.CacheConfig: Created cacheConfig: CacheConfig:disabled
    2018-05-08 21:30:02,085 INFO  [LoadIncrementalHFiles-2] compress.CodecPool: Got brand-new decompressor [.snappy]
    2018-05-08 21:30:02,120 INFO  [LoadIncrementalHFiles-0] mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/address/042426c252f74e859858c7877b95e510 first=12005000201 last=12005000210
    2018-05-08 21:30:02,120 INFO  [LoadIncrementalHFiles-2] mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/info/f3995920ae0247a88182f637aa031c49 first=12005000201 last=12005000210
    2018-05-08 21:30:02,128 INFO  [LoadIncrementalHFiles-1] mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/d/c53b252248af42779f29442ab84f86b8 first=\x00index_bulk\x00\x00\x00\x00\x00\x00\x00\x0018\x00\x0012005000204 last=\x00index_bulk\x00\x00\x00\x00\x00\x00\x00\x0032\x00\x0012005000206
    2018-05-08 21:30:02,231 INFO  [main] client.ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService
    2018-05-08 21:30:02,231 INFO  [main] client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x81007c2cf0f55cc5
    2018-05-08 21:30:02,235 INFO  [main] zookeeper.ZooKeeper: Session: 0x81007c2cf0f55cc5 closed
    2018-05-08 21:30:02,235 INFO  [main-EventThread] zookeeper.ClientCnxn: EventThread shut down for session: 0x81007c2cf0f55cc5