更新时间:2024-08-03 GMT+08:00

BulkPut接口使用

场景说明

用户可以在Spark应用程序中使用HBaseContext的方式去使用HBase,将构建的RDD写入HBase中。

数据规划

在客户端执行hbase shell,进入HBase命令行,使用下面的命令创建样例代码中要使用的HBase表:

create 'bulktable','cf1'

开发思路

  1. 创建RDD。
  2. 以HBaseContext的方式操作HBase,将上面生成的RDD写入HBase表中。

运行前置操作

安全模式下Spark Core样例代码需要读取两个文件(user.keytab、krb5.conf)。user.keytab和krb5.conf文件为安全模式下的认证文件,需要在FusionInsight Manager中下载principal用户的认证凭证,样例代码中使用的用户为:super,需要修改为准备好的开发用户。

打包项目

  • 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用
  • 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。
  • 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。

    若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。

提交命令

假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端$SPARK_HOME目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。

  • yarn-client模式:

    java/scala版本(类名等请与实际代码保持一致,此处仅为示例)

    bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample SparkOnHbaseJavaExample.jar bulktable cf1

    python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。

    bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseBulkPutExample.py bulktable cf1

  • yarn-cluster模式:

    java/scala版本(类名等请与实际代码保持一致,此处仅为示例)

    bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar bulktable cf1

    python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。

    bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseBulkPutExample.py bulktable cf1

Java样例代码

下面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample中的JavaHBaseBulkPutExample文件:

  public static void main(String[] args) throws Exception{
    if (args.length < 2) {
      System.out.println("JavaHBaseBulkPutExample  " +
              "{tableName} {columnFamily}");
      return;
    }
    LoginUtil.loginWithUserKeytab();
    String tableName = args[0];
    String columnFamily = args[1];
    SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseBulkPutExample " + tableName);
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    try {
      List<String> list = new ArrayList<String>(5);
      list.add("1," + columnFamily + ",1,1");
      list.add("2," + columnFamily + ",1,2");
      list.add("3," + columnFamily + ",1,3");
      list.add("4," + columnFamily + ",1,4");
      list.add("5," + columnFamily + ",1,5");
      list.add("6," + columnFamily + ",1,6");
      list.add("7," + columnFamily + ",1,7");
      list.add("8," + columnFamily + ",1,8");
      list.add("9," + columnFamily + ",1,9");
      list.add("10," + columnFamily + ",1,10");
      JavaRDD<String> rdd = jsc.parallelize(list);
      Configuration conf = HBaseConfiguration.create();
      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
      hbaseContext.bulkPut(rdd,
              TableName.valueOf(tableName),
              new PutFunction());
      System.out.println("Bulk put into Hbase successfully!");
    } finally {
      jsc.stop();
    }
  }

Scala样例代码

下面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample中HBaseBulkPutExample文件:

  def main(args: Array[String]) {
    if (args.length < 2) {
      System.out.println("HBaseBulkPutTimestampExample {tableName} {columnFamily} are missing an argument")
      return
    }
    LoginUtil.loginWithUserKeytab()
    val tableName = args(0)
    val columnFamily = args(1)
    val sparkConf = new SparkConf().setAppName("HBaseBulkPutTimestampExample " +
      tableName + " " + columnFamily)
    val sc = new SparkContext(sparkConf)
    try {
      val rdd = sc.parallelize(Array(
        (Bytes.toBytes("1"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("1")))),
        (Bytes.toBytes("2"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("2")))),
        (Bytes.toBytes("3"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("3")))),
        (Bytes.toBytes("4"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("4")))),
        (Bytes.toBytes("5"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("5")))),
        (Bytes.toBytes("6"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("6")))),
        (Bytes.toBytes("7"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("7")))),
        (Bytes.toBytes("8"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("8")))),
        (Bytes.toBytes("9"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("9")))),
        (Bytes.toBytes("10"),
          Array((Bytes.toBytes(columnFamily), Bytes.toBytes("1"), Bytes.toBytes("10"))))))
      val conf = HBaseConfiguration.create()
      val timeStamp = System.currentTimeMillis()
      val hbaseContext = new HBaseContext(sc, conf)
      hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
        TableName.valueOf(tableName),
        (putRecord) => {
          val put = new Put(putRecord._1)
          putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2,
            timeStamp, putValue._3))
          put
        })
    } finally {
      sc.stop()
    }
  }

Python样例代码

下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkPutExample文件:

# -*- coding:utf-8 -*-
"""
【说明】
(1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现
(2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中
   spark.yarn.security.credentials.hbase.enabled参数配置为true
"""
from py4j.java_gateway import java_import
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession\
        .builder\
        .appName("JavaHBaseBulkPutExample")\
        .getOrCreate()
# 向sc._jvm中导入要运行的类
java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample')
# 创建类实例并调用方法,传递sc._jsc参数
spark._jvm.JavaHBaseBulkPutExample().execute(spark._jsc, sys.argv)
# 停止SparkSession
spark.stop()