Java样例代码
功能介绍
用户可以使用Spark调用HBase接口来操作HBase table1表,然后把table1表的数据经过分析后写到HBase table2表中。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkHbasetoHbase。
样例代码获取方式请参考获取MRS应用开发样例工程。
代码样例:
/**
* 从table1表读取数据,根据key值去table2表获取相应记录,把两者数据后,更新到table2表
*/
public class SparkHbasetoHbase {
public static void main(String[] args) throws Exception {
if (args.length < 1) {
printUsage();
}
SparkConf conf = new SparkConf().setAppName("SparkHbasetoHbase");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");
JavaSparkContext jsc = new JavaSparkContext(conf);
// 建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());
// 声明表的信息
Scan scan = new org.apache.hadoop.hbase.client.Scan();
scan.addFamily(Bytes.toBytes("cf"));//colomn family
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanToString = Base64.encodeBytes(proto.toByteArray());
hbConf.set(TableInputFormat.INPUT_TABLE, "table1");//table name
hbConf.set(TableInputFormat.SCAN, scanToString);
// 通过spark接口获取表中的数据
JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
// 遍历hbase table1表中的每一个partition, 然后更新到Hbase table2表
// 如果数据条数较少,也可以使用rdd.foreach()方法
final String zkQuorum = args[0];
rdd.foreachPartition(
new VoidFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>>() {
public void call(Iterator<Tuple2<ImmutableBytesWritable, Result>> iterator) throws Exception {
hBaseWriter(iterator, zkQuorum);
}
}
);
jsc.stop();
}
/**
* 在executor端更新table2表记录
*
* @param iterator table1表的partition数据
*/
private static void hBaseWriter(Iterator<Tuple2<ImmutableBytesWritable, Result>> iterator, String zkQuorum) throws IOException {
// 准备读取hbase
String tableName = "table2";
String columnFamily = "cf";
String qualifier = "cid";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.property.clientPort", "24002");
conf.set("hbase.zookeeper.quorum", zkQuorum);
Connection connection = null;
Table table = null;
try {
connection = ConnectionFactory.createConnection(conf);
table = connection.getTable(TableName.valueOf(tableName));
List<Get> rowList = new ArrayList<Get>();
List<Tuple2<ImmutableBytesWritable, Result>> table1List = new ArrayList<Tuple2<ImmutableBytesWritable, Result>>();
while (iterator.hasNext()) {
Tuple2<ImmutableBytesWritable, Result> item = iterator.next();
Get get = new Get(item._2().getRow());
table1List.add(item);
rowList.add(get);
}
// 获取table2表记录
Result[] resultDataBuffer = table.get(rowList);
// 修改table2表记录
List<Put> putList = new ArrayList<Put>();
for (int i = 0; i < resultDataBuffer.length; i++) {
Result resultData = resultDataBuffer[i];//hbase2 row
if (!resultData.isEmpty()) {
// 查询hbase1Value
String hbase1Value = "";
Iterator<Cell> it = table1List.get(i)._2().listCells().iterator();
while (it.hasNext()) {
Cell c = it.next();
// 判断cf和qualifile是否相同
if (columnFamily.equals(Bytes.toString(CellUtil.cloneFamily(c)))
&& qualifier.equals(Bytes.toString(CellUtil.cloneQualifier(c)))) {
hbase1Value = Bytes.toString(CellUtil.cloneValue(c));
}
}
String hbase2Value = Bytes.toString(resultData.getValue(columnFamily.getBytes(), qualifier.getBytes()));
Put put = new Put(table1List.get(i)._2().getRow());
// 计算结果
int resultValue = Integer.parseInt(hbase1Value) + Integer.parseInt(hbase2Value);
// 设置结果到put对象
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Bytes.toBytes(String.valueOf(resultValue)));
putList.add(put);
}
}
if (putList.size() > 0) {
table.put(putList);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
// 关闭Hbase连接
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private static void printUsage() {
System.out.println("Usage: {zkQuorum}");
System.exit(1);
}
}