操作HBase数据源
场景说明
用户可以在Spark应用程序中以数据源的方式去使用HBase,将dataFrame写入HBase中,并从HBase读取数据以及对读取的数据进行过滤等操作。
数据规划
在客户端执行hbase shell,进入HBase命令行,使用下面的命令创建样例代码中要使用的HBase表:
create 'HBaseSourceExampleTable','rowkey','cf1','cf2','cf3','cf4','cf5','cf6','cf7', 'cf8'
开发思路
- 创建RDD.
- 以数据源的方式操作HBase,将上面生成的RDD写入HBase表中.
- 读取HBase表中的数据,并且对其进行简单的操作。
打包项目
- 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。
- 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。
若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
提交命令
假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行。
- yarn-client模式:
java/scala版本(类名等请与实际代码保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode client --jars /opt/female/protobuf-java-2.5.0.jar --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.datasources.HBaseSource SparkOnHbaseJavaExample-1.0.jar
python版本(文件名等请与实际保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode client --conf spark.yarn.user.classpath.first=true --jars SparkOnHbaseJavaExample-1.0.jar,/opt/female/protobuf-java-2.5.0.jar HBaseSource.py
- yarn-cluster模式:
java/scala版本(类名等请与实际代码保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode cluster --jars /opt/female/protobuf-java-2.5.0.jar --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.datasources.HBaseSource SparkOnHbaseJavaExample-1.0.jar
python版本(文件名等请与实际保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.user.classpath.first=true --jars SparkOnHbaseJavaExample-1.0.jar,/opt/female/protobuf-java-2.5.0.jar HBaseSource.py
Java样例代码
面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample中的HBaseSource文件:
public static void main(String args[]) throws IOException{ SparkConf sparkConf = new SparkConf().setAppName("HBaseSourceExample"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(jsc); Configuration conf = HBaseConfiguration.create(); JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc,conf); try{ List<HBaseRecord> list = new ArrayList<HBaseRecord>(); for(int i=0 ; i<256; i++){ list.add(new HBaseRecord(i)); } Map map = new HashMap<String, String>(); map.put(HBaseTableCatalog.tableCatalog(), cat); map.put(HBaseTableCatalog.newTable(), "5"); System.out.println("Before insert data into hbase table"); sqlContext.createDataFrame(list, HBaseRecord.class).write().options(map).format("org.apache.hadoop.hbase.spark").save(); Dataset<Row> ds = withCatalog(sqlContext, cat); System.out.println("After insert data into hbase table"); ds.printSchema(); ds.show(); ds.filter("key <= 'row5'").select("key","col1").show(); ds.registerTempTable("table1"); Dataset<Row> tempDS = sqlContext.sql("select count(col1) from table1 where key < 'row5'"); tempDS.show(); } finally { jsc.stop(); } }
Scala样例代码
面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample中的HBaseSource文件:
def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HBaseSourceExample") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val conf = HBaseConfiguration.create() val hbaseContext = new HBaseContext(sc,conf) import sqlContext.implicits._ def withCatalog(cat: String): DataFrame = { sqlContext .read .options(Map(HBaseTableCatalog.tableCatalog->cat)) .format("org.apache.hadoop.hbase.spark") .load() } val data = (0 to 255).map { i => HBaseRecord(i) } try{ sc.parallelize(data).toDF.write.options( Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")) .format("org.apache.hadoop.hbase.spark") .save() val df = withCatalog(cat) df.show() df.filter($"col0" <= "row005") .select($"col0", $"col1").show df.registerTempTable("table1") val c = sqlContext.sql("select count(col1) from table1 where col0 < 'row050'") c.show() } finally { sc.stop() } }
Python样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中的HBaseSource文件:
# -*- coding:utf-8 -*- """ 【说明】 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("HBaseSourceExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.datasources.HBaseSource') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.HBaseSource().execute(spark._jsc) # 停止SparkSession spark.stop()