更新时间:2022-07-19 GMT+08:00
Scala样例代码
功能介绍
在Spark应用中,通过使用Spark调用Hive接口来操作hive表,然后把Hive表的数据经过分析后写到HBase表。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkHivetoHbase
/** * 从hive表读取数据,根据key值去hbase表获取相应记录,把两者数据做操作后,更新到hbase表 */ object SparkHivetoHbase { case class FemaleInfo(name: String, gender: String, stayTime: Int) def main(args: Array[String]) { if (args.length < 1) { printUsage } // 通过spark接口获取表中的数据 val sparkConf = new SparkConf().setAppName("SparkHivetoHbase") val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ val dataFrame = sqlContext.sql("select name, account from person") // 遍历hive表中的每一个partition, 然后更新到hbase表 // 如果数据条数较少,也可以使用foreach()方法 dataFrame.rdd.foreachPartition(x => hBaseWriter(x, args(0))) sc.stop() } /** * 在executor端更新hbase表记录 * * @param iterator hive表的partition数据 */ def hBaseWriter(iterator: Iterator[Row], zkQuorum: String): Unit = { // 读取hbase val tableName = "table2" val columnFamily = "cf" val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "24002") conf.set("hbase.zookeeper.quorum", zkQuorum) var table: Table = null var connection: Connection = null try { connection = ConnectionFactory.createConnection(conf) table = connection.getTable(TableName.valueOf(tableName)) val iteratorArray = iterator.toArray val rowList = new util.ArrayList[Get]() for (row <- iteratorArray) { val get = new Get(row.getString(0).getBytes) rowList.add(get) } // 获取hbase表记录 val resultDataBuffer = table.get(rowList) // 修改hbase表记录 val putList = new util.ArrayList[Put]() for (i <- 0 until iteratorArray.size) { // hbase row val resultData = resultDataBuffer(i) if (!resultData.isEmpty) { // hive表值 var hiveValue = iteratorArray(i).getInt(1) // 根据列簇和列,获取hbase值 val hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes)) val put = new Put(iteratorArray(i).getString(0).getBytes) // 计算结果 val resultValue = hiveValue + hbaseValue.toInt // 设置结果到put对象 put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString)) putList.add(put) } } if (putList.size() > 0) { table.put(putList) } } catch { case e: IOException => e.printStackTrace(); } finally { if (table != null) { try { table.close() } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { //关闭Hbase连接. connection.close() } catch { case e: IOException => e.printStackTrace() } } } } private def printUsage { System.out.println("Usage: {zkQuorum}") System.exit(1) } }
父主题: 从Hive读取数据再写入HBase