Scala Sample Code
Function Description
In the Spark applications, users can use HBase APIs to create a table, read the table, and insert data into the table.
Sample Code
The following code snippets are used as an example. For complete codes, see SparkOnHbaseScalaExample.
Example: Creating an HBase table
// Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath. val conf: SparkConf = new SparkConf val sc: SparkContext = new SparkContext(conf) val hbConf: Configuration = HBaseConfiguration.create(sc.hadoopConfiguration) // Create a connection channel to connect to HBase. val connection: Connection = ConnectionFactory.createConnection(hbConf) // Declare table description. val userTable = TableName.valueOf("shb1") val tableDescr = new HTableDescriptor(userTable) tableDescr.addFamily(new HColumnDescriptor("info".getBytes)) // Create a table. println("Creating table shb1. ") val admin = connection.getAdmin if (admin.tableExists(userTable)) { admin.disableTable(userTable) admin.deleteTable(userTable) } admin.createTable(tableDescr) connection.close() sc.stop() println("Done!")
Example: Inserting data into the HBase table
// Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath. val conf = new SparkConf() val sc = new SparkContext(conf) val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration) // Declare table information. val table: HTable = null val tableName = "shb1" val familyName = Bytes.toBytes("info"); var connection: Connection = null try { // Obtain the HBase connection. connection = ConnectionFactory.createConnection(hbConf); // Obtain the table object. val table = connection.getTable(TableName.valueOf(tableName)); val data = sc.textFile(args(0)).map { line => val value = line.split(",") (value(0), value(1), value(2), value(3)) }.collect() var i = 0 for (line <- data) { val put = new Put(Bytes.toBytes("row" + i)); put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1)) put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2)) put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3)) put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4)) i += 1 table.put(put) } } catch { case e: IOException => e.printStackTrace(); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { // Close the HBase connection. connection.close(); } catch { case e: IOException => e.printStackTrace(); } } sc.stop() }
Example: Reading HBase table data
// Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath. val conf = new SparkConf() val sc = new SparkContext(conf) val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration) // Declare information about the table to be queried. val scan = new Scan() scan.addFamily(Bytes.toBytes("info")) val proto = ProtobufUtil.toScan(scan) val scanToString = Base64.encodeBytes(proto.toByteArray) hbConf.set(TableInputFormat.INPUT_TABLE, "shb1") hbConf.set(TableInputFormat.SCAN, scanToString) // Use the Spark API to obtain table data. val rdd = sc.newAPIHadoopRDD(hbConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) // Traverse every row in the HBase table and print the results. rdd.collect().foreach(x => { val key = x._1.toString val it = x._2.listCells().iterator() while (it.hasNext) { val c = it.next() val family = Bytes.toString(CellUtil.cloneFamily(c)) val qualifier = Bytes.toString(CellUtil.cloneQualifier(c)) val value = Bytes.toString(CellUtil.cloneValue(c)) val tm = c.getTimestamp println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm) } }) sc.stop()
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.