Deze pagina is nog niet beschikbaar in uw eigen taal. We werken er hard aan om meer taalversies toe te voegen. Bedankt voor uw steun.

On this page

Java Sample Code

Updated on 2022-09-14 GMT+08:00

Function Description

In the Spark applications, users can use HBase APIs to create a table, read the table, and insert data into the table.

Sample Code

The following code snippets are used as an example. For complete codes, see SparkOnHbaseJavaExample.

Example: Creating an HBase table

public class TableCreation {
    public static void main (String[] args) throws IOException {

        SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

        // Create a connection channel to connect to HBase.
        Connection connection = ConnectionFactory.createConnection(hbConf);

        // Declare table description.
        TableName userTable  = TableName.valueOf("shb1");
        HTableDescriptor tableDescr = new HTableDescriptor(userTable);
        tableDescr.addFamily(new HColumnDescriptor("info".getBytes()));

        //Create a table.
        System.out.println("Creating table shb1. ");
        Admin admin = connection.getAdmin();
        if (admin.tableExists(userTable)) {
            admin.disableTable(userTable);
            admin.deleteTable(userTable);
        }
        admin.createTable(tableDescr);

        connection.close();
        jsc.stop();
        System.out.println("Done!");
        
    }
}

Example: Inserting data into the HBase table

public class TableInputData {
    public static void main (String[] args) throws IOException {

        // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath.
        SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

        // Declare table information.
        Table table = null;
        String tableName = "shb1";
        byte[] familyName = Bytes.toBytes("info");
        Connection connection = null;

        try {
            // Obtain the HBase connection.
            connection = ConnectionFactory.createConnection(hbConf);
            // Obtain the table object.
            table = connection.getTable(TableName.valueOf(tableName));
            List<Tuple4<String, String, String, String>> data = jsc.textFile(args[0]).map(
                    new Function<String, Tuple4<String, String, String, String>>() {
                        @Override
                        public Tuple4<String, String, String, String> call(String s) throws Exception {
                            String[] tokens = s.split(",");

                            return new Tuple4<String, String, String, String>(tokens[0], tokens[1], tokens[2], tokens[3]);
                        }
                    }).collect();

            Integer i = 0;
            for(Tuple4<String, String, String, String> line:  data) {
                Put put = new Put(Bytes.toBytes("row" + i));
                put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1()));
                put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2()));
                put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3()));
                put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4()));
                i += 1;
                table.put(put);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                 try {
                     // Close the HBase connection.
                     connection.close();
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
             }
             jsc.stop();
        }
    }
}

Example: Reading HBase table data

public class TableOutputData {
    public static void main(String[] args) throws IOException {

        System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");

        // Create a configuration parameter to connect to HBase and ensure that hbase-site.xml is in classpath.
        SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

        //Declare information about the table to be queried.
        Scan scan = new org.apache.hadoop.hbase.client.Scan();
        scan.addFamily(Bytes.toBytes("info"));
        org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String scanToString = Base64.encodeBytes(proto.toByteArray());
        hbConf.set(TableInputFormat.INPUT_TABLE, "shb1");
        hbConf.set(TableInputFormat.SCAN, scanToString);

        // Use the Spark API to obtain table data.
        JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

        // Traverse every row in the HBase table and print the results.
        List<Tuple2<ImmutableBytesWritable, Result>> rddList = rdd.collect();
        for (int i = 0; i < rddList.size(); i++) {
            Tuple2<ImmutableBytesWritable, Result> t2 = rddList.get(i);
            ImmutableBytesWritable key = t2._1();
            Iterator<Cell> it = t2._2().listCells().iterator();
            while (it.hasNext()) {
                Cell c = it.next();
                String family = Bytes.toString(CellUtil.cloneFamily(c));
                String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));
                String value = Bytes.toString(CellUtil.cloneValue(c));
                Long tm = c.getTimestamp();
                System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm);
            }
        }
        jsc.stop();
    }
}
Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback