更新时间:2025-12-08 GMT+08:00

使用BulkLoad工具批量导入HBase数据

操作场景

您可以按照自定义的方式,通过命令批量导入数据到HBase中并创建索引。可以在“configuration.xml”文件中定义多个方式来批量导入数据,但导入数据时可不创建索引。

约束与限制

  • 列的名称不能包含特殊字符,只能由字母、数字和下划线组成。
  • BulkLoad支持的数据源格式为带分隔符的文本文件。
  • 如果批量导入数据时创建二级索引,还需注意:
    • 当将列的类型设置为string时,不能设置其长度。例如“<column index="1" type="string" length="1" >COLUMN_1</column>”,此类型不支持。
    • 当将列的类型设置为date时,不能设置其日期格式。例如“<column index="13" type="date" format="yyyy-MM-dd hh:mm:ss">COLUMN_13</column>”,此类型不支持。
    • 不能针对组合列建立二级索引。
    • 当前工具仅支持本地索引,不支持全局索引,全局索引操作请参见批量加载HBase全局二级索引数据

前提条件

已安装客户端。例如安装目录为“/opt/hadoopclient”,以下操作的客户端目录只是举例,请根据实际安装目录修改。

使用BulkLoad工具批量导入HBase数据

  1. 以客户端安装用户,登录安装客户端的节点。
  2. 执行以下命令切换到客户端目录

    cd /opt/hadoopclient

  3. 执行以下命令配置环境变量

    source bigdata_env

  4. 如果当前集群已启用Kerberos认证(安全模式),执行以下命令认证当前用户,当前用户需要具有提交Yarn任务的权限、创建和写入HBase表的权限和HDFS的操作权限:

    kinit 组件业务用户

    如果当前集群未启用Kerberos认证(普通模式),则执行以下命令设置Hadoop用户名:

    export HADOOP_USER_NAME=hbase

  5. 将数据导入到HDFS中。

    创建HDFS目录:

    hdfs dfs -mkdir <inputdir>

    将文件上传至HDFS目录中:

    hdfs dfs -put <local_data_file> <inputdir>

    例如定义数据文件“data.txt”,内容如下:

    001,Hadoop,citya
    002,HBaseFS,cityb
    003,HBase,cityc
    004,Hive,cityd
    005,Streaming,citye
    006,MapReduce,cityf
    007,Kerberos,cityg
    008,LdapServer,cityh

    执行以下命令将数据文件导入到HDFS中

    创建“/datadirImport”目录:

    hdfs dfs -mkdir /datadirImport

    上传“data.txt”文件至HDFS的“/datadirImport”目录中:

    hdfs dfs -put data.txt /datadirImport

  6. 进入hbase shell,创建表ImportTable并创建“configuration.xml”文件(该文件可以参考模板文件进行编辑,模板文件获取路径为:“/opt/client/HBase/hbase/conf/import.xml.template”)。

    例如执行以下命令建表:

    create 'ImportTable', {NAME => 'f1',COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'FAST_DIFF'},{NAME=>'f2'}

    例如自定义导入模板文件“configuration.xml”,内容如下:

    • column_num要和数据文件中的列的数量对应。
    • family的指定要和表的列族名称对应。
    • 仅当批量导入数据时创建二级索引才需配置以下参数,且索引类型的首字母需要大写,例如type="String";以下片段中length="30"表示索引列“H_ID”的列值不能超过30个字符:
                           <indices> 
                               <index name="IDX1"> 
                                       <index_column family="f1"> 
                                               <qualifier type="String" length="30">H_ID</qualifier> 
                                       </index_column> 
                               </index> 
                       </indices> 
    <?xml version="1.0" encoding="UTF-8"?> 
      
     <configuration> 
             <import id="first" column_num="3"> 
                     <columns> 
                             <column index="1" type="int">SMS_ID</column> 
                             <column index="2" type="string">SMS_NAME</column> 
                             <column index="3" type="string">SMS_ADDRESS</column> 
                     </columns> 
      
                     <rowkey> 
                             SMS_ID+'_'+substring(SMS_NAME,1,4)+'_'+reverse(SMS_ADDRESS) 
                     </rowkey> 
      
                     <qualifiers> 
                             <normal family="f1"> 
                                     <qualifier column="SMS_ID">H_ID</qualifier> 
                                     <qualifier column="SMS_NAME">H_NAME</qualifier> 
                                     <qualifier column="SMS_ADDRESS">H_ADDRESS</qualifier> 
                             </normal> 
      
                             <!-- Define composite columns --> 
                             <composite family="f2"> 
                                     <qualifier class="com.huawei.H_COMBINE_1">H_COMBINE_1</qualifier> 
                                     <columns> 
                                             <column>SMS_ADDRESS</column> 
                                             <column>SMS_NAME</column> 
                                     </columns> 
                             </composite> 
      
                     </qualifiers> 
      
                         <indices> 
                             <index name="IDX1"> 
                                     <index_column family="f1"> 
                                             <qualifier type="String" length="30">H_ID</qualifier> 
                                     </index_column> 
                             </index> 
                     </indices> 
      
                     <badlines>SMS_ID &lt; 7000 &amp;&amp; SMS_NAME == 'HBase'</badlines>
             </import> 
     </configuration>     

  7. 执行如下命令,生成HFile文件

    hbase com.huawei.hadoop.hbase.tools.bulkload.ImportData -Dimport.skip.bad.lines=true -Dimport.separator=<separator> -Dimport.bad.lines.output=</path/badlines/output> -Dimport.hfile.output=</path/for/output> <configuration xmlfile> <tablename> <inputdir>
    • 如果HBase配置了透明加密功能,执行bulkload命令的HBase用户需要添加到对应集群的hadoop用户组,且具有HBase根目录的加密key的读权限。
    • 检查目录“/tmp/hbase”的权限,需要手动添加当前用户对该目录的写权限。
    表1 生成HFile文件参数介绍

    参数

    参数说明

    -Dimport.skip.bad.lines

    表示遇到不合适的行是否继续执行:

    • 指定值为“false”,表示遇到不适用的行则停止执行。
    • 指定值为“true”,表示遇到不适用的数据行则跳过该行继续执行。

    如果没有在“configuration.xml”中定义不适用行,该参数不需要添加。

    -Dimport.separator

    表示分隔符,例如-Dimport.separator=','

    -Dimport.bad.lines.output

    表示不适用的数据行输出路径,如果没有在“configuration.xml”中定义不适用行,该参数不需要添加。

    -Dimport.hfile.output

    表示执行结果输出路径。

    HBase如果配置了透明加密功能,在执行bulkload命令生成HFile时,“-Dimport.hfile.output”指定的HFile路径必须为“/HBase根目录/extdata”的子目录,例如“/hbase/extdata/bulkloadTmp/hfile”。

    <configuration xmlfile>

    表示configuration配置文件。

    <tablename>

    表示要操作的表名。

    <inputdir>

    表示要批量上传的数据目录。

    例如执行以下命令:

    • 示例一:
      hbase com.huawei.hadoop.hbase.tools.bulkload.ImportData -Dimport.skip.bad.lines=true -Dimport.separator=',' -Dimport.bad.lines.output=/badline -Dimport.hfile.output=/hfile configuration.xml ImportTable /datadirImport
    • 示例二:
      hbase com.huawei.hadoop.hbase.tools.bulkload.IndexImportData -Dimport.skip.bad.lines=true -Dimport.separator=',' -Dimport.bad.lines.output=/badline -Dimport.hfile.output=/hfile configuration_index.xml IndexImportTable /datadirIndexImport

  8. 执行如下命令将HFile导入HBase

    • 批量导入数据:
      hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles </path/for/output> <tablename>

      例如执行以下命令:

      hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /hfile ImportTable
    • 批量导入数据时创建二级索引:
      hbase org.apache.hadoop.hbase.hindex.mapreduce.HIndexLoadIncrementalHFiles </path/for/output> <tablename>

      例如执行以下命令:

      hbase org.apache.hadoop.hbase.hindex.mapreduce.HIndexLoadIncrementalHFiles /hfile IndexImportTable

相关文档

大任务下MapReduce任务运行失败,处理方法请参考MapReduce任务运行失败,ApplicationMaster出现物理内存溢出异常