Updated on 2024-06-21 GMT+08:00

Accessing the ThriftServer Operation Table

Scenario

After importing the host where the ThriftServer instances are located and the port that provides services, you can create a Thrift client using the authentication credential and configuration file, access ThriftServer, and obtain table names, create a table, and delete a table based on the specified namespace.

Procedure

  1. Log in to the CloudTable console.
  2. Select a region in the upper left corner of the page.
  3. Click Cluster Management to go to the cluster management page.
  4. Click the name of an HBase cluster to go to the cluster details page and check the Thrift Server status. If Thrift Server is enabled, no further action is required. If Thrift Server is disabled, return to the cluster management page and choose More > Enable Thrift Server.

    • Currently, Thrift Server cannot interconnect with ELB. A single client does not support multiple threads.
    • Download the Thrift installation package from https://www.apache.org/dyn/closer.cgi?path=/thrift/0.11.0/thrift-0.11.0.tar.gz.
    • Download the ThriftServer code example package from https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.3.0/src/hbase-examples/hbase-thrift-example, decompress the package, and obtain the code example.

Java Code Example

  • Preparing for compilation

    Add the following configuration to the pom.xml file of the hbase-thrift-example project:

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.3</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.huawei.bigdata.hbase.examples.TestMain</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assemble</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • Initializing configuration
    The following code snippets belong to the TestMain class in the com.huawei.bigdata.hbase.examples package of the hbase-thrift-example sample project.
        private static void init() throws IOException {
            // Default load from conf directory
            conf = HBaseConfiguration.create();
    
            String userdir = TestMain.class.getClassLoader().getResource("conf").getPath() + File.separator;[1]
            //In Linux environment
            //String userdir = System.getProperty("user.dir") + File.separator + "conf" + File.separator;
            conf.addResource(new Path(userdir + "core-site.xml"), false);
            conf.addResource(new Path(userdir + "hdfs-site.xml"), false);
            conf.addResource(new Path(userdir + "hbase-site.xml"), false);
    }

    [1] userdir obtains the conf directory in the resource path after compilation. The core-site.xml, hdfs-site.xml, and hbase-site.xml files used for initial configuration must be stored in the src/main/resources/conf directory.

  • Connecting to a ThriftServer instance
    The following code snippets belong to the TestMain class in the com.huawei.bigdata.hbase.examples package of the hbase-thrift-example sample project.
        try {    
            test = new ThriftSample();    
            test.test("xxx.xxx.xxx.xxx", THRIFT_PORT, conf);[2]
        } catch (TException | IOException e) {
            LOG.error("Test thrift error", e);
        }

    [2] The value of the input parameter test.test() is the IP address of the node where the ThriftServer instance to be accessed is located. Change the IP address to the actual one. The IP address of the node must be configured in the hosts file of the local host where the sample code is run.

    THRIFT_PORT is the value of hbase.regionserver.thrift.port configured for the ThriftServer instance.

  • Invoking methods
    // Get table of specified namespace. 
    getTableNamesByNamespace(client, "default");
    // Create table. 
    createTable(client, TABLE_NAME);
    // Delete specified table.
     deleteTable(client, TABLE_NAME);
  • Obtaining table names based on the specified namespace

    The following code snippets are in the getTableNamesByNamespace method in the ThriftSample class of the hbase-thrift-example\src\main\java\com\huawei\hadoop\hbase\examples packet.

    private void getTableNamesByNamespace(THBaseService.Iface client, String namespace) throws TException {
         client.getTableNamesByNamespace(namespace)
             .forEach(
                 tTableName -> LOGGER.info("{}", TableName.valueOf(tTableName.getNs(), tTableName.getQualifier())));
     }
  • Creating a table

    The following code snippets are in the createTable method in the ThriftSample class of the hbase-thrift-example\src\main\java\com\huawei\hadoop\hbase\examples packet.

    private void createTable(THBaseService.Iface client, String tableName) throws TException, IOException {
         TTableName table = getTableName(tableName);
         TTableDescriptor descriptor = new TTableDescriptor(table);
         descriptor.setColumns(
             Collections.singletonList(new TColumnFamilyDescriptor().setName(COLUMN_FAMILY.getBytes())));
         if (client.tableExists(table)) {
             LOGGER.warn("Table {} is exists, delete it.", tableName);
             client.disableTable(table);
             client.deleteTable(table);
         }
         client.createTable(descriptor, null);
         if (client.tableExists(table)) {
             LOGGER.info("Created {}.", tableName);
         } else {
             LOGGER.error("Create {} failed.", tableName);
         }
     }
  • Deleting a table

    The following code snippets are in the deleteTable method in the ThriftSample class of the hbase-thrift-example\src\main\java\com\huawei\hadoop\hbase\examples packet.

    private void deleteTable(THBaseService.Iface client, String tableName) throws TException, IOException {
         TTableName table = getTableName(tableName);
         if (client.tableExists(table)) {
             client.disableTable(table);
             client.deleteTable(table);
             LOGGER.info("Deleted {}.", tableName);
         } else {
             LOGGER.warn("{} not exist.", tableName);
         }
     }

