更新时间:2024-06-21 GMT+08:00
分享

访问ThriftServer操作表

操作场景

传入ThriftServer实例所在host和提供服务的port,根据认证凭据及配置文件新建Thrift客户端,访问ThriftServer,进行根据指定namespace获取tablename以及创建表、删除表的操作。

操作步骤

  1. 登录表格存储服务控制台。
  2. 在页面左上角选择区域。
  3. 单击“集群管理”,进入集群管理界面。
  4. 单击HBase集群名称,进入集群详情页面查看ThriftServer的状态,如果ThriftServer为开启状态,无需开启操作;如果ThriftServer为关闭状态,则选返回集群管理界面,单击“更多 > 开启ThriftServer”,等待完成后,即可进行使用。

    • ThriftServer目前不对接ELB,单Client不支持多线程。
    • 下载Thrift安装包,下载链接:https://www.apache.org/dyn/closer.cgi?path=/thrift/0.11.0/thrift-0.11.0.tar.gz
    • ThriftServer代码样例下载地址:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.3.0/src/hbase-examples/hbase-thrift-example,下载压缩包到本地后解压,即可获取代码样例。

Java代码样例

  • 编译前准备

    在“hbase-thrift-example”样例工程的“pom.xml”文件中,增加以下配置。

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.3</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.huawei.bigdata.hbase.examples.TestMain</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assemble</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • 初始化配置
    以下代码在“hbase-thrift-example”样例工程的“com.huawei.bigdata.hbase.examples”包的“TestMain”类中。
        private static void init() throws IOException {
            // Default load from conf directory
            conf = HBaseConfiguration.create();
    
            String userdir = TestMain.class.getClassLoader().getResource("conf").getPath() + File.separator;[1]
            //In Linux environment
            //String userdir = System.getProperty("user.dir") + File.separator + "conf" + File.separator;
            conf.addResource(new Path(userdir + "core-site.xml"), false);
            conf.addResource(new Path(userdir + "hdfs-site.xml"), false);
            conf.addResource(new Path(userdir + "hbase-site.xml"), false);
    }

    [1]userdir获取的是编译后资源路径下conf目录的路径。初始化配置用到的core-site.xml、hdfs-site.xml、hbase-site.xml文件,需要放置到"src/main/resources/conf"的目录下。

  • 连接ThriftServer实例
    以下代码在“hbase-thrift-example”样例工程的“com.huawei.bigdata.hbase.examples”包的“TestMain”类中。
        try {    
            test = new ThriftSample();    
            test.test("xxx.xxx.xxx.xxx", THRIFT_PORT, conf);[2]
        } catch (TException | IOException e) {
            LOG.error("Test thrift error", e);
        }

    [2]test.test()传入参数为待访问的ThriftServer实例所在节点ip地址,需根据实际运行集群情况进行修改,且该节点ip需要配置到运行样例代码的本机hosts文件中。

    “THRIFT_PORT”为ThriftServer实例的配置参数"hbase.regionserver.thrift.port"对应的值。

  • 方法调用
    // Get table of specified namespace. 
    getTableNamesByNamespace(client, "default");
    // Create table. 
    createTable(client, TABLE_NAME);
    // Delete specified table.
     deleteTable(client, TABLE_NAME);
  • 根据指定namespace获取tablename

    以下代码片段在“hbase-thrift-example\src\main\java\com\huawei\hadoop\hbase\examples”包的“ThriftSample”类的getTableNamesByNamespace方法中。

    private void getTableNamesByNamespace(THBaseService.Iface client, String namespace) throws TException {
         client.getTableNamesByNamespace(namespace)
             .forEach(
                 tTableName -> LOGGER.info("{}", TableName.valueOf(tTableName.getNs(), tTableName.getQualifier())));
     }
  • 创建表

    以下代码片段在“hbase-thrift-example\src\main\java\com\huawei\hadoop\hbase\examples”包的“ThriftSample”类的createTable方法中。

    private void createTable(THBaseService.Iface client, String tableName) throws TException, IOException {
         TTableName table = getTableName(tableName);
         TTableDescriptor descriptor = new TTableDescriptor(table);
         descriptor.setColumns(
             Collections.singletonList(new TColumnFamilyDescriptor().setName(COLUMN_FAMILY.getBytes())));
         if (client.tableExists(table)) {
             LOGGER.warn("Table {} is exists, delete it.", tableName);
             client.disableTable(table);
             client.deleteTable(table);
         }
         client.createTable(descriptor, null);
         if (client.tableExists(table)) {
             LOGGER.info("Created {}.", tableName);
         } else {
             LOGGER.error("Create {} failed.", tableName);
         }
     }
  • 删除表

    以下代码片段在“hbase-thrift-example\src\main\java\com\huawei\hadoop\hbase\examples”包的“ThriftSample”类的deleteTable方法中。

    private void deleteTable(THBaseService.Iface client, String tableName) throws TException, IOException {
         TTableName table = getTableName(tableName);
         if (client.tableExists(table)) {
             client.disableTable(table);
             client.deleteTable(table);
             LOGGER.info("Deleted {}.", tableName);
         } else {
             LOGGER.warn("{} not exist.", tableName);
         }
     }

