Implementing Bidirectional Data Exchange with HBase (Scala)
Function
You can use Spark to call an HBase API to operate HBase table1 and write the data analysis result of table1 to HBase table2.
Sample Code
The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SparkHbasetoHbase.
/** * Read data from table1, and obtain the corresponding record from table2 based on the key value. Sum the obtained two data records and update the sum result to table2. */ object SparkHbasetoHbase { case class FemaleInfo(name: String, gender: String, stayTime: Int) def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkHbasetoHbase") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator") val sc = new SparkContext(conf) // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath. val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration) // Declare table information. val scan = new Scan() scan.addFamily(Bytes.toBytes("cf"))//colomn family val proto = ProtobufUtil.toScan(scan) val scanToString = Base64.encodeBytes(proto.toByteArray) hbConf.set(TableInputFormat.INPUT_TABLE, "table1")//table name hbConf.set(TableInputFormat.SCAN, scanToString) // Use the Spark API to obtain table data. val rdd = sc.newAPIHadoopRDD(hbConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) // Traverse every partition in HBase table1 and update data to HBase table2. // If the number of data records is small, you can use the rdd.foreach() method. rdd.foreachPartition(x => hBaseWriter(x)) sc.stop() } /** * Update records in table2 on the executor. * * @param iterator partition data in table1. */ def hBaseWriter(iterator: Iterator[(ImmutableBytesWritable, Result)]): Unit = { // Prepare for reading HBase data. val tableName = "table2" val columnFamily = "cf" val qualifier = "cid" val conf = HBaseConfiguration.create() var table: Table = null var connection: Connection = null try { connection = ConnectionFactory.createConnection(conf) table = connection.getTable(TableName.valueOf(tableName)) val iteratorArray = iterator.toArray val rowList = new util.ArrayList[Get]() for (row <- iteratorArray) { val get = new Get(row._2.getRow) rowList.add(get) } // Obtain the records in table2. val resultDataBuffer = table.get(rowList) // Modify records in table2. val putList = new util.ArrayList[Put]() for (i <- 0 until iteratorArray.size) { val resultData = resultDataBuffer(i) //hbase2 row if (!resultData.isEmpty) { // Query hbase1Value. var hbase1Value = "" val it = iteratorArray(i)._2.listCells().iterator() while (it.hasNext) { val c = it.next() // Check whether the values of cf and qualifile are the same. if (columnFamily.equals(Bytes.toString(CellUtil.cloneFamily(c))) && qualifier.equals(Bytes.toString(CellUtil.cloneQualifier(c)))) { hbase1Value = Bytes.toString(CellUtil.cloneValue(c)) } } val hbase2Value = Bytes.toString(resultData.getValue(columnFamily.getBytes, qualifier.getBytes)) val put = new Put(iteratorArray(i)._2.getRow) // Calculate the result. val resultValue = hbase1Value.toInt + hbase2Value.toInt // Set the result to the Put object. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(resultValue.toString)) putList.add(put) } } if (putList.size() > 0) { table.put(putList) } } catch { case e: IOException => e.printStackTrace(); } finally { if (table != null) { try { table.close() } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { //Close the HBase connection. connection.close() } catch { case e: IOException => e.printStackTrace() } } } } } /** * Sequential auxiliary class */ class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]) kryo.register(classOf[org.apache.hadoop.hbase.client.Result]) kryo.register(classOf[Array[(Any, Any)]]) kryo.register(classOf[Array[org.apache.hadoop.hbase.Cell]]) kryo.register(classOf[org.apache.hadoop.hbase.NoTagsKeyValue]) kryo.register(classOf[org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionLoadStats]) } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot