更新时间:2024-11-28 GMT+08:00

访问ThriftServer操作表

操作场景

用户根据指定的host和port访问对应的ThriftServer实例,进行HBase表的创建,删除等操作。

前提条件

  • 集群已启用ThriftServer并从集群详情页面获取到ThriftServer IP。
  • 已下载Thrift安装包,安装包下载地址:链接
  • 已下载HBase Thrift定义文件,文件下载地址:地址

操作步骤

  1. 登录表格存储服务控制台。
  2. 在页面左上角选择区域。
  3. 单击“集群管理”,进入集群管理界面。
  4. 单击HBase集群名称,进入集群详情页面查看ThriftServer的状态,如果ThriftServer为开启状态,无需开启操作;如果ThriftServer为关闭状态,则选返回集群管理界面,单击“更多 > 开启ThriftServer”,等待完成后,即可进行使用。

    • ThriftServer当前不具备负载均衡的能力,用户需要避免在代码里面同时访问同一个ThriftServer实例,避免单实例过载。
    • 用户需要在应用代码里面增加重试机制,保证单ThriftServer实例故障或者重启时,可以重试其他ThriftServer实例。

  5. 参考Thrift官方指导在客户端节点安装Thrift安装包。
  6. 使用Thrift命令将HBase Thrift定义文件生成对应语言的接口文件,支持的语言有C++,Python等。参考命令如下:

    thrift --gen <语言> hbase.thrift

    <语言>为要生成的目标语言,支持cpp(C++)、py(Python)等。

    以Python为例,执行命令为:thrift --gen py hbase.thrif

C++代码样例

#include "THBaseService.h"
#include <config.h>
#include <vector>
#include <ostream>
#include <iostream>
#include "transport/TSocket.h"
#include <transport/TBufferTransports.h>
#include <protocol/TBinaryProtocol.h>
using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::hadoop::hbase::thrift2;
using boost::shared_ptr;
int main(int argc, char **argv) {
    // ThriftServer的ip和端口号
    std::string host = "x.x.x.x";
    int port = 9090;
    boost::shared_ptr<TSocket> socket(new TSocket(host, port));
    boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
    boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
    // 设置表名
    std::string ns("default");
    std::string table("test");
    TTableName tableName;
    tableName.__set_ns(ns);
    tableName.__set_qualifier(table);
     try {
        // 创建连接
        transport->open();
        printf("Opened connection\n");
        // 初始化客户端接口
        THBaseServiceClient  client(protocol);
        
        // 建表
        TColumnFamilyDescriptor column;
        column.__set_name("f1");
        column.__set_maxVersions(10);
        std::vector<TColumnFamilyDescriptor> columns;
        columns.push_back(column);
        
        TTableDescriptor tableDescriptor;
        tableDescriptor.__set_tableName(tableName);
        tableDescriptor.__set_columns(columns);
        std::vector<std::string> splitKeys;
        splitKeys.push_back("row2");
        splitKeys.push_back("row4");
        splitKeys.push_back("row8");
        printf("Creating table: %s\n", table.c_str());
        try {
            client.createTable(tableDescriptor, splitKeys);
        } catch (const TException  &te) {
            std::cerr << "ERROR: " << te.what() << std::endl;
        }
        // Put写入单条数据
        TColumnValue columnValue;
        columnValue.__set_family("f1");
        columnValue.__set_qualifier("q1");
        columnValue.__set_value("val_001");
        std::vector<TColumnValue> columnValues;
        columnValues.push_back(columnValue);
        TPut put;
        put.__set_row("row1");
        put.__set_columnValues(columnValues);
        client.put(table, put);
        printf("Put single row success\n");
        // Put写入多条数据
        TColumnValue columnValue2;
        columnValue2.__set_family("f1");
        columnValue2.__set_qualifier("q1");
        columnValue2.__set_value("val_003");
        std::vector<TColumnValue> columnValues2;
        columnValues2.push_back(columnValue2);
        TPut put2;
        put2.__set_row("row3");
        put2.__set_columnValues(columnValues2);
        TColumnValue columnValue3;
        columnValue3.__set_family("f1");
        columnValue3.__set_qualifier("q1");
        columnValue3.__set_value("val_005");
        std::vector<TColumnValue> columnValues3;
        columnValues3.push_back(columnValue3);
        TPut put3;
        put3.__set_row("row5");
        put3.__set_columnValues(columnValues3);
        std::vector<TPut> puts;
        puts.push_back(put2);
        puts.push_back(put3);
        client.putMultiple(table, puts);
        printf("Put multiple rows success\n");
        // Get查询单条数据
        TResult result;
        TGet get;
        get.__set_row("row1");
        client.get(result, table, get);
        std::vector<TColumnValue> list=result.columnValues;
        std::vector<TColumnValue>::const_iterator iter;
        std::string row = result.row; 
        for(iter=list.begin();iter!=list.end();iter++) {  
            printf("%s=%s, %s,%s\n",row.c_str(),(*iter).family.c_str(),(*iter).qualifier.c_str(),(*iter).value.c_str());
        }
        printf("Get single row success.\n");
        // Get查询多条数据
        std::vector<TGet> multiGets;
        TGet get1;
        get1.__set_row("row1");
        multiGets.push_back(get1);
        TGet get2;
        get2.__set_row("row5");
        multiGets.push_back(get2);
        
        std::vector<TResult> multiRows;
        client.getMultiple(multiRows, table, multiGets);
        for(std::vector<TResult>::const_iterator iter1=multiRows.begin();iter1!=multiRows.end();iter1++) {
            std::vector<TColumnValue> list=(*iter1).columnValues;
            std::vector<TColumnValue>::const_iterator iter2;
            std::string row = (*iter1).row; 
            for(iter2=list.begin();iter2!=list.end();iter2++) {  
                printf("%s=%s, %s,%s\n",row.c_str(),(*iter2).family.c_str(),(*iter2).qualifier.c_str(),(*iter2).value.c_str());
            }
        }
        printf("Get multiple rows success.\n");
        // Scan查询数据
        TScan scan;
        scan.__set_startRow("row1");
        scan.__set_stopRow("row7");
        int32_t nbRows = 2;
        std::vector<TResult> results;
        TResult* current = NULL;
        while (true) {
            client.getScannerResults(results, table, scan, nbRows);
            if (results.size() == 0) {
                printf("No more result.\n");
                break;
            }
            std::vector<TResult>::const_iterator itx;
            for(itx=results.begin();itx!=results.end();itx++) {
                current = (TResult*) &(*itx);
                if (current == NULL) {
                    break;
                } else {
                    std::vector<TColumnValue> values=(*current).columnValues;
                    std::vector<TColumnValue>::const_iterator iterator;
                    for(iterator=list.begin();iterator!=list.end();iterator++) {
                        printf("%s=%s, %s,%s\n",(*current).row.c_str(),(*iterator).family.c_str(),(*iterator).qualifier.c_str(),(*iterator).value.c_str());
                    }
                }
            }
            if (current == NULL) {
                printf("Scan data done.\n");
                break;
            } else {
                scan.__set_startRow((*current).row + (char)0);
            }
        }
        //禁用和删除表
        client.disableTable(tableName);
        printf("Disabled %s\n", table.c_str());
        client.deleteTable(tableName);
        printf("Deleted %s\n", table.c_str());
        transport->close();
        printf("Closed connection\n");
    } catch (const TException &tx) {
        std::cerr << "ERROR(exception): " << tx.what() << std::endl;
    }
    return 0;  
}

