Scala Sample Code
Function Description
In the Spark applications, users can use HBase APIs to create a table, read the table, and insert data into the table.
Sample Code
The following code snippets are used as an example. For complete codes, see SparkOnHbaseScalaExample.
Example: Creating an HBase table
// Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath. val conf: SparkConf = new SparkConf val sc: SparkContext = new SparkContext(conf) val hbConf: Configuration = HBaseConfiguration.create(sc.hadoopConfiguration) // Create a connection channel to connect to HBase. val connection: Connection = ConnectionFactory.createConnection(hbConf) // Declare table description. val userTable = TableName.valueOf("shb1") val tableDescr = new HTableDescriptor(userTable) tableDescr.addFamily(new HColumnDescriptor("info".getBytes)) // Create a table. println("Creating table shb1. ") val admin = connection.getAdmin if (admin.tableExists(userTable)) { admin.disableTable(userTable) admin.deleteTable(userTable) } admin.createTable(tableDescr) connection.close() sc.stop() println("Done!")
Example: Inserting data into the HBase table
// Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath. val conf = new SparkConf() val sc = new SparkContext(conf) val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration) // Declare table information. val table: HTable = null val tableName = "shb1" val familyName = Bytes.toBytes("info"); var connection: Connection = null try { // Obtain the HBase connection. connection = ConnectionFactory.createConnection(hbConf); // Obtain the table object. val table = connection.getTable(TableName.valueOf(tableName)); val data = sc.textFile(args(0)).map { line => val value = line.split(",") (value(0), value(1), value(2), value(3)) }.collect() var i = 0 for (line <- data) { val put = new Put(Bytes.toBytes("row" + i)); put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1)) put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2)) put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3)) put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4)) i += 1 table.put(put) } } catch { case e: IOException => e.printStackTrace(); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { // Close the HBase connection. connection.close(); } catch { case e: IOException => e.printStackTrace(); } } sc.stop() }
Example: Reading HBase table data
// Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath. val conf = new SparkConf() val sc = new SparkContext(conf) val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration) // Declare information about the table to be queried. val scan = new Scan() scan.addFamily(Bytes.toBytes("info")) val proto = ProtobufUtil.toScan(scan) val scanToString = Base64.encodeBytes(proto.toByteArray) hbConf.set(TableInputFormat.INPUT_TABLE, "shb1") hbConf.set(TableInputFormat.SCAN, scanToString) // Use the Spark API to obtain table data. val rdd = sc.newAPIHadoopRDD(hbConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) // Traverse every row in the HBase table and print the results. rdd.collect().foreach(x => { val key = x._1.toString val it = x._2.listCells().iterator() while (it.hasNext) { val c = it.next() val family = Bytes.toString(CellUtil.cloneFamily(c)) val qualifier = Bytes.toString(CellUtil.cloneQualifier(c)) val value = Bytes.toString(CellUtil.cloneValue(c)) val tm = c.getTimestamp println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm) } }) sc.stop()
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot