操作Avro格式数据
场景说明
用户可以在Spark应用程序中以数据源的方式去使用HBase,本例中将数据以Avro格式存储在HBase中,并从中读取数据以及对读取的数据进行过滤等操作。
数据规划
在客户端执行hbase shell,进入HBase命令行,使用下面的命令创建样例代码中要使用的HBase表:
create 'ExampleAvrotable','rowkey','cf1'
create 'ExampleAvrotableInsert','rowkey','cf1'
开发思路
- 创建RDD。
- 以数据源的方式操作HBase,将上面生成的RDD写入HBase表中。
- 读取HBase表中的数据,并且对其进行简单的操作。
打包项目
- 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。
- 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。
若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
提交命令
假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行。
- yarn-client模式:
java/scala版本(类名等请与实际代码保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode client --jars /opt/female/protobuf-java-2.5.0.jar --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.datasources.AvroSource SparkOnHbaseJavaExample-1.0.jar
python版本(文件名等请与实际保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode client --conf spark.yarn.user.classpath.first=true --jars SparkOnHbaseJavaExample-1.0.jar,/opt/female/protobuf-java-2.5.0.jar AvroSource.py
- yarn-cluster模式:
java/scala版本(类名等请与实际代码保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode cluster --jars /opt/female/protobuf-java-2.5.0.jar --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.datasources.AvroSource SparkOnHbaseJavaExample-1.0.jar
python版本(文件名等请与实际保持一致,此处仅为示例)
bin/spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.user.classpath.first=true --jars SparkOnHbaseJavaExample-1.0.jar,/opt/female/protobuf-java-2.5.0.jar AvroSource.py
Java样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample中的AvroSource文件:
public static void main(JavaSparkContext jsc) throws IOException { SQLContext sqlContext = new SQLContext(jsc); Configuration hbaseconf = new HBaseConfiguration().create(); JavaHBaseContext hBaseContext = new JavaHBaseContext(jsc, hbaseconf); List list = new ArrayList<AvroHBaseRecord>(); for(int i=0; i<=255 ; ++i){ list.add(AvroHBaseRecord.apply(i)); } try{ Map<String, String> map = new HashMap<String, String>(); map.put(HBaseTableCatalog.tableCatalog(), catalog); map.put(HBaseTableCatalog.newTable(), "5"); sqlContext.createDataFrame(list, AvroHBaseRecord.class).write().options(map).format("org.apache.hadoop.hbase.spark").save(); Dataset<Row> ds = withCatalog(sqlContext,catalog); ds.show(); ds.printSchema(); ds.registerTempTable("ExampleAvrotable"); Dataset<Row> c= sqlContext.sql("select count(1) from ExampleAvrotable"); c.show(); Dataset<Row> filtered = ds.select("col0", "col1.favorite_array").where("col0 = 'name1'"); filtered.show(); java.util.List<Row> collected = filtered.collectAsList(); if (collected.get(0).get(1).toString().equals("number1")) { throw new UserCustomizedSampleException("value invalid", new Throwable()); } if (collected.get(0).get(1).toString().equals("number2")) { throw new UserCustomizedSampleException("value invalid", new Throwable()); } Map avroCatalogInsertMap = new HashMap<String,String>(); avroCatalogInsertMap.put("avroSchema" , AvroHBaseRecord.schemaString); avroCatalogInsertMap.put(HBaseTableCatalog.tableCatalog(), avroCatalogInsert); ds.write().options(avroCatalogInsertMap).format("org.apache.hadoop.hbase.spark").save(); Dataset<Row> newDS = withCatalog(sqlContext,avroCatalogInsert); newDS.show(); newDS.printSchema(); if (newDS.count() != 256) { throw new UserCustomizedSampleException("value invalid", new Throwable()); } ds.filter("col1.name = 'name5' || col1.name <= 'name5'").select("col0","col1.favorite_color", "col1.favorite_number").show(); } finally{ jsc.stop(); } }
Scala样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample中的AvroSource文件:
def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("AvroSourceExample") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) val hbaseConf = HBaseConfiguration.create() val hbaseContext = new HBaseContext(sc, hbaseConf) import sqlContext.implicits._ def withCatalog(cat: String): DataFrame = { sqlContext .read .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog)) .format("org.apache.hadoop.hbase.spark") .load() } val data = (0 to 255).map { i => AvroHBaseRecord(i) } try { sc.parallelize(data).toDF.write.options( Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) .format("org.apache.hadoop.hbase.spark") .save() val df = withCatalog(catalog) df.show() df.printSchema() df.registerTempTable("ExampleAvrotable") val c = sqlContext.sql("select count(1) from ExampleAvrotable") c.show() val filtered = df.select($"col0", $"col1.favorite_array").where($"col0" === "name001") filtered.show() val collected = filtered.collect() if (collected(0).getSeq[String](1)(0) != "number1") { throw new UserCustomizedSampleException("value invalid") } if (collected(0).getSeq[String](1)(1) != "number2") { throw new UserCustomizedSampleException("value invalid") } df.write.options( Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalogInsert, HBaseTableCatalog.newTable -> "5")) .format("org.apache.hadoop.hbase.spark") .save() val newDF = withCatalog(avroCatalogInsert) newDF.show() newDF.printSchema() if (newDF.count() != 256) { throw new UserCustomizedSampleException("value invalid") } df.filter($"col1.name" === "name005" || $"col1.name" <= "name005") .select("col0", "col1.favorite_color", "col1.favorite_number") .show() } finally { sc.stop() } }
Python样例代码
下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中的AvroSource文件:
# -*- coding:utf-8 -*- """ 【说明】 由于pyspark不提供HBase相关api,本样例使用Python调用Java的方式实现 """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("AvroSourceExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.datasources.AvroSource') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.AvroSource().execute(spark._jsc) # 停止SparkSession spark.stop()