Updated on 2024-11-28 GMT+08:00

Accessing the ThriftServer Operation Table

Scenario

Users access the corresponding ThriftServer instance based on the specified host and port to create and delete HBase tables.

Prerequisites

  • ThriftServer has been enabled for the cluster. You have obtained the ThriftServer IP address from the cluster details page.
  • You have downloaded the Thrift installation package from link.
  • You have downloaded the HBase Thrift definition file from address.

Procedure

  1. Log in to the CloudTable console.
  2. Select a region in the upper left corner of the page.
  3. Click Cluster Management to go to the cluster management page.
  4. Click the name of an HBase cluster to go to the cluster details page and check the Thrift Server status. If Thrift Server is enabled, no further action is required. If Thrift Server is disabled, return to the cluster management page and choose More > Enable Thrift Server.

    • ThriftServer does not support load balancing. To prevent overloading a single instance, avoid accessing the same ThriftServer instance concurrently in your code.
    • Implement a retry mechanism in the application code to ensure that other ThriftServer instances are retried if a single instance fails or is restarted.

  5. Install the Thrift installation package on the client node. For details, see the Thrift official guide.
  6. Run the Thrift command to convert the HBase Thrift definition file to the interface file of the corresponding language. The supported languages include C++ and Python. The following command is used as an example.

    thrift --gen <language> hbase.thrift

    <Language> indicates the target language to be generated. The value can be cpp (C++) or py (Python).

    Take Python as an example. Run the thrift --gen py hbase.thrif command.

C++ Code Example

#include "THBaseService.h"
#include <config.h>
#include <vector>
#include <ostream>
#include <iostream>
#include "transport/TSocket.h"
#include <transport/TBufferTransports.h>
#include <protocol/TBinaryProtocol.h>
using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::hadoop::hbase::thrift2;
using boost::shared_ptr;
int main(int argc, char **argv) {
// IP address and port number of ThriftServer
    std::string host = "x.x.x.x";
    int port = 9090;
    boost::shared_ptr<TSocket> socket(new TSocket(host, port));
    boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
    boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
// Set the table name.
    std::string ns("default");
    std::string table("test");
    TTableName tableName;
    tableName.__set_ns(ns);
    tableName.__set_qualifier(table);
     try {
        // Create a connection.
        transport->open();
        printf("Opened connection\n");
// Initialize the client.
        THBaseServiceClient  client(protocol);
        
       // Create a table.
        TColumnFamilyDescriptor column;
        column.__set_name("f1");
        column.__set_maxVersions(10);
        std::vector<TColumnFamilyDescriptor> columns;
        columns.push_back(column);
        
        TTableDescriptor tableDescriptor;
        tableDescriptor.__set_tableName(tableName);
        tableDescriptor.__set_columns(columns);
        std::vector<std::string> splitKeys;
        splitKeys.push_back("row2");
        splitKeys.push_back("row4");
        splitKeys.push_back("row8");
        printf("Creating table: %s\n", table.c_str());
        try {
            client.createTable(tableDescriptor, splitKeys);
        } catch (const TException  &te) {
            std::cerr << "ERROR: " << te.what() << std::endl;
        }
// Put a single piece of data.
        TColumnValue columnValue;
        columnValue.__set_family("f1");
        columnValue.__set_qualifier("q1");
        columnValue.__set_value("val_001");
        std::vector<TColumnValue> columnValues;
        columnValues.push_back(columnValue);
        TPut put;
        put.__set_row("row1");
        put.__set_columnValues(columnValues);
        client.put(table, put);
        printf("Put single row success\n");
// Put multiple pieces of data.
        TColumnValue columnValue2;
        columnValue2.__set_family("f1");
        columnValue2.__set_qualifier("q1");
        columnValue2.__set_value("val_003");
        std::vector<TColumnValue> columnValues2;
        columnValues2.push_back(columnValue2);
        TPut put2;
        put2.__set_row("row3");
        put2.__set_columnValues(columnValues2);
        TColumnValue columnValue3;
        columnValue3.__set_family("f1");
        columnValue3.__set_qualifier("q1");
        columnValue3.__set_value("val_005");
        std::vector<TColumnValue> columnValues3;
        columnValues3.push_back(columnValue3);
        TPut put3;
        put3.__set_row("row5");
        put3.__set_columnValues(columnValues3);
        std::vector<TPut> puts;
        puts.push_back(put2);
        puts.push_back(put3);
        client.putMultiple(table, puts);
        printf("Put multiple rows success\n");
// Get a single data record.
        TResult result;
        TGet get;
        get.__set_row("row1");
        client.get(result, table, get);
        std::vector<TColumnValue> list=result.columnValues;
        std::vector<TColumnValue>::const_iterator iter;
        std::string row = result.row; 
        for(iter=list.begin();iter!=list.end();iter++) {  
            printf("%s=%s, %s,%s\n",row.c_str(),(*iter).family.c_str(),(*iter).qualifier.c_str(),(*iter).value.c_str());
        }
        printf("Get single row success.\n");
// Get multiple data records.
        std::vector<TGet> multiGets;
        TGet get1;
        get1.__set_row("row1");
        multiGets.push_back(get1);
        TGet get2;
        get2.__set_row("row5");
        multiGets.push_back(get2);
        
        std::vector<TResult> multiRows;
        client.getMultiple(multiRows, table, multiGets);
        for(std::vector<TResult>::const_iterator iter1=multiRows.begin();iter1!=multiRows.end();iter1++) {
            std::vector<TColumnValue> list=(*iter1).columnValues;
            std::vector<TColumnValue>::const_iterator iter2;
            std::string row = (*iter1).row; 
            for(iter2=list.begin();iter2!=list.end();iter2++) {  
                printf("%s=%s, %s,%s\n",row.c_str(),(*iter2).family.c_str(),(*iter2).qualifier.c_str(),(*iter2).value.c_str());
            }
        }
        printf("Get multiple rows success.\n");
// Scan to query data.
        TScan scan;
        scan.__set_startRow("row1");
        scan.__set_stopRow("row7");
        int32_t nbRows = 2;
        std::vector<TResult> results;
        TResult* current = NULL;
        while (true) {
            client.getScannerResults(results, table, scan, nbRows);
            if (results.size() == 0) {
                printf("No more result.\n");
                break;
            }
            std::vector<TResult>::const_iterator itx;
            for(itx=results.begin();itx!=results.end();itx++) {
                current = (TResult*) &(*itx);
                if (current == NULL) {
                    break;
                } else {
                    std::vector<TColumnValue> values=(*current).columnValues;
                    std::vector<TColumnValue>::const_iterator iterator;
                    for(iterator=list.begin();iterator!=list.end();iterator++) {
                        printf("%s=%s, %s,%s\n",(*current).row.c_str(),(*iterator).family.c_str(),(*iterator).qualifier.c_str(),(*iterator).value.c_str());
                    }
                }
            }
            if (current == NULL) {
                printf("Scan data done.\n");
                break;
            } else {
                scan.__set_startRow((*current).row + (char)0);
            }
        }
// Disable and delete a table.
        client.disableTable(tableName);
        printf("Disabled %s\n", table.c_str());
        client.deleteTable(tableName);
        printf("Deleted %s\n", table.c_str());
        transport->close();
        printf("Closed connection\n");
    } catch (const TException &tx) {
        std::cerr << "ERROR(exception): " << tx.what() << std::endl;
    }
    return 0;  
}

