Updated on 2022-07-11 GMT+08:00

Java Example Code

Function

In Spark application, call Hive API using Spark to operate Hive table, analyze the data, and then write the analyzed data to the HBase table.

Example Code

For details about code, see com.huawei.bigdata.spark.examples.SparkHbasetoHbase.

Example code

/**
 * calculate data from hive/hbase,then update to hbase
 */
public class SparkHivetoHbase {

  public static void main(String[] args) throws Exception {

    // Obtain the data in the table through the Spark interface.
    SparkConf conf = new SparkConf().setAppName("SparkHivetoHbase");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(jsc);

    SparkSession spark = SparkSession
      .builder()
      .appName("SparkHivetoHbase")
      .config("spark.sql.warehouse.dir", "spark-warehouse)
      .enableHiveSupport()
      .getOrCreate();

    Dataset<Row>dataFrame = spark.sql("select name, account from person");

    // Traverse every Partition in the hive table and update the hbase table
    // If less data, you can use rdd.foreach()

    dataFrame.toJavaRDD().foreachPartition(
      new VoidFunction<Iterator<Row>>() {
        public void call(Iterator<Row> iterator) throws Exception {
          hBaseWriter(iterator);
        }
      }
    );

    spark.stop();
  }

  /**
   * write to hbase table in exetutor
   *
   * @param iterator partition data from hive table
   */
  private static void hBaseWriter(Iterator<Row> iterator) throws IOException {
    // read hbase
    String tableName = "table2";
    String columnFamily = "cf";
    Configuration conf = HBaseConfiguration.create();

    Connection connection = null;
    Table table = null;
    try {
      connection = ConnectionFactory.createConnection(conf);
      table = connection.getTable(TableName.valueOf(tableName));

      List<Row> table1List = new ArrayList<Row>();
      List<Get> rowList = new ArrayList<Get>();
      while (iterator.hasNext()) {
        Row item = iterator.next();
        Get get = new Get(item.getString(0).getBytes());
        table1List.add(item);
        rowList.add(get);
      }

      // get data from hbase table
      Result[] resultDataBuffer = table.get(rowList);

      // set data for hbase
      List<Put> putList = new ArrayList<Put>();
      for (int i = 0; i < resultDataBuffer.length; i++) {
        // hbase row
        Result resultData = resultDataBuffer[i];
        if (!resultData.isEmpty()) {
          // get hiveValue
          int hiveValue = table1List.get(i).getInt(1);

          // get hbaseValue by column Family and colomn qualifier
          String hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));
          Put put = new Put(table1List.get(i).getString(0).getBytes());

          // calculate result value
          int resultValue = hiveValue + Integer.valueOf(hbaseValue);

          // set data to put
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue)));
          putList.add(put);
        }
      }

      if (putList.size() > 0) {
        table.put(putList);
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // Close the HBase connection.
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
}