Java Example Code
Function
Call HBase API using Spark to operate HBase table1, analyze the data, and then write the analyzed data to HBase table2.
Example Code
For details about the code, see the class com.huawei.bigdata.spark.examples.SparkHbasetoHbase.
Example code:
/** * calculate data from hbase1/hbase2,then update to hbase2 */ public class SparkHbasetoHbase { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("SparkHbasetoHbase"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator"); JavaSparkContext jsc = new JavaSparkContext(conf); // Create the configuration parameter to connect the HBase. The hbase-site.xml must be included in the classpath. Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); // Declare the information of the table to be queried. Scan scan = new org.apache.hadoop.hbase.client.Scan(); scan.addFamily(Bytes.toBytes("cf"));//colomn family org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan); String scanToString = Base64.encodeBytes(proto.toByteArray()); hbConf.set(TableInputFormat.INPUT_TABLE, "table1");//table name hbConf.set(TableInputFormat.SCAN, scanToString); // Obtain the data in the table through the Spark interface. JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); // Traverse every Partition in the HBase table1 and update the HBase table2 //If less data, you can use rdd.foreach() rdd.foreachPartition( new VoidFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>>() { public void call(Iterator<Tuple2<ImmutableBytesWritable, Result>> iterator) throws Exception { hBaseWriter(iterator); } } ); jsc.stop(); } /** * write to table2 in exetutor * * @param iterator partition data from table1 */ private static void hBaseWriter(Iterator<Tuple2<ImmutableBytesWritable, Result>> iterator) throws IOException { //read hbase String tableName = "table2"; String columnFamily = "cf"; String qualifier = "cid"; Configuration conf = HBaseConfiguration.create(); Connection connection = null; Table table = null; try { connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf(tableName)); List<Get> rowList = new ArrayList<Get>(); List<Tuple2<ImmutableBytesWritable, Result>> table1List = new ArrayList<Tuple2<ImmutableBytesWritable, Result>>(); while (iterator.hasNext()) { Tuple2<ImmutableBytesWritable, Result> item = iterator.next(); Get get = new Get(item._2().getRow()); table1List.add(item); rowList.add(get); } //get data from hbase table2 Result[] resultDataBuffer = table.get(rowList); //set data for hbase List<Put> putList = new ArrayList<Put>(); for (int i = 0; i < resultDataBuffer.length; i++) { Result resultData = resultDataBuffer[i];//hbase2 row if (!resultData.isEmpty()) { //query hbase1Value String hbase1Value = ""; Iterator<Cell> it = table1List.get(i)._2().listCells().iterator(); while (it.hasNext()) { Cell c = it.next(); // query table1 value by colomn family and colomn qualifier if (columnFamily.equals(Bytes.toString(CellUtil.cloneFamily(c))) && qualifier.equals(Bytes.toString(CellUtil.cloneQualifier(c)))) { hbase1Value = Bytes.toString(CellUtil.cloneValue(c)); } } String hbase2Value = Bytes.toString(resultData.getValue(columnFamily.getBytes(), qualifier.getBytes())); Put put = new Put(table1List.get(i)._2().getRow()); //calculate result value int resultValue = Integer.parseInt(hbase1Value) + Integer.parseInt(hbase2Value); //set data to put put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(String.valueOf(resultValue))); putList.add(put); } } if (putList.size() > 0) { table.put(putList); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { // Close the HBase connection connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.