数据表复制样例代码
用户可以开发应用程序,通过调用HBase API接口的方式,将一个表的部分数据或全部数据复制到同一个集群或另一个集群中。目标表必须预先存在。
环境准备
- 要将表复制到另一个集群,请先获取CloudTable目标集群的“ZK链接地址(内网)”。
登录表格存储服务管理控制台,在左侧导航树单击集群管理,然后在集群列表中找到所需要的集群,并获取相应的“ZK链接地址(内网)”。
- 在复制表数据前,需要先确保在CloudTable目标集群中目标表已经存在。如果目标表不存在,请先创建目标表。
创建表的具体步骤,请参考创建HBase集群。
- 下载样例代码,然后参考准备开发环境章节准备开发环境。
如果您不准备使用样例代码的工程,需要将下载的样例代码中“cloudtable-example\lib”目录下的jar包拷贝到您的项目中,并在项目工程中将这些Jar包添加到依赖路径中。
数据表复制样例代码
用户可以根据实际的业务需求基于HBase API开发应用程序,复制表数据到集群。以下样例代码,可供用户参考。
以下样例代码的场景是将一个集群的表数据复制到另一个集群中。
package com.huawei.cloudtable.hbase.tool.client; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; public class DataCopyer { public static final Log LOG = LogFactory.getLog(DataCopyer.class); private static int batchSize; private static int threads; public static void main(String[] args) throws IOException { if (args.length != 5) { System.err.println( "Command : ./hbase com.huawei.cloudtable.hbase.tool.client.DataCopyer [srcZK] [dstZK] [tableName] [batchSize] [threads]"); System.exit(-1); } batchSize = Integer.valueOf(args[3]); threads = Integer.valueOf(args[4]); copyData(args[0], args[1], args[2]); } public static void copyData(String srcZk, String dstZk, String tableName) throws IOException { // 创建源HBase链接 Configuration srcConf = HBaseConfiguration.create(); srcConf.set("hbase.zookeeper.quorum", srcZk); //srcConf.set("zookeeper.znode.parent", "/hbase"); Connection srcConnection = ConnectionFactory.createConnection(srcConf); // 创建目的HBase链接 Configuration dstConf = HBaseConfiguration.create(); dstConf.set("hbase.zookeeper.quorum", dstZk); //dstConf.set("zookeeper.znode.parent", "/hbase_1.3.1"); Connection dstConnection = ConnectionFactory.createConnection(dstConf); // 复制数据表 copyDataTable(srcConnection, dstConnection, TableName.valueOf(tableName)); } private static void copyDataTable(final Connection srcConnection, final Connection dstConnection, final TableName tableName) throws IOException { try (Admin admin = srcConnection.getAdmin()) { // 先获取DATA表的Region信息 List<HRegionInfo> tableRegions = admin.getTableRegions(tableName); Collections.shuffle(tableRegions); // 创建线程池,并发复制数据 BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(10000); ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads, 60, TimeUnit.SECONDS, workQueue); for (final HRegionInfo regionInfo : tableRegions) { executor.submit(new Runnable() { @Override public void run() { try (final Table srcTable = srcConnection.getTable(tableName); final Table dstTable = dstConnection.getTable(tableName)) { LOG.info("Start to copy region " + regionInfo.toString()); Scan scan = new Scan(); scan.setStartRow(regionInfo.getStartKey()); scan.setStopRow(regionInfo.getEndKey()); scan.setCaching(batchSize); copyOneRegion(srcTable, dstTable, scan, batchSize); } catch (IOException e) { LOG.error("CopyData failed .............", e); System.exit(-1); } LOG.info("Stop to copy region " + regionInfo.toString()); } }); } } } private static void copyOneRegion(Table srcTable, Table dstTable, Scan scan, int batchSize) throws IOException { ResultScanner scanner = srcTable.getScanner(scan); Result result = null; List<Put> puts = new ArrayList<>(); long counter = 0; while ((result = scanner.next()) != null) { Put put = new Put(result.getRow()); for (Cell cell : result.rawCells()) { put.add(cell); } puts.add(put); counter += 1; if (puts.size() >= batchSize) { dstTable.put(puts); puts.clear(); } if (counter % 10000 == 0) { LOG.info(srcTable + " has send: " + counter); } } if (puts.size() != 0) { dstTable.put(puts); } } }