文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Spark2x开发指南(安全模式)/
开发Spark应用/
Spark从HBase读取数据再写入HBase样例程序/
Spark从HBase读取数据再写入HBase样例程序(Java)
更新时间:2024-08-03 GMT+08:00
Spark从HBase读取数据再写入HBase样例程序(Java)
功能介绍
用户可以使用Spark调用HBase接口来操作HBase table1表,然后把table1表的数据经过分析后写到HBase table2表中。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkHbasetoHbase。
/** * 从table1表读取数据,根据key值去table2表获取相应记录,把两者数据后,更新到table2表 */ public class SparkHbasetoHbase { public static void main(String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("SparkHbasetoHbase"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator"); JavaSparkContext jsc = new JavaSparkContext(conf); // 建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中 Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); // 声明表的信息 Scan scan = new org.apache.hadoop.hbase.client.Scan(); scan.addFamily(Bytes.toBytes("cf"));//colomn family org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan); String scanToString = Base64.encodeBytes(proto.toByteArray()); hbConf.set(TableInputFormat.INPUT_TABLE, "table1");//table name hbConf.set(TableInputFormat.SCAN, scanToString); // 通过spark接口获取表中的数据 JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); // 遍历hbase table1表中的每一个partition, 然后更新到Hbase table2表 // 如果数据条数较少,也可以使用rdd.foreach()方法 rdd.foreachPartition( new VoidFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>>() { public void call(Iterator<Tuple2<ImmutableBytesWritable, Result>> iterator) throws Exception { hBaseWriter(iterator); } } ); jsc.stop(); } /** * 在executor端更新table2表记录 * * @param iterator table1表的partition数据 */ private static void hBaseWriter(Iterator<Tuple2<ImmutableBytesWritable, Result>> iterator) throws IOException { // 准备读取hbase String tableName = "table2"; String columnFamily = "cf"; String qualifier = "cid"; Configuration conf = HBaseConfiguration.create(); Connection connection = null; Table table = null; try { connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf(tableName)); List<Get> rowList = new ArrayList<Get>(); List<Tuple2<ImmutableBytesWritable, Result>> table1List = new ArrayList<Tuple2<ImmutableBytesWritable, Result>>(); while (iterator.hasNext()) { Tuple2<ImmutableBytesWritable, Result> item = iterator.next(); Get get = new Get(item._2().getRow()); table1List.add(item); rowList.add(get); } // 获取table2表记录 Result[] resultDataBuffer = table.get(rowList); // 修改table2表记录 List<Put> putList = new ArrayList<Put>(); for (int i = 0; i < resultDataBuffer.length; i++) { Result resultData = resultDataBuffer[i];//hbase2 row if (!resultData.isEmpty()) { // 查询hbase1Value String hbase1Value = ""; Iterator<Cell> it = table1List.get(i)._2().listCells().iterator(); while (it.hasNext()) { Cell c = it.next(); // 判断cf和qualifile是否相同 if (columnFamily.equals(Bytes.toString(CellUtil.cloneFamily(c))) && qualifier.equals(Bytes.toString(CellUtil.cloneQualifier(c)))) { hbase1Value = Bytes.toString(CellUtil.cloneValue(c)); } } String hbase2Value = Bytes.toString(resultData.getValue(columnFamily.getBytes(), qualifier.getBytes())); Put put = new Put(table1List.get(i)._2().getRow()); // 计算结果 int resultValue = Integer.parseInt(hbase1Value) + Integer.parseInt(hbase2Value); // 设置结果到put对象 put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(String.valueOf(resultValue))); putList.add(put); } } if (putList.size() > 0) { table.put(putList); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { // 关闭Hbase连接 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }