- 登录表格存储服务控制台。
- 在页面左上角选择区域。
- 单击“集群管理”,进入集群管理界面。
- 单击HBase集群名称,进入集群详情页面查看ThriftServer的状态,如果ThriftServer为开启状态,无需开启操作;如果ThriftServer为关闭状态,则选返回集群管理界面,单击“更多 > 开启ThriftServer”,等待完成后,即可进行使用。
- ThriftServer目前不对接ELB,单Client不支持多线程。
- 下载Thrift安装包,下载链接:https://www.apache.org/dyn/closer.cgi?path=/thrift/0.11.0/thrift-0.11.0.tar.gz。
- ThriftServer代码样例下载地址:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.3.0/src/hbase-examples/hbase-thrift-example,下载压缩包到本地后解压,即可获取代码样例。
- 编译前准备
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.3</version> <configuration> <archive> <manifest> <mainClass>com.huawei.bigdata.hbase.examples.TestMain</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assemble</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
- 初始化配置
private static void init() throws IOException { // Default load from conf directory conf = HBaseConfiguration.create(); String userdir = TestMain.class.getClassLoader().getResource("conf").getPath() + File.separator;[1] //In Linux environment //String userdir = System.getProperty("user.dir") + File.separator + "conf" + File.separator; conf.addResource(new Path(userdir + "core-site.xml"), false); conf.addResource(new Path(userdir + "hdfs-site.xml"), false); conf.addResource(new Path(userdir + "hbase-site.xml"), false); }
- 连接ThriftServer实例
try { test = new ThriftSample(); test.test("xxx.xxx.xxx.xxx", THRIFT_PORT, conf);[2] } catch (TException | IOException e) { LOG.error("Test thrift error", e); }
- 方法调用
// Get table of specified namespace. getTableNamesByNamespace(client, "default"); // Create table. createTable(client, TABLE_NAME); // Delete specified table. deleteTable(client, TABLE_NAME);
- 根据指定namespace获取tablename
private void getTableNamesByNamespace(THBaseService.Iface client, String namespace) throws TException { client.getTableNamesByNamespace(namespace) .forEach( tTableName -> LOGGER.info("{}", TableName.valueOf(tTableName.getNs(), tTableName.getQualifier()))); }
- 创建表
private void createTable(THBaseService.Iface client, String tableName) throws TException, IOException { TTableName table = getTableName(tableName); TTableDescriptor descriptor = new TTableDescriptor(table); descriptor.setColumns( Collections.singletonList(new TColumnFamilyDescriptor().setName(COLUMN_FAMILY.getBytes()))); if (client.tableExists(table)) { LOGGER.warn("Table {} is exists, delete it.", tableName); client.disableTable(table); client.deleteTable(table); } client.createTable(descriptor, null); if (client.tableExists(table)) { LOGGER.info("Created {}.", tableName); } else { LOGGER.error("Create {} failed.", tableName); } }
- 删除表
private void deleteTable(THBaseService.Iface client, String tableName) throws TException, IOException { TTableName table = getTableName(tableName); if (client.tableExists(table)) { client.disableTable(table); client.deleteTable(table); LOGGER.info("Deleted {}.", tableName); } else { LOGGER.warn("{} not exist.", tableName); } }
- 请在客户端节点安装Thrift应用,登录SSH工具调试环境:
[ testThrift]#cat /etc/redhat-release CentOS Linux release 7.5.1804 (Core) [ testThrift]#thrift -version Thrift version 0.11.0
- 在客户端节点的/opt目录下,创建文件hbase.thrift,内容如下:
namespace java org.apache.hadoop.hbase.thrift2.generated namespace cpp apache.hadoop.hbase.thrift2 namespace rb Apache.Hadoop.Hbase.Thrift2 namespace py hbase namespace perl Hbase struct TTimeRange { 1: required i64 minStamp, 2: required i64 maxStamp } struct TColumn { 1: required binary family, 2: optional binary qualifier, 3: optional i64 timestamp } struct TColumnValue { 1: required binary family, 2: required binary qualifier, 3: required binary value, 4: optional i64 timestamp, 5: optional binary tags } struct TColumnIncrement { 1: required binary family, 2: required binary qualifier, 3: optional i64 amount = 1 } struct TResult { 1: optional binary row, 2: required list<TColumnValue> columnValues } enum TDeleteType { DELETE_COLUMN = 0, DELETE_COLUMNS = 1 } enum TDurability { SKIP_WAL = 1, ASYNC_WAL = 2, SYNC_WAL = 3, FSYNC_WAL = 4 } struct TAuthorization { 1: optional list<string> labels } struct TCellVisibility { 1: optional string expression } struct TGet { 1: required binary row, 2: optional list<TColumn> columns, 3: optional i64 timestamp, 4: optional TTimeRange timeRange, 5: optional i32 maxVersions, 6: optional binary filterString, 7: optional map<binary, binary> attributes 8: optional TAuthorization authorizations } struct TPut { 1: required binary row, 2: required list<TColumnValue> columnValues 3: optional i64 timestamp, 5: optional map<binary, binary> attributes, 6: optional TDurability durability, 7: optional TCellVisibility cellVisibility } struct TDelete { 1: required binary row, 2: optional list<TColumn> columns, 3: optional i64 timestamp, 4: optional TDeleteType deleteType = 1, 6: optional map<binary, binary> attributes, 7: optional TDurability durability } struct TIncrement { 1: required binary row, 2: required list<TColumnIncrement> columns, 4: optional map<binary, binary> attributes, 5: optional TDurability durability 6: optional TCellVisibility cellVisibility } struct TAppend { 1: required binary row, 2: required list<TColumnValue> columns, 3: optional map<binary, binary> attributes, 4: optional TDurability durability 5: optional TCellVisibility cellVisibility } struct TScan { 1: optional binary startRow, 2: optional binary stopRow, 3: optional list<TColumn> columns 4: optional i32 caching, 5: optional i32 maxVersions=1, 6: optional TTimeRange timeRange, 7: optional binary filterString, 8: optional i32 batchSize, 9: optional map<binary, binary> attributes 10: optional TAuthorization authorizations 11: optional bool reversed 12: optional bool cacheBlocks } union TMutation { 1: TPut put, 2: TDelete deleteSingle, } struct TRowMutations { 1: required binary row 2: required list<TMutation> mutations } struct THRegionInfo { 1: required i64 regionId 2: required binary tableName 3: optional binary startKey 4: optional binary endKey 5: optional bool offline 6: optional bool split 7: optional i32 replicaId } struct TServerName { 1: required string hostName 2: optional i32 port 3: optional i64 startCode } struct THRegionLocation { 1: required TServerName serverName 2: required THRegionInfo regionInfo } enum TCompareOp { LESS = 0, LESS_OR_EQUAL = 1, EQUAL = 2, NOT_EQUAL = 3, GREATER_OR_EQUAL = 4, GREATER = 5, NO_OP = 6 } exception TIOError { 1: optional string message } exception TIllegalArgument { 1: optional string message } service THBaseService { bool exists( 1: required binary table, 2: required TGet tget ) throws (1:TIOError io) TResult get( 1: required binary table, 2: required TGet tget ) throws (1: TIOError io) list<TResult> getMultiple( 1: required binary table, 2: required list<TGet> tgets ) throws (1: TIOError io) void put( 1: required binary table, 2: required TPut tput ) throws (1: TIOError io) bool checkAndPut( 1: required binary table, 2: required binary row, 3: required binary family, 4: required binary qualifier, 5: binary value, 6: required TPut tput ) throws (1: TIOError io) void putMultiple( 1: required binary table, 2: required list<TPut> tputs ) throws (1: TIOError io) void deleteSingle( 1: required binary table, 2: required TDelete tdelete ) throws (1: TIOError io) list<TDelete> deleteMultiple( 1: required binary table, 2: required list<TDelete> tdeletes ) throws (1: TIOError io) bool checkAndDelete( 1: required binary table, 2: required binary row, 3: required binary family, 4: required binary qualifier, 5: binary value, 6: required TDelete tdelete ) throws (1: TIOError io) TResult increment( 1: required binary table, 2: required TIncrement tincrement ) throws (1: TIOError io) TResult append( 1: required binary table, 2: required TAppend tappend ) throws (1: TIOError io) i32 openScanner( 1: required binary table, 2: required TScan tscan, ) throws (1: TIOError io) list<TResult> getScannerRows( 1: required i32 scannerId, 2: i32 numRows = 1 ) throws ( 1: TIOError io, 2: TIllegalArgument ia ) void closeScanner( 1: required i32 scannerId ) throws ( 1: TIOError io, 2: TIllegalArgument ia ) void mutateRow( 1: required binary table, 2: required TRowMutations trowMutations ) throws (1: TIOError io) list<TResult> getScannerResults( 1: required binary table, 2: required TScan tscan, 3: i32 numRows = 1 ) throws ( 1: TIOError io ) THRegionLocation getRegionLocation( 1: required binary table, 2: required binary row, 3: bool reload, ) throws ( 1: TIOError io ) list<THRegionLocation> getAllRegionLocations( 1: required binary table, ) throws ( 1: TIOError io ) bool checkAndMutate( 1: required binary table, 2: required binary row, 3: required binary family, 4: required binary qualifier, 5: required TCompareOp compareOp, 6: binary value, 7: required TRowMutations rowMutations ) throws (1: TIOError io) }
- 执行命令:thrift --gen cpp /opt/hbase.thrift,执行成功后会在/opt目录生成一个gen-cpp目录。
- 在客户端节点的/opt目录下,创建文件Makefile,内容如下:
THRIFT_DIR = /usr/local/include/thrift LIB_DIR = /usr/local/lib GEN_SRC = ./gen-cpp/hbase_types.cpp ./gen-cpp/hbase_constants.cpp ./gen-cpp/THBaseService.cpp .PHONY: clean help default: HbaseClient HbaseClient: HbaseClient.cpp g++ -o HbaseClient -I${THRIFT_DIR} -I./gen-cpp -L${LIB_DIR} -Wl,-rpath=${LIB_DIR} HbaseClient.cpp ${GEN_SRC} -lthrift -g clean: rm -rf HbaseClient help: $(warning "Makefile for C++ Hbase Thrift HbaseClient. Modify THRIFT_DIR and LIB_DIR in the \ file to point to correct locations. See $${HBASE_ROOT}/hbase-examples/README.txt for \ details.") @:
- 在客户端节点的/opt目录下,创建文件HbaseClient.cpp,内容如下:
#include "THBaseService.h" #include <config.h> #include <vector> #include <ostream> #include <iostream> #include "transport/TSocket.h" #include <transport/TBufferTransports.h> #include <protocol/TBinaryProtocol.h> using namespace std; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; using namespace apache::hadoop::hbase::thrift2; using boost::shared_ptr; int readdb(int argc, char** argv) { fprintf(stderr, "readdb start\n"); int port = atoi(argv[2]); boost::shared_ptr<TSocket> socket(new TSocket(argv[1], port)); boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); try { transport->open(); printf("open\n"); THBaseServiceClient client(protocol); TResult tresult; TGet get; std::vector<TColumnValue> cvs; const std::string table("test"); const std::string thisrow="row-1-1"; get.__set_row(thisrow); bool be = client.exists(table,get); printf("exists result value = %d\n", be); client.get(tresult,table,get); vector<TColumnValue> list=tresult.columnValues; std::vector<TColumnValue>::const_iterator iter; for(iter=list.begin();iter!=list.end();iter++) { printf("%s, %s,%s\n",(*iter).family.c_str(),(*iter).qualifier.c_str(),(*iter).value.c_str()); } transport->close(); printf("close\n"); } catch (const TException &tx) { std::cerr << "ERROR(exception): " << tx.what() << std::endl; } fprintf(stderr, "readdb stop\n"); return 0; return 0; } int writedb(int argc, char** argv){ fprintf(stderr, "writedb start\n"); int port = atoi(argv[2]); boost::shared_ptr<TSocket> socket(new TSocket(argv[1], port)); boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); try { char buf[128]; transport->open(); printf("open\n"); THBaseServiceClient client(protocol); TResult tresult; TGet get; std::vector<TPut> puts; const std::string table("test"); for(int i = 0; i < 10; i++) { fprintf(stderr, "%d, ", i); for(int j = 0; j < 10; j++) { TPut put; std::vector<TColumnValue> cvs; //put data sprintf(buf, "row-%d-%d", i, j); const std::string thisrow(buf); put.__set_row(thisrow); TColumnValue tcv; tcv.__set_family("info"); tcv.__set_qualifier("age"); sprintf(buf, "%d", i * j); tcv.__set_value(buf); cvs.insert(cvs.end(), tcv); put.__set_columnValues(cvs); puts.insert(puts.end(), put); } client.putMultiple(table, puts); puts.clear(); } transport->close(); printf("close\n"); } catch (const TException &tx) { std::cerr << "ERROR(exception): " << tx.what() << std::endl; } fprintf(stderr, "writedb stop\n"); return 0; } int main(int argc, char **argv) { if(argc != 3) { fprintf(stderr, "param is :XX ip port\n"); return -1; } writedb(argc, argv); readdb(argc, argv); return 0; }
- 然后/opt目录下执行make命令,编译提供的C++样例代码。
- 进入hbase shell执行命令create 'test', 'info',创建一张测试表。
- 执行命令 ./HbaseClient thrift2_server_ip thrift_server_port,例如:./HbaseClient 9090,验证能否读写表test,验证结果如下:
[ thrift2]#./HbaseClient 9090 writedb start open 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, close writedb stop readdb start open exists result value = 1 info, age,1 close readdb stop [ thrift2]#