更新时间:2022-12-14 GMT+08:00
API
本章节描述Segment的API以及使用方法,所有方法在org.apache.spark.util.CarbonSegmentUtil类中。
如下方法已废弃:
/** * Returns the valid segments for the query based on the filter condition * present in carbonScanRdd. * * @param carbonScanRdd * @return Array of valid segments */ @deprecated def getFilteredSegments(carbonScanRdd: CarbonScanRDD[InternalRow]): Array[String];
使用方法
使用如下方法从查询语句中获得CarbonScanRDD:
val df=carbon.sql("select * from table where age='12'") val myscan=df.queryExecution.sparkPlan.collect { case scan: CarbonDataSourceScan if scan.rdd.isInstanceOf[CarbonScanRDD[InternalRow]] => scan.rdd case scan: RowDataSourceScanExec if scan.rdd.isInstanceOf[CarbonScanRDD[InternalRow]] => scan.rdd }.head val carbonrdd=myscan.asInstanceOf[CarbonScanRDD[InternalRow]]
例子:
CarbonSegmentUtil.getFilteredSegments(carbonrdd)
可以通过传入sql语句来获取过滤后的segment:
/** * Returns an array of valid segment numbers based on the filter condition provided in the sql * NOTE: This API is supported only for SELECT Sql (insert into,ctas,.., is not supported) * * @param sql * @param sparkSession * @return Array of valid segments * @throws UnsupportedOperationException because Get Filter Segments API supports if and only * if only one carbon main table is present in query. */ def getFilteredSegments(sql: String, sparkSession: SparkSession): Array[String];
例子:
CarbonSegmentUtil.getFilteredSegments("select * from table where age='12'”, sparkSession)
传入数据库名和表名,获取会被合并的segment列表,得到的segment列表可以当做getMergedLoadName函数的参数传入:
/** * Identifies all segments which can be merged with MAJOR compaction type. * NOTE: This result can be passed to getMergedLoadName API to get the merged load name. * * @param sparkSession * @param tableName * @param dbName * @return list of LoadMetadataDetails */ def identifySegmentsToBeMerged(sparkSession: SparkSession, tableName: String, dbName: String) : util.List[LoadMetadataDetails];
例子:
CarbonSegmentUtil.identifySegmentsToBeMerged(sparkSession, "table_test","default")
传入数据库名、表名和自定义的segment列表,获取自定义合并操作会被合并的segment列表,得到的segment列表可以当做getMergedLoadName函数的参数传入:
/** * Identifies all segments which can be merged with CUSTOM compaction type. * NOTE: This result can be passed to getMergedLoadName API to get the merged load name. * * @param sparkSession * @param tableName * @param dbName * @param customSegments * @return list of LoadMetadataDetails * @throws UnsupportedOperationException if customSegments is null or empty. * @throws MalformedCarbonCommandException if segment does not exist or is not valid */ def identifySegmentsToBeMergedCustom(sparkSession: SparkSession, tableName: String, dbName: String, customSegments: util.List[String]): util.List[LoadMetadataDetails];
例子:
val customSegments = new util.ArrayList[String]() customSegments.add("1") customSegments.add("2") CarbonSegmentUtil.identifySegmentsToBeMergedCustom(sparkSession, "table_test","default", customSegments)
给定segment列表,返回合并后新的导入名称:
/** * Returns the Merged Load Name for given list of segments * * @param list of segments * @return Merged Load Name * @throws UnsupportedOperationException if list of segments is less than 1 */ def getMergedLoadName(list: util.List[LoadMetadataDetails]): String;
例子:
val carbonTable = CarbonEnv.getCarbonTable(Option(databaseName), tableName)(sparkSession) val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) CarbonSegmentUtil.getMergedLoadName(loadMetadataDetails.toList.asJava)
父主题: CarbonData语法参考