更新时间:2024-04-16 GMT+08:00
分享

数据表复制样例代码

用户可以开发应用程序,通过调用HBase API接口的方式,将一个表的部分数据或全部数据复制到同一个集群或另一个集群中。目标表必须预先存在。

环境准备

  1. 要将表复制到另一个集群,请先获取CloudTable目标集群的“ZK链接地址(内网)”。

    登录表格存储服务管理控制台,在左侧导航树单击集群管理,然后在集群列表中找到所需要的集群,并获取相应的“ZK链接地址(内网)”。

  2. 在复制表数据前,需要先确保在CloudTable目标集群中目标表已经存在。如果目标表不存在,请先创建目标表。

    创建表的具体步骤,请参考创建HBase集群

  3. 下载样例代码,然后参考准备开发环境章节准备开发环境。

    如果您不准备使用样例代码的工程,需要将下载的样例代码中“cloudtable-example\lib”目录下的jar包拷贝到您的项目中,并在项目工程中将这些Jar包添加到依赖路径中。

数据表复制样例代码

用户可以根据实际的业务需求基于HBase API开发应用程序,复制表数据到集群。以下样例代码,可供用户参考。

以下样例代码的场景是将一个集群的表数据复制到另一个集群中。

package com.huawei.cloudtable.hbase.tool.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;

public class DataCopyer {
  public static final Log LOG = LogFactory.getLog(DataCopyer.class);
  private static int batchSize;
  private static int threads;

  public static void main(String[] args) throws IOException {
    if (args.length != 5) {
      System.err.println(
          "Command : ./hbase com.huawei.cloudtable.hbase.tool.client.DataCopyer [srcZK] [dstZK] [tableName] [batchSize] [threads]");
      System.exit(-1);
    }
    batchSize = Integer.valueOf(args[3]);
    threads = Integer.valueOf(args[4]);
    copyData(args[0], args[1], args[2]);
  }

  public static void copyData(String srcZk, String dstZk, String tableName) throws IOException {
    // 创建源HBase链接
    Configuration srcConf = HBaseConfiguration.create();
    srcConf.set("hbase.zookeeper.quorum", srcZk);
    //srcConf.set("zookeeper.znode.parent", "/hbase");
    Connection srcConnection = ConnectionFactory.createConnection(srcConf);

    // 创建目的HBase链接
    Configuration dstConf = HBaseConfiguration.create();
    dstConf.set("hbase.zookeeper.quorum", dstZk);
    //dstConf.set("zookeeper.znode.parent", "/hbase_1.3.1");
    Connection dstConnection = ConnectionFactory.createConnection(dstConf);

    // 复制数据表
    copyDataTable(srcConnection, dstConnection, TableName.valueOf(tableName));

  }

  private static void copyDataTable(final Connection srcConnection, final Connection dstConnection,
      final TableName tableName) throws IOException {
    try (Admin admin = srcConnection.getAdmin()) {
      // 先获取DATA表的Region信息
      List<HRegionInfo> tableRegions = admin.getTableRegions(tableName);
      Collections.shuffle(tableRegions);

      // 创建线程池,并发复制数据
      BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(10000);
      ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads, 60, TimeUnit.SECONDS,
          workQueue);

      for (final HRegionInfo regionInfo : tableRegions) {
        executor.submit(new Runnable() {
          @Override
          public void run() {
            try (final Table srcTable = srcConnection.getTable(tableName);
                final Table dstTable = dstConnection.getTable(tableName)) {
              LOG.info("Start to copy region " + regionInfo.toString());
              Scan scan = new Scan();
              scan.setStartRow(regionInfo.getStartKey());
              scan.setStopRow(regionInfo.getEndKey());
              scan.setCaching(batchSize);
              copyOneRegion(srcTable, dstTable, scan, batchSize);
            } catch (IOException e) {
              LOG.error("CopyData failed .............", e);
              System.exit(-1);
            }
            LOG.info("Stop to copy region " + regionInfo.toString());
          }
        });
      }
    }
  }

  private static void copyOneRegion(Table srcTable, Table dstTable, Scan scan, int batchSize)
      throws IOException {
    ResultScanner scanner = srcTable.getScanner(scan);
    Result result = null;
    List<Put> puts = new ArrayList<>();
    long counter = 0;
    while ((result = scanner.next()) != null) {
      Put put = new Put(result.getRow());
      for (Cell cell : result.rawCells()) {
        put.add(cell);
      }
      puts.add(put);

      counter += 1;
      if (puts.size() >= batchSize) {
        dstTable.put(puts);
        puts.clear();
      }
      if (counter % 10000 == 0) {
        LOG.info(srcTable + " has send: " + counter);
      }
    }
    if (puts.size() != 0) {
      dstTable.put(puts);
    }
  }
}

相关文档