更新时间:2022-07-19 GMT+08:00
分享

Scala样例代码

功能简介

在Spark应用中,通过使用HBase接口来实现创建表,读取表,往表中插入数据等操作。

代码样例

下面代码片段仅为演示,具体代码参见SparkOnHbaseScalaExample:

样例:创建HBase表

    //建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
    val conf: SparkConf = new SparkConf
    val sc: SparkContext = new SparkContext(conf)
    val hbConf: Configuration = HBaseConfiguration.create(sc.hadoopConfiguration)
    //创建和hbase的连接通道
    val connection: Connection = ConnectionFactory.createConnection(hbConf)
    
    //声明表的描述信息
    val userTable = TableName.valueOf("shb1")
    val tableDescr = new HTableDescriptor(userTable)
    tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
    
    //创建表
    println("Creating table shb1. ")
    val admin = connection.getAdmin
    if (admin.tableExists(userTable)) {
      admin.disableTable(userTable)
      admin.deleteTable(userTable)
    }
    admin.createTable(tableDescr)

    connection.close()
    sc.stop()
    println("Done!")

样例:在HBase表中插入数据

    //建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration)

    //声明表的信息
    val table: HTable = null
    val tableName = "shb1"
    val familyName = Bytes.toBytes("info");
    var connection: Connection = null
    try {
      //获取hbase连接
      connection = ConnectionFactory.createConnection(hbConf);
      //获取table对象
      val table = connection.getTable(TableName.valueOf(tableName));
      val data = sc.textFile(args(0)).map { line =>
        val value = line.split(",")
        (value(0), value(1), value(2), value(3))
      }.collect()

      var i = 0
      for (line <- data) {
        val put = new Put(Bytes.toBytes("row" + i));
        put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1))
        put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2))
        put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3))
        put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4))
        i += 1
        table.put(put)
      }
    } catch {
      case e: IOException =>
        e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          // 关闭HTable对象
          table.close();
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          //关闭hbase连接.
          connection.close();
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
       sc.stop()
    }

样例:读取HBase表数据

   //建立连接hbase的配置参数,此时需要保证hbase-site.xml在classpath中
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration)
    
    //声明要查的表的信息
    val scan = new Scan()
    scan.addFamily(Bytes.toBytes("info"))
    val proto = ProtobufUtil.toScan(scan)
    val scanToString = Base64.encodeBytes(proto.toByteArray)
    hbConf.set(TableInputFormat.INPUT_TABLE, "shb1")
    hbConf.set(TableInputFormat.SCAN, scanToString)

    //通过spark接口获取表中的数据
    val rdd = sc.newAPIHadoopRDD(hbConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    
    //遍历hbase表中的每一行,并打印结果
    rdd.collect().foreach(x => {
      val key = x._1.toString
      val it = x._2.listCells().iterator()
      while (it.hasNext) {
        val c = it.next()
        val family = Bytes.toString(CellUtil.cloneFamily(c))
        val qualifier = Bytes.toString(CellUtil.cloneQualifier(c))
        val value = Bytes.toString(CellUtil.cloneValue(c))
        val tm = c.getTimestamp
        println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm)
      }
    })
    sc.stop()
分享:

    相关文档

    相关产品