SparkStreaming批量写入HBase表
场景说明
用户可以在Spark应用程序中使用HBaseContext的方式去操作HBase,使用streamBulkPut接口将流数据写入Hbase表中。
数据规划
开发思路
- 使用SparkStreaming持续读取特定端口的数据。
- 将读取到的Dstream通过streamBulkPut接口写入hbase表中。
打包项目
- 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。
- 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。
若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
提交命令
假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。
- yarn-client模式:
java/scala版本(类名等请与实际代码保持一致,此处仅为示例),${ip}请使用实际执行nc -lk 9999的命令的机器ip
bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample SparkOnHbaseJavaExample-1.0.jar ${ip} 9999 streamingTable cf1
python版本(文件名等请与实际保持一致,此处仅为示例)
bin/spark-submit --master yarn --jars SparkOnHbaseJavaExample-1.0.jar HBaseStreamingBulkPutExample.py ${ip} 9999 streamingTable cf1
- yarn-cluster模式:
java/scala版本(类名等请与实际代码保持一致,此处仅为示例),${ip}请使用实际执行nc -lk 9999的命令的机器ip
bin/spark-submit --master yarn --deploy-mode client --deploy-mode cluster --class com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample SparkOnHbaseJavaExample-1.0.jar ${ip} 9999 streamingTable cf1
python版本(文件名等请与实际保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseStreamingBulkPutExample.py ${ip} 9999 streamingTable cf1
Java样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample中JavaHBaseStreamingBulkPutExample文件:
代码中通过awaitTerminationOrTimeout()方法设置了任务超时时间(单位为毫秒),建议根据期望的任务运行时间调整参数大小。
public static void main(String[] args) throws IOException { if (args.length < 4) { System.out.println("JavaHBaseBulkPutExample " + "{host} {port} {tableName}"); return; } String host = args[0]; String port = args[1]; String tableName = args[2]; String columnFamily = args[3]; SparkConf sparkConf = new SparkConf().setAppName("JavaHBaseStreamingBulkPutExample " + tableName + ":" + port + ":" + tableName); JavaSparkContext jsc = new JavaSparkContext(sparkConf); try { JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Duration(1000)); JavaReceiverInputDStream<String> javaDstream = jssc.socketTextStream(host, Integer.parseInt(port)); Configuration conf = HBaseConfiguration.create(); JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); hbaseContext.streamBulkPut(javaDstream, TableName.valueOf(tableName), new PutFunction(columnFamily)); jssc.start(); jssc.awaitTerminationOrTimeout(60000); jssc.stop(false,true); }catch(InterruptedException e){ e.printStackTrace(); } finally { jsc.stop(); } }
Scala样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample中HBaseStreamingBulkPutExample文件:
代码中通过awaitTerminationOrTimeout()方法设置了任务超时时间(单位为毫秒),建议根据期望的任务运行时间调整参数大小。
def main(args: Array[String]): Unit = { val host = args(0) val port = args(1) val tableName = args(2) val columnFamily = args(3) val conf = new SparkConf() conf.setAppName("HBase Streaming Bulk Put Example") val sc = new SparkContext(conf) try { val config = HBaseConfiguration.create() val hbaseContext = new HBaseContext(sc, config) val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(host, port.toInt) hbaseContext.streamBulkPut[String](lines, TableName.valueOf(tableName), (putRecord) => { if (putRecord.length() > 0) { val put = new Put(Bytes.toBytes(putRecord)) put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("foo"), Bytes.toBytes("bar")) put } else { null } }) ssc.start() ssc.awaitTerminationOrTimeout(60000) ssc.stop(stopSparkContext = false) } finally { sc.stop() } }
Python样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseStreamingBulkPutExample文件:
# -*- coding:utf-8 -*- """ 【说明】 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseStreamingBulkPutExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseStreamingBulkPutExample().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()