Python代码样例

# -*- coding: utf-8  -*-

# 引入公共模块
import sys
import os

# 引入Thrift自带模块,如果不存在则需要执行pip install thrift安装
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.transport import THttpClient
from thrift.transport import TSocket

# 引入通过hbase.thrift生成的模块
gen_py_path = os.path.abspath('gen-py')
sys.path.append(gen_py_path)
from hbase import THBaseService
from hbase.ttypes import TColumnValue, TColumn, TTableName, TTableDescriptor, TColumnFamilyDescriptor, TGet, TPut, TScan
# 配置CloudTable HBase集群的ThriftServer的IP,可以通过集群的详情页面获取
host = "x.x.x.x"

socket = TSocket.TSocket(host, 9090)
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = THBaseService.Client(protocol)
transport.open()

# 测试表名
tableNameInBytes = "test".encode("utf8")

tableName = TTableName(ns="default".encode("utf8"), qualifier=tableNameInBytes)
# 预分region的split key
splitKeys=[]
splitKeys.append("row3".encode("utf8"))
splitKeys.append("row5".encode("utf8"))
# 建表操作
client.createTable(TTableDescriptor(tableName=tableName, columns=[TColumnFamilyDescriptor(name="cf1".encode("utf8"))]), splitKeys)
print("Create table %s success." % tableName)

# Put单条数据
put = TPut(row="row1".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))])
client.put(tableNameInBytes, put)
print("Put single row success.")

# Put多条数据
puts = []
puts.append(TPut(row="row4".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]))
puts.append(TPut(row="row6".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]))
puts.append(TPut(row="row8".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]))
client.putMultiple(tableNameInBytes, puts)
print("Put rows success.")

# Get单条数据
get = TGet(row="row1".encode("utf8"))
result = client.get(tableNameInBytes, get)
print("Get Result: ", result)

# Get多条数据
gets = []
gets.append(TGet(row="row4".encode("utf8")))
gets.append(TGet(row="row8".encode("utf8")))
results = client.getMultiple(tableNameInBytes, gets)
print("Get multiple rows: ", results)

# Scan数据
startRow, stopRow = "row4".encode("utf8"), "row9".encode("utf8")
scan = TScan(startRow=startRow, stopRow=stopRow)
caching=1
results = []
while True:
	scannerResult = client.getScannerResults(tableNameInBytes, scan, caching)
	lastOne = None
	for result in scannerResult:
		results.append(result)
		print("Scan Result: ", result)
		lastOne = result
	# 没有更多数据,退出
	if lastOne is None:
		break
	else:
		# 重新生成下一次scan的startRow
		newStartRow = bytearray(lastOne.row)
		newStartRow.append(0x00)
		scan = TScan(startRow=newStartRow, stopRow=stopRow)


# 禁用和删除表
client.disableTable(tableName)
print("Disable table %s success." % tableName)
client.deleteTable(tableName)
print("Delete table %s success." % tableName)

# 所有操作都结束后,关闭连接
transport.close()