Updated on 2024-04-29 GMT+08:00

Sample Code for Copying a Table

You can develop an application and call HBase APIs to copy some or all data in a table either to the same cluster or another cluster. The target table must exist first.

Environment Preparation

  1. If you want to copy a table to another cluster, obtain the access address (Intranet) of the target CloudTable cluster first.

    Log in to the CloudTable console and choose Cluster Management. In the cluster list, locate the required cluster and obtain the address in the Access Address (Intranet) column.

  2. Before copying table data, ensure that the target table exists in the target CloudTable cluster. If the target table does not exist, create it first.

    For details about how to create a table, see Creating an HBase Cluster.

  3. Download the sample code and prepare the development environment by referring to the corresponding section.

    If you do not use the sample code project, copy the JAR file in the cloudtable-example\lib directory in the downloaded sample code to your project and add the JAR file to the dependency path in the project.

Sample Code for Copying a Table

You can develop applications based on HBase APIs to copy table data to clusters based on service requirements. The following sample code is for reference only.

The following sample code is used to copy table data from one cluster to another cluster.

package com.huawei.cloudtable.hbase.tool.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;

public class DataCopyer {
  public static final Log LOG = LogFactory.getLog(DataCopyer.class);
  private static int batchSize;
  private static int threads;

  public static void main(String[] args) throws IOException {
    if (args.length != 5) {
      System.err.println(
          "Command : ./hbase com.huawei.cloudtable.hbase.tool.client.DataCopyer [srcZK] [dstZK] [tableName] [batchSize] [threads]");
      System.exit(-1);
    }
    batchSize = Integer.valueOf(args[3]);
    threads = Integer.valueOf(args[4]);
    copyData(args[0], args[1], args[2]);
  }

  public static void copyData(String srcZk, String dstZk, String tableName) throws IOException {
    // Create a source HBase link.
    Configuration srcConf = HBaseConfiguration.create();
    srcConf.set("hbase.zookeeper.quorum", srcZk);
    //srcConf.set("zookeeper.znode.parent", "/hbase");
    Connection srcConnection = ConnectionFactory.createConnection(srcConf);

    // Create a target HBase link.
    Configuration dstConf = HBaseConfiguration.create();
    dstConf.set("hbase.zookeeper.quorum", dstZk);
    //dstConf.set("zookeeper.znode.parent", "/hbase_1.3.1");
    Connection dstConnection = ConnectionFactory.createConnection(dstConf);

    // Copy a table.
    copyDataTable(srcConnection, dstConnection, TableName.valueOf(tableName));

  }

  private static void copyDataTable(final Connection srcConnection, final Connection dstConnection,
      final TableName tableName) throws IOException {
    try (Admin admin = srcConnection.getAdmin()) {
      // Obtain the region information of the data table.
      List<HRegionInfo> tableRegions = admin.getTableRegions(tableName);
      Collections.shuffle(tableRegions);

      // Create a thread pool and concurrently copy data.
      BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(10000);
      ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads, 60, TimeUnit.SECONDS,
          workQueue);

      for (final HRegionInfo regionInfo : tableRegions) {
        executor.submit(new Runnable() {
          @Override
          public void run() {
            try (final Table srcTable = srcConnection.getTable(tableName);
                final Table dstTable = dstConnection.getTable(tableName)) {
              LOG.info("Start to copy region " + regionInfo.toString());
              Scan scan = new Scan();
              scan.setStartRow(regionInfo.getStartKey());
              scan.setStopRow(regionInfo.getEndKey());
              scan.setCaching(batchSize);
              copyOneRegion(srcTable, dstTable, scan, batchSize);
            } catch (IOException e) {
              LOG.error("CopyData failed .............", e);
              System.exit(-1);
            }
            LOG.info("Stop to copy region " + regionInfo.toString());
          }
        });
      }
    }
  }

  private static void copyOneRegion(Table srcTable, Table dstTable, Scan scan, int batchSize)
      throws IOException {
    ResultScanner scanner = srcTable.getScanner(scan);
    Result result = null;
    List<Put> puts = new ArrayList<>();
    long counter = 0;
    while ((result = scanner.next()) != null) {
      Put put = new Put(result.getRow());
      for (Cell cell : result.rawCells()) {
        put.add(cell);
      }
      puts.add(put);

      counter += 1;
      if (puts.size() >= batchSize) {
        dstTable.put(puts);
        puts.clear();
      }
      if (counter % 10000 == 0) {
        LOG.info(srcTable + " has send: " + counter);
      }
    }
    if (puts.size() != 0) {
      dstTable.put(puts);
    }
  }
}