更新时间:2022-07-19 GMT+08:00
Java样例代码
功能简介
在Spark应用中,通过使用HBase接口来实现创建表,读取表,往表中插入数据等操作。
代码样例
下面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample:
样例:创建HBase表
public class TableCreation {
public static void main (String[] args) throws IOException {
SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
JavaSparkContext jsc = new JavaSparkContext(conf);
Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());
// 创建和hbase的连接通道
Connection connection = ConnectionFactory.createConnection(hbConf);
// 声明表的描述信息
TableName userTable = TableName.valueOf("shb1");
HTableDescriptor tableDescr = new HTableDescriptor(userTable);
tableDescr.addFamily(new HColumnDescriptor("info".getBytes()));
// 创建表
System.out.println("Creating table shb1. ");
Admin admin = connection.getAdmin();
if (admin.tableExists(userTable)) {
admin.disableTable(userTable);
admin.deleteTable(userTable);
}
admin.createTable(tableDescr);
connection.close();
jsc.stop();
System.out.println("Done!");
}
}
样例:在HBase表中插入数据
public class TableInputData {
public static void main (String[] args) throws IOException {
// 建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
JavaSparkContext jsc = new JavaSparkContext(conf);
Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());
// 声明表的信息
Table table = null;
String tableName = "shb1";
byte[] familyName = Bytes.toBytes("info");
Connection connection = null;
try {
// 获取hbase连接
connection = ConnectionFactory.createConnection(hbConf);
// 获取table对象
table = connection.getTable(TableName.valueOf(tableName));
List<Tuple4<String, String, String, String>> data = jsc.textFile(args[0]).map(
new Function<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> call(String s) throws Exception {
String[] tokens = s.split(",");
return new Tuple4<String, String, String, String>(tokens[0], tokens[1], tokens[2], tokens[3]);
}
}).collect();
Integer i = 0;
for(Tuple4<String, String, String, String> line: data) {
Put put = new Put(Bytes.toBytes("row" + i));
put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1()));
put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2()));
put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3()));
put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4()));
i += 1;
table.put(put);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
// 关闭hbase连接.
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
jsc.stop();
}
}
}
样例:读取HBase表数据
public class TableOutputData {
public static void main(String[] args) throws IOException {
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");
// 建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
JavaSparkContext jsc = new JavaSparkContext(conf);
Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());
// 声明要查的表的信息
Scan scan = new org.apache.hadoop.hbase.client.Scan();
scan.addFamily(Bytes.toBytes("info"));
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanToString = Base64.encodeBytes(proto.toByteArray());
hbConf.set(TableInputFormat.INPUT_TABLE, "shb1");
hbConf.set(TableInputFormat.SCAN, scanToString);
// 通过spark接口获取表中的数据
JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
// 遍历hbase表中的每一行,并打印结果
List<Tuple2<ImmutableBytesWritable, Result>> rddList = rdd.collect();
for (int i = 0; i < rddList.size(); i++) {
Tuple2<ImmutableBytesWritable, Result> t2 = rddList.get(i);
ImmutableBytesWritable key = t2._1();
Iterator<Cell> it = t2._2().listCells().iterator();
while (it.hasNext()) {
Cell c = it.next();
String family = Bytes.toString(CellUtil.cloneFamily(c));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));
String value = Bytes.toString(CellUtil.cloneValue(c));
Long tm = c.getTimestamp();
System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm);
}
}
jsc.stop();
}
}
父主题: Spark on HBase程序