Scala Sample Code
Function Description
In a Spark application, users can use Spark to call a Hive API to operate a Hive table, and write the data analysis result of the Hive table to an HBase table.
Sample Code
The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SparkHivetoHbase.
/** * Read data from the Hive table, and obtain the corresponding record from the HBase table based on the key value. Sum the obtained two data records and update the sum result to the HBase table. */ object SparkHivetoHbase { case class FemaleInfo(name: String, gender: String, stayTime: Int) def main(args: Array[String]) { if (args.length < 1) { printUsage } // Use the Spark API to obtain table data. val sparkConf = new SparkConf().setAppName("SparkHivetoHbase") val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ val dataFrame = sqlContext.sql("select name, account from person") // Traverse every partition in the Hive table and update data to the HBase table. // If the number of data records is small, you can use the foreach() method. dataFrame.rdd.foreachPartition(x => hBaseWriter(x, args(0))) sc.stop() } /** * Update records in the HBase table on the executor. * * @param iterator Partition data in the Hive table. */ def hBaseWriter(iterator: Iterator[Row], zkQuorum: String): Unit = { // Read the HBase table. val tableName = "table2" val columnFamily = "cf" val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "24002") conf.set("hbase.zookeeper.quorum", zkQuorum) var table: Table = null var connection: Connection = null try { connection = ConnectionFactory.createConnection(conf) table = connection.getTable(TableName.valueOf(tableName)) val iteratorArray = iterator.toArray val rowList = new util.ArrayList[Get]() for (row <- iteratorArray) { val get = new Get(row.getString(0).getBytes) rowList.add(get) } // Obtain the records in the HBase table. val resultDataBuffer = table.get(rowList) // Modify records in the HBase table. val putList = new util.ArrayList[Put]() for (i <- 0 until iteratorArray.size) { // hbase row val resultData = resultDataBuffer(i) if (!resultData.isEmpty) { // Hive table value var hiveValue = iteratorArray(i).getInt(1) // Obtain the HBase table value based on the column family and column. val hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes)) val put = new Put(iteratorArray(i).getString(0).getBytes) // Calculate the result. val resultValue = hiveValue + hbaseValue.toInt // Set the result to the Put object. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString)) putList.add(put) } } if (putList.size() > 0) { table.put(putList) } } catch { case e: IOException => e.printStackTrace(); } finally { if (table != null) { try { table.close() } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { // Close the HBase connection. connection.close() } catch { case e: IOException => e.printStackTrace() } } } } private def printUsage { System.out.println("Usage: {zkQuorum}") System.exit(1) } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.