Scala Example Code
Function
In Spark application, call Hive API using Spark to operate Hive table, analyze the data, and then write the analyzed data to the HBase table.
Example Code
For details about code, see com.huawei.bigdata.spark.examples.SparkHbasetoHbase.
Example code
/** * calculate data from hive/hbase,then update to hbase */ object SparkHivetoHbase { case class FemaleInfo(name: String, gender: String, stayTime: Int) def main(args: Array[String]) { String userPrincipal = "sparkuser"; String userKeytabPath = "/opt/FIclient/user.keytab"; String krb5ConfPath = "/opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf"; Configuration hadoopConf = new Configuration(); LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf); // Obtain the data in the table through the Spark interface. val spark = SparkSession .builder() .appName("SparkHiveHbase") .config("spark.sql.warehouse.dir", "spaek-warehouse") .enableHiveSupport() .getOrCreate() import spark.implicits._ val dataFrame = spark.sql("select name, account from person") // Traverse every Partition in the hive table and update the hbase table // If less data, you can use rdd.foreach() dataFrame.rdd.foreachPartition(x => hBaseWriter(x)) spark.stop() } /** * write to hbase table in exetutor * * @param iterator partition data from hive table */ def hBaseWriter(iterator: Iterator[Row]): Unit = { // read hbase val tableName = "table2" val columnFamily = "cf" val conf = HBaseConfiguration.create() var table: Table = null var connection: Connection = null try { connection = ConnectionFactory.createConnection(conf) table = connection.getTable(TableName.valueOf(tableName)) val iteratorArray = iterator.toArray val rowList = new util.ArrayList[Get]() for (row <- iteratorArray) { val get = new Get(row.getString(0).getBytes) rowList.add(get) } // get data from hbase table val resultDataBuffer = table.get(rowList) // set data for hbase val putList = new util.ArrayList[Put]() for (i <- 0 until iteratorArray.size) { // hbase row val resultData = resultDataBuffer(i) if (!resultData.isEmpty) { // get hiveValue var hiveValue = iteratorArray(i).getInt(1) // get hbaseValue by column Family and colomn qualifier val hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes)) val put = new Put(iteratorArray(i).getString(0).getBytes) // calculate result value val resultValue = hiveValue + hbaseValue.toInt // set data to put put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString)) putList.add(put) } } if (putList.size() > 0) { table.put(putList) } } catch { case e: IOException => e.printStackTrace(); } finally { if (table != null) { try { table.close() } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { //Close the HBase connection. connection.close() } catch { case e: IOException => e.printStackTrace() } } } } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.