更新时间:2022-07-19 GMT+08:00
分享

Java样例代码

功能简介

在Spark应用中,通过使用HBase接口来实现创建表,读取表,往表中插入数据等操作。

代码样例

下面代码片段仅为演示,具体代码参见SparkOnHbaseJavaExample:

样例:创建HBase表

public class TableCreation {
    public static void main (String[] args) throws IOException {

        SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

        // 创建和hbase的连接通道
        Connection connection = ConnectionFactory.createConnection(hbConf);

        // 声明表的描述信息
        TableName userTable  = TableName.valueOf("shb1");
        HTableDescriptor tableDescr = new HTableDescriptor(userTable);
        tableDescr.addFamily(new HColumnDescriptor("info".getBytes()));

        // 创建表
        System.out.println("Creating table shb1. ");
        Admin admin = connection.getAdmin();
        if (admin.tableExists(userTable)) {
            admin.disableTable(userTable);
            admin.deleteTable(userTable);
        }
        admin.createTable(tableDescr);

        connection.close();
        jsc.stop();
        System.out.println("Done!");
        
    }
}

样例:在HBase表中插入数据

public class TableInputData {
    public static void main (String[] args) throws IOException {

        // 建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
        SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

        // 声明表的信息
        Table table = null;
        String tableName = "shb1";
        byte[] familyName = Bytes.toBytes("info");
        Connection connection = null;

        try {
            // 获取hbase连接
            connection = ConnectionFactory.createConnection(hbConf);
            // 获取table对象
            table = connection.getTable(TableName.valueOf(tableName));
            List<Tuple4<String, String, String, String>> data = jsc.textFile(args[0]).map(
                    new Function<String, Tuple4<String, String, String, String>>() {
                        @Override
                        public Tuple4<String, String, String, String> call(String s) throws Exception {
                            String[] tokens = s.split(",");

                            return new Tuple4<String, String, String, String>(tokens[0], tokens[1], tokens[2], tokens[3]);
                        }
                    }).collect();

            Integer i = 0;
            for(Tuple4<String, String, String, String> line:  data) {
                Put put = new Put(Bytes.toBytes("row" + i));
                put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1()));
                put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2()));
                put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3()));
                put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4()));
                i += 1;
                table.put(put);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                 try {
                     // 关闭hbase连接.
                     connection.close();
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
             }
             jsc.stop();
        }
    }
}

样例:读取HBase表数据

public class TableOutputData {
    public static void main(String[] args) throws IOException {

        System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");

        // 建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
        SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

        // 声明要查的表的信息
        Scan scan = new org.apache.hadoop.hbase.client.Scan();
        scan.addFamily(Bytes.toBytes("info"));
        org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String scanToString = Base64.encodeBytes(proto.toByteArray());
        hbConf.set(TableInputFormat.INPUT_TABLE, "shb1");
        hbConf.set(TableInputFormat.SCAN, scanToString);

        // 通过spark接口获取表中的数据
        JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

        // 遍历hbase表中的每一行,并打印结果
        List<Tuple2<ImmutableBytesWritable, Result>> rddList = rdd.collect();
        for (int i = 0; i < rddList.size(); i++) {
            Tuple2<ImmutableBytesWritable, Result> t2 = rddList.get(i);
            ImmutableBytesWritable key = t2._1();
            Iterator<Cell> it = t2._2().listCells().iterator();
            while (it.hasNext()) {
                Cell c = it.next();
                String family = Bytes.toString(CellUtil.cloneFamily(c));
                String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));
                String value = Bytes.toString(CellUtil.cloneValue(c));
                Long tm = c.getTimestamp();
                System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm);
            }
        }
        jsc.stop();
    }
}

相关文档