Updated on 2022-06-01 GMT+08:00

Scala Sample Code

Function Description

In the Spark applications, users can use HBase APIs to create a table, read the table, and insert data into the table.

Sample Code

The following code snippets are used as an example. For complete codes, see SparkOnHbaseScalaExample.

Example: Creating an HBase table

    // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath.
    val conf: SparkConf = new SparkConf
    val sc: SparkContext = new SparkContext(conf)
    val hbConf: Configuration = HBaseConfiguration.create(sc.hadoopConfiguration)
    // Create a connection channel to connect to HBase.
    val connection: Connection = ConnectionFactory.createConnection(hbConf)
    
    // Declare table description.
    val userTable = TableName.valueOf("shb1")
    val tableDescr = new HTableDescriptor(userTable)
    tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
    
    // Create a table.
    println("Creating table shb1. ")
    val admin = connection.getAdmin
    if (admin.tableExists(userTable)) {
      admin.disableTable(userTable)
      admin.deleteTable(userTable)
    }
    admin.createTable(tableDescr)

    connection.close()
    sc.stop()
    println("Done!")

Example: Inserting data into the HBase table

    // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath.
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration)

    // Declare table information.
    val table: HTable = null
    val tableName = "shb1"
    val familyName = Bytes.toBytes("info");
    var connection: Connection = null
    try {
      // Obtain the HBase connection.
      connection = ConnectionFactory.createConnection(hbConf);
      // Obtain the table object.
      val table = connection.getTable(TableName.valueOf(tableName));
      val data = sc.textFile(args(0)).map { line =>
        val value = line.split(",")
        (value(0), value(1), value(2), value(3))
      }.collect()

      var i = 0
      for (line <- data) {
        val put = new Put(Bytes.toBytes("row" + i));
        put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1))
        put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2))
        put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3))
        put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4))
        i += 1
        table.put(put)
      }
    } catch {
      case e: IOException =>
        e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          // Close the HTable object.
          table.close();
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // Close the HBase connection.
          connection.close();
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
       sc.stop()
    }

Example: Reading HBase table data

   // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath.
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration)
    
    // Declare information about the table to be queried.
    val scan = new Scan()
    scan.addFamily(Bytes.toBytes("info"))
    val proto = ProtobufUtil.toScan(scan)
    val scanToString = Base64.encodeBytes(proto.toByteArray)
    hbConf.set(TableInputFormat.INPUT_TABLE, "shb1")
    hbConf.set(TableInputFormat.SCAN, scanToString)

    // Use the Spark API to obtain table data.
    val rdd = sc.newAPIHadoopRDD(hbConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    
    // Traverse every row in the HBase table and print the results.
    rdd.collect().foreach(x => {
      val key = x._1.toString
      val it = x._2.listCells().iterator()
      while (it.hasNext) {
        val c = it.next()
        val family = Bytes.toString(CellUtil.cloneFamily(c))
        val qualifier = Bytes.toString(CellUtil.cloneQualifier(c))
        val value = Bytes.toString(CellUtil.cloneValue(c))
        val tm = c.getTimestamp
        println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm)
      }
    })
    sc.stop()