Accessing the ThriftServer Operation Table
Scenario
Users access the corresponding ThriftServer instance based on the specified host and port to create and delete HBase tables.
Prerequisites
Procedure
- Log in to the CloudTable console.
- Select a region in the upper left corner of the page.
- Click Cluster Management to go to the cluster management page.
- Click the name of an HBase cluster to go to the cluster details page and check the Thrift Server status. If Thrift Server is enabled, no further action is required. If Thrift Server is disabled, return to the cluster management page and choose More > Enable Thrift Server.
- ThriftServer does not support load balancing. To prevent overloading a single instance, avoid accessing the same ThriftServer instance concurrently in your code.
- Implement a retry mechanism in the application code to ensure that other ThriftServer instances are retried if a single instance fails or is restarted.
- Install the Thrift installation package on the client node. For details, see the Thrift official guide.
- Run the Thrift command to convert the HBase Thrift definition file to the interface file of the corresponding language. The supported languages include C++ and Python. The following command is used as an example.
thrift --gen <language> hbase.thrift
<Language> indicates the target language to be generated. The value can be cpp (C++) or py (Python).
Take Python as an example. Run the thrift --gen py hbase.thrif command.
C++ Code Example
#include "THBaseService.h" #include <config.h> #include <vector> #include <ostream> #include <iostream> #include "transport/TSocket.h" #include <transport/TBufferTransports.h> #include <protocol/TBinaryProtocol.h> using namespace std; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; using namespace apache::hadoop::hbase::thrift2; using boost::shared_ptr; int main(int argc, char **argv) { // IP address and port number of ThriftServer std::string host = "x.x.x.x"; int port = 9090; boost::shared_ptr<TSocket> socket(new TSocket(host, port)); boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); // Set the table name. std::string ns("default"); std::string table("test"); TTableName tableName; tableName.__set_ns(ns); tableName.__set_qualifier(table); try { // Create a connection. transport->open(); printf("Opened connection\n"); // Initialize the client. THBaseServiceClient client(protocol); // Create a table. TColumnFamilyDescriptor column; column.__set_name("f1"); column.__set_maxVersions(10); std::vector<TColumnFamilyDescriptor> columns; columns.push_back(column); TTableDescriptor tableDescriptor; tableDescriptor.__set_tableName(tableName); tableDescriptor.__set_columns(columns); std::vector<std::string> splitKeys; splitKeys.push_back("row2"); splitKeys.push_back("row4"); splitKeys.push_back("row8"); printf("Creating table: %s\n", table.c_str()); try { client.createTable(tableDescriptor, splitKeys); } catch (const TException &te) { std::cerr << "ERROR: " << te.what() << std::endl; } // Put a single piece of data. TColumnValue columnValue; columnValue.__set_family("f1"); columnValue.__set_qualifier("q1"); columnValue.__set_value("val_001"); std::vector<TColumnValue> columnValues; columnValues.push_back(columnValue); TPut put; put.__set_row("row1"); put.__set_columnValues(columnValues); client.put(table, put); printf("Put single row success\n"); // Put multiple pieces of data. TColumnValue columnValue2; columnValue2.__set_family("f1"); columnValue2.__set_qualifier("q1"); columnValue2.__set_value("val_003"); std::vector<TColumnValue> columnValues2; columnValues2.push_back(columnValue2); TPut put2; put2.__set_row("row3"); put2.__set_columnValues(columnValues2); TColumnValue columnValue3; columnValue3.__set_family("f1"); columnValue3.__set_qualifier("q1"); columnValue3.__set_value("val_005"); std::vector<TColumnValue> columnValues3; columnValues3.push_back(columnValue3); TPut put3; put3.__set_row("row5"); put3.__set_columnValues(columnValues3); std::vector<TPut> puts; puts.push_back(put2); puts.push_back(put3); client.putMultiple(table, puts); printf("Put multiple rows success\n"); // Get a single data record. TResult result; TGet get; get.__set_row("row1"); client.get(result, table, get); std::vector<TColumnValue> list=result.columnValues; std::vector<TColumnValue>::const_iterator iter; std::string row = result.row; for(iter=list.begin();iter!=list.end();iter++) { printf("%s=%s, %s,%s\n",row.c_str(),(*iter).family.c_str(),(*iter).qualifier.c_str(),(*iter).value.c_str()); } printf("Get single row success.\n"); // Get multiple data records. std::vector<TGet> multiGets; TGet get1; get1.__set_row("row1"); multiGets.push_back(get1); TGet get2; get2.__set_row("row5"); multiGets.push_back(get2); std::vector<TResult> multiRows; client.getMultiple(multiRows, table, multiGets); for(std::vector<TResult>::const_iterator iter1=multiRows.begin();iter1!=multiRows.end();iter1++) { std::vector<TColumnValue> list=(*iter1).columnValues; std::vector<TColumnValue>::const_iterator iter2; std::string row = (*iter1).row; for(iter2=list.begin();iter2!=list.end();iter2++) { printf("%s=%s, %s,%s\n",row.c_str(),(*iter2).family.c_str(),(*iter2).qualifier.c_str(),(*iter2).value.c_str()); } } printf("Get multiple rows success.\n"); // Scan to query data. TScan scan; scan.__set_startRow("row1"); scan.__set_stopRow("row7"); int32_t nbRows = 2; std::vector<TResult> results; TResult* current = NULL; while (true) { client.getScannerResults(results, table, scan, nbRows); if (results.size() == 0) { printf("No more result.\n"); break; } std::vector<TResult>::const_iterator itx; for(itx=results.begin();itx!=results.end();itx++) { current = (TResult*) &(*itx); if (current == NULL) { break; } else { std::vector<TColumnValue> values=(*current).columnValues; std::vector<TColumnValue>::const_iterator iterator; for(iterator=list.begin();iterator!=list.end();iterator++) { printf("%s=%s, %s,%s\n",(*current).row.c_str(),(*iterator).family.c_str(),(*iterator).qualifier.c_str(),(*iterator).value.c_str()); } } } if (current == NULL) { printf("Scan data done.\n"); break; } else { scan.__set_startRow((*current).row + (char)0); } } // Disable and delete a table. client.disableTable(tableName); printf("Disabled %s\n", table.c_str()); client.deleteTable(tableName); printf("Deleted %s\n", table.c_str()); transport->close(); printf("Closed connection\n"); } catch (const TException &tx) { std::cerr << "ERROR(exception): " << tx.what() << std::endl; } return 0; }
Python Code Example
# -*- coding: utf-8 -*- # Import the common module. import sys import os # Import the built-in module of Thrift. If the module does not exist, run the pip install thrift command to install it. from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol from thrift.transport import THttpClient from thrift.transport import TSocket # Import the module generated by hbase.thrift. gen_py_path = os.path.abspath('gen-py') sys.path.append(gen_py_path) from hbase import THBaseService from hbase.ttypes import TColumnValue, TColumn, TTableName, TTableDescriptor, TColumnFamilyDescriptor, TGet, TPut, TScan # Configure the IP address of ThriftServer in the CloudTable HBase cluster. You can obtain the IP address from the cluster details page. host = "x.x.x.x" socket = TSocket.TSocket(host, 9090) transport = TTransport.TBufferedTransport(socket) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = THBaseService.Client(protocol) transport.open() # Test table name tableNameInBytes = "test".encode("utf8") tableName = TTableName(ns="default".encode("utf8"), qualifier=tableNameInBytes) # Split key for region pre-splitting splitKeys=[] splitKeys.append("row3".encode("utf8")) splitKeys.append("row5".encode("utf8")) # Create a table. client.createTable(TTableDescriptor(tableName=tableName, columns=[TColumnFamilyDescriptor(name="cf1".encode("utf8"))]), splitKeys) print("Create table %s success." % tableName) # Put a single data record. put = TPut(row="row1".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))]) client.put(tableNameInBytes, put) print("Put single row success.") # Put multiple data records. puts = [] puts.append(TPut(row="row4".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))])) puts.append(TPut(row="row6".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))])) puts.append(TPut(row="row8".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"), qualifier="q1".encode("utf8"), value="test_value1".encode("utf8"))])) client.putMultiple(tableNameInBytes, puts) print("Put rows success.") # Get a single data record. get = TGet(row="row1".encode("utf8")) result = client.get(tableNameInBytes, get) print("Get Result: ", result) # Get multiple data records. gets = [] gets.append(TGet(row="row4".encode("utf8"))) gets.append(TGet(row="row8".encode("utf8"))) results = client.getMultiple(tableNameInBytes, gets) print("Get multiple rows: ", results) # Scan data. startRow, stopRow = "row4".encode("utf8"), "row9".encode("utf8") scan = TScan(startRow=startRow, stopRow=stopRow) caching=1 results = [] while True: scannerResult = client.getScannerResults(tableNameInBytes, scan, caching) lastOne = None for result in scannerResult: results.append(result) print("Scan Result: ", result) lastOne = result # No more data. Exit. if lastOne is None: break else: # Regenerate the start row of the next scan. newStartRow = bytearray(lastOne.row) newStartRow.append(0x00) scan = TScan(startRow=newStartRow, stopRow=stopRow) # Disable and delete a table. client.disableTable(tableName) print("Disable table %s success." % tableName) client.deleteTable(tableName) print("Delete table %s success." % tableName) # Close the connection after all operations are complete. transport.close()
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot