文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Spark2x开发指南(普通模式)/
开发Spark应用/
Spark从Hive读取数据再写入HBase样例程序/
Spark从Hive读取数据再写入HBase样例程序(Scala)
更新时间:2024-08-05 GMT+08:00
Spark从Hive读取数据再写入HBase样例程序(Scala)
功能介绍
在Spark应用中,通过使用Spark调用Hive接口来操作hive表,然后把Hive表的数据经过分析后写到HBase表。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkHivetoHbase
/** * 从hive表读取数据,根据key值去hbase表获取相应记录,把两者数据做操作后,更新到hbase表 */ object SparkHivetoHbase { case class FemaleInfo(name: String, gender: String, stayTime: Int) def main(args: Array[String]) { // 通过spark接口获取表中的数据 val spark = SparkSession .builder() .appName("SparkHiveHbase") .config("spark.sql.warehouse.dir", "spaek-warehouse") .enableHiveSupport() .getOrCreate() import spark.implicits._ val dataFrame = spark.sql("select name, account from person") // 遍历hive表中的每一个partition, 然后更新到hbase表 // 如果数据条数较少,也可以使用foreach()方法 dataFrame.rdd.foreachPartition(x => hBaseWriter(x)) spark.stop() } /** * 在executor端更新hbase表记录 * * @param iterator hive表的partition数据 */ def hBaseWriter(iterator: Iterator[Row]): Unit = { // 读取hbase val tableName = "table2" val columnFamily = "cf" val conf = HBaseConfiguration.create() var table: Table = null var connection: Connection = null try { connection = ConnectionFactory.createConnection(conf) table = connection.getTable(TableName.valueOf(tableName)) val iteratorArray = iterator.toArray val rowList = new util.ArrayList[Get]() for (row <- iteratorArray) { val get = new Get(row.getString(0).getBytes) rowList.add(get) } // 获取hbase表记录 val resultDataBuffer = table.get(rowList) // 修改hbase表记录 val putList = new util.ArrayList[Put]() for (i <- 0 until iteratorArray.size) { // hbase row val resultData = resultDataBuffer(i) if (!resultData.isEmpty) { // hive表值 var hiveValue = iteratorArray(i).getInt(1) // 根据列簇和列,获取hbase值 val hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes)) val put = new Put(iteratorArray(i).getString(0).getBytes) // 计算结果 val resultValue = hiveValue + hbaseValue.toInt // 设置结果到put对象 put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString)) putList.add(put) } } if (putList.size() > 0) { table.put(putList) } } catch { case e: IOException => e.printStackTrace(); } finally { if (table != null) { try { table.close() } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { //关闭Hbase连接. connection.close() } catch { case e: IOException => e.printStackTrace() } } } } }