Scala Sample Code
Function Description
In Spark applications, use Structured Streaming to call Kafka APIs to obtain word records. Classify word records to obtain the number of records of each word.
Sample Code
The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SecurityKafkaWordCount.
- For a normal cluster, comment out .option("kafka.security.protocol", protocol) in the 49th line of the com.huawei.bigdata.spark.examples.SecurityKafkaWordCount.scala in the sample code.
- When new data is available in Streaming DataFrame/Dataset, outputMode is used for configuring data written to the Streaming receiver. The default value is append.
object SecurityKafkaWordCount { def main(args: Array[String]): Unit = { if (args.length < 6) { System.err.println("Usage: SecurityKafkaWordCount <bootstrap-servers> " + "<subscribe-type> <topics> <protocol> <service> <domain>") System.exit(1) } val Array(bootstrapServers, subscribeType, topics, protocol, service, domain) = args val spark = SparkSession .builder .appName("SecurityKafkaWordCount") .getOrCreate() import spark.implicits._ // Create DataSet representing the stream of input lines from Kafka. val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .load() .selectExpr("CAST(value AS STRING)") .as[String] // Generate the running word counts. val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() // Start running the query that prints the running counts to the console. val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot