Call HBase API using Spark to operate HBase table1, analyze the data, and then write the analyzed data to HBase table2.

For details about code, see com.huawei.bigdata.spark.examples.SparkHbasetoHbase.

Example code:

  * calculate data from hbase1/hbase2,then update to hbase2
object SparkHbasetoHbase {

  case class FemaleInfo(name: String, gender: String, stayTime: Int)

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("SparkHbasetoHbase")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator")
    val sc = new SparkContext(conf)
    // Create the configuration parameter to connect the HBase. The hbase-site.xml must be included in the classpath.
    val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration)

    // Declare the information of the table to be queried.
    val scan = new Scan()
    scan.addFamily(Bytes.toBytes("cf")) //colomn family
    val proto = ProtobufUtil.toScan(scan)
    val scanToString = Base64.encodeBytes(proto.toByteArray)
    hbConf.set(TableInputFormat.INPUT_TABLE, "table1") //table name
    hbConf.set(TableInputFormat.SCAN, scanToString)

    //  Obtain the data in the table through the Spark interface.
    val rdd = sc.newAPIHadoopRDD(hbConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

    // Traverse every Partition in the HBase table1 and update the HBase table2
    //If less data, you can use rdd.foreach()
    rdd.foreachPartition(x => hBaseWriter(x))


    * write to table2 in exetutor
    * @param iterator partition data from table1
  def hBaseWriter(iterator: Iterator[(ImmutableBytesWritable, Result)]): Unit = {
    //read hbase
    val tableName = "table2"
    val columnFamily = "cf"
    val qualifier = "cid"
    val conf = HBaseConfiguration.create()

    var table: Table = null
    var connection: Connection = null

    try {
      connection = ConnectionFactory.createConnection(conf)
      table = connection.getTable(TableName.valueOf(tableName))

      val iteratorArray = iterator.toArray
      val rowList = new util.ArrayList[Get]()
      for (row <- iteratorArray) {
        val get = new Get(row._2.getRow)

      //get data from hbase table2
      val resultDataBuffer = table.get(rowList)

      //set data for hbase
      val putList = new util.ArrayList[Put]()
      for (i <- 0 until iteratorArray.size) {
        val resultData = resultDataBuffer(i) //hbase2 row
        if (!resultData.isEmpty) {
          //query hbase1Value
          var hbase1Value = ""
          val it = iteratorArray(i)._2.listCells().iterator()
          while (it.hasNext) {
            val c = it.next()
            // query table1 value by colomn family and colomn qualifier
            if (columnFamily.equals(Bytes.toString(CellUtil.cloneFamily(c)))
              && qualifier.equals(Bytes.toString(CellUtil.cloneQualifier(c)))) {
              hbase1Value = Bytes.toString(CellUtil.cloneValue(c))

          val hbase2Value = Bytes.toString(resultData.getValue(columnFamily.getBytes, qualifier.getBytes))
          val put = new Put(iteratorArray(i)._2.getRow)

          //calculate result value
          val resultValue = hbase1Value.toInt + hbase2Value.toInt
          //set data to put
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(resultValue.toString))

      if (putList.size() > 0) {
    } catch {
      case e: IOException =>
    } finally {
      if (table != null) {
        try {
        } catch {
          case e: IOException =>
      if (connection != null) {
        try {
          //Close the HBase connection.
        } catch {
          case e: IOException =>

  * Define serializer class.
class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[Array[(Any, Any)]])