Java Sample Code
Function Description
In the Spark applications, users can use HBase APIs to create a table, read the table, and insert data into the table.
Sample Code
The following code snippets are used as an example. For complete codes, see SparkOnHbaseJavaExample.
Example: Creating an HBase table
public class TableCreation { public static void main (String[] args) throws IOException { SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo"); JavaSparkContext jsc = new JavaSparkContext(conf); Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); // Create a connection channel to connect to HBase. Connection connection = ConnectionFactory.createConnection(hbConf); // Declare table description. TableName userTable = TableName.valueOf("shb1"); HTableDescriptor tableDescr = new HTableDescriptor(userTable); tableDescr.addFamily(new HColumnDescriptor("info".getBytes())); //Create a table. System.out.println("Creating table shb1. "); Admin admin = connection.getAdmin(); if (admin.tableExists(userTable)) { admin.disableTable(userTable); admin.deleteTable(userTable); } admin.createTable(tableDescr); connection.close(); jsc.stop(); System.out.println("Done!"); } }
Example: Inserting data into the HBase table
public class TableInputData { public static void main (String[] args) throws IOException { // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath. SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo"); JavaSparkContext jsc = new JavaSparkContext(conf); Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); // Declare table information. Table table = null; String tableName = "shb1"; byte[] familyName = Bytes.toBytes("info"); Connection connection = null; try { // Obtain the HBase connection. connection = ConnectionFactory.createConnection(hbConf); // Obtain the table object. table = connection.getTable(TableName.valueOf(tableName)); List<Tuple4<String, String, String, String>> data = jsc.textFile(args[0]).map( new Function<String, Tuple4<String, String, String, String>>() { @Override public Tuple4<String, String, String, String> call(String s) throws Exception { String[] tokens = s.split(","); return new Tuple4<String, String, String, String>(tokens[0], tokens[1], tokens[2], tokens[3]); } }).collect(); Integer i = 0; for(Tuple4<String, String, String, String> line: data) { Put put = new Put(Bytes.toBytes("row" + i)); put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1())); put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2())); put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3())); put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4())); i += 1; table.put(put); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { // Close the HBase connection. connection.close(); } catch (IOException e) { e.printStackTrace(); } } jsc.stop(); } } }
Example: Reading HBase table data
public class TableOutputData { public static void main(String[] args) throws IOException { System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator"); // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath. SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo"); JavaSparkContext jsc = new JavaSparkContext(conf); Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); //Declare information about the table to be queried. Scan scan = new org.apache.hadoop.hbase.client.Scan(); scan.addFamily(Bytes.toBytes("info")); org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan); String scanToString = Base64.encodeBytes(proto.toByteArray()); hbConf.set(TableInputFormat.INPUT_TABLE, "shb1"); hbConf.set(TableInputFormat.SCAN, scanToString); // Use the Spark API to obtain table data. JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); // Traverse every row in the HBase table and print the results. List<Tuple2<ImmutableBytesWritable, Result>> rddList = rdd.collect(); for (int i = 0; i < rddList.size(); i++) { Tuple2<ImmutableBytesWritable, Result> t2 = rddList.get(i); ImmutableBytesWritable key = t2._1(); Iterator<Cell> it = t2._2().listCells().iterator(); while (it.hasNext()) { Cell c = it.next(); String family = Bytes.toString(CellUtil.cloneFamily(c)); String qualifier = Bytes.toString(CellUtil.cloneQualifier(c)); String value = Bytes.toString(CellUtil.cloneValue(c)); Long tm = c.getTimestamp(); System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm); } } jsc.stop(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot