更新时间:2024-12-10 GMT+08:00

Java样例代码

功能介绍

在Spark应用中,通过使用Spark调用Hive接口来操作hive表,然后把Hive表的数据经过分析后写到HBase表。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkHivetoHbase。

样例代码获取方式请参考获取MRS应用开发样例工程

代码样例:

/**
 * 从hive表读取数据,根据key值去hbase表获取相应记录,把两者数据做操作后,更新到hbase表
 */
public class SparkHivetoHbase {

  public static void main(String[] args) throws Exception {
    if (args.length < 1) {
      printUsage();
    }

    // 通过spark接口获取表中的数据
    SparkConf conf = new SparkConf().setAppName("SparkHivetoHbase");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(jsc);
    DataFrame dataFrame = sqlContext.sql("select name, account from person");

    // 遍历hive表中的每一个partition, 然后更新到hbase表
    // 如果数据条数较少,也可以使用foreach()方法
    final String zkQuorum = args[0];
    dataFrame.toJavaRDD().foreachPartition(
      new VoidFunction<Iterator<Row>>() {
        public void call(Iterator<Row> iterator) throws Exception {
          hBaseWriter(iterator,zkQuorum);
        }
      }
    );

    jsc.stop();
  }

 /**
   * 在executor端更新hbase表记录
   *
   * @param iterator hive表的partition数据
   */
  private static void hBaseWriter(Iterator<Row> iterator, String zkQuorum) throws IOException {
    // 读取hbase
    String tableName = "table2";
    String columnFamily = "cf";
    Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.property.clientPort", "24002");
    conf.set("hbase.zookeeper.quorum", zkQuorum);
    Connection connection = null;
    Table table = null;
    try {
      connection = ConnectionFactory.createConnection(conf);
      table = connection.getTable(TableName.valueOf(tableName));
      List<Row> table1List = new ArrayList<Row>();
      List<Get> rowList = new ArrayList<Get>();
      while (iterator.hasNext()) {
        Row item = iterator.next();
        Get get = new Get(item.getString(0).getBytes());
        table1List.add(item);
        rowList.add(get);
      }
      // 获取hbase表记录
      Result[] resultDataBuffer = table.get(rowList);
      // 修改hbase表记录
      List<Put> putList = new ArrayList<Put>();
      for (int i = 0; i < resultDataBuffer.length; i++) {
        // hive表值
        Result resultData = resultDataBuffer[i];
        if (!resultData.isEmpty()) {
          // get hiveValue
          int hiveValue = table1List.get(i).getInt(1);
          // 根据列簇和列,获取hbase值
          String hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));
          Put put = new Put(table1List.get(i).getString(0).getBytes());
          // 计算结果
          int resultValue = hiveValue + Integer.valueOf(hbaseValue);
          // 设置结果到put对象
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue)));
          putList.add(put);
        }
      }
      if (putList.size() > 0) {
        table.put(putList);
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // 关闭Hbase连接.
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
  

  private static void printUsage() {
    System.out.println("Usage: {zkQuorum}");
    System.exit(1);
  }
}