Scala Sample Code
Function Description
In Spark applications, use Streaming to call Kafka APIs to obtain data and write data analysis results to an HBase table.
Sample Code
The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase.
/** * Run a Streaming job. Read data from HBase table1 based on the value, sum two data records, and update the new data in the HBase table1. */ object SparkOnStreamingToHbase { def main(args: Array[String]) { if (args.length < 4) { printUsage } val Array(checkPointDir, topics, brokers, zkQuorum) = args val sparkConf = new SparkConf().setAppName("DirectStreamToHbase") val ssc = new StreamingContext(sparkConf, Seconds(5)) // Set the CheckPoint directory of Streaming. if (!"nocp".equals(checkPointDir)) { ssc.checkpoint(checkPointDir) } val columnFamily = "cf" val zkClientPort = "24002" val kafkaParams = Map[String, String]( "" -> brokers ) val topicArr = topics.split(",") val topicSet = topicArr.toSet // map(_._1) is the key of the message, and map(_._2) is the value of the message. val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2) lines.foreachRDD(rdd => { // Partitions run on the executor. rdd.foreachPartition(iterator => hBaseWriter(iterator, zkClientPort, zkQuorum, columnFamily)) }) ssc.start() ssc.awaitTermination() } /** * Write data to the executor. * @param iterator Message * @param zkClientPort * @param zkQuorum * @param columnFamily */ def hBaseWriter(iterator: Iterator[String], zkClientPort: String, zkQuorum: String, columnFamily: String): Unit = { val conf = HBaseConfiguration.create() conf.set("", zkClientPort) conf.set("hbase.zookeeper.quorum", zkQuorum) var table: Table = null var connection: Connection = null try { connection = ConnectionFactory.createConnection(conf) table = connection.getTable(TableName.valueOf("table1")) val iteratorArray = iterator.toArray val rowList = new util.ArrayList[Get]() for (row <- iteratorArray) { val get = new Get(row.getBytes) rowList.add(get) } // Obtain data in table1. val resultDataBuffer = table.get(rowList) // Set data in table1. val putList = new util.ArrayList[Put]() for (i <- 0 until iteratorArray.size) { val row = iteratorArray(i) val resultData = resultDataBuffer(i) if (!resultData.isEmpty) { // Obtain the old value based on the column family and column. val aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes)) val put = new Put(Bytes.toBytes(row)) // Calculate the result. val resultValue = row.toInt + aCid.toInt put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString)) putList.add(put) } } if (putList.size() > 0) { table.put(putList) } } catch { case e: IOException => e.printStackTrace(); } finally { if (table != null) { try { table.close() } catch { case e: IOException => e.printStackTrace(); } } if (connection != null) { try { // Close the HBase connection. connection.close() } catch { case e: IOException => e.printStackTrace() } } } } private def printUsage { System.out.println("Usage: {checkPointDir} {topic} {brokerList} {zkQuorum}") System.exit(1) } }
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.