文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Spark2x开发指南(安全模式)/
开发Spark应用/
Spark从Hive读取数据再写入HBase样例程序/
Spark从Hive读取数据再写入HBase样例程序(Scala)
更新时间:2024-08-03 GMT+08:00
Spark从Hive读取数据再写入HBase样例程序(Scala)
功能介绍
在Spark应用中,通过使用Spark调用Hive接口来操作hive表,然后把Hive表的数据经过分析后写到HBase表。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkHivetoHbase
/**
* 从hive表读取数据,根据key值去hbase表获取相应记录,把两者数据做操作后,更新到hbase表
*/
object SparkHivetoHbase {
case class FemaleInfo(name: String, gender: String, stayTime: Int)
def main(args: Array[String]) {
String userPrincipal = "sparkuser";
String userKeytabPath = "/opt/FIclient/user.keytab";
String krb5ConfPath = "/opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf";
Configuration hadoopConf = new Configuration();
LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf);
// 通过spark接口获取表中的数据
val spark = SparkSession
.builder()
.appName("SparkHiveHbase")
.config("spark.sql.warehouse.dir", "spaek-warehouse")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val dataFrame = spark.sql("select name, account from person")
// 遍历hive表中的每一个partition, 然后更新到hbase表
// 如果数据条数较少,也可以使用foreach()方法
dataFrame.rdd.foreachPartition(x => hBaseWriter(x))
spark.stop()
}
/**
* 在exetutor端更新hbase表记录
*
* @param iterator hive表的partition数据
*/
def hBaseWriter(iterator: Iterator[Row]): Unit = {
// 读取hbase
val tableName = "table2"
val columnFamily = "cf"
val conf = HBaseConfiguration.create()
var table: Table = null
var connection: Connection = null
try {
connection = ConnectionFactory.createConnection(conf)
table = connection.getTable(TableName.valueOf(tableName))
val iteratorArray = iterator.toArray
val rowList = new util.ArrayList[Get]()
for (row <- iteratorArray) {
val get = new Get(row.getString(0).getBytes)
rowList.add(get)
}
// 获取hbase表记录
val resultDataBuffer = table.get(rowList)
// 修改hbase表记录
val putList = new util.ArrayList[Put]()
for (i <- 0 until iteratorArray.size) {
// hbase row
val resultData = resultDataBuffer(i)
if (!resultData.isEmpty) {
// hive表值
var hiveValue = iteratorArray(i).getInt(1)
// 根据列簇和列,获取hbase值
val hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes))
val put = new Put(iteratorArray(i).getString(0).getBytes)
// 计算结果
val resultValue = hiveValue + hbaseValue.toInt
// 设置结果到put对象
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString))
putList.add(put)
}
}
if (putList.size() > 0) {
table.put(putList)
}
} catch {
case e: IOException =>
e.printStackTrace();
} finally {
if (table != null) {
try {
table.close()
} catch {
case e: IOException =>
e.printStackTrace();
}
}
if (connection != null) {
try {
//关闭Hbase连接.
connection.close()
} catch {
case e: IOException =>
e.printStackTrace()
}
}
}
}
}