Scala Example Code
Function
In Spark application, call Hive API using Spark to operate Hive table, analyze the data, and then write the analyzed data to the HBase table.
Example Code
For details about code, see com.huawei.bigdata.spark.examples.SparkHbasetoHbase.
Example code
/**
* calculate data from hive/hbase,then update to hbase
*/
object SparkHivetoHbase {
case class FemaleInfo(name: String, gender: String, stayTime: Int)
def main(args: Array[String]) {
String userPrincipal = "sparkuser";
String userKeytabPath = "/opt/FIclient/user.keytab";
String krb5ConfPath = "/opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf";
Configuration hadoopConf = new Configuration();
LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf);
// Obtain the data in the table through the Spark interface.
val spark = SparkSession
.builder()
.appName("SparkHiveHbase")
.config("spark.sql.warehouse.dir", "spaek-warehouse")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val dataFrame = spark.sql("select name, account from person")
// Traverse every Partition in the hive table and update the hbase table
// If less data, you can use rdd.foreach()
dataFrame.rdd.foreachPartition(x => hBaseWriter(x))
spark.stop()
}
/**
* write to hbase table in exetutor
*
* @param iterator partition data from hive table
*/
def hBaseWriter(iterator: Iterator[Row]): Unit = {
// read hbase
val tableName = "table2"
val columnFamily = "cf"
val conf = HBaseConfiguration.create()
var table: Table = null
var connection: Connection = null
try {
connection = ConnectionFactory.createConnection(conf)
table = connection.getTable(TableName.valueOf(tableName))
val iteratorArray = iterator.toArray
val rowList = new util.ArrayList[Get]()
for (row <- iteratorArray) {
val get = new Get(row.getString(0).getBytes)
rowList.add(get)
}
// get data from hbase table
val resultDataBuffer = table.get(rowList)
// set data for hbase
val putList = new util.ArrayList[Put]()
for (i <- 0 until iteratorArray.size) {
// hbase row
val resultData = resultDataBuffer(i)
if (!resultData.isEmpty) {
// get hiveValue
var hiveValue = iteratorArray(i).getInt(1)
// get hbaseValue by column Family and colomn qualifier
val hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes))
val put = new Put(iteratorArray(i).getString(0).getBytes)
// calculate result value
val resultValue = hiveValue + hbaseValue.toInt
// set data to put
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString))
putList.add(put)
}
}
if (putList.size() > 0) {
table.put(putList)
}
} catch {
case e: IOException =>
e.printStackTrace();
} finally {
if (table != null) {
try {
table.close()
} catch {
case e: IOException =>
e.printStackTrace();
}
}
if (connection != null) {
try {
//Close the HBase connection.
connection.close()
} catch {
case e: IOException =>
e.printStackTrace()
}
}
}
}
}
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.