C++ Code Example

  • Install the Thrift application on the client node and log in to the SSH tool debugging environment.
[192.168.0.82_ testThrift]#cat /etc/redhat-release
CentOS Linux release 7.5.1804 (Core)
[192.168.0.82_ testThrift]#thrift -version
Thrift version 0.11.0
  • In the /opt directory on the client node, create the hbase.thrift file. The file content is as follows:
namespace java org.apache.hadoop.hbase.thrift2.generated
namespace cpp apache.hadoop.hbase.thrift2
namespace rb Apache.Hadoop.Hbase.Thrift2
namespace py hbase
namespace perl Hbase

struct TTimeRange {
  1: required i64 minStamp,
  2: required i64 maxStamp
}
struct TColumn {
  1: required binary family,
  2: optional binary qualifier,
  3: optional i64 timestamp
}
struct TColumnValue {
  1: required binary family,
  2: required binary qualifier,
  3: required binary value,
  4: optional i64 timestamp,
  5: optional binary tags
}
struct TColumnIncrement {
  1: required binary family,
  2: required binary qualifier,
  3: optional i64 amount = 1
}
struct TResult {
  1: optional binary row,
  2: required list<TColumnValue> columnValues
}
enum TDeleteType {
  DELETE_COLUMN = 0,
  DELETE_COLUMNS = 1
}
enum TDurability {
  SKIP_WAL = 1,
  ASYNC_WAL = 2,
  SYNC_WAL = 3,
  FSYNC_WAL = 4
}
struct TAuthorization {
 1: optional list<string> labels
}
struct TCellVisibility {
 1: optional string expression
}
struct TGet {
  1: required binary row,
  2: optional list<TColumn> columns,

  3: optional i64 timestamp,
  4: optional TTimeRange timeRange,

