更新时间:2024-08-03 GMT+08:00

SparkStreaming批量写入HBase表

场景说明

用户可以在Spark应用程序中使用HBaseContext的方式去操作HBase,使用streamBulkPut接口将流数据写入Hbase表中。

数据规划

  1. 在客户端执行hbase shell进入HBase命令行。
  2. 在HBase命令执行下面的命令创建HBase表:

    create 'streamingTable','cf1'

  3. 在客户端另外一个session通过linux命令构造一个端口进行接收数据(不同操作系统的机器,命令可能不同,suse尝试使用netcat -lk 9999):

    nc -lk 9999

    在构造一个端口进行接收数据时,需要在客户端所在服务器上安装netcat

开发思路

  1. 使用SparkStreaming持续读取特定端口的数据。
  2. 将读取到的Dstream通过streamBulkPut接口写入hbase表中。

打包项目

  • 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用
  • 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。

    若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。

提交命令

假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。

  • yarn-client模式:

    java/scala版本(类名等请与实际代码保持一致,此处仅为示例),${ip}请使用实际执行nc -lk 9999的命令的机器ip

    bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample SparkOnHbaseJavaExample-1.0.jar ${ip} 9999 streamingTable cf1

    python版本(文件名等请与实际保持一致,此处仅为示例)

    bin/spark-submit --master yarn --jars SparkOnHbaseJavaExample-1.0.jar HBaseStreamingBulkPutExample.py ${ip} 9999 streamingTable cf1

  • yarn-cluster模式:

    java/scala版本(类名等请与实际代码保持一致,此处仅为示例),${ip}请使用实际执行nc -lk 9999的命令的机器ip

    bin/spark-submit --master yarn --deploy-mode client --deploy-mode cluster --class com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample SparkOnHbaseJavaExample-1.0.jar ${ip} 9999 streamingTable cf1

    python版本(文件名等请与实际保持一致,此处仅为示例)

    bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseStreamingBulkPutExample.py ${ip} 9999 streamingTable cf1

Java样例代码

下面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample中JavaHBaseStreamingBulkPutExample文件:

代码中通过awaitTerminationOrTimeout()方法设置了任务超时时间(单位为毫秒),建议根据期望的任务运行时间调整参数大小。

  public static void main(String[] args) throws IOException {
    if (args.length < 4) {
      System.out.println("JavaHBaseBulkPutExample  " +
              "{host} {port} {tableName}");
      return;
    }
    String host = args[0];
    String port = args[1];
    String tableName = args[2];
    String columnFamily = args[3];
    SparkConf sparkConf =
            new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " +
                    tableName + ":" + port + ":" + tableName);
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    try {
      JavaStreamingContext jssc =
              new JavaStreamingContext(jsc, new Duration(1000));
      JavaReceiverInputDStream<String> javaDstream =
              jssc.socketTextStream(host, Integer.parseInt(port));
      Configuration conf = HBaseConfiguration.create();
      JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
      hbaseContext.streamBulkPut(javaDstream,
              TableName.valueOf(tableName),
              new PutFunction(columnFamily));
      jssc.start();
      jssc.awaitTerminationOrTimeout(60000);
      jssc.stop(false,true);
    }catch(InterruptedException e){
      e.printStackTrace();
    } finally {
      jsc.stop();
    }
  }

Scala样例代码

下面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample中HBaseStreamingBulkPutExample文件:

代码中通过awaitTerminationOrTimeout()方法设置了任务超时时间(单位为毫秒),建议根据期望的任务运行时间调整参数大小。

  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1)
    val tableName = args(2)
    val columnFamily = args(3)
    val conf = new SparkConf()
    conf.setAppName("HBase Streaming Bulk Put Example")
    val sc = new SparkContext(conf)
    try {
      val config = HBaseConfiguration.create()
      val hbaseContext = new HBaseContext(sc, config)
      val ssc = new StreamingContext(sc, Seconds(1))
      val lines = ssc.socketTextStream(host, port.toInt)
      hbaseContext.streamBulkPut[String](lines,
        TableName.valueOf(tableName),
        (putRecord) => {
          if (putRecord.length() > 0) {
            val put = new Put(Bytes.toBytes(putRecord))
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("foo"), Bytes.toBytes("bar"))
            put
          } else {
            null
          }
        })
      ssc.start()
      ssc.awaitTerminationOrTimeout(60000)
      ssc.stop(stopSparkContext = false)
    } finally {
      sc.stop()
    }
  }

Python样例代码

下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseStreamingBulkPutExample文件:

# -*- coding:utf-8 -*-
"""
【说明】
由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现
"""
from py4j.java_gateway import java_import
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession\
        .builder\
        .appName("JavaHBaseStreamingBulkPutExample")\
        .getOrCreate()
# 向sc._jvm中导入要运行的类
java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample')
# 创建类实例并调用方法,传递sc._jsc参数
spark._jvm.JavaHBaseStreamingBulkPutExample().execute(spark._jsc, sys.argv)
# 停止SparkSession
spark.stop()