更新时间:2024-12-10 GMT+08:00

Scala样例代码

功能介绍

在Spark应用中,通过使用Spark调用Hive接口来操作hive表,然后把Hive表的数据经过分析后写到HBase表。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkHivetoHbase。

样例代码获取方式请参考获取MRS应用开发样例工程

代码样例:

/**
  * 从hive表读取数据,根据key值去hbase表获取相应记录,把两者数据做操作后,更新到hbase表
  */
object SparkHivetoHbase {
  case class FemaleInfo(name: String, gender: String, stayTime: Int)
  def main(args: Array[String]) {
    if (args.length < 1) {
      printUsage
    }
    // 通过spark接口获取表中的数据
    val sparkConf = new SparkConf().setAppName("SparkHivetoHbase")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    import sqlContext.implicits._
    val dataFrame = sqlContext.sql("select name, account from person")
    // 遍历hive表中的每一个partition, 然后更新到hbase表
    // 如果数据条数较少,也可以使用foreach()方法
    dataFrame.rdd.foreachPartition(x => hBaseWriter(x, args(0)))
    sc.stop()
  }
  /**
   * 在executor端更新hbase表记录
   *
   * @param iterator hive表的partition数据
   */
  def hBaseWriter(iterator: Iterator[Row], zkQuorum: String): Unit = {
    // 读取hbase
    val tableName = "table2"
    val columnFamily = "cf"
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.property.clientPort", "24002")
    conf.set("hbase.zookeeper.quorum", zkQuorum)
    var table: Table = null
    var connection: Connection = null
    try {
      connection = ConnectionFactory.createConnection(conf)
      table = connection.getTable(TableName.valueOf(tableName))
      val iteratorArray = iterator.toArray
      val rowList = new util.ArrayList[Get]()
      for (row <- iteratorArray) {
        val get = new Get(row.getString(0).getBytes)
        rowList.add(get)
      }
      // 获取hbase表记录
      val resultDataBuffer = table.get(rowList)
      // 修改hbase表记录
      val putList = new util.ArrayList[Put]()
      for (i <- 0 until iteratorArray.size) {
        // hbase row
        val resultData = resultDataBuffer(i)
        if (!resultData.isEmpty) {
          // hive表值
          var hiveValue = iteratorArray(i).getInt(1)
          // 根据列簇和列,获取hbase值
          val hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes))
          val put = new Put(iteratorArray(i).getString(0).getBytes)
          // 计算结果
          val resultValue = hiveValue + hbaseValue.toInt
          // 设置结果到put对象
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString))
          putList.add(put)
        }
      }
      if (putList.size() > 0) {
        table.put(putList)
      }
    } catch {
      case e: IOException =>
        e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close()
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          //关闭Hbase连接.
          connection.close()
        } catch {
          case e: IOException =>
            e.printStackTrace()
        }
      }
    }
  }
  
  private def printUsage {
    System.out.println("Usage: {zkQuorum}")
    System.exit(1)
  }
}