  5: optional i32 maxVersions,
  6: optional binary filterString,
  7: optional map<binary, binary> attributes
  8: optional TAuthorization authorizations
}
struct TPut {
  1: required binary row,
  2: required list<TColumnValue> columnValues
  3: optional i64 timestamp,
  5: optional map<binary, binary> attributes,
  6: optional TDurability durability,
  7: optional TCellVisibility cellVisibility
}
struct TDelete {
  1: required binary row,
  2: optional list<TColumn> columns,
  3: optional i64 timestamp,
  4: optional TDeleteType deleteType = 1,
  6: optional map<binary, binary> attributes,
  7: optional TDurability durability
}
struct TIncrement {
  1: required binary row,
  2: required list<TColumnIncrement> columns,
  4: optional map<binary, binary> attributes,
  5: optional TDurability durability
  6: optional TCellVisibility cellVisibility
}
struct TAppend {
  1: required binary row,
  2: required list<TColumnValue> columns,
  3: optional map<binary, binary> attributes,
  4: optional TDurability durability
  5: optional TCellVisibility cellVisibility
}
struct TScan {
  1: optional binary startRow,
  2: optional binary stopRow,
  3: optional list<TColumn> columns
  4: optional i32 caching,
  5: optional i32 maxVersions=1,
  6: optional TTimeRange timeRange,
  7: optional binary filterString,
  8: optional i32 batchSize,
  9: optional map<binary, binary> attributes
  10: optional TAuthorization authorizations
  11: optional bool reversed
  12: optional bool cacheBlocks
}
union TMutation {
  1: TPut put,
  2: TDelete deleteSingle,
}
struct TRowMutations {
  1: required binary row
  2: required list<TMutation> mutations
}
struct THRegionInfo {
  1: required i64 regionId
  2: required binary tableName
  3: optional binary startKey
  4: optional binary endKey
  5: optional bool offline
  6: optional bool split
  7: optional i32 replicaId
}
struct TServerName {
  1: required string hostName
  2: optional i32 port
  3: optional i64 startCode
}
struct THRegionLocation {
  1: required TServerName serverName
  2: required THRegionInfo regionInfo
}
enum TCompareOp {
  LESS = 0,
  LESS_OR_EQUAL = 1,
  EQUAL = 2,
  NOT_EQUAL = 3,
  GREATER_OR_EQUAL = 4,
  GREATER = 5,
  NO_OP = 6
}
exception TIOError {
  1: optional string message
}
exception TIllegalArgument {
  1: optional string message
}
service THBaseService {
  bool exists(
    1: required binary table,
    2: required TGet tget
  ) throws (1:TIOError io)
  TResult get(
    1: required binary table,
    2: required TGet tget
  ) throws (1: TIOError io)
  list<TResult> getMultiple(
    1: required binary table,
    2: required list<TGet> tgets
  ) throws (1: TIOError io)
  void put(
    1: required binary table,
    2: required TPut tput
  ) throws (1: TIOError io)
  bool checkAndPut(
    1: required binary table,
    2: required binary row,
    3: required binary family,
    4: required binary qualifier,
    5: binary value,
    6: required TPut tput
  ) throws (1: TIOError io)
  void putMultiple(
    1: required binary table,
    2: required list<TPut> tputs
  ) throws (1: TIOError io)
  void deleteSingle(
    1: required binary table,
    2: required TDelete tdelete
  ) throws (1: TIOError io)
  list<TDelete> deleteMultiple(
    1: required binary table,
    2: required list<TDelete> tdeletes
  ) throws (1: TIOError io)
  bool checkAndDelete(
    1: required binary table,
    2: required binary row,
    3: required binary family,
    4: required binary qualifier,
    5: binary value,
    6: required TDelete tdelete
  ) throws (1: TIOError io)
  TResult increment(
    1: required binary table,
    2: required TIncrement tincrement
  ) throws (1: TIOError io)
  TResult append(
    1: required binary table,
    2: required TAppend tappend
  ) throws (1: TIOError io)
  i32 openScanner(
    1: required binary table,
    2: required TScan tscan,
  ) throws (1: TIOError io)
  list<TResult> getScannerRows(
    1: required i32 scannerId,
    2: i32 numRows = 1
  ) throws (
    1: TIOError io,
    2: TIllegalArgument ia
  )
  void closeScanner(
    1: required i32 scannerId
  ) throws (
    1: TIOError io,
    2: TIllegalArgument ia
  )
  void mutateRow(
    1: required binary table,
    2: required TRowMutations trowMutations
  ) throws (1: TIOError io)
  list<TResult> getScannerResults(
    1: required binary table,
    2: required TScan tscan,
    3: i32 numRows = 1
  ) throws (
    1: TIOError io
  )
  THRegionLocation getRegionLocation(
    1: required binary table,
    2: required binary row,
    3: bool reload,
  ) throws (
    1: TIOError io
  )
  list<THRegionLocation> getAllRegionLocations(
    1: required binary table,
  ) throws (
    1: TIOError io
  )
  bool checkAndMutate(
    1: required binary table,
    2: required binary row,
    3: required binary family,
    4: required binary qualifier,
    5: required TCompareOp compareOp,
    6: binary value,
    7: required TRowMutations rowMutations
  ) throws (1: TIOError io)
}
  • Run the thrift --gen cpp /opt/hbase.thrift command. After the command is successfully executed, the gen-cpp directory is generated in the /opt directory.
  • In the /opt directory on the client node, create the Makefile file. The file content is as follows:
THRIFT_DIR = /usr/local/include/thrift
LIB_DIR = /usr/local/lib

GEN_SRC =  ./gen-cpp/hbase_types.cpp ./gen-cpp/hbase_constants.cpp ./gen-cpp/THBaseService.cpp

.PHONY: clean help

default: HbaseClient
HbaseClient: HbaseClient.cpp
	g++  -o HbaseClient -I${THRIFT_DIR}  -I./gen-cpp -L${LIB_DIR} -Wl,-rpath=${LIB_DIR} HbaseClient.cpp ${GEN_SRC} -lthrift -g