Python Code Example

# -*- coding: utf-8  -*-

# Import the common module.
import sys
import os

# Import the built-in module of Thrift. If the module does not exist, run the pip install thrift command to install it.
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.transport import THttpClient
from thrift.transport import TSocket

# Import the module generated by hbase.thrift.
gen_py_path = os.path.abspath('gen-py')
sys.path.append(gen_py_path)
from hbase import THBaseService
from hbase.ttypes import TColumnValue, TColumn, TTableName, TTableDescriptor, TColumnFamilyDescriptor, TGet, TPut, TScan
# Configure the IP address of ThriftServer in the CloudTable HBase cluster. You can obtain the IP address from the cluster details page.
host = "x.x.x.x"

socket = TSocket.TSocket(host, 9090)
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = THBaseService.Client(protocol)
transport.open()

# Test table name
tableNameInBytes = "test".encode("utf8")

tableName = TTableName(ns="default".encode("utf8"), qualifier=tableNameInBytes)
# Split key for region pre-splitting
splitKeys=[]
splitKeys.append("row3".encode("utf8"))
splitKeys.append("row5".encode("utf8"))
# Create a table.
client.createTable(TTableDescriptor(tableName=tableName, columns=[TColumnFamilyDescriptor(name="cf1".encode("utf8"))]), splitKeys)
print("Create table %s success." % tableName)

# Put a single data record.
put = TPut(row="row1".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))])
client.put(tableNameInBytes, put)
print("Put single row success.")

# Put multiple data records.
puts = []
puts.append(TPut(row="row4".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]))
puts.append(TPut(row="row6".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]))
puts.append(TPut(row="row8".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]))
client.putMultiple(tableNameInBytes, puts)
print("Put rows success.")

# Get a single data record.
get = TGet(row="row1".encode("utf8"))
result = client.get(tableNameInBytes, get)
print("Get Result: ", result)

# Get multiple data records.
gets = []
gets.append(TGet(row="row4".encode("utf8")))
gets.append(TGet(row="row8".encode("utf8")))
results = client.getMultiple(tableNameInBytes, gets)
print("Get multiple rows: ", results)

# Scan data.
startRow, stopRow = "row4".encode("utf8"), "row9".encode("utf8")
scan = TScan(startRow=startRow, stopRow=stopRow)
caching=1
results = []
while True:
	scannerResult = client.getScannerResults(tableNameInBytes, scan, caching)
	lastOne = None
	for result in scannerResult:
		results.append(result)
		print("Scan Result: ", result)
		lastOne = result
# No more data. Exit.
	if lastOne is None:
		break
	else:
# Regenerate the start row of the next scan.
		newStartRow = bytearray(lastOne.row)
		newStartRow.append(0x00)
		scan = TScan(startRow=newStartRow, stopRow=stopRow)


# Disable and delete a table.
client.disableTable(tableName)
print("Disable table %s success." % tableName)
client.deleteTable(tableName)
print("Delete table %s success." % tableName)

# Close the connection after all operations are complete.
transport.close()