Java Sample Code
Function
In Spark applications, use StructuredStreaming to invoke Kafka interface to obtain word records. Collect the statistics of records for each word.
Code Sample
The following code is an example. For details, see com.huawei.bigdata.spark.examples.KafkaWordCount.
When new data is available in Streaming DataFrame/Dataset, outputMode is used for configuring data written to the Streaming receptor.
public class KafkaWordCount { public static void main(String[] args) throws Exception { if (args.length < 3) { System.err.println("Usage: KafkaWordCount <bootstrap-servers> " + "<subscribe-type> <topics>"); System.exit(1); } String bootstrapServers = args[0]; String subscribeType = args[1]; String topics = args[2]; SparkSession spark = SparkSession .builder() .appName("KafkaWordCount") .getOrCreate(); // Create DataSet representing the stream of input lines from kafka Dataset<String> lines = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .load() .selectExpr("CAST(value AS STRING)") .as(Encoders.STRING()); // Generate running word count Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }, Encoders.STRING()).groupBy("value").count(); // Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.