Java Sample Code
Function Description
In the Spark applications, users can use HBase APIs to create a table, read the table, and insert data into the table.
Sample Code
The following code snippets are used as an example. For complete codes, see SparkOnHbaseJavaExample.
Example: Creating an HBase table
public class TableCreation { public static void main (String[] args) throws IOException { SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo"); JavaSparkContext jsc = new JavaSparkContext(conf); Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); // Create a connection channel to connect to HBase. Connection connection = ConnectionFactory.createConnection(hbConf); // Declare table description. TableName userTable = TableName.valueOf("shb1"); HTableDescriptor tableDescr = new HTableDescriptor(userTable); tableDescr.addFamily(new HColumnDescriptor("info".getBytes())); //Create a table. System.out.println("Creating table shb1. "); Admin admin = connection.getAdmin(); if (admin.tableExists(userTable)) { admin.disableTable(userTable); admin.deleteTable(userTable); } admin.createTable(tableDescr); connection.close(); jsc.stop(); System.out.println("Done!"); } }
Example: Inserting data into the HBase table
public class TableInputData { public static void main (String[] args) throws IOException { // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath. SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo"); JavaSparkContext jsc = new JavaSparkContext(conf); Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); // Declare table information. Table table = null; String tableName = "shb1"; byte[] familyName = Bytes.toBytes("info"); Connection connection = null; try { // Obtain the HBase connection. connection = ConnectionFactory.createConnection(hbConf); // Obtain the table object. table = connection.getTable(TableName.valueOf(tableName)); List<Tuple4<String, String, String, String>> data = jsc.textFile(args[0]).map( new Function<String, Tuple4<String, String, String, String>>() { @Override public Tuple4<String, String, String, String> call(String s) throws Exception { String[] tokens = s.split(","); return new Tuple4<String, String, String, String>(tokens[0], tokens[1], tokens[2], tokens[3]); } }).collect(); Integer i = 0; for(Tuple4<String, String, String, String> line: data) { Put put = new Put(Bytes.toBytes("row" + i)); put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1())); put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2())); put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3())); put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4())); i += 1; table.put(put); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { // Close the HBase connection. connection.close(); } catch (IOException e) { e.printStackTrace(); } } jsc.stop(); } } }
Example: Reading HBase table data
public class TableOutputData { public static void main(String[] args) throws IOException { System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator"); // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath. SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo"); JavaSparkContext jsc = new JavaSparkContext(conf); Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); //Declare information about the table to be queried. Scan scan = new org.apache.hadoop.hbase.client.Scan(); scan.addFamily(Bytes.toBytes("info")); org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan); String scanToString = Base64.encodeBytes(proto.toByteArray()); hbConf.set(TableInputFormat.INPUT_TABLE, "shb1"); hbConf.set(TableInputFormat.SCAN, scanToString); // Use the Spark API to obtain table data. JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); // Traverse every row in the HBase table and print the results. List<Tuple2<ImmutableBytesWritable, Result>> rddList = rdd.collect(); for (int i = 0; i < rddList.size(); i++) { Tuple2<ImmutableBytesWritable, Result> t2 = rddList.get(i); ImmutableBytesWritable key = t2._1(); Iterator<Cell> it = t2._2().listCells().iterator(); while (it.hasNext()) { Cell c = it.next(); String family = Bytes.toString(CellUtil.cloneFamily(c)); String qualifier = Bytes.toString(CellUtil.cloneQualifier(c)); String value = Bytes.toString(CellUtil.cloneValue(c)); Long tm = c.getTimestamp(); System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm); } } jsc.stop(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.