更新时间:2022-07-19 GMT+08:00
Java样例代码
功能简介
在Spark应用中,通过使用HBase接口来实现创建表,读取表,往表中插入数据等操作。
代码样例
下面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample:
样例:创建HBase表
public class TableCreation { public static void main (String[] args) throws IOException { SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo"); JavaSparkContext jsc = new JavaSparkContext(conf); Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); // 创建和hbase的连接通道 Connection connection = ConnectionFactory.createConnection(hbConf); // 声明表的描述信息 TableName userTable = TableName.valueOf("shb1"); HTableDescriptor tableDescr = new HTableDescriptor(userTable); tableDescr.addFamily(new HColumnDescriptor("info".getBytes())); // 创建表 System.out.println("Creating table shb1. "); Admin admin = connection.getAdmin(); if (admin.tableExists(userTable)) { admin.disableTable(userTable); admin.deleteTable(userTable); } admin.createTable(tableDescr); connection.close(); jsc.stop(); System.out.println("Done!"); } }
样例:在HBase表中插入数据
public class TableInputData { public static void main (String[] args) throws IOException { // 建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中 SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo"); JavaSparkContext jsc = new JavaSparkContext(conf); Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); // 声明表的信息 Table table = null; String tableName = "shb1"; byte[] familyName = Bytes.toBytes("info"); Connection connection = null; try { // 获取hbase连接 connection = ConnectionFactory.createConnection(hbConf); // 获取table对象 table = connection.getTable(TableName.valueOf(tableName)); List<Tuple4<String, String, String, String>> data = jsc.textFile(args[0]).map( new Function<String, Tuple4<String, String, String, String>>() { @Override public Tuple4<String, String, String, String> call(String s) throws Exception { String[] tokens = s.split(","); return new Tuple4<String, String, String, String>(tokens[0], tokens[1], tokens[2], tokens[3]); } }).collect(); Integer i = 0; for(Tuple4<String, String, String, String> line: data) { Put put = new Put(Bytes.toBytes("row" + i)); put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1())); put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2())); put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3())); put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4())); i += 1; table.put(put); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { // 关闭hbase连接. connection.close(); } catch (IOException e) { e.printStackTrace(); } } jsc.stop(); } } }
样例:读取HBase表数据
public class TableOutputData { public static void main(String[] args) throws IOException { System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator"); // 建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中 SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo"); JavaSparkContext jsc = new JavaSparkContext(conf); Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration()); // 声明要查的表的信息 Scan scan = new org.apache.hadoop.hbase.client.Scan(); scan.addFamily(Bytes.toBytes("info")); org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan); String scanToString = Base64.encodeBytes(proto.toByteArray()); hbConf.set(TableInputFormat.INPUT_TABLE, "shb1"); hbConf.set(TableInputFormat.SCAN, scanToString); // 通过spark接口获取表中的数据 JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); // 遍历hbase表中的每一行,并打印结果 List<Tuple2<ImmutableBytesWritable, Result>> rddList = rdd.collect(); for (int i = 0; i < rddList.size(); i++) { Tuple2<ImmutableBytesWritable, Result> t2 = rddList.get(i); ImmutableBytesWritable key = t2._1(); Iterator<Cell> it = t2._2().listCells().iterator(); while (it.hasNext()) { Cell c = it.next(); String family = Bytes.toString(CellUtil.cloneFamily(c)); String qualifier = Bytes.toString(CellUtil.cloneQualifier(c)); String value = Bytes.toString(CellUtil.cloneValue(c)); Long tm = c.getTimestamp(); System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm); } } jsc.stop(); } }
父主题: Spark on HBase程序