更新时间:2022-07-19 GMT+08:00

Scala样例代码

功能介绍

在Spark应用中,通过使用Streaming调用kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数,或将数据写入Kafka0-10。

Streaming读取Kafka0-10代码样例

下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SecurityKafkaWordCount。

普通集群需要将样例代码中com.huawei.bigdata.spark.examples.SecurityKafkaWordCount.scala类中第60行代码

“"security.protocol" -> "SASL_PLAINTEXT",”注释掉。

/**
  *从Kafka的一个或多个主题消息。
  * <checkPointDir>是Spark Streaming检查点目录。
  * <brokers>是用于自举,制作人只会使用它来获取元数据
  * <topics>是要消费的一个或多个kafka主题的列表
  * <batchTime>是Spark Streaming批次持续时间(以秒为单位)。
  */
object SecurityKafkaWordCount {

  def main(args: Array[String]) {
    val ssc = createContext(args)

    //启动Streaming系统。
    ssc.start()
    ssc.awaitTermination()
  }

  def createContext(args : Array[String]) : StreamingContext = {
    val Array(checkPointDir, brokers, topics, batchSize) = args

    //新建一个Streaming启动环境。
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(batchSize.toLong))

    //配置Streaming的CheckPoint目录。
    //由于窗口概念的存在,此参数是必需的。
    ssc.checkpoint(checkPointDir)

    //获取获取kafka使用的topic列表。
    val topicArr = topics.split(",")
    val topicSet = topicArr.toSet
    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> brokers,
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "group.id" -> "DemoConsumer",
      "security.protocol" -> "SASL_PLAINTEXT",
      "sasl.kerberos.service.name" -> "kafka",
      "kerberos.domain.name" -> "hadoop.hadoop.com"
    );

    val locationStrategy = LocationStrategies.PreferConsistent
    val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)

    // 用brokers and topics新建direct kafka stream
    //从Kafka接收数据并生成相应的DStream。
    val stream = KafkaUtils.createDirectStream[String, String](ssc, locationStrategy, consumerStrategy)

    //获取每行中的字段属性。
    val tf = stream.transform ( rdd =>
      rdd.map(r => (r.value, 1L))
    )

    //汇总计算字数的总时间。
    val wordCounts = tf.reduceByKey(_ + _)
    val totalCounts = wordCounts.updateStateByKey(updataFunc)
    totalCounts.print()
    ssc
  }

  def updataFunc(values : Seq[Long], state : Option[Long]) : Option[Long] =
    Some(values.sum + state.getOrElse(0L))
}

Streaming Write To Kafka 0-10样例代码

下面代码片段仅为演示,具体代码参见com.huawei.bigdata.spark.examples.DstreamKafkaWriter。

  • 建议使用新的API createDirectStream代替原有API createStream进行应用程序开发。原有API仍然可以使用,但新的API性能和稳定性更好。
  • 该样例代码只存在于mrs-sample-project-1.6.0.zip中。
/**
 * 参数解析:
 * <checkPointDir>为checkPoint目录。
 * <topics>为Kafka中订阅的主题,多以逗号分隔。
 * <brokers>为获取元数据的Kafka地址。
 */
object DstreamKafkaWriterTest1 {

  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println("Usage: DstreamKafkaWriterTest <checkPointDir> <brokers> <topic>")
      System.exit(1)
    }

    val Array(checkPointDir, brokers, topic) = args
    val sparkConf = new SparkConf().setAppName("KafkaWriter")

    //填写Kafka的properties。
    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> brokers,
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.serializer" -> "org.apache.kafka.common.serialization.ByteArraySerializer",
      "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
      "group.id" -> "dstreamKafkaWriterFt",
      "auto.offset.reset" -> "latest"
    )

    //创建Streaming的context。
    val ssc = new StreamingContext(sparkConf, Milliseconds(500));
    val sentData = Seq("kafka_writer_test_msg_01", "kafka_writer_test_msg_02", "kafka_writer_test_msg_03")

    //创建RDD队列。
    val sent = new mutable.Queue[RDD[String]]()
    sent.enqueue(ssc.sparkContext.makeRDD(sentData))

    //创建写数据的DStream。
    val wStream = ssc.queueStream(sent)

    //使用writetokafka API把数据写入Kafka。
    wStream.writeToKafka(kafkaParams,
      (x: String) => new ProducerRecord[String, Array[Byte]](topic, x.getBytes))

    //启动streaming的context。
    ssc.start()
    ssc.awaitTermination()
  }
}