更新时间:2024-12-10 GMT+08:00

Scala样例代码

功能介绍

在Spark应用中,通过使用Streaming调用kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。

代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase。

样例代码获取方式请参考获取MRS应用开发样例工程

代码样例:

/**
  * 运行streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表
  */
object SparkOnStreamingToHbase {
  def main(args: Array[String]) {
    if (args.length < 4) {
      printUsage
    }

    val Array(checkPointDir, topics, brokers, zkQuorum) = args
    val sparkConf = new SparkConf().setAppName("DirectStreamToHbase")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 设置Streaming的CheckPoint目录
    if (!"nocp".equals(checkPointDir)) {
      ssc.checkpoint(checkPointDir)
    }

    val columnFamily = "cf"
    val zkClientPort = "24002"
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers
    )

    val topicArr = topics.split(",")
    val topicSet = topicArr.toSet
    // map(_._1)是消息的key, map(_._2)是消息的value
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
    lines.foreachRDD(rdd => {
      //partition运行在executor上
      rdd.foreachPartition(iterator => hBaseWriter(iterator, zkClientPort, zkQuorum, columnFamily))
    })

    ssc.start()
    ssc.awaitTermination()
  }

  
  /**
   * 在executor端写入数据
   * @param iterator  消息
   * @param zkClientPort
   * @param zkQuorum
   * @param columnFamily
   */
  def hBaseWriter(iterator: Iterator[String], zkClientPort: String, zkQuorum: String, columnFamily: String): Unit = {
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.property.clientPort", zkClientPort)
    conf.set("hbase.zookeeper.quorum", zkQuorum)
    var table: Table = null
    var connection: Connection = null
    try {
      connection = ConnectionFactory.createConnection(conf)
      table = connection.getTable(TableName.valueOf("table1"))
      val iteratorArray = iterator.toArray
      val rowList = new util.ArrayList[Get]()
      for (row <- iteratorArray) {
        val get = new Get(row.getBytes)
        rowList.add(get)
      }
      // 获取table1的数据
      val resultDataBuffer = table.get(rowList)
      // 设置table1的数据
      val putList = new util.ArrayList[Put]()
      for (i <- 0 until iteratorArray.size) {
        val row = iteratorArray(i)
        val resultData = resultDataBuffer(i)
        if (!resultData.isEmpty) {
          // 根据列簇和列,获取旧值
          val aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes))
          val put = new Put(Bytes.toBytes(row))
          // 计算结果
          val resultValue = row.toInt + aCid.toInt
          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString))
          putList.add(put)
        }
      }
      if (putList.size() > 0) {
        table.put(putList)
      }
    } catch {
      case e: IOException =>
        e.printStackTrace();
    } finally {
      if (table != null) {
        try {
          table.close()
        } catch {
          case e: IOException =>
            e.printStackTrace();
        }
      }
      if (connection != null) {
        try {
          // 关闭Hbase连接.
          connection.close()
        } catch {
          case e: IOException =>
            e.printStackTrace()
        }
      }
    }
  }
  

  private def printUsage {
    System.out.println("Usage: {checkPointDir} {topic} {brokerList} {zkQuorum}")
    System.exit(1)
  }
}