Spark Structured Streaming Sample Project (Java)
Function
The project uses Structured Streaming in Spark applications to call Kafka APIs to obtain word records. Word records are classified to obtain the number of records of each word.
Sample Code
The following code snippets are used as an example. For complete codes, see com.huawei.bigdata.spark.examples.SecurityKafkaWordCount.
When new data is available in Streaming DataFrame/Dataset, outputMode is used for configuring data written to the Streaming receptor.
public class SecurityKafkaWordCount { public static void main(String[] args) throws Exception { if (args.length < 6) { System.err.println("Usage: SecurityKafkaWordCount <bootstrap-servers> " + "<subscribe-type> <topics> <protocol> <service> <domain>"); System.exit(1); } String bootstrapServers = args[0]; String subscribeType = args[1]; String topics = args[2]; String protocol = args[3]; String service = args[4]; String domain = args[5]; SparkSession spark = SparkSession .builder() .appName("SecurityKafkaWordCount") .getOrCreate(); //Create a dataset that represents the stream of input lines from Kafka. Dataset<String> lines = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .option("kafka.security.protocol", protocol) .option("kafka.sasl.kerberos.service.name", service) .option("kafka.kerberos.domain.name", domain) .load() .selectExpr("CAST(value AS STRING)") .as(Encoders.STRING()); //Generate the running word counts. Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }, Encoders.STRING()).groupBy("value").count(); //Start running the query that prints the running counts to the console. StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot