更新时间:2024-11-25 GMT+08:00
Scala样例代码
功能介绍
在Spark应用中,通过使用Streaming调用kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase。
样例代码获取方式请参考获取MRS应用开发样例工程。
代码样例:
/** * 运行streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表 */ object SparkOnStreamingToHbase { def main(args: Array[String]) { if (args.length < 4) { printUsage } val Array(checkPointDir, topics, brokers, zkQuorum) = args val sparkConf = new SparkConf().setAppName("DirectStreamToHbase") val ssc = new StreamingContext(sparkConf, Seconds(5)) // 设置Streaming的CheckPoint目录 if (!"nocp".equals(checkPointDir)) { ssc.checkpoint(checkPointDir) } val columnFamily = "cf" val zkClientPort = "24002" val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers ) val topicArr = topics.split(",") val topicSet = topicArr.toSet // map(_._1)是消息的key, map(_._2)是消息的value val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2) lines.foreachRDD(rdd => { //partition运行在executor上 rdd.foreachPartition(iterator => hBaseWriter(iterator, zkClientPort, zkQuorum, columnFamily)) }) ssc.start() ssc.awaitTermination() } /** * 在executor端写入数据 * @param iterator 消息 * @param zkClientPort * @param zkQuorum * @param columnFamily */ def hBaseWriter(iterator: Iterator[String], zkClientPort: String, zkQuorum: String, columnFamily: String): Unit = { val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", zkClientPort) conf.set("hbase.zookeeper.quorum", zkQuorum) var table: Table = null var connection: Connection = null try { connection = ConnectionFactory.createConnection(conf) table = connection.getTable(TableName.valueOf("table1")) val iteratorArray = iterator.toArray val rowList = new util.ArrayList[Get]() for (row <- iteratorArray) { val get = new Get(row.getBytes) rowList.add(get) } // 获取table1的数据 val resultDataBuffer = table.get(rowList) // 设置table1的数据 val putList = new util.ArrayList[Put]() for (i <- 0 until iteratorArray.size) { val row = iteratorArray(i) val resultData = resultDataBuffer(i) if (!resultData.isEmpty) { // 根据列簇和列,获取旧值 val aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes)) val put = new Put(Bytes.toBytes(row)) // 计算结果 val resultValue = row.toInt + aCid.toInt put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString)) putList.add(put) } } if (putList.size() > 0) { table.put(putList) } } catch { case e: IOException => e.printStackTrace(); } finally { if (table != null) { try { table.close() } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { // 关闭Hbase连接. connection.close() } catch { case e: IOException => e.printStackTrace() } } } } private def printUsage { System.out.println("Usage: {checkPointDir} {topic} {brokerList} {zkQuorum}") System.exit(1) } }