C++代码样例

  • 请在客户端节点安装Thrift应用,登录SSH工具调试环境:
[192.168.0.82_ testThrift]#cat /etc/redhat-release
CentOS Linux release 7.5.1804 (Core)
[192.168.0.82_ testThrift]#thrift -version
Thrift version 0.11.0
  • 在客户端节点的/opt目录下,创建文件hbase.thrift,内容如下:
namespace java org.apache.hadoop.hbase.thrift2.generated
namespace cpp apache.hadoop.hbase.thrift2
namespace rb Apache.Hadoop.Hbase.Thrift2
namespace py hbase
namespace perl Hbase

struct TTimeRange {
  1: required i64 minStamp,
  2: required i64 maxStamp
}
struct TColumn {
  1: required binary family,
  2: optional binary qualifier,
  3: optional i64 timestamp
}
struct TColumnValue {
  1: required binary family,
  2: required binary qualifier,
  3: required binary value,
  4: optional i64 timestamp,
  5: optional binary tags
}
struct TColumnIncrement {
  1: required binary family,
  2: required binary qualifier,
  3: optional i64 amount = 1
}
struct TResult {
  1: optional binary row,
  2: required list<TColumnValue> columnValues
}
enum TDeleteType {
  DELETE_COLUMN = 0,
  DELETE_COLUMNS = 1
}
enum TDurability {
  SKIP_WAL = 1,
  ASYNC_WAL = 2,
  SYNC_WAL = 3,
  FSYNC_WAL = 4
}
struct TAuthorization {
 1: optional list<string> labels
}
struct TCellVisibility {
 1: optional string expression
}
struct TGet {
  1: required binary row,
  2: optional list<TColumn> columns,

  3: optional i64 timestamp,
  4: optional TTimeRange timeRange,

