BulkDelete接口使用
场景说明
用户可以在Spark应用程序中使用HBaseContext的方式去使用HBase,将要删除的数据的rowKey构造成rdd,然后通过HBaseContext的bulkDelete接口对HBase表上这些rowKey对应的数据进行删除。
数据规划
基于BulkPut接口使用章节创建的HBase表及其中的数据进行操作。
开发思路
- 创建包含了要删除的rowkey信息的RDD。
- 以HBaseContext的方式操作HBase,通过HBaseContext的bulkDelete接口对HBase表上这些rowKey对应的数据进行删除。
打包项目
- 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。
- 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。
若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
提交命令
假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。
- yarn-client模式:
java/scala 版本(类名等请与实际代码保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkDeleteExample SparkOnHbaseJavaExample-1.0.jar bulktable
python版本(文件名等与实际保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseButDeleteExample.py bulktable
- yarn-cluster模式:
java/scala 版本(类名等请与实际代码保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkDeleteExample SparkOnHbaseJavaExample-1.0.jar bulktable
python版本(文件名等与实际保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseButDeleteExample.py bulktable
Java样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample中HBaseBulkDeleteExample文件:
public static void main(String[] args) throws IOException {
if (args.length < 1) {
System.out.println("JavaHBaseBulkDeleteExample {tableName}");
return;
}
String tableName = args[0];
SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkDeleteExample " + tableName);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
List<byte[]> list = new ArrayList<byte[]>(5);
list.add(Bytes.toBytes("1"));
list.add(Bytes.toBytes("2"));
list.add(Bytes.toBytes("3"));
list.add(Bytes.toBytes("4"));
list.add(Bytes.toBytes("5"));
JavaRDD<byte[]> rdd = jsc.parallelize(list);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.bulkDelete(rdd,
TableName.valueOf(tableName), new DeleteFunction(), 4);
System.out.println("Bulk Delete successfully!");
} finally {
jsc.stop();
}
}
Scala样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample中HBaseBulkDeleteExample文件:
def main(args: Array[String]) {
if (args.length < 1) {
println("HBaseBulkDeleteExample {tableName} missing an argument")
return
}
val tableName = args(0)
val sparkConf = new SparkConf().setAppName("HBaseBulkDeleteExample " + tableName)
val sc = new SparkContext(sparkConf)
try {
//[Array[Byte]]
val rdd = sc.parallelize(Array(
Bytes.toBytes("1"),
Bytes.toBytes("2"),
Bytes.toBytes("3"),
Bytes.toBytes("4"),
Bytes.toBytes("5")
))
val conf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(sc, conf)
hbaseContext.bulkDelete[Array[Byte]](rdd,
TableName.valueOf(tableName),
putRecord => new Delete(putRecord),
4)
} finally {
sc.stop()
}
}
Python样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkDeleteExample文件:
def main(args: Array[String]) {
# -*- coding:utf-8 -*-
"""
【说明】
由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现
"""
from py4j.java_gateway import java_import
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession\
.builder\
.appName("JavaHBaseBulkDeleteExample")\
.getOrCreate()
# 向sc._jvm中导入要运行的类
java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkDeleteExample')
# 创建类实例并调用方法,传递sc._jsc参数
spark._jvm.JavaHBaseBulkDeleteExample().execute(spark._jsc, sys.argv)
# 停止SparkSession
spark.stop()