更新时间:2024-11-29 GMT+08:00

批量加载索引数据

场景介绍

HBase本身支持使用ImportTsv和LoadIncremental工具批量加载用户数据,还支持使用GlobalIndexImportTsv和GlobalIndexBulkLoadHFilesTool加载用户数据的同时批量加载全局索引数据。GlobalIndexImportTsv继承了HBase批量加载数据工具ImportTsv的所有功能。

若在执行GlobalIndexImportTsv工具之前未建表,直接运行该工具,将会在创建表时创建全局索引,并在生成用户数据的同时生成索引数据。由于自动建表不会进行预分区,可能导致性能问题,因此建议提前建表后再运行GlobalIndexImportTsv工具进行数据加载。

操作步骤

  1. 以客户端安装用户登录安装了客户端的节点,并执行以下命令:

    cd 客户端安装目录

    source bigdata_env

    kinit 组件业务用户集群未启用Kerberos认证(普通模式)请跳过该操作)

  2. 将数据导入到HDFS中。

    hdfs dfs -mkdir <inputdir>

    hdfs dfs -put <local_data_file> <inputdir>

    例如定义数据文件“data.txt”,内容如下:

    12005000201,Zhang San,Male,19,City a,Province a
    12005000202,Li Wanting,Female,23,City b,Province b
    12005000203,Wang Ming,Male,26,City c,Province c
    12005000204,Li Gang,Male,18,City d,Province d
    12005000205,Zhao Enru,Female,21,City e,Province e
    12005000206,Chen Long,Male,32,City f,Province f
    12005000207,Zhou Wei,Female,29,City g,Province g
    12005000208,Yang Yiwen,Female,30,City h,Province h
    12005000209,Xu Bing,Male,26,City i,Province i
    12005000210,Xiao Kai,Male,25,City j,Province j

    执行以下命令导入至HDFS中:

    hdfs dfs -mkdir /datadirImport

    hdfs dfs -put data.txt /datadirImport

  3. 执行以下命令创建表bulkTable

    hbase shell

    create 'bulkTable', {NAME => 'info',COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'FAST_DIFF'},{NAME=>'address'}

    表创建成功后退出HBase Shell命令行。

  4. 执行如下命令,创建全局索引:

    hbase org.apache.hadoop.hbase.hindex.global.mapreduce.GlobalTableIndexer -Dtablename.to.index='bulkTable' -Dindexspecs.to.add='index_bulk=>info:[age->String]' -Dindexspecs.coveredallcolumn.to.add='index_bulk=>true' -Dindexspecs.splitkeys.to.set='index_bulk=>[\x010,\x011,\x012]'

    命令的具体使用请参考创建索引

  5. 执行如下命令,生成HFile文件(StoreFiles):

    hbase org.apache.hadoop.hbase.hindex.global.mapreduce.GlobalIndexImportTsv -Dimporttsv.separator=<separator>

    -Dimporttsv.bulk.output=</path/for/output> <columns> tableName <inputdir>

    • -Dimport.separator:分隔符,例如:-Dimport.separator=','
    • -Dimport.bulk.output=</path/for/output>:表示执行结果输出路径,需指定一个不存在的路径。
    • <columns>:表示导入数据在表中的对应关系,例如:-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,address:city,address:province
    • <tablename>:表示要操作的表名。
    • <inputdir>:表示要批量导入的数据目录。
    • -Dindexspecs.covered.to.add(可选):表示索引中冗余存储的数据表的列(覆盖列定义),例如:-Dindexspecs.covered.to.add='IDX1=>cf1:[q1];cf2:[q1]#IDX2=>cf0:[q5]'
    • -Dindexspecs.covered.family.to.add(可选):表示索引表冗余存储的数据表的列族(覆盖列族定义),例如:-Dindexspecs.covered.family.to.add='IDX1=>cf_0#IDX2=>cf_1;cf_2'
    • -Dindexspecs.coveredallcolumn.to.add(可选):表示索引表冗余存储数据表中的所有数据(覆盖所有列),例如: -Dindexspecs.coveredallcolumn.to.add='IDX1=>true#IDX2=>true'
    • -Dindexspecs.splitkeys.to.set(可选):表示索引表预分区切分点,建议指定,避免索引表Region成为热点,例如,预分区指定格式为:
      • '#'用于分隔索引。
      • splitkey包含在'[]'中。
      • ','用于分隔splitkey。

      例如:-Dindexspecs.splitkeys.to.set='IDX1=>[1,2,3]#IDX2=>[a,b,c]'

    • -Dindexspecs.to.add=<indexspecs>(可选):表示索引名与列的映射,例如:-Dindexspecs.to.add='index_bulk=>info:[age->String]'。 其构成可以表示如下:

      indexNameN=>familyN :[columnQualifierN-> columnQualifierDataType], [columnQualifierM-> columnQualifierDataType];familyM: [columnQualifierO-> columnQualifierDataType]# indexNameN=> familyM: [columnQualifierO-> columnQualifierDataType]

      其中:

      • 列限定符用逗号(,)分隔。例如:index1 => f1 :[c1-> String], [c2-> String]
      • 列族由分号(;)分隔。例如:index1 => f1 :[c1-> String], [c2-> String]; f2 :[c3-> Long]
      • 多个索引由#号键()分隔。例如:index1 => f1 :[c1-> String], [c2-> String]; f2 :[c3-> Long]#index2 => f2 :[c3-> Long]
      • 列限定的数据类型。

        可用的数据类型有:STRING、INTEGER、FLOAT\LONG、DOUBLE、SHORT、BYTE、CHAR。

    • 数据类型不区分大小写。
    • indexspecs.covered.to.addindexspecs.covered.family.to.addindexspecs.coveredallcolumn.to.addindexspecs.splitkeys.to.setindexspecs.to.add五个参数只有在要操作的表不存在并且需要自动建表时才会生效。

    例如执行以下命令:

    hbase org.apache.hadoop.hbase.hindex.global.mapreduce.GlobalIndexImportTsv -Dimporttsv.separator=',' -Dimporttsv.bulk.output=/dataOutput -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,address:city,address:province bulkTable /datadirImport/data.txt

  6. 执行如下命令将生成的HFile导入HBase中:

    hbase org.apache.hadoop.hbase.tool.GlobalIndexBulkLoadHFilesTool </path/for/output> <tablename>

    例如执行以下命令:

    hbase org.apache.hadoop.hbase.tool.GlobalIndexBulkLoadHFilesTool /dataOutput bulkTable

    输出内容为:

    2024-01-13 18:29:03,043 INFO  [GlobalIndexBulkLoadHFiles-0] hdfs.DFSClient: Created token for admintest: HDFS_DELEGATION_TOKEN owner=admintest@HADOOP.COM, renewer=renewer, realUser=, issueDate=1705141743030, maxDate=1705746543030, sequenceNumber=4261, masterKeyId=5 on ha-hdfs:hacluster
    2024-01-13 18:29:03,123 INFO  [LoadIncrementalHFiles-0] compress.CodecPool: Got brand-new decompressor [.snappy]
    2024-01-13 18:29:03,127 INFO  [LoadIncrementalHFiles-0] compress.CodecPool: Got brand-new decompressor [.snappy]
    2024-01-13 18:29:03,127 INFO  [LoadIncrementalHFiles-1] compress.CodecPool: Got brand-new decompressor [.snappy]
    2024-01-13 18:29:03,127 INFO  [LoadIncrementalHFiles-4] compress.CodecPool: Got brand-new decompressor [.snappy]
    2024-01-13 18:29:03,128 INFO  [LoadIncrementalHFiles-0] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/0/8610217824254455849576409ebf8f53 first=Optional[\x0118\x00\x0112005000204\x00] last=Optional[\x0119\x00\x0112005000201\x00]
    2024-01-13 18:29:03,128 INFO  [LoadIncrementalHFiles-1] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/0/fa17bc8e753341ffa0ba9e702200c04a first=Optional[\x0121\x00\x0112005000205\x00] last=Optional[\x0132\x00\x0112005000206\x00]
    2024-01-13 18:29:03,129 INFO  [LoadIncrementalHFiles-2] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/address/7a0308810d264d61bda32c385f50260c first=Optional[\x0121\x00\x0112005000205\x00] last=Optional[\x0132\x00\x0112005000206\x00]
    2024-01-13 18:29:03,129 INFO  [LoadIncrementalHFiles-4] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/info/27cb42f48cb14597badb6cf8b302d4e8 first=Optional[\x0118\x00\x0112005000204\x00] last=Optional[\x0119\x00\x0112005000201\x00]
    2024-01-13 18:29:03,130 INFO  [LoadIncrementalHFiles-3] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/address/fe8487c5e2cf4bbaaeb9e638b8acc2c1 first=Optional[\x0118\x00\x0112005000204\x00] last=Optional[\x0119\x00\x0112005000201\x00]
    2024-01-13 18:29:03,131 INFO  [LoadIncrementalHFiles-5] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/info/657937b1edd6401b8f5575e42e7ec92b first=Optional[\x0121\x00\x0112005000205\x00] last=Optional[\x0132\x00\x0112005000206\x00]
    2024-01-13 18:29:03,539 INFO  [GlobalIndexBulkLoadHFiles-0] hdfs.DFSClient: Cancelling token for admintest: HDFS_DELEGATION_TOKEN owner=admintest@HADOOP.COM, renewer=renewer, realUser=, issueDate=1705141743030, maxDate=1705746543030, sequenceNumber=4261, masterKeyId=5 on ha-hdfs:hacluster
    2024-01-13 18:29:03,571 INFO  [GlobalIndexBulkLoadHFiles-0] client.ConnectionImplementation: Closing master protocol: MasterService
    2024-01-13 18:29:03,678 INFO  [GlobalIndexBulkLoadHFiles-0-EventThread] zookeeper.ClientCnxn: EventThread shut down for session: 0x3201ef383210e59e
    2024-01-13 18:29:03,678 INFO  [GlobalIndexBulkLoadHFiles-0] zookeeper.ZooKeeper: Connection: 0x3201ef383210e59e closed
    2024-01-13 18:29:03,679 INFO  [GlobalIndexBulkLoadHFiles-0] client.ConnectionImplementation: Connection has been closed by GlobalIndexBulkLoadHFiles-0.

    索引数据生成和加载过程中,禁止对索引进行更改,包括但不限于新增索引、删除索引、索引状态变更等操作,否则会因为无法保证索引数据的一致性导致运行的任务失败,需要待索引状态稳定后再重新执行任务。