文档首页/
MapReduce服务 MRS/
开发指南(普通版_2.x及之前)/
Spark开发指南/
开发Spark应用/
Streaming从Kafka读取数据再写入HBase/
Scala样例代码
更新时间:2024-12-10 GMT+08:00
Scala样例代码
功能介绍
在Spark应用中,通过使用Streaming调用kafka接口来获取数据,然后把数据经过分析后,找到对应的HBase表记录,再写到HBase表。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase。
样例代码获取方式请参考获取MRS应用开发样例工程。
代码样例:
/**
* 运行streaming任务,根据value值从hbase table1表读取数据,把两者数据做操作后,更新到hbase table1表
*/
object SparkOnStreamingToHbase {
def main(args: Array[String]) {
if (args.length < 4) {
printUsage
}
val Array(checkPointDir, topics, brokers, zkQuorum) = args
val sparkConf = new SparkConf().setAppName("DirectStreamToHbase")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 设置Streaming的CheckPoint目录
if (!"nocp".equals(checkPointDir)) {
ssc.checkpoint(checkPointDir)
}
val columnFamily = "cf"
val zkClientPort = "24002"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers
)
val topicArr = topics.split(",")
val topicSet = topicArr.toSet
// map(_._1)是消息的key, map(_._2)是消息的value
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)
lines.foreachRDD(rdd => {
//partition运行在executor上
rdd.foreachPartition(iterator => hBaseWriter(iterator, zkClientPort, zkQuorum, columnFamily))
})
ssc.start()
ssc.awaitTermination()
}
/**
* 在executor端写入数据
* @param iterator 消息
* @param zkClientPort
* @param zkQuorum
* @param columnFamily
*/
def hBaseWriter(iterator: Iterator[String], zkClientPort: String, zkQuorum: String, columnFamily: String): Unit = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.property.clientPort", zkClientPort)
conf.set("hbase.zookeeper.quorum", zkQuorum)
var table: Table = null
var connection: Connection = null
try {
connection = ConnectionFactory.createConnection(conf)
table = connection.getTable(TableName.valueOf("table1"))
val iteratorArray = iterator.toArray
val rowList = new util.ArrayList[Get]()
for (row <- iteratorArray) {
val get = new Get(row.getBytes)
rowList.add(get)
}
// 获取table1的数据
val resultDataBuffer = table.get(rowList)
// 设置table1的数据
val putList = new util.ArrayList[Put]()
for (i <- 0 until iteratorArray.size) {
val row = iteratorArray(i)
val resultData = resultDataBuffer(i)
if (!resultData.isEmpty) {
// 根据列簇和列,获取旧值
val aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes))
val put = new Put(Bytes.toBytes(row))
// 计算结果
val resultValue = row.toInt + aCid.toInt
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString))
putList.add(put)
}
}
if (putList.size() > 0) {
table.put(putList)
}
} catch {
case e: IOException =>
e.printStackTrace();
} finally {
if (table != null) {
try {
table.close()
} catch {
case e: IOException =>
e.printStackTrace();
}
}
if (connection != null) {
try {
// 关闭Hbase连接.
connection.close()
} catch {
case e: IOException =>
e.printStackTrace()
}
}
}
}
private def printUsage {
System.out.println("Usage: {checkPointDir} {topic} {brokerList} {zkQuorum}")
System.exit(1)
}
}