更新时间:2025-12-08 GMT+08:00
分享

批量加载HBase全局二级索引数据

场景介绍

HBase本身支持使用ImportTsv和LoadIncremental工具批量加载用户数据,还支持使用GlobalIndexImportTsv和GlobalIndexBulkLoadHFilesTool加载用户数据的同时批量加载全局索引数据。GlobalIndexImportTsv继承了HBase批量加载数据工具ImportTsv的所有功能。

若在执行GlobalIndexImportTsv工具之前未建表,直接运行该工具,将会在创建表时创建全局索引,并在生成用户数据的同时生成索引数据。由于自动建表不会进行预分区,可能导致性能问题,因此建议提前建表后再运行GlobalIndexImportTsv工具进行数据加载。

本章节内容仅适用于MRS 3.3.1-LTS及之后版本。

操作步骤

  1. 以客户端安装用户登录安装了客户端的节点,并执行以下命令:

    切换至客户端安装目录:

    cd 客户端安装目录

    配置环境变量:

    source bigdata_env

    认证用户,集群未启用Kerberos认证(普通模式)请跳过该操作:

    kinit 组件业务用户

  2. 将数据导入到HDFS中。

    创建HDFS目录:

    hdfs dfs -mkdir <inputdir>

    将数据文件上传至HDFS中:

    hdfs dfs -put <local_data_file> <inputdir>

    例如定义数据文件“data.txt”,内容如下:

    12005000201,Zhang San,Male,19,City a,Province a
    12005000202,Li Wanting,Female,23,City b,Province b
    12005000203,Wang Ming,Male,26,City c,Province c
    12005000204,Li Gang,Male,18,City d,Province d
    12005000205,Zhao Enru,Female,21,City e,Province e
    12005000206,Chen Long,Male,32,City f,Province f
    12005000207,Zhou Wei,Female,29,City g,Province g
    12005000208,Yang Yiwen,Female,30,City h,Province h
    12005000209,Xu Bing,Male,26,City i,Province i
    12005000210,Xiao Kai,Male,25,City j,Province j

    执行以下命令导入“data.txt”文件至HDFS中:

    创建“datadirImport”目录:

    hdfs dfs -mkdir /datadirImport

    上传“data.txt”文件至HDFS的“datadirImport”目录中:

    hdfs dfs -put data.txt /datadirImport

  3. 执行以下命令创建表bulkTable

    1. 登录HBase客户端:
      hbase shell
    2. 创建表bulkTable
      create 'bulkTable', {NAME => 'info',COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'FAST_DIFF'},{NAME=>'address'}
    3. 表创建成功后,执行以下命令退出hbase shell
      !quit

  4. 执行如下命令,创建全局索引:

    hbase org.apache.hadoop.hbase.hindex.global.mapreduce.GlobalTableIndexer -Dtablename.to.index='bulkTable' -Dindexspecs.to.add='index_bulk=>info:[age->String]' -Dindexspecs.coveredallcolumn.to.add='index_bulk=>true' -Dindexspecs.splitkeys.to.set='index_bulk=>[\x010,\x011,\x012]'

    命令的具体使用请参考创建HBase全局二级索引

  5. 执行如下命令,生成HFile文件(StoreFiles):

    hbase org.apache.hadoop.hbase.hindex.global.mapreduce.GlobalIndexImportTsv -Dimporttsv.separator=<separator> -Dimporttsv.bulk.output=</path/for/output> <columns> tableName <inputdir>

    例如执行以下命令:

    hbase org.apache.hadoop.hbase.hindex.global.mapreduce.GlobalIndexImportTsv -Dimporttsv.separator=',' -Dimporttsv.bulk.output=/dataOutput -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,address:city,address:province bulkTable /datadirImport/data.txt

    参数介绍请参见表1

    • 数据类型不区分大小写。
    • indexspecs.covered.to.addindexspecs.covered.family.to.addindexspecs.coveredallcolumn.to.addindexspecs.splitkeys.to.setindexspecs.to.add五个参数只有在要操作的表不存在并且需要自动建表时才会生效。
    表1 参数介绍

    参数

    参数说明

    -Dimport.separator

    用于指定分隔符,例如-Dimport.separator=','

    -Dimport.bulk.output

    表示执行结果输出路径,需指定一个不存在的路径。

    <columns>

    表示导入数据在表中的对应关系,例如-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:gender,info:age,address:city,address:province

    tableName

    表示要操作的表名。

    <inputdir>

    表示要批量导入的数据目录。

    -Dindexspecs.covered.to.add(可选)

    表示索引中冗余存储的数据表的列(覆盖列定义),例如:-Dindexspecs.covered.to.add='IDX1=>cf1:[q1];cf2:[q1]#IDX2=>cf0:[q5]'

    -Dindexspecs.covered.family.to.add(可选)

    表示索引表冗余存储的数据表的列族(覆盖列族定义),例如:-Dindexspecs.covered.family.to.add='IDX1=>cf_0#IDX2=>cf_1;cf_2'

    -Dindexspecs.coveredallcolumn.to.add(可选)

    表示索引表冗余存储数据表中的所有数据(覆盖所有列),例如: -Dindexspecs.coveredallcolumn.to.add='IDX1=>true#IDX2=>true'

    -Dindexspecs.splitkeys.to.set(可选)

    表示索引表预分区切分点,建议指定,避免索引表Region成为热点,例如,预分区指定格式为:
    • '#'用于分隔索引。
    • splitkey包含在'[]'中。
    • ','用于分隔splitkey。

    例如:-Dindexspecs.splitkeys.to.set='IDX1=>[1,2,3]#IDX2=>[a,b,c]'

    -Dindexspecs.to.add=<indexspecs>(可选)

    表示索引名与列的映射,例如:-Dindexspecs.to.add='index_bulk=>info:[age->String]'。 其构成可以表示如下:

    indexNameN=>familyN :[columnQualifierN-> columnQualifierDataType], [columnQualifierM-> columnQualifierDataType];familyM: [columnQualifierO-> columnQualifierDataType]# indexNameN=> familyM: [columnQualifierO-> columnQualifierDataType]

    其中:

    • 列限定符用逗号(,)分隔。例如:index1 => f1 :[c1-> String], [c2-> String]
    • 列族由分号(;)分隔。例如:index1 => f1 :[c1-> String], [c2-> String]; f2 :[c3-> Long]
    • 多个索引由#号键()分隔。例如:index1 => f1 :[c1-> String], [c2-> String]; f2 :[c3-> Long]#index2 => f2 :[c3-> Long]
    • 列限定的数据类型。

      可用的数据类型有:STRING、INTEGER、FLOAT\LONG、DOUBLE、SHORT、BYTE、CHAR。

  6. 执行如下命令将生成的HFile导入HBase中:

    hbase org.apache.hadoop.hbase.tool.GlobalIndexBulkLoadHFilesTool </path/for/output> <tablename>

    索引数据生成和加载过程中,禁止对索引进行更改,包括但不限于新增索引、删除索引、索引状态变更等操作,否则会因为无法保证索引数据的一致性导致运行的任务失败,需要待索引状态稳定后再重新执行任务。

    例如执行以下命令:

    hbase org.apache.hadoop.hbase.tool.GlobalIndexBulkLoadHFilesTool /dataOutput bulkTable

    输出内容为:

    2024-01-13 18:29:03,043 INFO  [GlobalIndexBulkLoadHFiles-0] hdfs.DFSClient: Created token for admintest: HDFS_DELEGATION_TOKEN owner=admintest@HADOOP.COM, renewer=renewer, realUser=, issueDate=1705141743030, maxDate=1705746543030, sequenceNumber=4261, masterKeyId=5 on ha-hdfs:hacluster
    2024-01-13 18:29:03,123 INFO  [LoadIncrementalHFiles-0] compress.CodecPool: Got brand-new decompressor [.snappy]
    2024-01-13 18:29:03,127 INFO  [LoadIncrementalHFiles-0] compress.CodecPool: Got brand-new decompressor [.snappy]
    2024-01-13 18:29:03,127 INFO  [LoadIncrementalHFiles-1] compress.CodecPool: Got brand-new decompressor [.snappy]
    2024-01-13 18:29:03,127 INFO  [LoadIncrementalHFiles-4] compress.CodecPool: Got brand-new decompressor [.snappy]
    2024-01-13 18:29:03,128 INFO  [LoadIncrementalHFiles-0] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/0/8610217824254455849576409ebf8f53 first=Optional[\x0118\x00\x0112005000204\x00] last=Optional[\x0119\x00\x0112005000201\x00]
    2024-01-13 18:29:03,128 INFO  [LoadIncrementalHFiles-1] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/0/fa17bc8e753341ffa0ba9e702200c04a first=Optional[\x0121\x00\x0112005000205\x00] last=Optional[\x0132\x00\x0112005000206\x00]
    2024-01-13 18:29:03,129 INFO  [LoadIncrementalHFiles-2] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/address/7a0308810d264d61bda32c385f50260c first=Optional[\x0121\x00\x0112005000205\x00] last=Optional[\x0132\x00\x0112005000206\x00]
    2024-01-13 18:29:03,129 INFO  [LoadIncrementalHFiles-4] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/info/27cb42f48cb14597badb6cf8b302d4e8 first=Optional[\x0118\x00\x0112005000204\x00] last=Optional[\x0119\x00\x0112005000201\x00]
    2024-01-13 18:29:03,130 INFO  [LoadIncrementalHFiles-3] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/address/fe8487c5e2cf4bbaaeb9e638b8acc2c1 first=Optional[\x0118\x00\x0112005000204\x00] last=Optional[\x0119\x00\x0112005000201\x00]
    2024-01-13 18:29:03,131 INFO  [LoadIncrementalHFiles-5] tool.LoadIncrementalHFiles: Trying to load hfile=hdfs://hacluster/dataOutput/bulkTable.index_bulk/info/657937b1edd6401b8f5575e42e7ec92b first=Optional[\x0121\x00\x0112005000205\x00] last=Optional[\x0132\x00\x0112005000206\x00]
    2024-01-13 18:29:03,539 INFO  [GlobalIndexBulkLoadHFiles-0] hdfs.DFSClient: Cancelling token for admintest: HDFS_DELEGATION_TOKEN owner=admintest@HADOOP.COM, renewer=renewer, realUser=, issueDate=1705141743030, maxDate=1705746543030, sequenceNumber=4261, masterKeyId=5 on ha-hdfs:hacluster
    2024-01-13 18:29:03,571 INFO  [GlobalIndexBulkLoadHFiles-0] client.ConnectionImplementation: Closing master protocol: MasterService
    2024-01-13 18:29:03,678 INFO  [GlobalIndexBulkLoadHFiles-0-EventThread] zookeeper.ClientCnxn: EventThread shut down for session: 0x3201ef383210e59e
    2024-01-13 18:29:03,678 INFO  [GlobalIndexBulkLoadHFiles-0] zookeeper.ZooKeeper: Connection: 0x3201ef383210e59e closed
    2024-01-13 18:29:03,679 INFO  [GlobalIndexBulkLoadHFiles-0] client.ConnectionImplementation: Connection has been closed by GlobalIndexBulkLoadHFiles-0.

相关文档