BulkLoad接口使用
场景说明
用户可以在Spark应用程序中使用HBaseContext的方式去使用HBase,将要插入的数据的rowKey构造成rdd,然后通过HBaseContext的bulkLoad接口将rdd写入HFile中。将生成的HFile文件导入HBase表的操作采用如下格式的命令,不属于本接口范围,不在此进行详细说明:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles {hfilePath} {tableName}
开发思路
- 将要导入的数据构造成RDD。
- 以HBaseContext的方式操作HBase,通过HBaseContext的bulkLoad接口将rdd写入HFile中。
运行前置操作
安全模式下Spark Core样例代码需要读取两个文件(user.keytab、krb5.conf)。user.keytab和krb5.conf文件为安全模式下的认证文件,需要在FusionInsight Manager中下载principal用户的认证凭证,样例代码中使用的用户为:super,需要修改为准备好的开发用户。
打包项目
- 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。
- 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。
- 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。
若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
提交命令
假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。
- yarn-client模式:
java/scala版本(类名等请与实际代码保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkLoadExample SparkOnHbaseJavaExample.jar /tmp/hfile bulkload-table-test
python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。
bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseBulkLoadExample.py /tmp/hfile bulkload-table-test
- yarn-cluster模式:
java/scala版本(类名等请与实际代码保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkLoadExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar /tmp/hfile bulkload-table-test
python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。
bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseBulkLoadExample.py /tmp/hfile bulkload-table-test
Java样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample中JavaHBaseBulkLoadExample文件:
public static void main(String[] args) throws IOException{ if (args.length < 2) { System.out.println("JavaHBaseBulkLoadExample {outputPath} {tableName}"); return; } LoginUtil.loginWithUserKeytab(); String outputPath = args[0]; String tableName = args[1]; String columnFamily1 = "f1"; String columnFamily2 = "f2"; SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkLoadExample " + tableName); JavaSparkContext jsc = new JavaSparkContext(sparkConf); try { List<String> list= new ArrayList<String>(); // row1 list.add("1," + columnFamily1 + ",b,1"); // row3 list.add("3," + columnFamily1 + ",a,2"); list.add("3," + columnFamily1 + ",b,1"); list.add("3," + columnFamily2 + ",a,1"); /* row2 */ list.add("2," + columnFamily2 + ",a,3"); list.add("2," + columnFamily2 + ",b,3"); JavaRDD<String> rdd = jsc.parallelize(list); Configuration conf = HBaseConfiguration.create(); JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); hbaseContext.bulkLoad(rdd, TableName.valueOf(tableName),new BulkLoadFunction(), outputPath, new HashMap<byte[], FamilyHFileWriteOptions>(), false, HConstants.DEFAULT_MAX_FILE_SIZE); } finally { jsc.stop(); } }
Scala样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample中HBaseBulkLoadExample文件:
def main(args: Array[String]) { if(args.length < 2) { println("HBaseBulkLoadExample {outputPath} {tableName} return } LoginUtil.loginWithUserKeytab() val Array(outputPath, tableName) = args val columnFamily1 = "f1" val columnFamily2 = "f2" val sparkConf = new SparkConf().setAppName("JavaHBaseBulkLoadExample " + tableName) val sc = new SparkContext(sparkConf) try { val arr = Array("1," + columnFamily1 + ",b,1", "2," + columnFamily1 + ",a,2", "3," + columnFamily1 + ",b,1", "3," + columnFamily2 + ",a,1", "4," + columnFamily2 + ",a,3", "5," + columnFamily2 + ",b,3") val rdd = sc.parallelize(arr) val config = HBaseConfiguration.create val hbaseContext = new HBaseContext(sc, config) hbaseContext.bulkLoad[String](rdd, TableName.valueOf(tableName), (putRecord) => { if(putRecord.length > 0) { val strArray = putRecord.split(",") val kfq = new KeyFamilyQualifier(Bytes.toBytes(strArray(0)), Bytes.toBytes(strArray(1)), Bytes.toBytes(strArray(2))) val ite = (kfq, Bytes.toBytes(strArray(3))) val itea = List(ite).iterator itea } else { null } }, outputPath) } finally { sc.stop() } } }
Python样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkLoadPythonExample文件:
# -*- coding:utf-8 -*- """ 【说明】 (1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 (2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中 spark.yarn.security.credentials.hbase.enabled参数配置为true """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseBulkLoadExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.HBaseBulkLoadPythonExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.HBaseBulkLoadPythonExample().hbaseBulkLoad(spark._jsc, sys.argv[1], sys.argv[2]) # 停止SparkSession spark.stop()