文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Spark2x开发指南(安全模式)/
开发Spark应用/
Spark从HBase读取数据再写入HBase样例程序/
Spark从HBase读取数据再写入HBase样例程序(Scala)
更新时间:2024-08-05 GMT+08:00
Spark从HBase读取数据再写入HBase样例程序(Scala)
功能介绍
用户可以使用Spark调用HBase接口来操作HBase table1表,然后把table1表的数据经过分析后写到HBase table2表中。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkHbasetoHbase。
/** * 从table1表读取数据,根据key值去table2表获取相应记录,把两者数据后,更新到table2表 */ object SparkHbasetoHbase { case class FemaleInfo(name: String, gender: String, stayTime: Int) def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkHbasetoHbase") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator") val sc = new SparkContext(conf) // 建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中 val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration) // 声明表的信息 val scan = new Scan() scan.addFamily(Bytes.toBytes("cf"))//colomn family val proto = ProtobufUtil.toScan(scan) val scanToString = Base64.encodeBytes(proto.toByteArray) hbConf.set(TableInputFormat.INPUT_TABLE, "table1")//table name hbConf.set(TableInputFormat.SCAN, scanToString) // 通过spark接口获取表中的数据 val rdd = sc.newAPIHadoopRDD(hbConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) // 遍历hbase table1表中的每一个partition, 然后更新到Hbase table2表 // 如果数据条数较少,也可以使用rdd.foreach()方法 rdd.foreachPartition(x => hBaseWriter(x)) sc.stop() } /** * 在executor端更新table2表记录 * * @param iterator table1表的partition数据 */ def hBaseWriter(iterator: Iterator[(ImmutableBytesWritable, Result)]): Unit = { // 准备读取hbase val tableName = "table2" val columnFamily = "cf" val qualifier = "cid" val conf = HBaseConfiguration.create() var table: Table = null var connection: Connection = null try { connection = ConnectionFactory.createConnection(conf) table = connection.getTable(TableName.valueOf(tableName)) val iteratorArray = iterator.toArray val rowList = new util.ArrayList[Get]() for (row <- iteratorArray) { val get = new Get(row._2.getRow) rowList.add(get) } // 获取table2表记录 val resultDataBuffer = table.get(rowList) // 修改table2表记录 val putList = new util.ArrayList[Put]() for (i <- 0 until iteratorArray.size) { val resultData = resultDataBuffer(i) //hbase2 row if (!resultData.isEmpty) { // 查询hbase1Value var hbase1Value = "" val it = iteratorArray(i)._2.listCells().iterator() while (it.hasNext) { val c = it.next() // 判断cf和qualifile是否相同 if (columnFamily.equals(Bytes.toString(CellUtil.cloneFamily(c))) && qualifier.equals(Bytes.toString(CellUtil.cloneQualifier(c)))) { hbase1Value = Bytes.toString(CellUtil.cloneValue(c)) } } val hbase2Value = Bytes.toString(resultData.getValue(columnFamily.getBytes, qualifier.getBytes)) val put = new Put(iteratorArray(i)._2.getRow) // 计算结果 val resultValue = hbase1Value.toInt + hbase2Value.toInt // 设置结果到put对象 put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(resultValue.toString)) putList.add(put) } } if (putList.size() > 0) { table.put(putList) } } catch { case e: IOException => e.printStackTrace(); } finally { if (table != null) { try { table.close() } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { //关闭Hbase连接 connection.close() } catch { case e: IOException => e.printStackTrace() } } } } } /** * 序列化辅助类 */ class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]) kryo.register(classOf[org.apache.hadoop.hbase.client.Result]) kryo.register(classOf[Array[(Any, Any)]]) kryo.register(classOf[Array[org.apache.hadoop.hbase.Cell]]) kryo.register(classOf[org.apache.hadoop.hbase.NoTagsKeyValue]) kryo.register(classOf[org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats]) } }