更新时间:2023-09-14 GMT+08:00
分享

示例

Configuration可以设置的参数

为了能够建立一个HBase Client端到HBase Server端的连接,需要设置如下几个参数。

  • hbase.zookeeper.quorum: ZooKeeper的IP。多个ZooKeeper节点的话,中间用“,”隔开。
  • hbase.zookeeper.property.clientPort: Zookeeper的端口。

通过HBaseConfiguration.create()创建的Configuration实例,会自动加载如下配置文件中的配置项:

  • core-default.xml
  • core-site.xml
  • hbase-default.xml
  • hbase-site.xml

因此,这4个配置文件,应该要放置在“Source Folder”下面(将一个文件夹设置为Source Folder的方法:如果在工程下面建立了一个resource的文件夹,那么,可以在该文件夹上右键鼠标,依次选择“Build Path”->“Use as Source Folder”即可,可参考下图)

下面是客户端可配置的一些参数集合。

在通常情况下,这些值都不建议修改。

参数名

参数解释

hbase.client.pause

每次异常或者其它情况下重试等待相关的时间参数(实际等待时间将根据该值与已重试次数计算得出)。

hbase.client.retries.number

异常或者其它情况下的重试次数。

hbase.client.retries.longer.multiplier

与重试次数有关。

hbase.client.rpc.maxattempts

RPC请求不可达时的重试次数。

hbase.regionserver.lease.period

与Scanner超时时间有关(单位ms)。

hbase.client.write.buffer

在启用AutoFlush的情况下,该值不起作用。如果未启用AotoFlush的话,HBase Client端会首先缓存写入的数据,达到设定的大小后才向HBase集群下发一次写入操作。

hbase.client.scanner.caching

Scan时一次next请求获取的行数。

hbase.client.keyvalue.maxsize

一条keyvalue数据的最大值。

hbase.htable.threads.max

HTable实例中与数据操作有关的最大线程数。

hbase.client.prefetch.limit

客户端在写数据或者读取数据时,需要首先获取对应的Region所在的地址。客户端可以预缓存一些Region地址,这个参数就是与缓存的数目有关的配置。

正确设置参数的方法:

hbaseConfig = HBaseConfiguration.create();

//如下参数,如果在配置文件中已经存在,则无须再配置

hbaseConfig.set("hbase.zookeeper.quorum", "10.5.100.1,10.5.100.2,10.5.100.3");

hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181");

HTablePool在多线程写入操作中的应用

  1. 有多个写数据线程时,可以采用HTablePool。现在先简单介绍下该类的使用方法和注意点:
  2. 多个写数据的线程之间,应共享同一个HTablePool实例。

    实例化HTablePool的时候,应要指定最大的HTableInterface实例个数maxSize,即需要通过如下构造函数实例化该类:

    public HTablePool(final Configuration config, final int maxSize)

    关于maxSize的值,可以根据写数据的线程数Threads以及涉及到的用户表个数Tables来定,理论上,不应该超过(Threads*Tables)。

  3. 客户端线程通过HTablePool#getTable(tableName)的方法,获取一个表名为tableName的HTableInterface实例。
  4. 同一个HTableInterface实例,在同一个时刻只能给一个线程使用。
  5. 如果HTableInterface使用完了,需要调用HTablePool#putTable(HTableInterface table)方法将它放回去。
示例代码:
/**
* 写数据失败后需要一定的重试次数,每一次重试的等待时间,需要根据已经重试的次数而定.
*/
private static final int[] RETRIES_WAITTIME = {1, 1, 1, 2, 2, 4, 4, 8, 16, 32};
/**
* 限定的重试次数
*/
private static final int RETRIES = 10;
/**
* 失败后等待的基本时间单位
*/
private static final int PAUSE_UNIT = 1000;
private static Configuration hadoopConfig;
private static HTablePool tablePool;
private static String[] tables;
/**
* <初始化HTablePool>
* <功能详细描述>
* @param config
* @see [类、类#方法、类#成员]
*/
public static void initTablePool()
{
DemoConfig config = DemoConfig.getInstance();
if (hadoopConfig == null)
{
hadoopConfig = HBaseConfiguration.create();
hadoopConfig.set("hbase.zookeeper.quorum", config.getZookeepers());
hadoopConfig.set("hbase.zookeeper.property.clientPort", config.getZookeeperPort());
}
if (tablePool == null)
{
tablePool = new HTablePool(hadoopConfig, config.getTablePoolMaxSize());
tables = config.getTables().split(",");
}
}
public void run()
{
// 初始化HTablePool.因为这是多线程间共享的一个实例, 仅被实例化一次.
initTablePool();
for (;;)
{
Map<String, Object> data = DataStorage.takeList();
String tableName = tables[(Integer)data.get("table")];
List<Put> list = (List)data.get("list");
// 以Row为Key,保存List中所有的Put.该集合仅使用于写入失败时查找失败的数据记录.
// 因为从Server端仅返回了失败的数据记录的Row值.
Map<byte[], Put> rowPutMap = null;
// 如果失败了(哪怕是部分数据失败),需要重试.每一次重试,都仅提交失败的数据条目
INNER_LOOP :
for (int retry = 0; retry < RETRIES; retry++)
{
// 从HTablePool中获取一个HTableInterface实例.用完后需要放回去.
HTableInterface table = tablePool.getTable(tableName);
try
{
table.put(list);
// 如果执行到这里,说明成功了 .
break INNER_LOOP;
}
catch (IOException e)
{
// 如果是RetriesExhaustedWithDetailsException类型的异常,
// 说明这些数据中有部分是写入失败的这通常都是因为HBase集群
// 的进程异常引起,当然有时也会因为有大量的Region正在被转移,
// 导致尝试一定的次数后失败.
// 如果非RetriesExhaustedWithDetailsException异常,则需要将
// list中的所有数据都要重新插入.
if (e instanceof RetriesExhaustedWithDetailsException)
{
RetriesExhaustedWithDetailsException ree =
(RetriesExhaustedWithDetailsException)e;
int failures = ree.getNumExceptions();
System.out.println("本次插入失败了[" + failures + "]条数据.");
// 第一次失败且重试时,实例化该Map.
if (rowPutMap == null)
{
rowPutMap = new HashMap<byte[], Put>(failures);
for (int m = 0; m < list.size(); m++)
{
Put put = list.get(m);
rowPutMap.put(put.getRow(), put);
}
}
//先Clear掉原数据,然后将失败的数据添加进来
list.clear();
for (int m = 0; m < failures; m++)
{
list.add(rowPutMap.get(ree.getRow(m)));
}
}
}
finally
{
// 用完之后,再将该实例放回去
tablePool.putTable(table);
}
// 如果异常了,就暂时等待一段时间.该等待应该在将HTableInterface实例放回去之后
try
{
sleep(getWaitTime(retry));
}
catch (InterruptedException e1)
{
System.out.println("Interruped");
}
}
}
}

