Scala Example Code
Function
Call HBase API using Spark to operate HBase table1, analyze the data, and then write the analyzed data to HBase table2.
Example Code
For details about code, see com.huawei.bigdata.spark.examples.SparkHbasetoHbase.
Example code:
/** * calculate data from hbase1/hbase2,then update to hbase2 */ object SparkHbasetoHbase { case class FemaleInfo(name: String, gender: String, stayTime: Int) def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkHbasetoHbase") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator") val sc = new SparkContext(conf) // Create the configuration parameter to connect the HBase. The hbase-site.xml must be included in the classpath. val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration) // Declare the information of the table to be queried. val scan = new Scan() scan.addFamily(Bytes.toBytes("cf")) //colomn family val proto = ProtobufUtil.toScan(scan) val scanToString = Base64.encodeBytes(proto.toByteArray) hbConf.set(TableInputFormat.INPUT_TABLE, "table1") //table name hbConf.set(TableInputFormat.SCAN, scanToString) // Obtain the data in the table through the Spark interface. val rdd = sc.newAPIHadoopRDD(hbConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) // Traverse every Partition in the HBase table1 and update the HBase table2 //If less data, you can use rdd.foreach() rdd.foreachPartition(x => hBaseWriter(x)) sc.stop() } /** * write to table2 in exetutor * * @param iterator partition data from table1 */ def hBaseWriter(iterator: Iterator[(ImmutableBytesWritable, Result)]): Unit = { //read hbase val tableName = "table2" val columnFamily = "cf" val qualifier = "cid" val conf = HBaseConfiguration.create() var table: Table = null var connection: Connection = null try { connection = ConnectionFactory.createConnection(conf) table = connection.getTable(TableName.valueOf(tableName)) val iteratorArray = iterator.toArray val rowList = new util.ArrayList[Get]() for (row <- iteratorArray) { val get = new Get(row._2.getRow) rowList.add(get) } //get data from hbase table2 val resultDataBuffer = table.get(rowList) //set data for hbase val putList = new util.ArrayList[Put]() for (i <- 0 until iteratorArray.size) { val resultData = resultDataBuffer(i) //hbase2 row if (!resultData.isEmpty) { //query hbase1Value var hbase1Value = "" val it = iteratorArray(i)._2.listCells().iterator() while (it.hasNext) { val c = it.next() // query table1 value by colomn family and colomn qualifier if (columnFamily.equals(Bytes.toString(CellUtil.cloneFamily(c))) && qualifier.equals(Bytes.toString(CellUtil.cloneQualifier(c)))) { hbase1Value = Bytes.toString(CellUtil.cloneValue(c)) } } val hbase2Value = Bytes.toString(resultData.getValue(columnFamily.getBytes, qualifier.getBytes)) val put = new Put(iteratorArray(i)._2.getRow) //calculate result value val resultValue = hbase1Value.toInt + hbase2Value.toInt //set data to put put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(resultValue.toString)) putList.add(put) } } if (putList.size() > 0) { table.put(putList) } } catch { case e: IOException => e.printStackTrace(); } finally { if (table != null) { try { table.close() } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { //Close the HBase connection. connection.close() } catch { case e: IOException => e.printStackTrace() } } } } } /** * Define serializer class. */ class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]) kryo.register(classOf[org.apache.hadoop.hbase.client.Result]) kryo.register(classOf[Array[(Any, Any)]]) kryo.register(classOf[Array[org.apache.hadoop.hbase.Cell]]) kryo.register(classOf[org.apache.hadoop.hbase.NoTagsKeyValue]) kryo.register(classOf[org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats]) } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.