更新时间:2024-06-27 GMT+08:00
分享

操作Avro格式数据

场景说明

用户可以在Spark应用程序中以数据源的方式去使用HBase,本例中将数据以Avro格式存储在HBase中,并从中读取数据以及对读取的数据进行过滤等操作。

数据规划

在客户端执行hbase shell,进入HBase命令行,使用下面的命令创建样例代码中要使用的HBase表:

create 'ExampleAvrotable','rowkey','cf1' (如果表已经存在,则每次执行提交命令前需清空表里的数据:truncate 'ExampleAvrotable'

create 'ExampleAvrotableInsert','rowkey','cf1' (如果表已经存在,则每次执行提交命令前需清空表里的数据:truncate 'ExampleAvrotable'

开发思路

  1. 创建RDD。
  2. 以数据源的方式操作HBase,将上面生成的RDD写入HBase表中。
  3. 读取HBase表中的数据,并且对其进行简单的操作。

运行前置操作

安全模式下Spark Core样例代码需要读取两个文件(user.keytab、krb5.conf)。user.keytab和krb5.conf文件为安全模式下的认证文件,需要在FusionInsight Manager中下载principal用户的认证凭证,样例代码中使用的用户为:super,需要修改为准备好的开发用户。

打包项目

  • 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序
  • 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。
  • 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。

    若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。

提交命令

假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行。

  • yarn-client模式:

    java/scala版本(类名等请与实际代码保持一致,此处仅为示例)

    bin/spark-submit --master yarn --deploy-mode client --jars /opt/female/protobuf-java-2.5.0.jar --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.datasources.AvroSource SparkOnHbaseJavaExample.jar

    python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。

    bin/spark-submit --master yarn --deploy-mode client --conf spark.yarn.user.classpath.first=true --jars SparkOnHbaseJavaExample.jar,/opt/female/protobuf-java-2.5.0.jar AvroSource.py

  • yarn-cluster模式:

    java/scala版本(类名等请与实际代码保持一致,此处仅为示例)

    bin/spark-submit --master yarn --deploy-mode cluster --jars /opt/female/protobuf-java-2.5.0.jar --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.datasources.AvroSource --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar

    python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。

    bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --conf spark.yarn.user.classpath.first=true --jars SparkOnHbaseJavaExample.jar,/opt/female/protobuf-java-2.5.0.jar AvroSource.py

Java样例代码

下面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample中的AvroSource文件:

public static void main(JavaSparkContext jsc) throws IOException {
        LoginUtil.loginWithUserKeytab();
        SQLContext sqlContext = new SQLContext(jsc);
        Configuration hbaseconf = new HBaseConfiguration().create();
        JavaHBaseContext hBaseContext = new JavaHBaseContext(jsc, hbaseconf);
        List list = new ArrayList<AvroHBaseRecord>();
        for(int i=0; i<=255 ; ++i){
            list.add(AvroHBaseRecord.apply(i));
        }
        try{
            Map<String, String> map = new HashMap<String, String>();
            map.put(HBaseTableCatalog.tableCatalog(), catalog);
            map.put(HBaseTableCatalog.newTable(), "5");
            sqlContext.createDataFrame(list, AvroHBaseRecord.class).write().options(map).format("org.apache.hadoop.hbase.spark").save();
            Dataset<Row> ds = withCatalog(sqlContext,catalog);
            ds.show();
            ds.printSchema();
            ds.registerTempTable("ExampleAvrotable");
            Dataset<Row> c= sqlContext.sql("select count(1) from ExampleAvrotable");
            c.show();
            Dataset<Row> filtered = ds.select("col0", "col1.favorite_array").where("col0 = 'name1'");
            filtered.show();
            java.util.List<Row> collected = filtered.collectAsList();
            if (collected.get(0).get(1).toString().equals("number1")) {
                throw new UserCustomizedSampleException("value invalid", new Throwable());
            }
            if (collected.get(0).get(1).toString().equals("number2")) {
                throw new UserCustomizedSampleException("value invalid", new Throwable());
            }
            Map avroCatalogInsertMap = new HashMap<String,String>();
            avroCatalogInsertMap.put("avroSchema" , AvroHBaseRecord.schemaString);
            avroCatalogInsertMap.put(HBaseTableCatalog.tableCatalog(), avroCatalogInsert);
            ds.write().options(avroCatalogInsertMap).format("org.apache.hadoop.hbase.spark").save();
            Dataset<Row> newDS = withCatalog(sqlContext,avroCatalogInsert);
            newDS.show();
            newDS.printSchema();
            if (newDS.count() != 256) {
                throw new UserCustomizedSampleException("value invalid", new Throwable());
            }
            ds.filter("col1.name = 'name5' || col1.name <= 'name5'").select("col0","col1.favorite_color", "col1.favorite_number").show();
        } finally{
            jsc.stop();
       }
    }

Scala样例代码

下面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample中的AvroSource文件:

  def main(args: Array[String]) {
    LoginUtil.loginWithUserKeytab()
    val sparkConf = new SparkConf().setAppName("AvroSourceExample")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val hbaseConf = HBaseConfiguration.create()
    val hbaseContext = new HBaseContext(sc, hbaseConf)
    import sqlContext.implicits._
    def withCatalog(cat: String): DataFrame = {
      sqlContext
        .read
        .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
        .format("org.apache.hadoop.hbase.spark")
        .load()
    }
    val data = (0 to 255).map { i =>
      AvroHBaseRecord(i)
    }
    try {
      sc.parallelize(data).toDF.write.options(
        Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
        .format("org.apache.hadoop.hbase.spark")
        .save()

      val df = withCatalog(catalog)
      df.show()
      df.printSchema()
      df.registerTempTable("ExampleAvrotable")
      val c = sqlContext.sql("select count(1) from ExampleAvrotable")
      c.show()

      val filtered = df.select($"col0", $"col1.favorite_array").where($"col0" === "name001")
      filtered.show()
      val collected = filtered.collect()
      if (collected(0).getSeq[String](1)(0) != "number1") {
        throw new UserCustomizedSampleException("value invalid")
      }
      if (collected(0).getSeq[String](1)(1) != "number2") {
        throw new UserCustomizedSampleException("value invalid")
      }

      df.write.options(
        Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalogInsert,
          HBaseTableCatalog.newTable -> "5"))
        .format("org.apache.hadoop.hbase.spark")
        .save()
      val newDF = withCatalog(avroCatalogInsert)
      newDF.show()
      newDF.printSchema()
      if (newDF.count() != 256) {
        throw new UserCustomizedSampleException("value invalid")
      }
      df.filter($"col1.name" === "name005" || $"col1.name" <= "name005")
        .select("col0", "col1.favorite_color", "col1.favorite_number")
        .show()
    } finally {
      sc.stop()
    }
  }

Python样例代码

下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中的AvroSource文件:

# -*- coding:utf-8 -*-
"""
【说明】
(1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现
(2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中
   spark.yarn.security.credentials.hbase.enabled参数配置为true
"""
from py4j.java_gateway import java_import
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession\
        .builder\
        .appName("AvroSourceExample")\
        .getOrCreate()
# 向sc._jvm中导入要运行的类
java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.datasources.AvroSource')
# 创建类实例并调用方法,传递sc._jsc参数
spark._jvm.AvroSource().execute(spark._jsc)
# 停止SparkSession
spark.stop()

相关文档