文档首页/
MapReduce服务 MRS/
开发指南(LTS版)/
Spark2x开发指南(普通模式)/
开发Spark应用/
Spark Streaming对接Kafka0-10样例程序/
Spark Streaming对接Kafka0-10样例程序(Scala)
更新时间:2024-08-03 GMT+08:00
Spark Streaming对接Kafka0-10样例程序(Scala)
功能介绍
在Spark应用中,通过使用Streaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数,或将数据写入Kafka0-10。
Streaming读取Kafka0-10代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.KafkaWordCount。
/**
* 从Kafka的一个或多个主题消息。
* <checkPointDir>是Spark Streaming检查点目录。
* <brokers>是用于自举,制作人只会使用它来获取元数据
* <topics>是要消费的一个或多个kafka主题的列表
* <batchTime>是Spark Streaming批次持续时间(以秒为单位)。
*/
object KafkaWordCount {
def main(args: Array[String]) {
val ssc = createContext(args)
//启动Streaming系统。
ssc.start()
ssc.awaitTermination()
}
def createContext(args : Array[String]) : StreamingContext = {
val Array(checkPointDir, brokers, topics, batchSize) = args
// 新建一个Streaming启动环境。
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(batchSize.toLong))
//配置Streaming的CheckPoint目录。
//由于窗口概念的存在,此参数是必需的。
ssc.checkpoint(checkPointDir)
// 获取kafka使用的topic列表。
val topicArr = topics.split(",")
val topicSet = topicArr.toSet
val kafkaParams = Map[String, String](
"bootstrap.servers" -> brokers,
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> "DemoConsumer"
);
val locationStrategy = LocationStrategies.PreferConsistent
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)
// 用brokers and topics新建direct kafka stream
//从Kafka接收数据并生成相应的DStream。
val stream = KafkaUtils.createDirectStream[String, String](ssc, locationStrategy, consumerStrategy)
// 获取每行中的字段属性。
val tf = stream.transform ( rdd =>
rdd.map(r => (r.value, 1L))
)
// 汇总计算字数的总时间。
val wordCounts = tf.reduceByKey(_ + _)
val totalCounts = wordCounts.updateStateByKey(updataFunc)
totalCounts.print()
ssc
}
def updataFunc(values : Seq[Long], state : Option[Long]) : Option[Long] =
Some(values.sum + state.getOrElse(0L))
}
Streaming Write To Kafka 0-10样例代码
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.DstreamKafkaWriter。
建议使用新的API createDirectStream代替原有API createStream进行应用程序开发。原有API仍然可以使用,但新的API性能和稳定性更好。
/**
* 参数解析:
* <checkPointDir>为checkPoint目录。
* <topics>为Kafka中订阅的主题,多以逗号分隔。
* <brokers>为获取元数据的Kafka地址。
*/
object DstreamKafkaWriterTest1 {
def main(args: Array[String]) {
if (args.length != 4) {
System.err.println("Usage: DstreamKafkaWriterTest <checkPointDir> <brokers> <topic>")
System.exit(1)
}
val Array(checkPointDir, brokers, topic) = args
val sparkConf = new SparkConf().setAppName("KafkaWriter")
//填写Kafka的properties。
val kafkaParams = Map[String, String](
"bootstrap.servers" -> brokers,
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.serializer" -> "org.apache.kafka.common.serialization.ByteArraySerializer",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"group.id" -> "dstreamKafkaWriterFt",
"auto.offset.reset" -> "latest"
)
//创建Streaming的context。
val ssc = new StreamingContext(sparkConf, Milliseconds(500));
val sentData = Seq("kafka_writer_test_msg_01", "kafka_writer_test_msg_02", "kafka_writer_test_msg_03")
//创建RDD队列。
val sent = new mutable.Queue[RDD[String]]()
sent.enqueue(ssc.sparkContext.makeRDD(sentData))
//创建写数据的DStream。
val wStream = ssc.queueStream(sent)
//使用writetokafka API把数据写入Kafka。
wStream.writeToKafka(kafkaParams,
(x: String) => new ProducerRecord[String, Array[Byte]](topic, x.getBytes))
//启动streaming的context。
ssc.start()
ssc.awaitTermination()
}
}