Updated on 2022-06-01 GMT+08:00

Scala Sample Code

Function Description

Collect statistics on female netizens who continuously dwell on online shopping for more than half an hour in real time. Print statistics directly or output statistics and write them to Kafka.

Spark Streaming Write To Print Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint.

    // Parameter description:
    // <batchTime>: Interval for Streaming processing in batches.
    // <windowTime> is the time span of the statistics data. The unit is second.
    // <topics>: Topics subscribed in the Kafka. Multiple topics are separated by commas (,).
    // <brokers> is the Kafka address for obtaining metadata.
    val Array(batchTime, windowTime, topics, brokers) = args
    val batchDuration = Seconds(batchTime.toInt)
    val windowDuration = Seconds(windowTime.toInt)

    // Set up a Streaming startup environment.
    val sparkConf = new SparkConf()
    sparkConf.setAppName("DataSightStreamingExample")
    val ssc = new StreamingContext(sparkConf, batchDuration)

    // Set the CheckPoint directory of Streaming. This parameter is mandatory because the window concept exists.
    ssc.checkpoint("checkpoint")

    // Assemble a Kafka topic list.
     val topicsSet = topics.split(",").toSet

    // Create a kafka stream by using brokers and topics.
    // 1. Receive data from Kafka and generate the corresponding DStream. 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet).map(_._2)

    // 2. Obtain the field attribute of each row.
    val records = lines.map(getRecord)

    // 3. Filter data information of the time that female netizens spend online. 
    val femaleRecords = records.filter(_._2 == "female")
      .map(x => (x._1, x._3))

    // 4. Summarize the total time that each female netizen spends online within a time window. 
    val aggregateRecords = femaleRecords
      .reduceByKeyAndWindow(_ + _, _ - _, windowDuration)

    // 5. Filter data about netizens whose consecutive online duration exceeds the threshold, and obtain the results. 
    aggregateRecords.filter(_._2 > 0.9 * windowTime.toInt).print()

    // 6. Start Streaming.
    ssc.start()
    ssc.awaitTermination()

The preceding code cites the following functions:

  // Obtain field functions.
  def getRecord(line: String): (String, String, Int) = {
    val elems = line.split(",")
    val name = elems(0)
    val sexy = elems(1)
    val time = elems(2).toInt
    (name, sexy, time)
  }

Spark Streaming Write To Kafka Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.DstreamKafkaWriter.

  • After the Spark is upgraded, the new API createDirectStream is recommended. The old API createStream still exists, but the performance and stability are poor. You are advised not to use the old API to develop applications.
  • The sample code exists only in mrs-sample-project-1.6.0.zip.
    // Parameter description:
    //<groupId> Consumer's group.id
    //<brokers> IP address and port number of the broker
     //<topic> Topic of Kafka
    if (args.length != 3) {
      System.err.println("Usage: DstreamKafkaWriter <groupId> <brokers> <topic>")
      System.exit(1)
    }

    val Array(groupId, brokers, topic) = args
    val sparkConf = new SparkConf().setAppName("KafkaWriter")

    // Configure Kafka.
    val kafkaParams = new Properties()
    kafkaParams.put("metadata.broker.list", brokers)
    kafkaParams.put("group.id", groupId)
    kafkaParams.put("auto.offset.reset", "smallest")

    // Create Java streaming context.
    val ssc = new StreamingContext(sparkConf, Milliseconds(500))

    // Send data to Kafka.
    val sentData = Seq("kafka_writer_test_msg_01", "kafka_writer_test_msg_02",
      "kafka_writer_test_msg_03")

   // Create an RDD queue.
    val sent = new mutable.Queue[RDD[String]]()
    sent.enqueue(ssc.sparkContext.makeRDD(sentData))

   // Use the written data to create Dstream.
    val wStream = ssc.queueStream(sent)

    // Write data to Kafka.
    wStream.writeToKafka(kafkaParams,
      (x: String) => new KeyedMessage[String, Array[Byte]](topic, x.getBytes))

    ssc.start()
    ssc.awaitTermination()