Updated on 2024-10-23 GMT+08:00

Implementing Data Transition Between Hive and HBase (Scala)

Function

In a Spark application, you can use Spark to call a Hive API to operate a Hive table, and write the data analysis result of the Hive table to an HBase table.

Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SparkHivetoHbase.

/**
  * Read data from the Hive table, and obtain the corresponding record from the HBase table based on the key value. Sum the obtained two data records and update the sum result to the HBase table.
  */
object SparkHivetoHbase {
  case class FemaleInfo(name: String, gender: String, stayTime: Int)
  def main(args: Array[String]) {
    // Use the Spark API to obtain table data.

     val spark = SparkSession
      .builder()
      .appName("SparkHiveHbase")
      .config("spark.sql.warehouse.dir", "spaek-warehouse")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._
    val dataFrame = spark.sql("select name, account from person")
    // Traverse every partition in the Hive table and update data to the HBase table.
    // If the number of data records is small, you can use the foreach() method.
    dataFrame.rdd.foreachPartition(x => hBaseWriter(x))
    spark.stop()
  }
  /**
   * Update records in the HBase table on the executor.
   *
   * @param iterator Partition data in the Hive table.
   */
  def hBaseWriter(iterator: Iterator[Row]): Unit = {
    // Read the HBase table.
    val tableName = "table2"
    val columnFamily = "cf"
    val conf = HBaseConfiguration.create()
    var table: Table = null
    var connection: Connection = null
    try {
      connection = ConnectionFactory.createConnection(conf)
      table = connection.getTable(TableName.valueOf(tableName))
      val iteratorArray = iterator.toArray
      val rowList = new util.ArrayList[Get]()
      for (row <- iteratorArray) {
        val get = new Get(row.getString(0).getBytes)
        rowList.add(get)
      }
      // Obtain the records in the HBase table.
      val resultDataBuffer = table.get(rowList)
      // Modify records in the HBase table.
      val putList = new util.ArrayList[Put]()
      for (i <- 0 until iteratorArray.size) {
        // hbase row
        val resultData = resultDataBuffer(i)
        if (!resultData.isEmpty) {
        // Hive table value.
          var hiveValue = iteratorArray(i).getInt(1)
          // Obtain the HBase table value based on the column family and column.
          val hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes))
          val put = new Put(iteratorArray(i).getString(0).getBytes)
          // Calculate the result.
          val resultValue = hiveValue + hbaseValue.toInt
          // Set the result to the Put object.
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString))
          putList.add(put)
        }
      }
      if (putList.size() > 0) {
        table.put(putList)
      }
    } catch {
      case e: IOException =>
        e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close()
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // Close the HBase connection.
          connection.close()
        } catch {
          case e: IOException =>
            e.printStackTrace()
        }
      }
    }
  }

}