更新时间:2022-07-19 GMT+08:00

Scala样例代码

功能介绍

实时统计连续网购时间超过半个小时的女性网民信息,将统计结果直接打印或者输出写入到Kafka中。

Spark Streaming Write To Print代码样例

下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint:

    // 参数解析:
    // <batchTime>为Streaming分批的处理间隔。
    // <windowTime>为统计数据的时间跨度,时间单位都是秒。
    // <topics>为Kafka中订阅的主题,多以逗号分隔。
    // <brokers>为获取元数据的kafka地址。
    val Array(batchTime, windowTime, topics, brokers) = args
    val batchDuration = Seconds(batchTime.toInt)
    val windowDuration = Seconds(windowTime.toInt)

    // 建立Streaming启动环境
    val sparkConf = new SparkConf()
    sparkConf.setAppName("DataSightStreamingExample")
    val ssc = new StreamingContext(sparkConf, batchDuration)

    // 设置Streaming的CheckPoint目录,由于窗口概念存在,该参数必须设置
    ssc.checkpoint("checkpoint")

    // 组装Kafka的主题列表
     val topicsSet = topics.split(",").toSet

    // 通过brokers和topics直接创建kafka stream
    // 1.接收Kafka中数据,生成相应DStream
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet).map(_._2)

    // 2.获取每一个行的字段属性
    val records = lines.map(getRecord)

    // 3.筛选女性网民上网时间数据信息
    val femaleRecords = records.filter(_._2 == "female")
      .map(x => (x._1, x._3))

    // 4.汇总在一个时间窗口内每个女性上网时间
    val aggregateRecords = femaleRecords
      .reduceByKeyAndWindow(_ + _, _ - _, windowDuration)

    // 5.筛选连续上网时间超过阈值的用户,并获取结果
    aggregateRecords.filter(_._2 > 0.9 * windowTime.toInt).print()

    // 6.Streaming系统启动
    ssc.start()
    ssc.awaitTermination()

上述代码会引用以下函数

  // 获取字段函数
  def getRecord(line: String): (String, String, Int) = {
    val elems = line.split(",")
    val name = elems(0)
    val sexy = elems(1)
    val time = elems(2).toInt
    (name, sexy, time)
  }

Spark Streaming Write To Kafka代码样例

下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.DstreamKafkaWriter:

  • Spark版本升级后,推荐使用新接口createDirectStream,老接口createStream仍然存在,但是性能和稳定性差,建议不要使用老接口开发应用程序。
  • 该样例代码只存在于mrs-sample-project-1.6.0.zip中
    // 参数解析:
    //<groupId> 消费者的group.id.
    //<brokers> broker的IP和端口.
    //<topic> kafka的topic.
    if (args.length != 3) {
      System.err.println("Usage: DstreamKafkaWriter <groupId> <brokers> <topic>")
      System.exit(1)
    }

    val Array(groupId, brokers, topic) = args
    val sparkConf = new SparkConf().setAppName("KafkaWriter")

    // 配置Kafka
    val kafkaParams = new Properties()
    kafkaParams.put("metadata.broker.list", brokers)
    kafkaParams.put("group.id", groupId)
    kafkaParams.put("auto.offset.reset", "smallest")

    // 创建一个Java streaming context
    val ssc = new StreamingContext(sparkConf, Milliseconds(500))

    // 向kafka发送数据
    val sentData = Seq("kafka_writer_test_msg_01", "kafka_writer_test_msg_02",
      "kafka_writer_test_msg_03")

    // 创建RDD queue
    val sent = new mutable.Queue[RDD[String]]()
    sent.enqueue(ssc.sparkContext.makeRDD(sentData))

    // 使用写入的数据创建Dstream
    val wStream = ssc.queueStream(sent)

    // 写入Kafka
    wStream.writeToKafka(kafkaParams,
      (x: String) => new KeyedMessage[String, Array[Byte]](topic, x.getBytes))

    ssc.start()
    ssc.awaitTermination()