Updated on 2024-08-10 GMT+08:00

Implementing Data Transition Between Hive and HBase (Java)

Function

In a Spark application, you can use Spark to call a Hive API to operate a Hive table, and write the data analysis result of the Hive table to an HBase table.

Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SparkHivetoHbase.

/**
 * Read data from the Hive table, and obtain the corresponding record from the HBase table based on the key value. Sum the obtained two data records and update the sum result to the HBase table.
 */
public class SparkHivetoHbase {

  public static void main(String[] args) throws Exception {

    String userPrincipal = "sparkuser";
    String userKeytabPath = "/opt/FIclient/user.keytab";
    String krb5ConfPath = "/opt/FIclient/KrbClient/kerberos/var/krb5kdc/krb5.conf";
    Configuration hadoopConf = new Configuration();
    LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf);
    // Use the Spark API to obtain table data.
    SparkConf conf = new SparkConf().setAppName("SparkHivetoHbase");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(jsc);
    SparkSession spark = SparkSession
      .builder()
      .appName("SparkHivetoHbase")
      .config("spark.sql.warehouse.dir", "spark-warehouse)
      .enableHiveSupport()
      .getOrCreate();

    Dataset<Row>dataFrame = sqlContext.sql("select name, account from person");

    // Traverse every partition in the Hive table and update data to the HBase table.
    // If the number of data records is small, you can use the foreach() method.
    dataFrame.toJavaRDD().foreachPartition(
      new VoidFunction<Iterator<Row>>() {
        public void call(Iterator<Row> iterator) throws Exception {
          hBaseWriter(iterator);
        }
      }
    );

    spark.stop();
  }

 /**
   * Update records in the HBase table on the executor.
   *
   * @param iterator Partition data in the Hive table.
   */
  private static void hBaseWriter(Iterator<Row> iterator) throws IOException {
    // Read the HBase table.
    String tableName = "table2";
    String columnFamily = "cf";
    Configuration conf = HBaseConfiguration.create();
    Connection connection = null;
    Table table = null;
    try {
      connection = ConnectionFactory.createConnection(conf);
      table = connection.getTable(TableName.valueOf(tableName));
      List<Row> table1List = new ArrayList<Row>();
      List<Get> rowList = new ArrayList<Get>();
      while (iterator.hasNext()) {
        Row item = iterator.next();
        Get get = new Get(item.getString(0).getBytes());
        table1List.add(item);
        rowList.add(get);
      }
      // Obtain the records in the HBase table.
      Result[] resultDataBuffer = table.get(rowList);
      // Modify records in the HBase table.
      List<Put> putList = new ArrayList<Put>();
      for (int i = 0; i < resultDataBuffer.length; i++) {
        // Hive table value.
        Result resultData = resultDataBuffer[i];
        if (!resultData.isEmpty()) {
          // get hiveValue
          int hiveValue = table1List.get(i).getInt(1);
          // Obtain the HBase table value based on the column family and column.
          String hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));
          Put put = new Put(table1List.get(i).getString(0).getBytes());
          // Calculate the result.
          int resultValue = hiveValue + Integer.valueOf(hbaseValue);
          // Set the result to the Put object.
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue)));
          putList.add(put);
        }
      }
      if (putList.size() > 0) {
        table.put(putList);
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // Close the HBase connection.
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
}