更新时间:2022-07-19 GMT+08:00
Scala样例代码
功能简介
在Spark应用中,通过使用HBase接口来实现创建表,读取表,往表中插入数据等操作。
代码样例
下面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample:
样例:创建HBase表
//建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
val conf: SparkConf = new SparkConf
val sc: SparkContext = new SparkContext(conf)
val hbConf: Configuration = HBaseConfiguration.create(sc.hadoopConfiguration)
//创建和hbase的连接通道
val connection: Connection = ConnectionFactory.createConnection(hbConf)
//声明表的描述信息
val userTable = TableName.valueOf("shb1")
val tableDescr = new HTableDescriptor(userTable)
tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
//创建表
println("Creating table shb1. ")
val admin = connection.getAdmin
if (admin.tableExists(userTable)) {
admin.disableTable(userTable)
admin.deleteTable(userTable)
}
admin.createTable(tableDescr)
connection.close()
sc.stop()
println("Done!")
样例:在HBase表中插入数据
//建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
val conf = new SparkConf()
val sc = new SparkContext(conf)
val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration)
//声明表的信息
val table: HTable = null
val tableName = "shb1"
val familyName = Bytes.toBytes("info");
var connection: Connection = null
try {
//获取hbase连接
connection = ConnectionFactory.createConnection(hbConf);
//获取table对象
val table = connection.getTable(TableName.valueOf(tableName));
val data = sc.textFile(args(0)).map { line =>
val value = line.split(",")
(value(0), value(1), value(2), value(3))
}.collect()
var i = 0
for (line <- data) {
val put = new Put(Bytes.toBytes("row" + i));
put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1))
put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2))
put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3))
put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4))
i += 1
table.put(put)
}
} catch {
case e: IOException =>
e.printStackTrace();
} finally {
if (table != null) {
try {
// 关闭HTable对象
table.close();
} catch {
case e: IOException =>
e.printStackTrace();
}
}
if (connection != null) {
try {
//关闭hbase连接.
connection.close();
} catch {
case e: IOException =>
e.printStackTrace();
}
}
sc.stop()
}
样例:读取HBase表数据
//建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
val conf = new SparkConf()
val sc = new SparkContext(conf)
val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration)
//声明要查的表的信息
val scan = new Scan()
scan.addFamily(Bytes.toBytes("info"))
val proto = ProtobufUtil.toScan(scan)
val scanToString = Base64.encodeBytes(proto.toByteArray)
hbConf.set(TableInputFormat.INPUT_TABLE, "shb1")
hbConf.set(TableInputFormat.SCAN, scanToString)
//通过spark接口获取表中的数据
val rdd = sc.newAPIHadoopRDD(hbConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
//遍历hbase表中的每一行,并打印结果
rdd.collect().foreach(x => {
val key = x._1.toString
val it = x._2.listCells().iterator()
while (it.hasNext) {
val c = it.next()
val family = Bytes.toString(CellUtil.cloneFamily(c))
val qualifier = Bytes.toString(CellUtil.cloneQualifier(c))
val value = Bytes.toString(CellUtil.cloneValue(c))
val tm = c.getTimestamp
println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm)
}
})
sc.stop()
父主题: Spark on HBase程序