  5: optional i32 maxVersions,
  6: optional binary filterString,
  7: optional map<binary, binary> attributes
  8: optional TAuthorization authorizations
}
struct TPut {
  1: required binary row,
  2: required list<TColumnValue> columnValues
  3: optional i64 timestamp,
  5: optional map<binary, binary> attributes,
  6: optional TDurability durability,
  7: optional TCellVisibility cellVisibility
}
struct TDelete {
  1: required binary row,
  2: optional list<TColumn> columns,
  3: optional i64 timestamp,
  4: optional TDeleteType deleteType = 1,
  6: optional map<binary, binary> attributes,
  7: optional TDurability durability
}
struct TIncrement {
  1: required binary row,
  2: required list<TColumnIncrement> columns,
  4: optional map<binary, binary> attributes,
  5: optional TDurability durability
  6: optional TCellVisibility cellVisibility
}
struct TAppend {
  1: required binary row,
  2: required list<TColumnValue> columns,
  3: optional map<binary, binary> attributes,
  4: optional TDurability durability
  5: optional TCellVisibility cellVisibility
}
struct TScan {
  1: optional binary startRow,
  2: optional binary stopRow,
  3: optional list<TColumn> columns
  4: optional i32 caching,
  5: optional i32 maxVersions=1,
  6: optional TTimeRange timeRange,
  7: optional binary filterString,
  8: optional i32 batchSize,
  9: optional map<binary, binary> attributes
  10: optional TAuthorization authorizations
  11: optional bool reversed
  12: optional bool cacheBlocks
}
union TMutation {
  1: TPut put,
  2: TDelete deleteSingle,
}
struct TRowMutations {
  1: required binary row
  2: required list<TMutation> mutations
}
struct THRegionInfo {
  1: required i64 regionId
  2: required binary tableName
  3: optional binary startKey
  4: optional binary endKey
  5: optional bool offline
  6: optional bool split
  7: optional i32 replicaId
}
struct TServerName {
  1: required string hostName
  2: optional i32 port
  3: optional i64 startCode
}
struct THRegionLocation {
  1: required TServerName serverName
  2: required THRegionInfo regionInfo
}
enum TCompareOp {
  LESS = 0,
  LESS_OR_EQUAL = 1,
  EQUAL = 2,
  NOT_EQUAL = 3,
  GREATER_OR_EQUAL = 4,
  GREATER = 5,
  NO_OP = 6
}
exception TIOError {
  1: optional string message
}
exception TIllegalArgument {
  1: optional string message
}
service THBaseService {
  bool exists(
    1: required binary table,
    2: required TGet tget
  ) throws (1:TIOError io)
  TResult get(
    1: required binary table,
    2: required TGet tget
  ) throws (1: TIOError io)
  list<TResult> getMultiple(
    1: required binary table,
    2: required list<TGet> tgets
  ) throws (1: TIOError io)
  void put(
    1: required binary table,
    2: required TPut tput
  ) throws (1: TIOError io)
  bool checkAndPut(
    1: required binary table,
    2: required binary row,
    3: required binary family,
    4: required binary qualifier,
    5: binary value,
    6: required TPut tput
  ) throws (1: TIOError io)
  void putMultiple(
    1: required binary table,
    2: required list<TPut> tputs
  ) throws (1: TIOError io)
  void deleteSingle(
    1: required binary table,
    2: required TDelete tdelete
  ) throws (1: TIOError io)
  list<TDelete> deleteMultiple(
    1: required binary table,
    2: required list<TDelete> tdeletes
  ) throws (1: TIOError io)
  bool checkAndDelete(
    1: required binary table,
    2: required binary row,
    3: required binary family,
    4: required binary qualifier,
    5: binary value,
    6: required TDelete tdelete
  ) throws (1: TIOError io)
  TResult increment(
    1: required binary table,
    2: required TIncrement tincrement
  ) throws (1: TIOError io)
  TResult append(
    1: required binary table,
    2: required TAppend tappend
  ) throws (1: TIOError io)
  i32 openScanner(
    1: required binary table,
    2: required TScan tscan,
  ) throws (1: TIOError io)
  list<TResult> getScannerRows(
    1: required i32 scannerId,
    2: i32 numRows = 1
  ) throws (
    1: TIOError io,
    2: TIllegalArgument ia
  )
  void closeScanner(
    1: required i32 scannerId
  ) throws (
    1: TIOError io,
    2: TIllegalArgument ia
  )
  void mutateRow(
    1: required binary table,
    2: required TRowMutations trowMutations
  ) throws (1: TIOError io)
  list<TResult> getScannerResults(
    1: required binary table,
    2: required TScan tscan,
    3: i32 numRows = 1
  ) throws (
    1: TIOError io
  )
  THRegionLocation getRegionLocation(
    1: required binary table,
    2: required binary row,
    3: bool reload,
  ) throws (
    1: TIOError io
  )
  list<THRegionLocation> getAllRegionLocations(
    1: required binary table,
  ) throws (
    1: TIOError io
  )
  bool checkAndMutate(
    1: required binary table,
    2: required binary row,
    3: required binary family,
    4: required binary qualifier,
    5: required TCompareOp compareOp,
    6: binary value,
    7: required TRowMutations rowMutations
  ) throws (1: TIOError io)
}
  • 执行命令:thrift --gen cpp /opt/hbase.thrift,执行成功后会在/opt目录生成一个gen-cpp目录。
  • 在客户端节点的/opt目录下,创建文件Makefile,内容如下:
THRIFT_DIR = /usr/local/include/thrift
LIB_DIR = /usr/local/lib

GEN_SRC =  ./gen-cpp/hbase_types.cpp ./gen-cpp/hbase_constants.cpp ./gen-cpp/THBaseService.cpp

.PHONY: clean help

default: HbaseClient
HbaseClient: HbaseClient.cpp
	g++  -o HbaseClient -I${THRIFT_DIR}  -I./gen-cpp -L${LIB_DIR} -Wl,-rpath=${LIB_DIR} HbaseClient.cpp ${GEN_SRC} -lthrift -g

