Implementing Bidirectional Data Exchange with HBase (Java)
Function
You can use Spark to call an HBase API to operate HBase table1 and write the data analysis result of table1 to HBase table2.
Sample Code
The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SparkHbasetoHbase.
/** * Read data from table1, and obtain the corresponding record from table2 based on the key value. Sum the obtained two data records and update the sum result to table2. */ public class SparkHbasetoHbase { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("SparkHbasetoHbase"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator"); JavaSparkContext jsc = new JavaSparkContext(conf); // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath. Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); // Declare table information. Scan scan = new org.apache.hadoop.hbase.client.Scan(); scan.addFamily(Bytes.toBytes("cf"));//colomn family org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan); String scanToString = Base64.encodeBytes(proto.toByteArray()); hbConf.set(TableInputFormat.INPUT_TABLE, "table1");//table name hbConf.set(TableInputFormat.SCAN, scanToString); // Use the Spark API to obtain table data. JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); // Traverse every partition in HBase table1 and update data to HBase table2. // If the number of data records is small, you can use the rdd.foreach() method. rdd.foreachPartition( new VoidFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>>() { public void call(Iterator<Tuple2<ImmutableBytesWritable, Result>> iterator) throws Exception { hBaseWriter(iterator); } } ); jsc.stop(); } /** * Update records in table2 on the executor. * * @param iterator partition data in table1. */ private static void hBaseWriter(Iterator<Tuple2<ImmutableBytesWritable, Result>> iterator) throws IOException { // Prepare for reading HBase data. String tableName = "table2"; String columnFamily = "cf"; String qualifier = "cid"; Configuration conf = HBaseConfiguration.create(); Connection connection = null; Table table = null; try { connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf(tableName)); List<Get> rowList = new ArrayList<Get>(); List<Tuple2<ImmutableBytesWritable, Result>> table1List = new ArrayList<Tuple2<ImmutableBytesWritable, Result>>(); while (iterator.hasNext()) { Tuple2<ImmutableBytesWritable, Result> item = iterator.next(); Get get = new Get(item._2().getRow()); table1List.add(item); rowList.add(get); } // Obtain the records in table2. Result[] resultDataBuffer = table.get(rowList); // Modify records in table2. List<Put> putList = new ArrayList<Put>(); for (int i = 0; i < resultDataBuffer.length; i++) { Result resultData = resultDataBuffer[i];//hbase2 row if (!resultData.isEmpty()) { // Query hbase1Value. String hbase1Value = ""; Iterator<Cell> it = table1List.get(i)._2().listCells().iterator(); while (it.hasNext()) { Cell c = it.next(); // Check whether the values of cf and qualifile are the same. if (columnFamily.equals(Bytes.toString(CellUtil.cloneFamily(c))) && qualifier.equals(Bytes.toString(CellUtil.cloneQualifier(c)))) { hbase1Value = Bytes.toString(CellUtil.cloneValue(c)); } } String hbase2Value = Bytes.toString(resultData.getValue(columnFamily.getBytes(), qualifier.getBytes())); Put put = new Put(table1List.get(i)._2().getRow()); // Calculate the result. int resultValue = Integer.parseInt(hbase1Value) + Integer.parseInt(hbase2Value); // Set the result to the Put object. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(String.valueOf(resultValue))); putList.add(put); } } if (putList.size() > 0) { table.put(putList); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { // Close the HBase connection. connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot