更新时间:2024-08-03 GMT+08:00

Spark从HBase读取数据再写入HBase样例程序(Java)

功能介绍

用户可以使用Spark调用HBase接口来操作HBase table1表,然后把table1表的数据经过分析后写到HBase table2表中。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkHbasetoHbase。

/**
 * 从table1表读取数据,根据key值去table2表获取相应记录,把两者数据后,更新到table2表 
*/
public class SparkHbasetoHbase {

  public static void main(String[] args) throws Exception {
    SparkConf conf = new SparkConf().setAppName("SparkHbasetoHbase");
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    conf.set("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    // 建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
    Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

    // 声明表的信息
    Scan scan = new org.apache.hadoop.hbase.client.Scan();
    scan.addFamily(Bytes.toBytes("cf"));//colomn family
    org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
    String scanToString = Base64.encodeBytes(proto.toByteArray());
    hbConf.set(TableInputFormat.INPUT_TABLE, "table1");//table name
    hbConf.set(TableInputFormat.SCAN, scanToString);

    // 通过spark接口获取表中的数据
    JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

    // 遍历hbase table1表中的每一个partition, 然后更新到Hbase table2表
    // 如果数据条数较少,也可以使用rdd.foreach()方法
    rdd.foreachPartition(
      new VoidFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>>() {
        public void call(Iterator<Tuple2<ImmutableBytesWritable, Result>> iterator) throws Exception {
          hBaseWriter(iterator);
        }
      }
    );

    jsc.stop();
  }

/**
   * 在executor端更新table2表记录
   *
   * @param iterator table1表的partition数据 
   */
  private static void hBaseWriter(Iterator<Tuple2<ImmutableBytesWritable, Result>> iterator) throws IOException {
    // 准备读取hbase
    String tableName = "table2";
    String columnFamily = "cf";
    String qualifier = "cid";
    Configuration conf = HBaseConfiguration.create();
    Connection connection = null;
    Table table = null;
    try {
      connection = ConnectionFactory.createConnection(conf);
      table = connection.getTable(TableName.valueOf(tableName));
      List<Get> rowList = new ArrayList<Get>();
      List<Tuple2<ImmutableBytesWritable, Result>> table1List = new ArrayList<Tuple2<ImmutableBytesWritable, Result>>();
      while (iterator.hasNext()) {
        Tuple2<ImmutableBytesWritable, Result> item = iterator.next();
        Get get = new Get(item._2().getRow());
        table1List.add(item);
        rowList.add(get);
      }
      // 获取table2表记录
      Result[] resultDataBuffer = table.get(rowList);
      // 修改table2表记录
      List<Put> putList = new ArrayList<Put>();
      for (int i = 0; i < resultDataBuffer.length; i++) {
        Result resultData = resultDataBuffer[i];//hbase2 row
        if (!resultData.isEmpty()) {
          // 查询hbase1Value
          String hbase1Value = "";
          Iterator<Cell> it = table1List.get(i)._2().listCells().iterator();
          while (it.hasNext()) {
            Cell c = it.next();
            // 判断cf和qualifile是否相同
            if (columnFamily.equals(Bytes.toString(CellUtil.cloneFamily(c)))
              && qualifier.equals(Bytes.toString(CellUtil.cloneQualifier(c)))) {
              hbase1Value = Bytes.toString(CellUtil.cloneValue(c));
            }
          }
          String hbase2Value = Bytes.toString(resultData.getValue(columnFamily.getBytes(), qualifier.getBytes()));
          Put put = new Put(table1List.get(i)._2().getRow());
          // 计算结果
          int resultValue = Integer.parseInt(hbase1Value) + Integer.parseInt(hbase2Value);
          // 设置结果到put对象
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(String.valueOf(resultValue)));
          putList.add(put);
        }
      }
      if (putList.size() > 0) {
        table.put(putList);
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // 关闭Hbase连接
          connection.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }
  }
}