Spark从HBase读取数据再写入HBase样例程序(Scala)
功能介绍
用户可以使用Spark调用HBase接口来操作HBase table1表,然后把table1表的数据经过分析后写到HBase table2表中。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkHbasetoHbase。
/**
* 从table1表读取数据,根据key值去table2表获取相应记录,把两者数据后,更新到table2表
*/
object SparkHbasetoHbase {
case class FemaleInfo(name: String, gender: String, stayTime: Int)
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SparkHbasetoHbase")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator")
val sc = new SparkContext(conf)
// 建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration)
// 声明表的信息
val scan = new Scan()
scan.addFamily(Bytes.toBytes("cf"))//colomn family
val proto = ProtobufUtil.toScan(scan)
val scanToString = Base64.encodeBytes(proto.toByteArray)
hbConf.set(TableInputFormat.INPUT_TABLE, "table1")//table name
hbConf.set(TableInputFormat.SCAN, scanToString)
// 通过spark接口获取表中的数据
val rdd = sc.newAPIHadoopRDD(hbConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
// 遍历hbase table1表中的每一个partition, 然后更新到Hbase table2表
// 如果数据条数较少,也可以使用rdd.foreach()方法
rdd.foreachPartition(x => hBaseWriter(x))
sc.stop()
}
/**
* 在executor端更新table2表记录
*
* @param iterator table1表的partition数据
*/
def hBaseWriter(iterator: Iterator[(ImmutableBytesWritable, Result)]): Unit = {
// 准备读取hbase
val tableName = "table2"
val columnFamily = "cf"
val qualifier = "cid"
val conf = HBaseConfiguration.create()
var table: Table = null
var connection: Connection = null
try {
connection = ConnectionFactory.createConnection(conf)
table = connection.getTable(TableName.valueOf(tableName))
val iteratorArray = iterator.toArray
val rowList = new util.ArrayList[Get]()
for (row <- iteratorArray) {
val get = new Get(row._2.getRow)
rowList.add(get)
}
// 获取table2表记录
val resultDataBuffer = table.get(rowList)
// 修改table2表记录
val putList = new util.ArrayList[Put]()
for (i <- 0 until iteratorArray.size) {
val resultData = resultDataBuffer(i) //hbase2 row
if (!resultData.isEmpty) {
// 查询hbase1Value
var hbase1Value = ""
val it = iteratorArray(i)._2.listCells().iterator()
while (it.hasNext) {
val c = it.next()
// 判断cf和qualifile是否相同
if (columnFamily.equals(Bytes.toString(CellUtil.cloneFamily(c)))
&& qualifier.equals(Bytes.toString(CellUtil.cloneQualifier(c)))) {
hbase1Value = Bytes.toString(CellUtil.cloneValue(c))
}
}
val hbase2Value = Bytes.toString(resultData.getValue(columnFamily.getBytes, qualifier.getBytes))
val put = new Put(iteratorArray(i)._2.getRow)
// 计算结果
val resultValue = hbase1Value.toInt + hbase2Value.toInt
// 设置结果到put对象
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(resultValue.toString))
putList.add(put)
}
}
if (putList.size() > 0) {
table.put(putList)
}
} catch {
case e: IOException =>
e.printStackTrace();
} finally {
if (table != null) {
try {
table.close()
} catch {
case e: IOException =>
e.printStackTrace();
}
}
if (connection != null) {
try {
//关闭Hbase连接
connection.close()
} catch {
case e: IOException =>
e.printStackTrace()
}
}
}
}
}
/**
* 序列化辅助类
*/
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable])
kryo.register(classOf[org.apache.hadoop.hbase.client.Result])
kryo.register(classOf[Array[(Any, Any)]])
kryo.register(classOf[Array[org.apache.hadoop.hbase.Cell]])
kryo.register(classOf[org.apache.hadoop.hbase.NoTagsKeyValue])
kryo.register(classOf[org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats])
}
}