配置HBase多语言访问
操作场景
用户根据指定的host和port访问对应的ThriftServer实例,进行HBase表的创建,删除等操作。
前提条件
操作步骤
- 登录表格存储服务控制台。
- 在页面左上角选择区域。
- 单击“集群管理”,进入集群管理界面。
- 单击HBase集群名称,进入集群详情页面查看ThriftServer的状态,如果ThriftServer为开启状态,无需开启操作;如果ThriftServer为关闭状态,则返回集群管理界面,单击“更多 > 开启ThriftServer”,等待完成后,即可进行使用。
- ThriftServer当前不具备负载均衡的能力,用户需要避免在代码里面同时访问同一个ThriftServer实例,避免单实例过载。
- 用户需要在应用代码里面增加重试机制,保证其中一个ThriftServer实例故障或者重启时,可以重试其他ThriftServer实例。
- 参考Thrift官方指导在客户端节点安装Thrift安装包。
- 使用Thrift命令将HBase Thrift定义文件生成对应语言的接口文件,支持的语言有C++,Python等。参考命令如下:
thrift --gen <语言> hbase.thrift
<语言>为要生成的目标语言,支持cpp(C++)、py(Python)等。
以Python为例,执行命令为:thrift --gen py hbase.thrift
C++代码样例
#include "THBaseService.h"
#include <config.h>
#include <vector>
#include <ostream>
#include <iostream>
#include "transport/TSocket.h"
#include <transport/TBufferTransports.h>
#include <protocol/TBinaryProtocol.h>
using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::hadoop::hbase::thrift2;
using boost::shared_ptr;
int main(int argc, char **argv) {
// ThriftServer的ip和端口号
std::string host = "x.x.x.x";
int port = 9090;
boost::shared_ptr<TSocket> socket(new TSocket(host, port));
boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
// 设置表名
std::string ns("default");
std::string table("test");
TTableName tableName;
tableName.__set_ns(ns);
tableName.__set_qualifier(table);
try {
// 创建连接
transport->open();
printf("Opened connection\n");
// 初始化客户端接口
THBaseServiceClient client(protocol);
// 建表
TColumnFamilyDescriptor column;
column.__set_name("f1");
column.__set_maxVersions(10);
std::vector<TColumnFamilyDescriptor> columns;
columns.push_back(column);
TTableDescriptor tableDescriptor;
tableDescriptor.__set_tableName(tableName);
tableDescriptor.__set_columns(columns);
std::vector<std::string> splitKeys;
splitKeys.push_back("row2");
splitKeys.push_back("row4");
splitKeys.push_back("row8");
printf("Creating table: %s\n", table.c_str());
try {
client.createTable(tableDescriptor, splitKeys);
} catch (const TException &te) {
std::cerr << "ERROR: " << te.what() << std::endl;
}
// Put写入单条数据
TColumnValue columnValue;
columnValue.__set_family("f1");
columnValue.__set_qualifier("q1");
columnValue.__set_value("val_001");
std::vector<TColumnValue> columnValues;
columnValues.push_back(columnValue);
TPut put;
put.__set_row("row1");
put.__set_columnValues(columnValues);
client.put(table, put);
printf("Put single row success\n");
// Put写入多条数据
TColumnValue columnValue2;
columnValue2.__set_family("f1");
columnValue2.__set_qualifier("q1");
columnValue2.__set_value("val_003");
std::vector<TColumnValue> columnValues2;
columnValues2.push_back(columnValue2);
TPut put2;
put2.__set_row("row3");
put2.__set_columnValues(columnValues2);
TColumnValue columnValue3;
columnValue3.__set_family("f1");
columnValue3.__set_qualifier("q1");
columnValue3.__set_value("val_005");
std::vector<TColumnValue> columnValues3;
columnValues3.push_back(columnValue3);
TPut put3;
put3.__set_row("row5");
put3.__set_columnValues(columnValues3);
std::vector<TPut> puts;
puts.push_back(put2);
puts.push_back(put3);
client.putMultiple(table, puts);
printf("Put multiple rows success\n");
// Get查询单条数据
TResult result;
TGet get;
get.__set_row("row1");
client.get(result, table, get);
std::vector<TColumnValue> list=result.columnValues;
std::vector<TColumnValue>::const_iterator iter;
std::string row = result.row;
for(iter=list.begin();iter!=list.end();iter++) {
printf("%s=%s, %s,%s\n",row.c_str(),(*iter).family.c_str(),(*iter).qualifier.c_str(),(*iter).value.c_str());
}
printf("Get single row success.\n");
// Get查询多条数据
std::vector<TGet> multiGets;
TGet get1;
get1.__set_row("row1");
multiGets.push_back(get1);
TGet get2;
get2.__set_row("row5");
multiGets.push_back(get2);
std::vector<TResult> multiRows;
client.getMultiple(multiRows, table, multiGets);
for(std::vector<TResult>::const_iterator iter1=multiRows.begin();iter1!=multiRows.end();iter1++) {
std::vector<TColumnValue> list=(*iter1).columnValues;
std::vector<TColumnValue>::const_iterator iter2;
std::string row = (*iter1).row;
for(iter2=list.begin();iter2!=list.end();iter2++) {
printf("%s=%s, %s,%s\n",row.c_str(),(*iter2).family.c_str(),(*iter2).qualifier.c_str(),(*iter2).value.c_str());
}
}
printf("Get multiple rows success.\n");
// Scan查询数据
TScan scan;
scan.__set_startRow("row1");
scan.__set_stopRow("row7");
int32_t nbRows = 2;
std::vector<TResult> results;
TResult* current = NULL;
while (true) {
client.getScannerResults(results, table, scan, nbRows);
if (results.size() == 0) {
printf("No more result.\n");
break;
}
std::vector<TResult>::const_iterator itx;
for(itx=results.begin();itx!=results.end();itx++) {
current = (TResult*) &(*itx);
if (current == NULL) {
break;
} else {
std::vector<TColumnValue> values=(*current).columnValues;
std::vector<TColumnValue>::const_iterator iterator;
for(iterator=list.begin();iterator!=list.end();iterator++) {
printf("%s=%s, %s,%s\n",(*current).row.c_str(),(*iterator).family.c_str(),(*iterator).qualifier.c_str(),(*iterator).value.c_str());
}
}
}
if (current == NULL) {
printf("Scan data done.\n");
break;
} else {
scan.__set_startRow((*current).row + (char)0);
}
}
//禁用和删除表
client.disableTable(tableName);
printf("Disabled %s\n", table.c_str());
client.deleteTable(tableName);
printf("Deleted %s\n", table.c_str());
transport->close();
printf("Closed connection\n");
} catch (const TException &tx) {
std::cerr << "ERROR(exception): " << tx.what() << std::endl;
}
return 0;
}
Python代码样例
# -*- coding: utf-8 -*-
# 引入公共模块
import sys
import os
# 引入Thrift自带模块,如果不存在则需要执行pip install thrift安装
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.transport import THttpClient
from thrift.transport import TSocket
# 引入通过hbase.thrift生成的模块
gen_py_path = os.path.abspath('gen-py')
sys.path.append(gen_py_path)
from hbase import THBaseService
from hbase.ttypes import TColumnValue, TColumn, TTableName, TTableDescriptor, TColumnFamilyDescriptor, TGet, TPut, TScan
# 配置CloudTable HBase集群的ThriftServer的IP,可以通过集群的详情页面获取
host = "x.x.x.x"
socket = TSocket.TSocket(host, 9090)
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = THBaseService.Client(protocol)
transport.open()
# 测试表名
tableNameInBytes = "test".encode("utf8")
tableName = TTableName(ns="default".encode("utf8"), qualifier=tableNameInBytes)
# 预分region的split key
splitKeys=[]
splitKeys.append("row3".encode("utf8"))
splitKeys.append("row5".encode("utf8"))
# 建表操作
client.createTable(TTableDescriptor(tableName=tableName, columns=[TColumnFamilyDescriptor(name="cf1".encode("utf8"))]), splitKeys)
print("Create table %s success." % tableName)
# Put单条数据
put = TPut(row="row1".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))])
client.put(tableNameInBytes, put)
print("Put single row success.")
# Put多条数据
puts = []
puts.append(TPut(row="row4".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]))
puts.append(TPut(row="row6".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]))
puts.append(TPut(row="row8".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]))
client.putMultiple(tableNameInBytes, puts)
print("Put rows success.")
# Get单条数据
get = TGet(row="row1".encode("utf8"))
result = client.get(tableNameInBytes, get)
print("Get Result: ", result)
# Get多条数据
gets = []
gets.append(TGet(row="row4".encode("utf8")))
gets.append(TGet(row="row8".encode("utf8")))
results = client.getMultiple(tableNameInBytes, gets)
print("Get multiple rows: ", results)
# Scan数据
startRow, stopRow = "row4".encode("utf8"), "row9".encode("utf8")
scan = TScan(startRow=startRow, stopRow=stopRow)
caching=1
results = []
while True:
scannerResult = client.getScannerResults(tableNameInBytes, scan, caching)
lastOne = None
for result in scannerResult:
results.append(result)
print("Scan Result: ", result)
lastOne = result
# 没有更多数据,退出
if lastOne is None:
break
else:
# 重新生成下一次scan的startRow
newStartRow = bytearray(lastOne.row)
newStartRow.append(0x00)
scan = TScan(startRow=newStartRow, stopRow=stopRow)
# 禁用和删除表
client.disableTable(tableName)
print("Disable table %s success." % tableName)
client.deleteTable(tableName)
print("Delete table %s success." % tableName)
# 所有操作都结束后,关闭连接
transport.close()