更新时间:2024-11-28 GMT+08:00
访问ThriftServer操作表
操作场景
用户根据指定的host和port访问对应的ThriftServer实例,进行HBase表的创建,删除等操作。
前提条件
操作步骤
- 登录表格存储服务控制台。
- 在页面左上角选择区域。
- 单击“集群管理”,进入集群管理界面。
- 单击HBase集群名称,进入集群详情页面查看ThriftServer的状态,如果ThriftServer为开启状态,无需开启操作;如果ThriftServer为关闭状态,则选返回集群管理界面,单击“更多 > 开启ThriftServer”,等待完成后,即可进行使用。
- ThriftServer当前不具备负载均衡的能力,用户需要避免在代码里面同时访问同一个ThriftServer实例,避免单实例过载。
- 用户需要在应用代码里面增加重试机制,保证单ThriftServer实例故障或者重启时,可以重试其他ThriftServer实例。
- 参考Thrift官方指导在客户端节点安装Thrift安装包。
- 使用Thrift命令将HBase Thrift定义文件生成对应语言的接口文件,支持的语言有C++,Python等。参考命令如下:
thrift --gen <语言> hbase.thrift
<语言>为要生成的目标语言,支持cpp(C++)、py(Python)等。
以Python为例,执行命令为:thrift --gen py hbase.thrif
C++代码样例
#include "THBaseService.h" #include <config.h> #include <vector> #include <ostream> #include <iostream> #include "transport/TSocket.h" #include <transport/TBufferTransports.h> #include <protocol/TBinaryProtocol.h> using namespace std; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; using namespace apache::hadoop::hbase::thrift2; using boost::shared_ptr; int main(int argc, char **argv) { // ThriftServer的ip和端口号 std::string host = "x.x.x.x"; int port = 9090; boost::shared_ptr<TSocket> socket(new TSocket(host, port)); boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); // 设置表名 std::string ns("default"); std::string table("test"); TTableName tableName; tableName.__set_ns(ns); tableName.__set_qualifier(table); try { // 创建连接 transport->open(); printf("Opened connection\n"); // 初始化客户端接口 THBaseServiceClient client(protocol); // 建表 TColumnFamilyDescriptor column; column.__set_name("f1"); column.__set_maxVersions(10); std::vector<TColumnFamilyDescriptor> columns; columns.push_back(column); TTableDescriptor tableDescriptor; tableDescriptor.__set_tableName(tableName); tableDescriptor.__set_columns(columns); std::vector<std::string> splitKeys; splitKeys.push_back("row2"); splitKeys.push_back("row4"); splitKeys.push_back("row8"); printf("Creating table: %s\n", table.c_str()); try { client.createTable(tableDescriptor, splitKeys); } catch (const TException &te) { std::cerr << "ERROR: " << te.what() << std::endl; } // Put写入单条数据 TColumnValue columnValue; columnValue.__set_family("f1"); columnValue.__set_qualifier("q1"); columnValue.__set_value("val_001"); std::vector<TColumnValue> columnValues; columnValues.push_back(columnValue); TPut put; put.__set_row("row1"); put.__set_columnValues(columnValues); client.put(table, put); printf("Put single row success\n"); // Put写入多条数据 TColumnValue columnValue2; columnValue2.__set_family("f1"); columnValue2.__set_qualifier("q1"); columnValue2.__set_value("val_003"); std::vector<TColumnValue> columnValues2; columnValues2.push_back(columnValue2); TPut put2; put2.__set_row("row3"); put2.__set_columnValues(columnValues2); TColumnValue columnValue3; columnValue3.__set_family("f1"); columnValue3.__set_qualifier("q1"); columnValue3.__set_value("val_005"); std::vector<TColumnValue> columnValues3; columnValues3.push_back(columnValue3); TPut put3; put3.__set_row("row5"); put3.__set_columnValues(columnValues3); std::vector<TPut> puts; puts.push_back(put2); puts.push_back(put3); client.putMultiple(table, puts); printf("Put multiple rows success\n"); // Get查询单条数据 TResult result; TGet get; get.__set_row("row1"); client.get(result, table, get); std::vector<TColumnValue> list=result.columnValues; std::vector<TColumnValue>::const_iterator iter; std::string row = result.row; for(iter=list.begin();iter!=list.end();iter++) { printf("%s=%s, %s,%s\n",row.c_str(),(*iter).family.c_str(),(*iter).qualifier.c_str(),(*iter).value.c_str()); } printf("Get single row success.\n"); // Get查询多条数据 std::vector<TGet> multiGets; TGet get1; get1.__set_row("row1"); multiGets.push_back(get1); TGet get2; get2.__set_row("row5"); multiGets.push_back(get2); std::vector<TResult> multiRows; client.getMultiple(multiRows, table, multiGets); for(std::vector<TResult>::const_iterator iter1=multiRows.begin();iter1!=multiRows.end();iter1++) { std::vector<TColumnValue> list=(*iter1).columnValues; std::vector<TColumnValue>::const_iterator iter2; std::string row = (*iter1).row; for(iter2=list.begin();iter2!=list.end();iter2++) { printf("%s=%s, %s,%s\n",row.c_str(),(*iter2).family.c_str(),(*iter2).qualifier.c_str(),(*iter2).value.c_str()); } } printf("Get multiple rows success.\n"); // Scan查询数据 TScan scan; scan.__set_startRow("row1"); scan.__set_stopRow("row7"); int32_t nbRows = 2; std::vector<TResult> results; TResult* current = NULL; while (true) { client.getScannerResults(results, table, scan, nbRows); if (results.size() == 0) { printf("No more result.\n"); break; } std::vector<TResult>::const_iterator itx; for(itx=results.begin();itx!=results.end();itx++) { current = (TResult*) &(*itx); if (current == NULL) { break; } else { std::vector<TColumnValue> values=(*current).columnValues; std::vector<TColumnValue>::const_iterator iterator; for(iterator=list.begin();iterator!=list.end();iterator++) { printf("%s=%s, %s,%s\n",(*current).row.c_str(),(*iterator).family.c_str(),(*iterator).qualifier.c_str(),(*iterator).value.c_str()); } } } if (current == NULL) { printf("Scan data done.\n"); break; } else { scan.__set_startRow((*current).row + (char)0); } } //禁用和删除表 client.disableTable(tableName); printf("Disabled %s\n", table.c_str()); client.deleteTable(tableName); printf("Deleted %s\n", table.c_str()); transport->close(); printf("Closed connection\n"); } catch (const TException &tx) { std::cerr << "ERROR(exception): " << tx.what() << std::endl; } return 0; }
Python代码样例
# -*- coding: utf-8 -*- # 引入公共模块 import sys import os # 引入Thrift自带模块,如果不存在则需要执行pip install thrift安装 from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol from thrift.transport import THttpClient from thrift.transport import TSocket # 引入通过hbase.thrift生成的模块 gen_py_path = os.path.abspath('gen-py') sys.path.append(gen_py_path) from hbase import THBaseService from hbase.ttypes import TColumnValue, TColumn, TTableName, TTableDescriptor, TColumnFamilyDescriptor, TGet, TPut, TScan # 配置CloudTable HBase集群的ThriftServer的IP,可以通过集群的详情页面获取 host = "x.x.x.x" socket = TSocket.TSocket(host, 9090) transport = TTransport.TBufferedTransport(socket) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = THBaseService.Client(protocol) transport.open() # 测试表名 tableNameInBytes = "test".encode("utf8") tableName = TTableName(ns="default".encode("utf8"), qualifier=tableNameInBytes) # 预分region的split key splitKeys=[] splitKeys.append("row3".encode("utf8")) splitKeys.append("row5".encode("utf8")) # 建表操作 client.createTable(TTableDescriptor(tableName=tableName, columns=[TColumnFamilyDescriptor(name="cf1".encode("utf8"))]), splitKeys) print("Create table %s success." % tableName) # Put单条数据 put = TPut(row="row1".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]) client.put(tableNameInBytes, put) print("Put single row success.") # Put多条数据 puts = [] puts.append(TPut(row="row4".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))])) puts.append(TPut(row="row6".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))])) puts.append(TPut(row="row8".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))])) client.putMultiple(tableNameInBytes, puts) print("Put rows success.") # Get单条数据 get = TGet(row="row1".encode("utf8")) result = client.get(tableNameInBytes, get) print("Get Result: ", result) # Get多条数据 gets = [] gets.append(TGet(row="row4".encode("utf8"))) gets.append(TGet(row="row8".encode("utf8"))) results = client.getMultiple(tableNameInBytes, gets) print("Get multiple rows: ", results) # Scan数据 startRow, stopRow = "row4".encode("utf8"), "row9".encode("utf8") scan = TScan(startRow=startRow, stopRow=stopRow) caching=1 results = [] while True: scannerResult = client.getScannerResults(tableNameInBytes, scan, caching) lastOne = None for result in scannerResult: results.append(result) print("Scan Result: ", result) lastOne = result # 没有更多数据,退出 if lastOne is None: break else: # 重新生成下一次scan的startRow newStartRow = bytearray(lastOne.row) newStartRow.append(0x00) scan = TScan(startRow=newStartRow, stopRow=stopRow) # 禁用和删除表 client.disableTable(tableName) print("Disable table %s success." % tableName) client.deleteTable(tableName) print("Delete table %s success." % tableName) # 所有操作都结束后,关闭连接 transport.close()