文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Spark2x开发指南(安全模式)/
开发Spark应用/
Spark Structured Streaming样例程序/
Spark Structured Streaming样例程序(Java)
更新时间:2024-08-05 GMT+08:00
Spark Structured Streaming样例程序(Java)
功能介绍
在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。
代码样例
下面代码片段仅为演示,具体代码参见:com.huawei.bigdata.spark.examples.SecurityKafkaWordCount。
当Streaming DataFrame/Dataset中有新的可用数据时,outputMode用于配置写入Streaming接收器的数据。
public class SecurityKafkaWordCount
{
public static void main(String[] args) throws Exception {
if (args.length < 6) {
System.err.println("Usage: SecurityKafkaWordCount <bootstrap-servers> " +
"<subscribe-type> <topics> <protocol> <service> <domain>");
System.exit(1);
}
String bootstrapServers = args[0];
String subscribeType = args[1];
String topics = args[2];
String protocol = args[3];
String service = args[4];
String domain = args[5];
SparkSession spark = SparkSession
.builder()
.appName("SecurityKafkaWordCount")
.getOrCreate();
//创建表示来自kafka的输入行流的DataSet。
Dataset<String> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.option("kafka.security.protocol", protocol)
.option("kafka.sasl.kerberos.service.name", service)
.option("kafka.kerberos.domain.name", domain)
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
//生成运行字数。
Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING()).groupBy("value").count();
//开始运行将运行计数打印到控制台的查询。
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
}
}