Java Sample Code
Function Description
In a Spark application, users can use Spark to call a Hive API to operate a Hive table, and write the data analysis result of the Hive table to an HBase table.
Sample Code
The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SparkHivetoHbase.
/** * Read data from the Hive table, and obtain the corresponding record from the HBase table based on the key value. Sum the obtained two data records and update the sum result to the HBase table. */ public class SparkHivetoHbase { public static void main(String[] args) throws Exception { if (args.length < 1) { printUsage(); } // Use the Spark API to obtain table data. SparkConf conf = new SparkConf().setAppName("SparkHivetoHbase"); JavaSparkContext jsc = new JavaSparkContext(conf); HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(jsc); DataFrame dataFrame = sqlContext.sql("select name, account from person"); // Traverse every partition in the Hive table and update data to the HBase table. // If the number of data records is small, you can use the foreach() method. final String zkQuorum = args[0]; dataFrame.toJavaRDD().foreachPartition( new VoidFunction<Iterator<Row>>() { public void call(Iterator<Row> iterator) throws Exception { hBaseWriter(iterator,zkQuorum); } } ); jsc.stop(); } /** * Update records in the HBase table on the executor. * * @param iterator Partition data in the Hive table. */ private static void hBaseWriter(Iterator<Row> iterator, String zkQuorum) throws IOException { // Read the HBase table. String tableName = "table2"; String columnFamily = "cf"; Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.property.clientPort", "24002"); conf.set("hbase.zookeeper.quorum", zkQuorum); Connection connection = null; Table table = null; try { connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf(tableName)); List<Row> table1List = new ArrayList<Row>(); List<Get> rowList = new ArrayList<Get>(); while (iterator.hasNext()) { Row item = iterator.next(); Get get = new Get(item.getString(0).getBytes()); table1List.add(item); rowList.add(get); } // Obtain the records in the HBase table. Result[] resultDataBuffer = table.get(rowList); // Modify records in the HBase table. List<Put> putList = new ArrayList<Put>(); for (int i = 0; i < resultDataBuffer.length; i++) { // Hive table value Result resultData = resultDataBuffer[i]; if (!resultData.isEmpty()) { // get hiveValue int hiveValue = table1List.get(i).getInt(1); // Obtain the HBase table value based on the column family and column. String hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes())); Put put = new Put(table1List.get(i).getString(0).getBytes()); // Calculate the result. int resultValue = hiveValue + Integer.valueOf(hbaseValue); // Set the result to the Put object. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue))); putList.add(put); } } if (putList.size() > 0) { table.put(putList); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { // Close the HBase connection. connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } private static void printUsage() { System.out.println("Usage: {zkQuorum}"); System.exit(1); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot