更新时间:2024-09-18 GMT+08:00

使用BulkLoad工具批量导入HBase数据

操作场景

您可以按照自定义的方式,通过命令批量导入数据到HBase中并创建索引。

您可以在“configuration.xml”文件中定义多个方式来批量导入数据,导入数据时可不创建索引。

  • 列的名称不能包含特殊字符,只能由字母、数字和下划线组成。
  • 大任务下MapReduce任务运行失败,请参考MapReduce任务运行失败,ApplicationMaster出现物理内存溢出异常进行处理。
  • BulkLoad支持的数据源格式为带分隔符的文本文件。
  • 已安装客户端。例如安装目录为“/opt/hadoopclient”,以下操作的客户端目录只是举例,请根据实际安装目录修改。
  • 若批量导入数据时创建二级索引,还需注意:
    • 当将列的类型设置为string时,不能设置其长度。例如“<column index="1" type="string" length="1" >COLOUMN_1</column>”,此类型不支持。
    • 当将列的类型设置为date时,不能设置其日期格式。例如“<column index="13" type="date" format="yyyy-MM-dd hh:mm:ss">COLOUMN_13</column>”,此类型不支持。
    • 不能针对组合列建立二级索引。

使用BulkLoad工具批量导入HBase数据

  1. 以客户端安装用户,登录安装客户端的节点。
  2. 执行以下命令切换到客户端目录。

    cd /opt/hadoopclient

  3. 执行以下命令配置环境变量。

    source bigdata_env

  4. 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户,当前用户需要具有提交Yarn任务的权限、创建和写入HBase表的权限和HDFS的操作权限:

    kinit 组件业务用户

    如果当前集群未启用Kerberos认证,则执行以下命令设置Hadoop用户名:

    export HADOOP_USER_NAME=hbase

  5. 将数据导入到HDFS中。

    hdfs dfs -mkdir<inputdir>

    hdfs dfs -put<local_data_file> <inputdir>

    例如定义数据文件“data.txt”,内容如下:

    001,Hadoop,citya
    002,HBaseFS,cityb
    003,HBase,cityc
    004,Hive,cityd
    005,Streaming,citye
    006,MapReduce,cityf
    007,Kerberos,cityg
    008,LdapServer,cityh

    执行以下命令:

    hdfs dfs -mkdir /datadirImport

    hdfs dfs -put data.txt /datadirImport

  6. 进入hbase shell,创建表ImportTable并创建“configuration.xml”文件(该文件可以参考模板文件进行编辑,模板文件获取路径为:“/opt/client/HBase/hbase/conf/import.xml.template”)。

    例如执行以下命令建表:

    create 'ImportTable', {NAME => 'f1',COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'FAST_DIFF'},{NAME=>'f2'}

    例如自定义导入模板文件“configuration.xml”,内容如下:

    • column_num要和数据文件中的列的数量对应。
    • family的指定要和表的列族名称对应。
    • 仅当批量导入数据时创建二级索引才需配置以下参数,且索引类型的首字母需要大写,例如type="String";以下片段中length="30"表示索引列“H_ID”的列值不能超过30个字符:
                           <indices> 
                               <index name="IDX1"> 
                                       <index_column family="f1"> 
                                               <qualifier type="String" length="30">H_ID</qualifier> 
                                       </index_column> 
                               </index> 
                       </indices> 
    <?xml version="1.0" encoding="UTF-8"?> 
      
     <configuration> 
             <import id="first" column_num="3"> 
                     <columns> 
                             <column index="1" type="int">SMS_ID</column> 
                             <column index="2" type="string">SMS_NAME</column> 
                             <column index="3" type="string">SMS_ADDRESS</column> 
                     </columns> 
      
                     <rowkey> 
                             SMS_ID+'_'+substring(SMS_NAME,1,4)+'_'+reverse(SMS_ADDRESS) 
                     </rowkey> 
      
                     <qualifiers> 
                             <normal family="f1"> 
                                     <qualifier column="SMS_ID">H_ID</qualifier> 
                                     <qualifier column="SMS_NAME">H_NAME</qualifier> 
                                     <qualifier column="SMS_ADDRESS">H_ADDRESS</qualifier> 
                             </normal> 
      
                             <!-- Define composite columns --> 
                             <composite family="f2"> 
                                     <qualifier class="com.huawei.H_COMBINE_1">H_COMBINE_1</qualifier> 
                                     <columns> 
                                             <column>SMS_ADDRESS</column> 
                                             <column>SMS_NAME</column> 
                                     </columns> 
                             </composite> 
      
                     </qualifiers> 
      
                         <indices> 
                             <index name="IDX1"> 
                                     <index_column family="f1"> 
                                             <qualifier type="String" length="30">H_ID</qualifier> 
                                     </index_column> 
                             </index> 
                     </indices> 
      
                     <badlines>SMS_ID &lt; 7000 &amp;&amp; SMS_NAME == 'HBase'</badlines>
             </import> 
     </configuration>     

  7. 执行如下命令,生成HFile文件。

    hbase com.huawei.hadoop.hbase.tools.bulkload.ImportData -Dimport.skip.bad.lines=true-Dimport.separator=<separator>-Dimport.bad.lines.output=</path/badlines/output>-Dimport.hfile.output=</path/for/output> <configuration xmlfile> <tablename> <inputdir>

    • -Dimport.skip.bad.lines:指定值为“false”,表示遇到不适用的行则停止执行。指定值为“true”,表示遇到不适用的数据行则跳过该行继续执行,如果没有在“configuration.xml”中定义不适用行,该参数不需要添加。
    • -Dimport.separator:分隔符,例如,-Dimport.separator=','。
    • -Dimport.bad.lines.output=</path/badlines/output>:指的是不适用的数据行输出路径,如果没有在“configuration.xml”中定义不适用行,该参数不需要添加。
    • -Dimport.hfile.output=< /path/for/output>:指的是执行结果输出路径。
    • <configuration xmlfile>:指向configuration配置文件。
    • <tablename>:表示要操作的表名。
    • <inputdir>:表示要批量上传的数据目录。

    例如执行以下命令:

    • hbase com.huawei.hadoop.hbase.tools.bulkload.ImportData -Dimport.skip.bad.lines=true -Dimport.separator=',' -Dimport.bad.lines.output=/badline -Dimport.hfile.output=/hfile configuration.xml ImportTable /datadirImport
    • hbase com.huawei.hadoop.hbase.tools.bulkload.IndexImportData -Dimport.skip.bad.lines=true -Dimport.separator=',' -Dimport.bad.lines.output=/badline -Dimport.hfile.output=/hfile configuration_index.xml IndexImportTable /datadirIndexImport
    • 当HBase已经配置透明加密后,在执行bulkload命令生成HFile时,“-Dimport.hfile.output”指定的HFile路径必须为“/HBase根目录/extdata”的子目录,例如“/hbase/extdata/bulkloadTmp/hfile”。
    • 当HBase已经配置透明加密后,执行bulkload命令的HBase用户需要添加到对应集群的hadoop用户组(非FusionInsight Manager下第一个安装的集群,用户组为“c<集群ID>_hadoop”,例如“c2_hadoop”),且具有HBase根目录的加密key的读权限。
    • 检查目录“/tmp/hbase”的权限,需要手动添加当前用户对该目录的写权限。

  8. 执行如下命令将HFile导入HBase。

    • 批量导入数据:

      hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles </path/for/output> <tablename>

    • 批量导入数据时创建二级索引:

      hbase org.apache.hadoop.hbase.hindex.mapreduce.HIndexLoadIncrementalHFiles </path/for/output> <tablename>

    例如执行以下命令:

    • hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /hfile ImportTable
    • hbase org.apache.hadoop.hbase.hindex.mapreduce.HIndexLoadIncrementalHFiles /hfile IndexImportTable