更新时间:2022-07-19 GMT+08:00
Scala样例代码
功能介绍
实时统计连续网购时间超过半个小时的女性网民信息,将统计结果直接打印或者输出写入到Kafka中。
Spark Streaming Write To Print代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint:
// 参数解析:
// <batchTime>为Streaming分批的处理间隔。
// <windowTime>为统计数据的时间跨度,时间单位都是秒。
// <topics>为Kafka中订阅的主题,多以逗号分隔。
// <brokers>为获取元数据的kafka地址。
val Array(batchTime, windowTime, topics, brokers) = args
val batchDuration = Seconds(batchTime.toInt)
val windowDuration = Seconds(windowTime.toInt)
// 建立Streaming启动环境
val sparkConf = new SparkConf()
sparkConf.setAppName("DataSightStreamingExample")
val ssc = new StreamingContext(sparkConf, batchDuration)
// 设置Streaming的CheckPoint目录,由于窗口概念存在,该参数必须设置
ssc.checkpoint("checkpoint")
// 组装Kafka的主题列表
val topicsSet = topics.split(",").toSet
// 通过brokers和topics直接创建kafka stream
// 1.接收Kafka中数据,生成相应DStream
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet).map(_._2)
// 2.获取每一个行的字段属性
val records = lines.map(getRecord)
// 3.筛选女性网民上网时间数据信息
val femaleRecords = records.filter(_._2 == "female")
.map(x => (x._1, x._3))
// 4.汇总在一个时间窗口内每个女性上网时间
val aggregateRecords = femaleRecords
.reduceByKeyAndWindow(_ + _, _ - _, windowDuration)
// 5.筛选连续上网时间超过阈值的用户,并获取结果
aggregateRecords.filter(_._2 > 0.9 * windowTime.toInt).print()
// 6.Streaming系统启动
ssc.start()
ssc.awaitTermination()
上述代码会引用以下函数
// 获取字段函数
def getRecord(line: String): (String, String, Int) = {
val elems = line.split(",")
val name = elems(0)
val sexy = elems(1)
val time = elems(2).toInt
(name, sexy, time)
}
Spark Streaming Write To Kafka代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.DstreamKafkaWriter:
- Spark版本升级后,推荐使用新接口createDirectStream,老接口createStream仍然存在,但是性能和稳定性差,建议不要使用老接口开发应用程序。
- 该样例代码只存在于mrs-sample-project-1.6.0.zip中
// 参数解析:
//<groupId> 消费者的group.id.
//<brokers> broker的IP和端口.
//<topic> kafka的topic.
if (args.length != 3) {
System.err.println("Usage: DstreamKafkaWriter <groupId> <brokers> <topic>")
System.exit(1)
}
val Array(groupId, brokers, topic) = args
val sparkConf = new SparkConf().setAppName("KafkaWriter")
// 配置Kafka
val kafkaParams = new Properties()
kafkaParams.put("metadata.broker.list", brokers)
kafkaParams.put("group.id", groupId)
kafkaParams.put("auto.offset.reset", "smallest")
// 创建一个Java streaming context
val ssc = new StreamingContext(sparkConf, Milliseconds(500))
// 向kafka发送数据
val sentData = Seq("kafka_writer_test_msg_01", "kafka_writer_test_msg_02",
"kafka_writer_test_msg_03")
// 创建RDD queue
val sent = new mutable.Queue[RDD[String]]()
sent.enqueue(ssc.sparkContext.makeRDD(sentData))
// 使用写入的数据创建Dstream
val wStream = ssc.queueStream(sent)
// 写入Kafka
wStream.writeToKafka(kafkaParams,
(x: String) => new KeyedMessage[String, Array[Byte]](topic, x.getBytes))
ssc.start()
ssc.awaitTermination()
父主题: Spark Streaming程序