Updated on 2024-08-10 GMT+08:00

Connecting Spark Streaming to Kafka0-10 (Scala)

Function

The project uses Streaming in Spark applications to call Kafka APIs to obtain word records or write data to Kafka0-10. Word records are classified to obtain the number of records of each word.

Sample Code for Streaming to Read Kafka0-10

The following code is an example. For details, see com.huawei.bigdata.spark.examples.KafkaWordCount.

/**
  * One or more topic messages from Kafka
  * <checkPointDir> is the Spark Streaming checkpoint directory.
  * <brokers> is used for bootstrapping. The producer only uses it to obtain metadata.
  * <topics> is a list of one or more Kafka topics to be consumed.
  * <batchTime> is the duration (in seconds) of one Spark Streaming batch.
  */
object KafkaWordCount {

  def main(args: Array[String]) {
    val ssc = createContext(args)

    //Start the Streaming system.
    ssc.start()
    ssc.awaitTermination()
  }

  def createContext(args : Array[String]) : StreamingContext = {
    val Array(checkPointDir, brokers, topics, batchSize) = args

    // Create a Streaming startup environment.
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(batchSize.toLong))

    //Set the CheckPoint directory of Streaming.
    //This parameter is mandatory because a window concept exists.
    ssc.checkpoint(checkPointDir)

    // Obtain the list of topics used by Kafka.
    val topicArr = topics.split(",")
    val topicSet = topicArr.toSet
    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> brokers,
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "group.id" -> "DemoConsumer"
    );

    val locationStrategy = LocationStrategies.PreferConsistent
    val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)

    // Create a direct kafka stream using brokers and topics.
    // Receive data from Kafka and generate the corresponding DStream.
    val stream = KafkaUtils.createDirectStream[String, String](ssc, locationStrategy, consumerStrategy)

    // Obtain the field attribute of each row.
    val tf = stream.transform ( rdd =>
      rdd.map(r => (r.value, 1L))
    )

    // Sum the total time for calculating the number of words.
    val wordCounts = tf.reduceByKey(_ + _)
    val totalCounts = wordCounts.updateStateByKey(updataFunc)
    totalCounts.print()

    ssc
  }

  def updataFunc(values : Seq[Long], state : Option[Long]) : Option[Long] =
    Some(values.sum + state.getOrElse(0L))
}

Streaming Write To Kafka 0–10 Sample Code

The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.DstreamKafkaWriter.

You are advised to use the new API createDirectStream to develop applications instead of the old API createStream. While the old API remains functional, the new API offers improved performance and stability.

/**
 * Parameter description:
 * <checkPointDir> is the checkPoint directory.
 * <topics> are topics subscribed in the Kafka. Multiple topics are separated by commas (,).
 * <brokers> is the Kafka address for obtaining metadata.
 */
object DstreamKafkaWriterTest1 {

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println("Usage: DstreamKafkaWriterTest <checkPointDir> <brokers> <topic>")
      System.exit(1)
    }

    val Array(checkPointDir, brokers, topic) = args
    val sparkConf = new SparkConf().setAppName("KafkaWriter")

    // Enter the properties of Kafka.
    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> brokers,
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.serializer" -> "org.apache.kafka.common.serialization.ByteArraySerializer",
      "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
      "group.id" -> "dstreamKafkaWriterFt",
      "auto.offset.reset" -> "latest"
    )

    //Create the context of the Streaming.
    val ssc = new StreamingContext(sparkConf, Milliseconds(500));
    val sentData = Seq("kafka_writer_test_msg_01", "kafka_writer_test_msg_02", "kafka_writer_test_msg_03")

    // Create an RDD queue.
    val sent = new mutable.Queue[RDD[String]]()
    sent.enqueue(ssc.sparkContext.makeRDD(sentData))

    // Create a DStream for writing data.
    val wStream = ssc.queueStream(sent)

    // Use the writetokafka API to write data to Kafka.
    wStream.writeToKafka(kafkaParams,
      (x: String) => new ProducerRecord[String, Array[Byte]](topic, x.getBytes))

    // Start the context of Streaming.
    ssc.start()
    ssc.awaitTermination()
  }
}