更新时间:2022-12-14 GMT+08:00

API

本章节描述Segment的API以及使用方法,所有方法在org.apache.spark.util.CarbonSegmentUtil类中。

如下方法已废弃:

/** 
* Returns the valid segments for the query based on the filter condition 
* present in carbonScanRdd. 
* 
* @param carbonScanRdd 
* @return Array of valid segments 
*/ 
@deprecated def getFilteredSegments(carbonScanRdd: CarbonScanRDD[InternalRow]): Array[String];

使用方法

使用如下方法从查询语句中获得CarbonScanRDD:
val df=carbon.sql("select * from table where age='12'") 
val myscan=df.queryExecution.sparkPlan.collect { 
case scan: CarbonDataSourceScan if scan.rdd.isInstanceOf[CarbonScanRDD[InternalRow]] => scan.rdd 
case scan: RowDataSourceScanExec if scan.rdd.isInstanceOf[CarbonScanRDD[InternalRow]] => scan.rdd 
}.head 
val carbonrdd=myscan.asInstanceOf[CarbonScanRDD[InternalRow]]

例子:

CarbonSegmentUtil.getFilteredSegments(carbonrdd) 
可以通过传入sql语句来获取过滤后的segment:
/** 
* Returns an array of valid segment numbers based on the filter condition provided in the sql 
* NOTE: This API is supported only for SELECT Sql (insert into,ctas,.., is not supported) 
* 
* @param sql 
* @param sparkSession 
* @return Array of valid segments 
* @throws UnsupportedOperationException because Get Filter Segments API supports if and only 
* if only one carbon main table is present in query. 
*/ 
def getFilteredSegments(sql: String, sparkSession: SparkSession): Array[String];

例子:

CarbonSegmentUtil.getFilteredSegments("select * from table where age='12'”, sparkSession)

传入数据库名和表名,获取会被合并的segment列表,得到的segment列表可以当做getMergedLoadName函数的参数传入:

/** 
* Identifies all segments which can be merged with MAJOR compaction type. 
* NOTE: This result can be passed to getMergedLoadName API to get the merged load name. 
* 
* @param sparkSession
* @param tableName 
* @param dbName 
* @return list of LoadMetadataDetails  
*/ 
def identifySegmentsToBeMerged(sparkSession: SparkSession, 
tableName: String, 
dbName: String) : util.List[LoadMetadataDetails];

例子:

CarbonSegmentUtil.identifySegmentsToBeMerged(sparkSession, "table_test","default") 

传入数据库名、表名和自定义的segment列表,获取自定义合并操作会被合并的segment列表,得到的segment列表可以当做getMergedLoadName函数的参数传入:

/** 
* Identifies all segments which can be merged with CUSTOM compaction type. 
*  NOTE: This result can be passed to getMergedLoadName API to get the merged load name. 
* 
* @param sparkSession 
* @param tableName 
* @param dbName 
* @param customSegments 
* @return list of LoadMetadataDetails 
* @throws UnsupportedOperationException if customSegments is null or empty. 
* @throws MalformedCarbonCommandException if segment does not exist or is not valid 
*/ 
def identifySegmentsToBeMergedCustom(sparkSession: SparkSession, 
tableName: String, 
dbName: String, 
customSegments: util.List[String]): util.List[LoadMetadataDetails];

例子:

val customSegments = new util.ArrayList[String]()
customSegments.add("1") 
customSegments.add("2")  
CarbonSegmentUtil.identifySegmentsToBeMergedCustom(sparkSession, "table_test","default", customSegments)

给定segment列表,返回合并后新的导入名称:

/** 
* Returns the Merged Load Name for given list of segments 
* 
* @param list of segments 
* @return Merged Load Name 
* @throws UnsupportedOperationException if list of segments is less than 1 
*/ 
def getMergedLoadName(list: util.List[LoadMetadataDetails]): String;

例子:

val carbonTable = CarbonEnv.getCarbonTable(Option(databaseName), tableName)(sparkSession) 
val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)  CarbonSegmentUtil.getMergedLoadName(loadMetadataDetails.toList.asJava)