clean:
	rm -rf HbaseClient

help:
	$(warning "Makefile for C++ Hbase Thrift HbaseClient. Modify THRIFT_DIR and LIB_DIR in the \
file to point to correct locations. See $${HBASE_ROOT}/hbase-examples/README.txt for \
details.")
	@:
  • 在客户端节点的/opt目录下,创建文件HbaseClient.cpp,内容如下:
#include "THBaseService.h"
#include <config.h>
#include <vector>
#include <ostream>
#include <iostream>
#include "transport/TSocket.h"
#include <transport/TBufferTransports.h>
#include <protocol/TBinaryProtocol.h>

using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::hadoop::hbase::thrift2;

using boost::shared_ptr;

int readdb(int argc, char** argv) {
   fprintf(stderr, "readdb start\n");

        int port = atoi(argv[2]);
        boost::shared_ptr<TSocket> socket(new TSocket(argv[1], port));
        boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
        boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
        try {
        transport->open();
        printf("open\n");
        THBaseServiceClient  client(protocol);
        TResult tresult;
        TGet get;

        std::vector<TColumnValue> cvs;
        const std::string table("test"); 
        const std::string thisrow="row-1-1";
        get.__set_row(thisrow);  
        bool be = client.exists(table,get);
        printf("exists result value = %d\n", be);
        client.get(tresult,table,get);
        vector<TColumnValue> list=tresult.columnValues;
        std::vector<TColumnValue>::const_iterator iter;  
        for(iter=list.begin();iter!=list.end();iter++) {  
            printf("%s, %s,%s\n",(*iter).family.c_str(),(*iter).qualifier.c_str(),(*iter).value.c_str());
        }
        transport->close();  
        printf("close\n");  
     } catch (const TException &tx) {
        std::cerr << "ERROR(exception): " << tx.what() << std::endl;
     }
    fprintf(stderr, "readdb stop\n");
    return 0;
    return 0; 
}

int writedb(int argc, char** argv){
   fprintf(stderr, "writedb start\n");
        int port = atoi(argv[2]);
        boost::shared_ptr<TSocket> socket(new TSocket(argv[1], port));
        boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
        boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
        try {
        char buf[128];
        transport->open();
        printf("open\n");
        THBaseServiceClient  client(protocol);
        TResult tresult;
        TGet get;
        std::vector<TPut> puts;
        const std::string table("test");
        for(int i = 0; i < 10; i++) { 
            fprintf(stderr, "%d, ", i);
            for(int j = 0; j < 10; j++) {
                TPut put;
                std::vector<TColumnValue> cvs;
                //put data
                sprintf(buf, "row-%d-%d", i, j);
                const std::string thisrow(buf);
                put.__set_row(thisrow);				
                TColumnValue tcv;
                tcv.__set_family("info");
                tcv.__set_qualifier("age");
                sprintf(buf, "%d", i * j);				
                tcv.__set_value(buf);
                cvs.insert(cvs.end(), tcv);
                put.__set_columnValues(cvs);
                puts.insert(puts.end(), put);			
            }
            client.putMultiple(table, puts);
            puts.clear();
        }

        transport->close();
        printf("close\n");
     } catch (const TException &tx) {
        std::cerr << "ERROR(exception): " << tx.what() << std::endl;
     }
    fprintf(stderr, "writedb stop\n");
    return 0;
}

int main(int argc, char **argv) {
    if(argc != 3) {
        fprintf(stderr, "param  is :XX ip port\n");
        return -1;
    }
    writedb(argc, argv);
    readdb(argc, argv);
    return 0;  
}
  • 然后/opt目录下执行make命令,编译提供的C++样例代码。
  • 进入hbase shell执行命令create 'test', 'info',创建一张测试表
  • 执行命令 ./HbaseClient thrift2_server_ip thrift_server_port,例如:./HbaseClient 192.168.5.238 9090,验证能否读写表test,验证结果如下:
[192.168.0.82_ thrift2]#./HbaseClient 192.168.5.238 9090
writedb start
open
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, close
writedb stop
readdb start
open
exists result value = 1
info, age,1
close
readdb stop
[192.168.0.82_ thrift2]#
分享:

    相关文档

    相关产品