文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Spark2x开发指南(普通模式)/
开发Spark应用/
Spark Structured Streaming样例程序/
Spark Structured Streaming样例程序(Java)
更新时间:2024-08-05 GMT+08:00
Spark Structured Streaming样例程序(Java)
功能介绍
在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.KafkaWordCount。
当Streaming DataFrame/Dataset中有新的可用数据时,outputMode用于配置写入Streaming接收器的数据。
public class KafkaWordCount { public static void main(String[] args) throws Exception { if (args.length < 3) { System.err.println("Usage: KafkaWordCount <bootstrap-servers> " + "<subscribe-type> <topics>"); System.exit(1); } String bootstrapServers = args[0]; String subscribeType = args[1]; String topics = args[2]; SparkSession spark = SparkSession .builder() .appName("KafkaWordCount") .getOrCreate(); //创建表示来自kafka的输入行流的DataSet。 Dataset<String> lines = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .load() .selectExpr("CAST(value AS STRING)") .as(Encoders.STRING()); //生成运行字数。 Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }, Encoders.STRING()).groupBy("value").count(); //开始运行将运行计数打印到控制台的查询。 StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); } }