mapPartitions接口使用
场景说明
用户可以在Spark应用程序中使用HBaseContext的方式去操作HBase,使用mapPartition接口并行遍历HBase表。
数据规划
使用foreachPartition接口使用章节创建的HBase数据表。
开发思路
- 构造需要遍历的HBase表中rowkey的RDD。
- 使用mapPartition接口遍历上述rowkey对应的数据信息,并进行简单的操作。
运行前置操作
安全模式下Spark Core样例代码需要读取两个文件(user.keytab、krb5.conf)。user.keytab和krb5.conf文件为安全模式下的认证文件,需要在FusionInsight Manager中下载principal用户的认证凭证,样例代码中使用的用户为:super,需要修改为准备好的开发用户。
打包项目
- 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。
- 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。
- 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。
若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
提交命令
假设用例代码打包后的jar包名为 、spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。
- yarn-client模式:
java/scala版本(类名等请与实际代码保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseMapPartitionExample SparkOnHbaseJavaExample.jar table2
python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。
bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseMapPartitionExample.py table2
- yarn-cluster模式:
java/scala版本(类名等请与实际代码保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseMapPartitionExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar table2
python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。
bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseMapPartitionExample.py table2
Java样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample中JavaHBaseMapPartitionExample文件:
public static void main(String args[]) throws IOException {
if(args.length <1){
System.out.println("JavaHBaseMapPartitionExample {tableName} is missing an argument");
return;
}
LoginUtil.loginWithUserKeytab();
final String tableName = args[0];
SparkConf sparkConf = new SparkConf().setAppName("HBaseMapPartitionExample " + tableName);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try{
List<byte []> list = new ArrayList();
list.add(Bytes.toBytes("1"));
list.add(Bytes.toBytes("2"));
list.add(Bytes.toBytes("3"));
list.add(Bytes.toBytes("4"));
list.add(Bytes.toBytes("5"));
JavaRDD<byte []> rdd = jsc.parallelize(list);
Configuration hbaseconf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, hbaseconf);
JavaRDD getrdd = hbaseContext.mapPartitions(rdd, new FlatMapFunction<Tuple2<Iterator<byte[]>,Connection>, Object>() {
public Iterator call(Tuple2<Iterator<byte[]>, Connection> t)
throws Exception {
Table table = t._2.getTable(TableName.valueOf(tableName));
//go through rdd
List<String> list = new ArrayList<String>();
while(t._1.hasNext()){
byte[] bytes = t._1.next();
Result result = table.get(new Get(bytes));
Iterator<Cell> it = result.listCells().iterator();
StringBuilder sb = new StringBuilder();
sb.append(Bytes.toString(result.getRow()) + ":");
while(it.hasNext()){
Cell cell = it.next();
String column = Bytes.toString(cell.getQualifierArray());
if(column.equals("counter")){
sb.append("(" + column + "," + Bytes.toLong(cell.getValueArray()) + ")");
} else {
sb.append("(" + column + "," + Bytes.toString(cell.getValueArray()) + ")");
}
}
list.add(sb.toString());
}
return list.iterator();
}
});
List<byte[]> resultList = getrdd.collect();
if(null == resultList || 0 == resultList.size()){
System.out.println("Nothing matches!");
}else{
for(int i =0; i< resultList.size(); i++){
System.out.println(resultList.get(i));
}
}
} finally {
jsc.stop();
}
}
Scala样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample中HBaseMapPartitionExample文件:
def main(args: Array[String]) {
if (args.length < 1) {
println("HBaseMapPartitionExample {tableName} is missing an argument")
return
}
LoginUtil.loginWithUserKeytab()
val tableName = args(0)
val sparkConf = new SparkConf().setAppName("HBaseMapPartitionExample " + tableName)
val sc = new SparkContext(sparkConf)
try {
//[(Array[Byte])]
val rdd = sc.parallelize(Array(
Bytes.toBytes("1"),
Bytes.toBytes("2"),
Bytes.toBytes("3"),
Bytes.toBytes("4"),
Bytes.toBytes("5")))
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
val b = new StringBuilder
val getRdd = rdd.hbaseMapPartitions[String](hbaseContext, (it, connection) => {
val table = connection.getTable(TableName.valueOf(tableName))
it.map{r =>
//batching would be faster. This is just an example
val result = table.get(new Get(r))
val it = result.listCells().iterator()
b.append(Bytes.toString(result.getRow) + ":")
while (it.hasNext) {
val cell = it.next()
val q = Bytes.toString(cell.getQualifierArray)
if (q.equals("counter")) {
b.append("(" + q + "," + Bytes.toLong(cell.getValueArray) + ")")
} else {
b.append("(" + q + "," + Bytes.toString(cell.getValueArray) + ")")
}
}
b.toString()
}
})
getRdd.collect().foreach(v => println(v))
} finally {
sc.stop()
}
}
Python样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseMapPartitionExample文件:
# -*- coding:utf-8 -*-
"""
【说明】
(1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现
(2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中
spark.yarn.security.credentials.hbase.enabled参数配置为true
"""
from py4j.java_gateway import java_import
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession\
.builder\
.appName("JavaHBaseMapPartitionExample")\
.getOrCreate()
# 向sc._jvm中导入要运行的类
java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseMapPartitionExample')
# 创建类实例并调用方法,传递sc._jsc参数
spark._jvm.JavaHBaseMapPartitionExample().execute(spark._jsc, sys.argv)
# 停止SparkSession
spark.stop()