更新时间:2022-07-19 GMT+08:00
Scala样例代码
功能简介
在Spark应用中,通过使用HBase接口来实现创建表,读取表,往表中插入数据等操作。
代码样例
下面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample:
样例:创建HBase表
//建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中 val conf: SparkConf = new SparkConf val sc: SparkContext = new SparkContext(conf) val hbConf: Configuration = HBaseConfiguration.create(sc.hadoopConfiguration) //创建和hbase的连接通道 val connection: Connection = ConnectionFactory.createConnection(hbConf) //声明表的描述信息 val userTable = TableName.valueOf("shb1") val tableDescr = new HTableDescriptor(userTable) tableDescr.addFamily(new HColumnDescriptor("info".getBytes)) //创建表 println("Creating table shb1. ") val admin = connection.getAdmin if (admin.tableExists(userTable)) { admin.disableTable(userTable) admin.deleteTable(userTable) } admin.createTable(tableDescr) connection.close() sc.stop() println("Done!")
样例:在HBase表中插入数据
//建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中 val conf = new SparkConf() val sc = new SparkContext(conf) val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration) //声明表的信息 val table: HTable = null val tableName = "shb1" val familyName = Bytes.toBytes("info"); var connection: Connection = null try { //获取hbase连接 connection = ConnectionFactory.createConnection(hbConf); //获取table对象 val table = connection.getTable(TableName.valueOf(tableName)); val data = sc.textFile(args(0)).map { line => val value = line.split(",") (value(0), value(1), value(2), value(3)) }.collect() var i = 0 for (line <- data) { val put = new Put(Bytes.toBytes("row" + i)); put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1)) put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2)) put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3)) put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4)) i += 1 table.put(put) } } catch { case e: IOException => e.printStackTrace(); } finally { if (table != null) { try { // 关闭HTable对象 table.close(); } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { //关闭hbase连接. connection.close(); } catch { case e: IOException => e.printStackTrace(); } } sc.stop() }
样例:读取HBase表数据
//建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中 val conf = new SparkConf() val sc = new SparkContext(conf) val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration) //声明要查的表的信息 val scan = new Scan() scan.addFamily(Bytes.toBytes("info")) val proto = ProtobufUtil.toScan(scan) val scanToString = Base64.encodeBytes(proto.toByteArray) hbConf.set(TableInputFormat.INPUT_TABLE, "shb1") hbConf.set(TableInputFormat.SCAN, scanToString) //通过spark接口获取表中的数据 val rdd = sc.newAPIHadoopRDD(hbConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) //遍历hbase表中的每一行,并打印结果 rdd.collect().foreach(x => { val key = x._1.toString val it = x._2.listCells().iterator() while (it.hasNext) { val c = it.next() val family = Bytes.toString(CellUtil.cloneFamily(c)) val qualifier = Bytes.toString(CellUtil.cloneQualifier(c)) val value = Bytes.toString(CellUtil.cloneValue(c)) val tm = c.getTimestamp println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm) } }) sc.stop()
父主题: Spark on HBase程序