BulkPut接口使用
场景说明
用户可以在Spark应用程序中使用HBaseContext的方式去使用HBase,将构建的RDD写入HBase中。
数据规划
在客户端执行hbase shell,进入HBase命令行,使用下面的命令创建样例代码中要使用的HBase表:
create 'bulktable','cf1'
开发思路
- 创建RDD。
- 以HBaseContext的方式操作HBase,将上面生成的RDD写入HBase表中。
打包项目
- 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。
- 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。
若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
提交命令
假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在$SPARK_HOME目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。
- yarn-client模式:
java/scala版本(类名等请与实际代码保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample SparkOnHbaseJavaExample-1.0.jar bulktable cf1
python版本(文件名等请与实际保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkPutExample.py bulktable cf1
- yarn-cluster模式:
java/scala版本(类名等请与实际代码保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample SparkOnHbaseJavaExample-1.0.jar bulktable cf1
python版本(文件名等请与实际保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkPutExample.py bulktable cf1
Java样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample中的JavaHBaseBulkPutExample文件:
public static void main(String[] args) throws Exception{
if (args.length < 2) {
System.out.println("JavaHBaseBulkPutExample " +
"{tableName} {columnFamily}");
return;
}
String tableName = args[0];
String columnFamily = args[1];
SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkPutExample " + tableName);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
List<String> list = new ArrayList<String>(5);
list.add("1," + columnFamily + ",1,1");
list.add("2," + columnFamily + ",1,2");
list.add("3," + columnFamily + ",1,3");
list.add("4," + columnFamily + ",1,4");
list.add("5," + columnFamily + ",1,5");
list.add("6," + columnFamily + ",1,6");
list.add("7," + columnFamily + ",1,7");
list.add("8," + columnFamily + ",1,8");
list.add("9," + columnFamily + ",1,9");
list.add("10," + columnFamily + ",1,10");
JavaRDD<String> rdd = jsc.parallelize(list);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.bulkPut(rdd,
TableName.valueOf(tableName),
new PutFunction());
System.out.println("Bulk put into Hbase successfully!");
} finally {
jsc.stop();
}
}
Scala样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample中HBaseBulkPutExample文件:
def main(args: Array[String]) {
if (args.length < 2) {
System.out.println("HBaseBulkPutTimestampExample {tableName} {columnFamily} are missing an argument")
return
}
val tableName = args(0)
val columnFamily = args(1)
val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " +
tableName + " " + columnFamily)
val sc = new SparkContext(sparkConf)
try {
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
(Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
(Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
(Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
(Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))),
(Bytes.toBytes("6"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("6")))),
(Bytes.toBytes("7"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("7")))),
(Bytes.toBytes("8"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("8")))),
(Bytes.toBytes("9"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("9")))),
(Bytes.toBytes("10"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("10"))))))
val conf = HBaseConfiguration.create()
val timeStamp = System.currentTimeMillis()
val hbaseContext = new HBaseContext(sc, conf)
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2,
timeStamp, putValue._3))
put
})
} finally {
sc.stop()
}
}
Python样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkPutExample文件:
# -*- coding:utf-8 -*-
"""
【说明】
由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现
"""
from py4j.java_gateway import java_import
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession\
.builder\
.appName("JavaHBaseBulkPutExample")\
.getOrCreate()
# 向sc._jvm中导入要运行的类
java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample')
# 创建类实例并调用方法,传递sc._jsc参数
spark._jvm.JavaHBaseBulkPutExample().execute(spark._jsc, sys.argv)
# 停止SparkSession
spark.stop()