clean:
	rm -rf HbaseClient

help:
	$(warning "Makefile for C++ Hbase Thrift HbaseClient. Modify THRIFT_DIR and LIB_DIR in the \
file to point to correct locations. See $${HBASE_ROOT}/hbase-examples/README.txt for \
details.")
	@:
  • In the /opt directory on the client node, create the HbaseClient.cpp file. The file content is as follows:
#include "THBaseService.h"
#include <config.h>
#include <vector>
#include <ostream>
#include <iostream>
#include "transport/TSocket.h"
#include <transport/TBufferTransports.h>
#include <protocol/TBinaryProtocol.h>

using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::hadoop::hbase::thrift2;

using boost::shared_ptr;

int readdb(int argc, char** argv) {
   fprintf(stderr, "readdb start\n");

        int port = atoi(argv[2]);
        boost::shared_ptr<TSocket> socket(new TSocket(argv[1], port));
        boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
        boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
        try {
        transport->open();
        printf("open\n");
        THBaseServiceClient  client(protocol);
        TResult tresult;
        TGet get;

        std::vector<TColumnValue> cvs;
        const std::string table("test"); 
        const std::string thisrow="row-1-1";
        get.__set_row(thisrow);  
        bool be = client.exists(table,get);
        printf("exists result value = %d\n", be);
        client.get(tresult,table,get);
        vector<TColumnValue> list=tresult.columnValues;
        std::vector<TColumnValue>::const_iterator iter;  
        for(iter=list.begin();iter!=list.end();iter++) {  
            printf("%s, %s,%s\n",(*iter).family.c_str(),(*iter).qualifier.c_str(),(*iter).value.c_str());
        }
        transport->close();  
        printf("close\n");  
     } catch (const TException &tx) {
        std::cerr << "ERROR(exception): " << tx.what() << std::endl;
     }
    fprintf(stderr, "readdb stop\n");
    return 0;
    return 0; 
}

int writedb(int argc, char** argv){
   fprintf(stderr, "writedb start\n");
        int port = atoi(argv[2]);
        boost::shared_ptr<TSocket> socket(new TSocket(argv[1], port));
        boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
        boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
        try {
        char buf[128];
        transport->open();
        printf("open\n");
        THBaseServiceClient  client(protocol);
        TResult tresult;
        TGet get;
        std::vector<TPut> puts;
        const std::string table("test");
        for(int i = 0; i < 10; i++) { 
            fprintf(stderr, "%d, ", i);
            for(int j = 0; j < 10; j++) {
                TPut put;
                std::vector<TColumnValue> cvs;
                //put data
                sprintf(buf, "row-%d-%d", i, j);
                const std::string thisrow(buf);
                put.__set_row(thisrow);				
                TColumnValue tcv;
                tcv.__set_family("info");
                tcv.__set_qualifier("age");
                sprintf(buf, "%d", i * j);				
                tcv.__set_value(buf);
                cvs.insert(cvs.end(), tcv);
                put.__set_columnValues(cvs);
                puts.insert(puts.end(), put);			
            }
            client.putMultiple(table, puts);
            puts.clear();
        }

        transport->close();
        printf("close\n");
     } catch (const TException &tx) {
        std::cerr << "ERROR(exception): " << tx.what() << std::endl;
     }
    fprintf(stderr, "writedb stop\n");
    return 0;
}

int main(int argc, char **argv) {
    if(argc != 3) {
        fprintf(stderr, "param  is :XX ip port\n");
        return -1;
    }
    writedb(argc, argv);
    readdb(argc, argv);
    return 0;  
}
  • Then, run the make command in the /opt directory to compile the provided C++ example code.
  • Go to the HBase shell and run the create'test', 'info' command to create a test table.
  • Run the ./HbaseClient thrift2_server_ip thrift_server_port command, for example, ./HbaseClient 192.168.5.238 9090, to check whether the test table can be read and written. The verification result is as follows:
[192.168.0.82_ thrift2]#./HbaseClient 192.168.5.238 9090
writedb start
open
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, close
writedb stop
readdb start
open
exists result value = 1
info, age,1
close
readdb stop
[192.168.0.82_ thrift2]#