更新时间:2022-07-19 GMT+08:00
Scala样例代码
功能介绍
实时统计连续网购时间超过半个小时的女性网民信息,将统计结果直接打印或者输出写入到Kafka中。
Spark Streaming Write To Print代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint:
// 参数解析: // <batchTime>为Streaming分批的处理间隔。 // <windowTime>为统计数据的时间跨度,时间单位都是秒。 // <topics>为Kafka中订阅的主题,多以逗号分隔。 // <brokers>为获取元数据的kafka地址。 val Array(batchTime, windowTime, topics, brokers) = args val batchDuration = Seconds(batchTime.toInt) val windowDuration = Seconds(windowTime.toInt) // 建立Streaming启动环境 val sparkConf = new SparkConf() sparkConf.setAppName("DataSightStreamingExample") val ssc = new StreamingContext(sparkConf, batchDuration) // 设置Streaming的CheckPoint目录,由于窗口概念存在,该参数必须设置 ssc.checkpoint("checkpoint") // 组装Kafka的主题列表 val topicsSet = topics.split(",").toSet // 通过brokers和topics直接创建kafka stream // 1.接收Kafka中数据,生成相应DStream val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet).map(_._2) // 2.获取每一个行的字段属性 val records = lines.map(getRecord) // 3.筛选女性网民上网时间数据信息 val femaleRecords = records.filter(_._2 == "female") .map(x => (x._1, x._3)) // 4.汇总在一个时间窗口内每个女性上网时间 val aggregateRecords = femaleRecords .reduceByKeyAndWindow(_ + _, _ - _, windowDuration) // 5.筛选连续上网时间超过阈值的用户,并获取结果 aggregateRecords.filter(_._2 > 0.9 * windowTime.toInt).print() // 6.Streaming系统启动 ssc.start() ssc.awaitTermination()
上述代码会引用以下函数
// 获取字段函数 def getRecord(line: String): (String, String, Int) = { val elems = line.split(",") val name = elems(0) val sexy = elems(1) val time = elems(2).toInt (name, sexy, time) }
Spark Streaming Write To Kafka代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.DstreamKafkaWriter:
- Spark版本升级后,推荐使用新接口createDirectStream,老接口createStream仍然存在,但是性能和稳定性差,建议不要使用老接口开发应用程序。
- 该样例代码只存在于mrs-sample-project-1.6.0.zip中
// 参数解析: //<groupId> 消费者的group.id. //<brokers> broker的IP和端口. //<topic> kafka的topic. if (args.length != 3) { System.err.println("Usage: DstreamKafkaWriter <groupId> <brokers> <topic>") System.exit(1) } val Array(groupId, brokers, topic) = args val sparkConf = new SparkConf().setAppName("KafkaWriter") // 配置Kafka val kafkaParams = new Properties() kafkaParams.put("metadata.broker.list", brokers) kafkaParams.put("group.id", groupId) kafkaParams.put("auto.offset.reset", "smallest") // 创建一个Java streaming context val ssc = new StreamingContext(sparkConf, Milliseconds(500)) // 向kafka发送数据 val sentData = Seq("kafka_writer_test_msg_01", "kafka_writer_test_msg_02", "kafka_writer_test_msg_03") // 创建RDD queue val sent = new mutable.Queue[RDD[String]]() sent.enqueue(ssc.sparkContext.makeRDD(sentData)) // 使用写入的数据创建Dstream val wStream = ssc.queueStream(sent) // 写入Kafka wStream.writeToKafka(kafkaParams, (x: String) => new KeyedMessage[String, Array[Byte]](topic, x.getBytes)) ssc.start() ssc.awaitTermination()
父主题: Spark Streaming程序