Put实例的创建

HBase是一个面向列的数据库,一行数据,可能对应多个列族,而一个列族又可以对应多个列。通常,写入数据的时候,需要指定要写入的列(含列族名称和列名称):

如果要往HBase表中写入一行数据,需要首先构建一个Put实例。Put中包含了数据的Key值和相应的Value值,Value值可以有多个(即可以有多列值)。

有一点需要注意:在往Put实例中add一条KeyValue数据时,传入的family,qualifier,value都是字节数组。在将一个字符串转换为字节数组时,需要使用Bytes.toBytes方法,不要使用String.toBytes方法,因为后者无法保证编码,尤其是在Key或Value中出现中文字符的时候,就会出现问题。

代码示例:

//列族的名称为privateInfo
private final static byte[] FAMILY_PRIVATE = Bytes.toBytes("privateInfo");
//列族privateInfo中总共有两个列"name"&"address"
private final static byte[] COLUMN_NAME = Bytes.toBytes("name");
private final static byte[] COLUMN_ADDR = Bytes.toBytes("address");
/**
* <创建一个Put实例>
* <在该方法中,将会创建一个具有1个列族,2列数据的Put>
* @param rowKey Key值
* @param name 人名
* @param address 地址
* @return
* @see [类、类#方法、类#成员]
*/
public Put createPut(String rowKey, String name, String address)
{
Put put = new Put(Bytes.toBytes(rowKey));
put.add(FAMILY_PRIVATE, COLUMN_NAME, Bytes.toBytes(name));
        put.add(FAMILY_PRIVATE, COLUMN_ADDR, Bytes.toBytes(address));
return put;
}

HBaseAdmin实例的创建以及常用方法

代码示例:

private Configuration demoConf = null;
private HBaseAdmin hbaseAdmin = null;
/**
* <构造函数>
* 需要将已经实例化好的Configuration实例传递进来
*/
public HBaseAdminDemo(Configuration conf)
{
this.demoConf = conf;
try
{
// 实例化HBaseAdmin
hbaseAdmin = new HBaseAdmin(this.demoConf);
}
catch (MasterNotRunningException e)
{
e.printStackTrace();
}
catch (ZooKeeperConnectionException e)
{
e.printStackTrace();
}
}
/**
* <一些方法使用示例>
* <更多的方法,请参考HBase接口文档>
* @throws IOException
* @throws ZooKeeperConnectionException
* @throws MasterNotRunningException
* @see [类、类#方法、类#成员]
*/
public void demo() throws MasterNotRunningException, ZooKeeperConnectionException, IOException
{
byte[] regionName = Bytes.toBytes("mrtest,jjj,1315449869513.fc41d70b84e9f6e91f9f01affdb06703.");
byte[] encodeName = Bytes.toBytes("fc41d70b84e9f6e91f9f01affdb06703");
// 重新分配一个Reigon.
hbaseAdmin.unassign(regionName, false);
// 主动触发Balance.
hbaseAdmin.balancer();
// 移动一个Region,第2个参数,是RegionServer的HostName+StartCode,例如:
// host187.example.com,60020,1289493121758.如果将该参数设置为null,则会随机移动该Region
hbaseAdmin.move(encodeName, null);
// 判断一个表是否存在
hbaseAdmin.tableExists("tableName");
// 判断一个表是否被激活
hbaseAdmin.isTableEnabled("tableName");
}
/**
* <快速创建一个表的方法>
* <首先创建一个HTableDescriptor实例,它里面包含了即将要创建的HTable的描述信息,同时,需要创建相应的列族。列族关联的实例是HColumnDescriptor。在本示例中,创建的列族名称为“columnName”>
* @param tableName 表名
* @return
* @see [类、类#方法、类#成员]
*/
public boolean createTable(String tableName)
{
try {
if (hbaseAdmin.tableExists(tableName)) {
return false;
}
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
HColumnDescriptor fieldADesc = new HColumnDescriptor("columnName".getBytes());
fieldADesc.setBlocksize(640 * 1024);
tableDesc.addFamily(fieldADesc);
hbaseAdmin.createTable(tableDesc);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
分享:

    相关文档

    相关产品